Sunday, 8 July 2012

Data Streams vs. Streams of Data

The Streamulus engine applies techniques that are used in Reactive Programming systems, which maintain a dependency diagram between different components and automatically propagate changes to inputs. Spreadsheets are the most well known example of reactive systems - once the dependencies between the cells have been set up, a change in any cell propagates to all cells that depend on it.

However, there is a significant difference between something like a spreadsheet and a stream processing system: A spreadsheet encodes relationships between scalar values. Once the value in a cell changes, its previous value no longer matters. In a stream processing problem, on the other hand, the entities we are reasoning about are not the individual scalar values that are streaming in, but rather the data streams themselves. They have meaningful properties that depend on multiple elements as well as on their order, such as moving average or historical volatility.

From the implementation point of view, this means that we need to be more careful about how exactly we propagate data through the dependency graph. For example, assume that we have a spreadsheet with four cells: A is an input cell where we place a number, and then we have three cells B,C,D which collectively compute the value of (A+1)/(A+2):

B := A+1
C := A+2
D := B/C

Assume that the value in cell A is 0 (so B=1, C=2 and D=1/2), and we change the value of A to 1. How should we propagate the change? The most efficient update order is to first update B=2 and C=3, and finally recompute D=2/3. However, if your spreadsheet computes the cells in a different order, such as: B=2 D=1, C=3, D=2/3 then the final result is the same. It is a waste of time to compute D twice, but logically the result is correct.

Now consider the same arithmetic over streams. If A is the stream {0,1}, then the value of the stream (A+1)/(A+2) is exactly the stream {1/2, 2/3}. If we produce the stream {1/2, 1, 2/3} because we computed D at an intermediate stage, then the output is logically incorrect.

Push and Pull

The computation graph for our expression looks like this:

What is the algorithm for propagating changes to A? The incorrect scheme we attempted above is push: after a cell is updated, every cell that depends on it is recursively updated. The reason that this did not work is because there are two paths from A to D, so A affects D in two different ways and we recomputed D while taking account of only one of the effects.  If D knows that there is a dependency between its inputs, it can wait until both are updated before recomputing its value and propagating it on. However, if our system is implemented in a reasonable way, the D node is a simple division operator that has no knowledge of the topology of the graph.

Could a pull method work? Before updating a node, recursively update anything it depends on. There are two problems with this approach. First, it departs from the reactive programming model - rather than inputs automatically propagating to the outputs, we need to actively request the outputs to update themselves. Second, each such update may need to involve a larger portion of the graph than is strictly necessary; Since a node does not necessarily emit a change to its output stream for every change to its input streams, it is not possible to know which nodes will need to do something in reaction to a change in a certain input.

The right thing to do is to propagate the changes forward from the input nodes, in topological order. Some systems, such as Boost accumulators and TPIE, apply a static update order. When constructing the graph, they also create a list of the nodes in topological order, and use this list to direct propagation of data through the graph.

Streamulus, on the other hand, dynamically discovers which nodes need to be updated. Like the push method, it begins at the inputs that have updates and traverses the graph forward. However, it does not immediately ask every dependent node to update itself. Rather, it inserts all successors of the updated input nodes into a priority queue, where the priority of a node is its topological order index. It then repeatedly removes the highest priority node from the queue (the one with the smallest topological order index) and activates it. If this node chooses to emit an output, its successor nodes are inserted into the priority queue (if they were not already there) so that they will be activated as well.

The advantage of the dynamic approach is that we only traverse parts of the graph that need to be traversed. The disadvantage is the performance cost of queueing. The static update order will be more efficient if updates tend to propagate through all or most nodes of the graph. When there are many inputs, each of which affects a small fraction of the nodes, or when there are nodes that filter streams such that inputs do not usually propagate beyond them, the dynamic approach scales better.

In the future, we plan to develop a hybrid method that computes a static update order for certain subgraphs and applies dynamic decisions only at the junctions between them.