Data::EventStream

Perl extension for processing event streams

Download .zip Download .tar.gz View on GitHub

Time weighted moving average

Now we will assume that each event contain timestamp and value, and that stream of events defines step function that changes its value on each event. In this case average value will be equal to integral of this function on a given interval divided by the length of the interval.

Aggregator

In this case event contains both timestamp and value, so aggregator will require time_value_sub attribute that extracts both from the event.

package TWMA;
use strict;
use warnings;

sub new {
    my ($class, %params) = @_;
    die "time_value_sub parameter must be specified" unless $params{time_value_sub};
    $params{_sum} = 0;
    return bless \%params, $class;
}

_sum attribute will contain integral of our function over the sliding window. To calculate duration we will use _start_point and _end_point attributes which will contain timestamp and function value for start and end of the sliding window respectively. To calculate the average we will have to divide _sum by duration:

sub duration {
    my ($self) = @_;
    $self->{_start_point} ? $self->{_end_point}[0] - $self->{_start_point}[0] : 0;
}

sub average {
    my ($self) = @_;
    $self->duration ? $self->{_sum} / $self->duration : undef;
}

When event enters window we should update _last_point with time and value of the event, and when event leaves window we should update _end_point with sliding window start time and value of the leaving event:

sub enter {
    my ($self, $event, $win) = @_;
    my ($time, $value) = $self->{time_value_sub}->($event);
    $self->{_end_point} = [$time, $value];
    unless ($self->{_start_point}) {
        # this is the first event we've got
        $self->{_start_point} = [$time, $value];
    }
}

sub leave {
    my ($self, $event, $win) = @_;
    my ($time, $value) = $self->{time_value_sub}->($event);
    $self->{_start_point} = [$win->start_time, $value];
}

For reset we will set _sum to zero, but we will preserve the current value of our function in _start_point and _end_point attributes:

sub reset {
    my ($self, $win) = @_;
    $self->{_sum} = 0;
    $self->{_start_point} = [$win->start_time, $self->{_end_point}[1]];
    $self->{_end_point}[0] = $win->end_time;
}

window_update method is called when time has changed. It should subtract from the _sum integral for the interval that left the window, and it should add to it integral for the interval that has been added to the window:

sub window_update {
    my ($self, $win) = @_;
    # return if we didn't receive a single event yet
    return unless $self->{_start_point};
    if ($win->start_time > $self->{_start_point}[0]) {
        $self->{_sum} -=
            ($win->start_time - $self->{_start_point}[0]) * $self->{_start_point}[1];
        $self->{_start_point}[0] = $win->start_time;
    }
    if ($win->end_time > $self->{_end_point}[0]) {
        $self->{_sum} +=
            ($win->end_time - $self->{_end_point}[0]) * $self->{_end_point}[1];
        $self->{_end_point}[0] = $win->end_time;
    }
}

Processing event stream

We will assume that events are hashes with time and price elements, and we will calculate moving average for the last 1 minute and minutely averages. By minutely averages here I mean that once minute is finished we will reset the aggregator.

my $now = time;
my $es = Data::EventStream->new(time_sub => sub { shift->{time} }, time => $now);
my $moving = TWMA->new(time_value_sub => sub { ($_[0]->{time}, $_[1]->{price}) });
$es->add_aggregator(
    $moving,
    duration => 60,
    on_enter => sub {
        my $val = shift->average;
        say "Last minute average: $val" if defined $val;
    },
);
my $minutely = TWMA->new(time_value_sub => sub { ($_[0]->{time}, $_[1]->{price}) });
$es->add_aggregator(
    $minutely,
    duration   => 60,
    batch      => 1,
    start_time => $now - ($now % 60),
    on_reset   => sub {
        my $val = shift->average;
        say "Reseting aggregator, last minute average: $val" if defined $val;
    }
);
while (my $event = wait_for_new_event()) {
    $es->add_event($event);
}

Here we defined on_enter and on_reset callbacks for aggregators. on_enter callback is called just after event has entered aggregator (after enter method has been called on aggregator), on_reset callback is called when aggregator’s window is full, just before calling reset method on aggregator. There is also on_leave callback that is called when event leaves aggregator, just before leave method is called.

First aggregator calculates moving average for the last 60 seconds, second aggregator calculates moving average since the start of the current minute, and is reset every minute, so it produces minutely averages.