Data::EventStream

Perl extension for processing event streams

Download .zip Download .tar.gz View on GitHub

Introducion

Suppose you receive stream of events, where each event represents state of some process at given time. In many cases it may be usefull to calculate certain parameters of this stream over a sliding window, for example an average over the last 5 minutes. Data::EventStream is a module that helps you to solve this task. Data::EventStream object represents stream of events, you can attach aggregator objects to the stream, and they will be updated each time event enters or leave sliding window.

Windows

There are two basic types of sliding window – length based and time based. Length based window holds certain number of events, e.g. last 10 events. If new event enters length window that already contains maximum number of events, the oldest event leaves window before the new event enters it. Time based window holds events for the specified period of time, e.g. last 10 minutes. When time is updated, all events older than specified period leave window. Window may combine time and length limits, e.g. it is possible to define a sliding window which holds up to 10 last events for up to 10 minutes. Window may be defined as “batch”, which means that when maximum number of events is added to the window, or it covers full period of time, it shrinks to zero and starts growing again. Each aggregator has its own sliding window, so you can attach to a single event stream aggregators which calculate parameters for different periods of time or number of events.

Aggregators

Aggregator is an object that implements four methods: enter, leave, reset, and window_update. When event enters or leaves sliding window, enter, or leave method is invoked with two parameters – event, and window object which describes current sliding window. For batch windows, reset method is invoked when window reaches maximum allowed size, leave method is not invoked in this case. Finally, window_update method is invoked when time changes.

Time

Data::EventStream does not use real time, instead it allows you to set initial time when you creating an object and when update it using set_time method, or just by adding new events containing updated time. Time is arbitrary numeric value which may only increase, attempt to set new time that is less than current time, or add an event with timestamp less than current time will trigger an error.

Events

Events are arbitrary data structures – it may be a hash, object, array, scalar or any other data type. Data::EventStream only needs to be able to extract timestamp from the event if you use time based sliding windows. For that purpose you should specify time_sub attribute when creating Data::EventStream object, this attribute should be a reference to the subroutine that returns timestamp for the event passed to it as the only argument. For example, if your events are references to hashes that contain timestamp in timestamp element, then you can create Data::EventStream object like this:

my $es = Data::EventStream->new(
    time_sub => sub { shift->{timestamp} },
)

In a similar manner, some aggregators may require you to specify value_sub that extracts some value from the event.

Now let’s have a look at some examples.