Saturday, 6 October 2012

Streamulus V1.0

Streamulus has evolved since v0.1 first came out, and it's probably time to mark a new release point. Most of the changes were cleanup and low-level performance improvements. The main API change is that the strops' Filter mechanism was replaced by the possibility to return a boost::optional. boost::none means no-output. This API is more natural and easier to work with.  The examples have been updated to work in this way.

The spreadsheet utility was also updated to work with the new API. It uses streamulus to support reactive programming in C++.

Monday, 6 August 2012

"Spreadsheet programming" in C++

In a previous post I explained that Streamulus is not just-a-spreadsheet because beyond managing dependencies between scalar values, it defines a programming language for computations over ordered event streams. As such, it can be used as the backbone of a CEP system.

Then it occurred to me that it would be fun to write a spreadsheet library (a la the Lisp Cells library) and that this could be a good way of demonstrating some of the capabilities of Streamulus.  So I did, here it is.  I don't know of another C++ library that does this, so please let me know if there is one.

Usage is like so:

#include "spreadsheet.hpp"

int main (int argc, const char * argv[])
    using namespace spreadsheet;

    Spreadsheet sheet;

    // Create some cells
    Cell<double> a = sheet.NewCell<double>(); 
    Cell<double> b = sheet.NewCell<double>(); 
    Cell<double> c = sheet.NewCell<double>(); 

    // Assign expressions to them:
    c.Set(SQRT(a()*a() + b()*b()));

    // print the values:
    std::cout << " a=" << a.Value() 
              << " b=" << b.Value() 
              << " c=" << c.Value() << std::endl;

    // change a and see it propagate:

    std::cout << " a=" << a.Value() 
              << " b=" << b.Value() 
              << " c=" << c.Value() << std::endl;

This program will print:
 a=3 b=4 c=5 
 a=4 b=4 c=5.65685

There are a few limitations in the first version, which can be helped with some more work (such as the need to write a() and b() rather than simply a and b in the expression above). See the README file and the example code for more information. 

Sunday, 8 July 2012

Data Streams vs. Streams of Data

The Streamulus engine applies techniques that are used in Reactive Programming systems, which maintain a dependency diagram between different components and automatically propagate changes to inputs. Spreadsheets are the most well known example of reactive systems - once the dependencies between the cells have been set up, a change in any cell propagates to all cells that depend on it.

However, there is a significant difference between something like a spreadsheet and a stream processing system: A spreadsheet encodes relationships between scalar values. Once the value in a cell changes, its previous value no longer matters. In a stream processing problem, on the other hand, the entities we are reasoning about are not the individual scalar values that are streaming in, but rather the data streams themselves. They have meaningful properties that depend on multiple elements as well as on their order, such as moving average or historical volatility.

From the implementation point of view, this means that we need to be more careful about how exactly we propagate data through the dependency graph. For example, assume that we have a spreadsheet with four cells: A is an input cell where we place a number, and then we have three cells B,C,D which collectively compute the value of (A+1)/(A+2):

B := A+1
C := A+2
D := B/C

Assume that the value in cell A is 0 (so B=1, C=2 and D=1/2), and we change the value of A to 1. How should we propagate the change? The most efficient update order is to first update B=2 and C=3, and finally recompute D=2/3. However, if your spreadsheet computes the cells in a different order, such as: B=2 D=1, C=3, D=2/3 then the final result is the same. It is a waste of time to compute D twice, but logically the result is correct.

Now consider the same arithmetic over streams. If A is the stream {0,1}, then the value of the stream (A+1)/(A+2) is exactly the stream {1/2, 2/3}. If we produce the stream {1/2, 1, 2/3} because we computed D at an intermediate stage, then the output is logically incorrect.

Push and Pull

The computation graph for our expression looks like this:

What is the algorithm for propagating changes to A? The incorrect scheme we attempted above is push: after a cell is updated, every cell that depends on it is recursively updated. The reason that this did not work is because there are two paths from A to D, so A affects D in two different ways and we recomputed D while taking account of only one of the effects.  If D knows that there is a dependency between its inputs, it can wait until both are updated before recomputing its value and propagating it on. However, if our system is implemented in a reasonable way, the D node is a simple division operator that has no knowledge of the topology of the graph.

