I have the following Camel route:
<route id="myRoute">
<from uri="direct:aggregator" />
<aggregate strategy="aggregatorStrategy" completionInterval="60000" completionSize="500">
<correlationExpression>
<xpath>/fizz/buzz</xpath>
</correlationExpression>
<to uri="bean:postProcessor?method=run" />
</aggregator>
</route>
As you can see it aggregates either the first 500 messages that the <aggregator/>
receives, or all the messages within a 1 minute interval, and then sends the aggregated message on to a bean called postProcessor
.
You can think of this aggregation logic as follows:
AGGREGATE UNTIL:
We have received 500 messages
OR
1 minute has elapsed
THEN:
Send to postProcessor
Or in pseudo-code: aggregateUntil(weHave500Message() || 1minHasElapsed())
. I'd like to change this logic to:
AGGREGATE UNTIL:
We have received 500 messages
OR
1 minute has elapsed
OR
A message is received that has a property called "fireNow" and a value of "true"
THEN:
Send to postProcessor
Or, again in pseudo-code: aggregateUntil(weHave500Message() || 1minHasElapsed() || messageHasProperty("fireNow", "true"))
.
In other words, aggregate until either of the 3 conditions are met. Any ideas how I could implement this? I have a feeling I can finagle this with completionPredicate
and perhaps eagerCheckCompletion
, but not seeing the forest through the trees here.
You can use completionSize
and completionInterval
together and add a completionPredicate
to the equation, like this (you do need to use eagerCheckCompletion="true"
because we need to test the incoming message, not the aggregated message):
<route>
<from uri="direct:aggregator" />
<aggregate completionSize="500" completionInterval="60000" eagerCheckCompletion="true">
<correlationExpression>
<xpath>/fizz/buzz</xpath>
</correlationExpression>
<completion-predicate>
<simple>${property.fireNow} == 'true'</simple>
</completion-predicate>
<to uri="bean:postProcessor?method=run" />
</aggregate>
</route>
An alternative could be to create a compound predicate that tests all three conditions and use only that predicate as completion-predicate
but this solution is inferior because the interval timeout is triggered by a separate thread, whereas using a compound predicate there is no thread to trigger the aggregation on timeout expiration.