How to throttle event stream using RX?

Alex picture Alex · Jul 9, 2010 · Viewed 18k times · Source

I want to effectively throttle an event stream, so that my delegate is called when the first event is received but then not for 1 second if subsequent events are received. After expiry of that timeout (1 second), if a subsequent event was received I want my delegate to be called.

Is there a simple way to do this using Reactive Extensions?

Sample code:

static void Main(string[] args)
{
    Console.WriteLine("Running...");

    var generator = Observable
        .GenerateWithTime(1, x => x <= 100, x => x, x => TimeSpan.FromMilliseconds(1), x => x + 1)
        .Timestamp();

    var builder = new StringBuilder();

    generator
        .Sample(TimeSpan.FromSeconds(1))
        .Finally(() => Console.WriteLine(builder.ToString()))
        .Subscribe(feed =>
                   builder.AppendLine(string.Format("Observed {0:000}, generated at {1}, observed at {2}",
                                                    feed.Value,
                                                    feed.Timestamp.ToString("mm:ss.fff"),
                                                    DateTime.Now.ToString("mm:ss.fff"))));

    Console.ReadKey();
}

Current output:

Running...
Observed 064, generated at 41:43.602, observed at 41:43.602
Observed 100, generated at 41:44.165, observed at 41:44.602

But I want to observe (timestamps obviously will change)

Running...
Observed 001, generated at 41:43.602, observed at 41:43.602
....
Observed 100, generated at 41:44.165, observed at 41:44.602

Answer

cRichter picture cRichter · Jul 9, 2010

Okay,

you have 3 scenarios here:

1) I would like to get one value of the event stream every second. means: that if it produces more events per second, you will get a always bigger buffer.

observableStream.Throttle(timeSpan)

2) I would like to get the latest event, that was produced before the second happens means: other events get dropped.

observableStream.Sample(TimeSpan.FromSeconds(1))

3) you would like to get all events, that happened in the last second. and that every second

observableStream.BufferWithTime(timeSpan)

4) you want to select what happens in between the second with all the values, till the second has passed, and your result is returned

observableStream.CombineLatest(Observable.Interval(1000), selectorOnEachEvent)