Could a pull method work? Before updating a node, recursively update anything it depends on. There are two problems with this approach. First, it departs from the reactive programming model - rather than inputs automatically propagating to the outputs, we need to actively request the outputs to update themselves. Second, each such update may need to involve a larger portion of the graph than is strictly necessary; Since a node does not necessarily emit a change to its output stream for every change to its input streams, it is not possible to know which nodes will need to do something in reaction to a change in a certain input.

The right thing to do is to propagate the changes forward from the input nodes, in topological order. Some systems, such as Boost accumulators and TPIE, apply a static update order. When constructing the graph, they also create a list of the nodes in topological order, and use this list to direct propagation of data through the graph.

Streamulus, on the other hand, dynamically discovers which nodes need to be updated. Like the push method, it begins at the inputs that have updates and traverses the graph forward. However, it does not immediately ask every dependent node to update itself. Rather, it inserts all successors of the updated input nodes into a priority queue, where the priority of a node is its topological order index. It then repeatedly removes the highest priority node from the queue (the one with the smallest topological order index) and activates it. If this node chooses to emit an output, its successor nodes are inserted into the priority queue (if they were not already there) so that they will be activated as well.

The advantage of the dynamic approach is that we only traverse parts of the graph that need to be traversed. The disadvantage is the performance cost of queueing. The static update order will be more efficient if updates tend to propagate through all or most nodes of the graph. When there are many inputs, each of which affects a small fraction of the nodes, or when there are nodes that filter streams such that inputs do not usually propagate beyond them, the dynamic approach scales better.

In the future, we plan to develop a hybrid method that computes a static update order for certain subgraphs and applies dynamic decisions only at the junctions between them.

Thursday, 21 June 2012

The Problem of Streams

Why do we need an embedded language for stream processing?

I will use an example to demonstrate the kind of problems I ran into when writing code for stream processing applications, and show how Streamulus solves them. I'd be interested in alternative solutions; leave a comment or email me.

A Simple Stream Processing Application

Our input is a volatile time series, such as the price of a stock or the blood pressure of a patient. We would like to receive an alert when there has been a significant change in the direction of the time series. One way to do this is to compute two exponentially decaying moving averages, with different decay rates, and issue an alert when they cross each other:

When the slow-decaying moving average crosses the fast-decaying one from below, this indicates that the time series recently changed direction from increasing to decreasing. In financial technical analysis this case is called a "death cross", while the opposite case is termed a "golden cross". 

The Algorithm

We maintain the two moving averages, and whenever they are recomputed, compare their values. A crossing occurs whenever the result of the comparison changes. Here is a flow chart of this computation:

An Object-Oriented Implementation

So you are a seasoned C++ programmer and you want to implement the cross detection algorithm. Chances are, you would begin by implementing a moving-average calculator. Something like this:

  struct TimeValue
    clock_t time;
    double value;

  // Exponentially decaying moving average
  class Mavg
    Mavg(int decay_factor)
      : first(true)
      , df(decay_factor)

    double Update(const TimeValue& tick)
      if (!first)
        double alpha = 1-1/exp(df*(tick.time-prev_time)); 
        mavg += alpha*(tick.value - mavg);
        mavg = tick.value;
        first = false;
      prev_time = tick.time;
      return mavg;

    double Get() const
      return mavg;

    clock_t prev_time;
    bool    first;
    int     df; // decay factor
    double  mavg;

Then you would use two instances of Mavg in your cross-detection class, like so:

  class CrossDetection
      : slow(1)
      , fast(10)
      , first(true)

    void Update(const TimeValue& tick)
      bool comp = (slow.Update(tick) < fast.Update(tick));
      if ( (comp != prev_comp) && !first)
      first = false;
      prev_comp = comp;

    Mavg slow; 
    Mavg fast;
    bool first;
    bool prev_comp;
To use CrossDetection, you first create an instance:

  CrossDetection cross_detection; 

and then invoke its Update function whenever a new value for the time series arrives:

  void HandleInput(const TimeValue& tick) 
All is well until, one day, another part of the program needs to use the values of the moving averages. You are asked to refactor the code to make that possible.

