Time weighted moving average
Now we will assume that each event contain timestamp and value, and that stream of events defines step function that changes its value on each event. In this case average value will be equal to integral of this function on a given interval divided by the length of the interval.
Aggregator
In this case event contains both timestamp and value, so aggregator will
require time_value_sub attribute that extracts both from the event.
package TWMA;
use strict;
use warnings;
sub new {
    my ($class, %params) = @_;
    die "time_value_sub parameter must be specified" unless $params{time_value_sub};
    $params{_sum} = 0;
    return bless \%params, $class;
}
_sum attribute will contain integral of our function over the sliding window.
To calculate duration we will use _start_point and _end_point attributes
which will contain timestamp and function value for start and end of the
sliding window respectively. To calculate the average we will have to divide
_sum by duration:
sub duration {
    my ($self) = @_;
    $self->{_start_point} ? $self->{_end_point}[0] - $self->{_start_point}[0] : 0;
}
sub average {
    my ($self) = @_;
    $self->duration ? $self->{_sum} / $self->duration : undef;
}
When event enters window we should update _last_point with time and value of
the event, and when event leaves window we should update _end_point with
sliding window start time and value of the leaving event:
sub enter {
    my ($self, $event, $win) = @_;
    my ($time, $value) = $self->{time_value_sub}->($event);
    $self->{_end_point} = [$time, $value];
    unless ($self->{_start_point}) {
        # this is the first event we've got
        $self->{_start_point} = [$time, $value];
    }
}
sub leave {
    my ($self, $event, $win) = @_;
    my ($time, $value) = $self->{time_value_sub}->($event);
    $self->{_start_point} = [$win->start_time, $value];
}
For reset we will set _sum to zero, but we will preserve the current value of
our function in _start_point and _end_point attributes:
sub reset {
    my ($self, $win) = @_;
    $self->{_sum} = 0;
    $self->{_start_point} = [$win->start_time, $self->{_end_point}[1]];
    $self->{_end_point}[0] = $win->end_time;
}
window_update method is called when time has changed. It should subtract from
the _sum integral for the interval that left the window, and it should add to
it integral for the interval that has been added to the window:
sub window_update {
    my ($self, $win) = @_;
    # return if we didn't receive a single event yet
    return unless $self->{_start_point};
    if ($win->start_time > $self->{_start_point}[0]) {
        $self->{_sum} -=
            ($win->start_time - $self->{_start_point}[0]) * $self->{_start_point}[1];
        $self->{_start_point}[0] = $win->start_time;
    }
    if ($win->end_time > $self->{_end_point}[0]) {
        $self->{_sum} +=
            ($win->end_time - $self->{_end_point}[0]) * $self->{_end_point}[1];
        $self->{_end_point}[0] = $win->end_time;
    }
}
Processing event stream
We will assume that events are hashes with time and price elements, and we
will calculate moving average for the last 1 minute and minutely averages. By
minutely averages here I mean that once minute is finished we will reset the
aggregator.
my $now = time;
my $es = Data::EventStream->new(time_sub => sub { shift->{time} }, time => $now);
my $moving = TWMA->new(time_value_sub => sub { ($_[0]->{time}, $_[1]->{price}) });
$es->add_aggregator(
    $moving,
    duration => 60,
    on_enter => sub {
        my $val = shift->average;
        say "Last minute average: $val" if defined $val;
    },
);
my $minutely = TWMA->new(time_value_sub => sub { ($_[0]->{time}, $_[1]->{price}) });
$es->add_aggregator(
    $minutely,
    duration   => 60,
    batch      => 1,
    start_time => $now - ($now % 60),
    on_reset   => sub {
        my $val = shift->average;
        say "Reseting aggregator, last minute average: $val" if defined $val;
    }
);
while (my $event = wait_for_new_event()) {
    $es->add_event($event);
}
Here we defined on_enter and on_reset callbacks for aggregators. on_enter
callback is called just after event has entered aggregator (after enter
method has been called on aggregator), on_reset callback is called when
aggregator’s window is full, just before calling reset method on aggregator.
There is also on_leave callback that is called when event leaves aggregator,
just before leave method is called.
First aggregator calculates moving average for the last 60 seconds, second aggregator calculates moving average since the start of the current minute, and is reset every minute, so it produces minutely averages.