# The Problem of Streams

Why do we need an embedded language for stream processing?

I will use an example to demonstrate the kind of problems I ran into when writing code for stream processing applications, and show how Streamulus solves them. I'd be interested in alternative solutions; leave a comment or email me.

### A Simple Stream Processing Application

Our input is a volatile time series, such as the price of a stock or the blood pressure of a patient. We would like to receive an alert when there has been a significant change in the direction of the time series. One way to do this is to compute two exponentially decaying moving averages, with different decay rates, and issue an alert when they cross each other:

When the slow-decaying moving average crosses the fast-decaying one from below, this indicates that the time series recently changed direction from increasing to decreasing. In financial technical analysis this case is called a "death cross", while the opposite case is termed a "golden cross".

### The Algorithm

We maintain the two moving averages, and whenever they are recomputed, compare their values. A crossing occurs whenever the result of the comparison changes. Here is a flow chart of this computation:

### An Object-Oriented Implementation

So you are a seasoned C++ programmer and you want to implement the cross detection algorithm. Chances are, you would begin by implementing a moving-average calculator. Something like this:
```
struct TimeValue
{
clock_t time;
double value;
};

// Exponentially decaying moving average
class Mavg
{
public:
Mavg(int decay_factor)
: first(true)
, df(decay_factor)
{
}

double Update(const TimeValue& tick)
{
if (!first)
{
double alpha = 1-1/exp(df*(tick.time-prev_time));
mavg += alpha*(tick.value - mavg);
}
else
{
mavg = tick.value;
first = false;
}
prev_time = tick.time;
return mavg;
}

double Get() const
{
return mavg;
}

private:
clock_t prev_time;
bool    first;
int     df; // decay factor
double  mavg;
};
```

Then you would use two instances of Mavg in your cross-detection class, like so:
```
class CrossDetection
{
public:
CrossDetection()
: slow(1)
, fast(10)
, first(true)
{
}

void Update(const TimeValue& tick)
{
bool comp = (slow.Update(tick) < fast.Update(tick));
if ( (comp != prev_comp) && !first)
first = false;
prev_comp = comp;
}

private:
Mavg slow;
Mavg fast;
bool first;
bool prev_comp;
};
```
To use CrossDetection, you first create an instance:
```
CrossDetection cross_detection;
```

and then invoke its Update function whenever a new value for the time series arrives:
```
void HandleInput(const TimeValue& tick)
{
cross_detection.Update(tick);
}
```
All is well until, one day, another part of the program needs to use the values of the moving averages. You are asked to refactor the code to make that possible.

### The Refactored Code

Your new cross detection class will not construct and maintain the moving averages. Rather, its constructor receives references to the moving averages and its Update function merely probes them for their values:
```
class CrossDetection
{
public:
CrossDetection(const Mavg& slow_mavg, const Mavg& fast_mavg)
: slow(slow_mavg)
, fast(fast_mavg)
, first(true)
{
}

void Update()
{
bool comp = (slow.Get() < fast.Get());
if ( (comp != prev_comp) && ! first)
first = false;
prev_comp = comp;
}

private:
const Mavg& slow;
const Mavg& fast;
bool  first;
bool  prev_comp;
};
```

The responsibility for updating the moving averages was transferred to the user of the cross-detection class, whose setup code will look like this:
```
Mavg slow(1), fast(10);
CrossDetection cross_detection(slow, fast);
SomethingElse something_else(slow, fast);
```

because CrossDetection and some other class both want to use the moving averages. So far so good, but handling a new event became more complicated:
```
void HandleInput(const TimeValue& tick)
{
slow.Update(tick);
fast.Update(tick);
cross_detection.Update(); // depends on slow and fast
something_else.Update();  // depends on slow and fast
}
```

With the refactored class, we are forcing the user to write code which is vulnerable for two related but different reasons.

First, the dependence of cross_detection on slow and fast needs to be stated twice: Once when references to slow and fast are passed into the cross_detection constructor, and again in HandleInput, where they all must be updated together, in the correct order.
Whenever an element of the program's logic needs to be repeated twice, we run the risk that it won't be. For example, suppose that one day you decide to use a different moving average in cross_detection, so you change the setup code:

```
Mavg slow(1), moderate(5), fast(10);
CrossDetection cross_detection(moderate, fast);
SomethingElse something_else(slow, fast);
```

You must remember to also add this line to HandleInput:
```
moderate.Update(tick);
```

The second problem is that the dependence of cross_detection on slow and fast in HandleInput is implicit. There is nothing in the code of this function to suggest that this dependence exists, which is why we added comments in the hope that whoever might try to modify this function will be aware of the invisible connections between its lines. However, comments are not a substitute for clear code. This function is fragile and can break in subtle ways - you might remove one of the moving average updates, or relocate it after the cross_detection.Update() call, creating a logical bug. Or, you might replace the last two lines with something that does not depend on slow and fast, but leave the first two lines that update them, creating a performance bug.

### The Streamulus Way

The examples that come with Streamulus contain a complete program for the cross detection problem. The code is structured as follows.

First, there is a collection of functors that implement the stream operators we need, such as the operator that issues a cross alert whenever it is invoked (the rightmost box in the diagram above):
```
{
template< class Sig >
struct result
{
typedef bool type;
};

bool operator()(const bool is_golden_cross)
{
if (is_golden_cross)
std::bout << "Golden cross detected!" << std::endl;
else
std::bout << "Death cross detected!" << std::endl;
return is_golden_cross;
}
};
```
Next comes the subscription stage, where we define input streams and build expression over them, using Streamified versions of the functors we defined as well as built-in stream operators:
```
// create an input stream:
InputStream< TimeValue >::type ts =
NewInputStream< TimeValue >("Time Series");

// create a streamulus instance:
Streamulus engine;

// create moving averages:
Mavg mavg1(1), mavg10(10);

// subscribe the moving averages:
typedef Subscription< double >::type Subs;
Subs slow = engine.Subscribe(Streamify(mavg1)(ts));
Subs fast = engine.Subscribe(Streamify(mavg10)(ts));

engine.Subscribe(
Streamify< unique< bool > >( slow < fast )));
```

So far, this is not very different from the object oriented solution we described above - first we define the types that we will use, then we create instances of them and hook them up together.

The main advantage of Streamulus is in the way we handle a new input value:
```
void HandleInput(const TimeValue& tick)
{
InputStreamPut(ts, tick);
}
```

We merely insert the new value into the input stream ts, and the data structure that Streamulus created from our expression will propagate it through the expression and update whatever needs to be updated by invoking the operators whose inputs have changed. Importantly, only those operators will be invoked, while unaffected parts of the expression will not be recomputed. Now, if we decide to use a different moving average in the expression, HandleInput doesn't need to change.