Calculating number of messages per second in a rolling window?

Rudiger picture Rudiger · Feb 16, 2010 · Viewed 14.4k times · Source

I have messages coming into my program with millisecond resolution (anywhere from zero to a couple hundred messages a millisecond).

I'd like to do some analysis. Specifically, I want to maintain multiple rolling windows of the message counts, updated as messages come in. For example,

  • # of messages in last second
  • # of messages in last minute
  • # of messages in last half-hour divided by # of messages in last hour

I can't just maintain a simple count like "1,017 messages in last second", since I won't know when a message is older than 1 second and therefore should no longer be in the count...

I thought of maintaining a queue of all the messages, searching for the youngest message that's older than one second, and inferring the count from the index. However, this seems like it would be too slow, and would eat up a lot of memory.

What can I do to keep track of these counts in my program so that I can efficiently get these values in real-time?

Answer

Antti Huima picture Antti Huima · Feb 16, 2010

This is easiest handled by a cyclic buffer.

A cyclic buffer has a fixed number of elements, and a pointer to it. You can add an element to the buffer, and when you do, you increment the pointer to the next element. If you get past the fixed-length buffer you start from the beginning. It's a space and time efficient way to store "last N" items.

Now in your case you could have one cyclic buffer of 1,000 counters, each one counting the number of messages during one millisecond. Adding all the 1,000 counters gives you the total count during last second. Of course you can optimize the reporting part by incrementally updating the count, i.e. deduct form the count the number you overwrite when you insert and then add the new number.

You can then have another cyclic buffer that has 60 slots and counts the aggregate number of messages in whole seconds; once a second, you take the total count of the millisecond buffer and write the count to the buffer having resolution of seconds, etc.

Here C-like pseudocode:

int msecbuf[1000]; // initialized with zeroes
int secbuf[60]; // ditto
int msecptr = 0, secptr = 0;
int count = 0;
int msec_total_ctr = 0;
void msg_received() { count++; }
void every_msec() {
  msec_total_ctr -= msecbuf[msecptr];
  msecbuf[msecptr] = count;
  msec_total_ctr += msecbuf[msecptr];
  count = 0;
  msecptr = (msecptr + 1) % 1000;
}
void every_sec() {
  secbuf[secptr] = msec_total_ctr;
  secptr = (secptr + 1) % 60;
}