The Refactored Code

Your new cross detection class will not construct and maintain the moving averages. Rather, its constructor receives references to the moving averages and its Update function merely probes them for their values:

  class CrossDetection
    CrossDetection(const Mavg& slow_mavg, const Mavg& fast_mavg)
      : slow(slow_mavg)
      , fast(fast_mavg)
      , first(true)
    void Update()
      bool comp = (slow.Get() < fast.Get());
      if ( (comp != prev_comp) && ! first)
      first = false;
      prev_comp = comp;

    const Mavg& slow;
    const Mavg& fast;
    bool  first;
    bool  prev_comp;

The responsibility for updating the moving averages was transferred to the user of the cross-detection class, whose setup code will look like this:

  Mavg slow(1), fast(10);
  CrossDetection cross_detection(slow, fast);
  SomethingElse something_else(slow, fast);

because CrossDetection and some other class both want to use the moving averages. So far so good, but handling a new event became more complicated:

  void HandleInput(const TimeValue& tick)   
    cross_detection.Update(); // depends on slow and fast
    something_else.Update();  // depends on slow and fast

With the refactored class, we are forcing the user to write code which is vulnerable for two related but different reasons.

First, the dependence of cross_detection on slow and fast needs to be stated twice: Once when references to slow and fast are passed into the cross_detection constructor, and again in HandleInput, where they all must be updated together, in the correct order.
Whenever an element of the program's logic needs to be repeated twice, we run the risk that it won't be. For example, suppose that one day you decide to use a different moving average in cross_detection, so you change the setup code:

  Mavg slow(1), moderate(5), fast(10);
  CrossDetection cross_detection(moderate, fast);
  SomethingElse something_else(slow, fast);

You must remember to also add this line to HandleInput:


The second problem is that the dependence of cross_detection on slow and fast in HandleInput is implicit. There is nothing in the code of this function to suggest that this dependence exists, which is why we added comments in the hope that whoever might try to modify this function will be aware of the invisible connections between its lines. However, comments are not a substitute for clear code. This function is fragile and can break in subtle ways - you might remove one of the moving average updates, or relocate it after the cross_detection.Update() call, creating a logical bug. Or, you might replace the last two lines with something that does not depend on slow and fast, but leave the first two lines that update them, creating a performance bug.

The Streamulus Way

The examples that come with Streamulus contain a complete program for the cross detection problem. The code is structured as follows.

First, there is a collection of functors that implement the stream operators we need, such as the operator that issues a cross alert whenever it is invoked (the rightmost box in the diagram above):

  struct cross_alert
    template< class Sig >
    struct result
      typedef bool type;

    bool operator()(const bool is_golden_cross) 
      if (is_golden_cross)
        std::bout << "Golden cross detected!" << std::endl;
        std::bout << "Death cross detected!" << std::endl; 
      return is_golden_cross;  
Next comes the subscription stage, where we define input streams and build expression over them, using Streamified versions of the functors we defined as well as built-in stream operators:

  // create an input stream:
  InputStream< TimeValue >::type ts = 
                   NewInputStream< TimeValue >("Time Series");

  // create a streamulus instance:
  Streamulus engine;

  // create moving averages:
  Mavg mavg1(1), mavg10(10);

  // subscribe the moving averages:
  typedef Subscription< double >::type Subs;
  Subs slow = engine.Subscribe(Streamify(mavg1)(ts));
  Subs fast = engine.Subscribe(Streamify(mavg10)(ts));

  // subscribe the cross-alert expression:
              Streamify< cross_alert >(
                      Streamify< unique< bool > >( slow < fast )));

So far, this is not very different from the object oriented solution we described above - first we define the types that we will use, then we create instances of them and hook them up together.

The main advantage of Streamulus is in the way we handle a new input value:

  void HandleInput(const TimeValue& tick)
    InputStreamPut(ts, tick);

We merely insert the new value into the input stream ts, and the data structure that Streamulus created from our expression will propagate it through the expression and update whatever needs to be updated by invoking the operators whose inputs have changed. Importantly, only those operators will be invoked, while unaffected parts of the expression will not be recomputed. Now, if we decide to use a different moving average in the expression, HandleInput doesn't need to change.