Camel Aggregator and completionPredicate by example

AdjustingForInflation picture AdjustingForInflation · Feb 5, 2014 · Viewed 8.4k times · Source

I have the following Camel route:

<route id="myRoute">
    <from uri="direct:aggregator" />

    <aggregate strategy="aggregatorStrategy" completionInterval="60000" completionSize="500">
        <correlationExpression>
            <xpath>/fizz/buzz</xpath>
        </correlationExpression>

        <to uri="bean:postProcessor?method=run" />
    </aggregator>
</route>

As you can see it aggregates either the first 500 messages that the <aggregator/> receives, or all the messages within a 1 minute interval, and then sends the aggregated message on to a bean called postProcessor.

You can think of this aggregation logic as follows:

AGGREGATE UNTIL:
    We have received 500 messages
    OR
    1 minute has elapsed

THEN:
    Send to postProcessor

Or in pseudo-code: aggregateUntil(weHave500Message() || 1minHasElapsed()). I'd like to change this logic to:

AGGREGATE UNTIL:
    We have received 500 messages
    OR
    1 minute has elapsed
    OR
    A message is received that has a property called "fireNow" and a value of "true"

THEN:
    Send to postProcessor

Or, again in pseudo-code: aggregateUntil(weHave500Message() || 1minHasElapsed() || messageHasProperty("fireNow", "true")).

In other words, aggregate until either of the 3 conditions are met. Any ideas how I could implement this? I have a feeling I can finagle this with completionPredicate and perhaps eagerCheckCompletion, but not seeing the forest through the trees here.

Answer

EmirCalabuch picture EmirCalabuch · Feb 6, 2014

You can use completionSize and completionInterval together and add a completionPredicate to the equation, like this (you do need to use eagerCheckCompletion="true" because we need to test the incoming message, not the aggregated message):

<route>
    <from uri="direct:aggregator" />
    <aggregate completionSize="500" completionInterval="60000" eagerCheckCompletion="true">
        <correlationExpression>
             <xpath>/fizz/buzz</xpath>
        </correlationExpression>
        <completion-predicate>
             <simple>${property.fireNow} == 'true'</simple>
        </completion-predicate>
        <to uri="bean:postProcessor?method=run" />
    </aggregate>
</route>

An alternative could be to create a compound predicate that tests all three conditions and use only that predicate as completion-predicate but this solution is inferior because the interval timeout is triggered by a separate thread, whereas using a compound predicate there is no thread to trigger the aggregation on timeout expiration.