Skip to content

Advanced Topics

Route Concurrency

Camel offers route concurrency support in a number of places. Using their constructs can also improve the application performance.

The following is a summary of a number of concurrency constructs available in Camel. However, I would strongly advise reading Chapter 10 of the Camel In Action for more details.

Parallel processing EIPs

The following Camel EIP implementations support parallel processing:

  • multicast
  • recipient list
  • splitter
  • delayer
  • wiretap
  • error handler

If you are going to enable parallel processing for those, it would be even better if you also provide a custom thread pool specifically tuned for your use case rather than relying on Camel”s default thread pool profile.

Throttler EIP

The throttler allows you to ensure that a specific endpoint does not get overloaded, or that we don”t exceed an agreed SLA with some external service.

from("seda:a")
    .throttle(3)
    .timePeriodMillis(10000)
    .to("log:result", "mock:result")

So the above example will throttle messages all messages received on seda:a before being sent to mock:result ensuring that a maximum of 3 messages are sent in any 10 second window. Note that typically you would often use the default time period of a second. So to throttle requests at 100 requests per second between two endpoints it would look more like this:

from("seda:a")
    .throttle(100)
    .to("seda:b")

Threads DSL
 construct

Some Camel endpoints (such as the File consumer) are single threaded by design and cannot be parallelized at endpoint level.

In case of File consumer, a single thread picks a file at a time and processes it through the route until it reaches the end of the route and then the consumer thread picks the next file.

This is when Camel threads() construct can be useful. As per the example below, File consumer thread can pick a file and pass it to a thread from the Threads construct for further processing.

Then the File consumer can pick another file without waiting for the previous Exchange to complete processing fully.

Seda component

Seda is another way to achieve parallelism in Camel.

The Seda component has in-memory list to accumulate incoming messages from the producer and concurrent consumers to process those incoming request in parallel by multiple threads.

Asynchronous redelivery/retry

If you are using an error handler with a redelivery policy as part of the routing process, you can configure it to be asynchronous and do the re-deliveries in a separate thread.

That will use a separate thread pool for the redelivery not block the main request processing thread while waiting.

If you need long delayed re-deliveries, it might be a better approach to use JMS broker redelivery (that is different from consumer redelivery BTW) where the re-deliveries will be persisted on the message broker and not kept in Camel application memory.

Another benefit of this mechanism is that the re-deliveries will survive platform6 restart and also play nicely when the application is clustered

Examples

Parallel processing

// import java.util.concurrent.Executors

import java.util.concurrent.ExecutorService
import org.apache.camel.builder.ThreadPoolBuilder

def executor = new ThreadPoolBuilder(camel.getCtx()).poolSize(10).maxPoolSize(150).maxQueueSize(150).build("MyCustomPool")
// Alternative way to create an executor
// def executor = Executors.newFixedThreadPool(4)

camel.getCtx().addRoutes(new RouteBuilder() {
    void configure() {
        from("timer://myTimer?period=10s")
            .routeId("MyPooledRoute")
            .description("Master Timer")
            .multicast()        
            .parallelProcessing()
            .executorService(executor)
            .to("direct:a", "direct:b", "direct:c", "direct:d")

        from("direct:a")
            .routeId("ConsumerA")
            .description("My Consumer A")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")

        from("direct:b")
            .routeId("ConsumerB")
            .description("My Consumer B")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")

        from("direct:c")
            .routeId("ConsumerC")
            .description("My Consumer C")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")

        from("direct:d")
            .routeId("ConsumerD")
            .description("My Consumer D")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")
    }
})

Threads DSL

See: https://camel.apache.org/maven/camel-2.15.0/camel-core/apidocs/org/apache/camel/model/ProcessorDefinition.html#threads()

camel.getCtx().addRoutes(new RouteBuilder() {
    void configure() {
        from("file:/opt/b2box5.data/resources/ftp.in?antInclude=*.xml&move=.processed")
            .to("file:/opt/b2box5.data/tmp")
            // Use up to ten threads to process the FileTesting script
            .threads(2,10)
            .to( "p6cmb://scripts?platform6.request.action=execute&id=FileTesting" )
            .routeId("myFile2Route")
    }
})

SEDA component

See: https://camel.apache.org/seda.html

camel.getCtx().addRoutes(new RouteBuilder() {
    void configure() {
        from("timer://myTimer?period=10s")
            .routeId("MyPooledRoute")
            .description("Master Timer")
            .to("seda:a")

        from("seda:a?concurrentConsumers=4")
            .routeId("ConsumerA")
            .description("My Consumer A")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")
    }
})

The Throttler EIP

See: https://people.apache.org/~dkulp/camel/throttler.html

camel.getCtx().addRoutes(new RouteBuilder() {
    void configure() {
        from("timer://myTimer?period=10s")
            .routeId("MyPooledRoute")
            .description("Master Timer")
            .throttle(100)
            .to("vm:a")

        from("vm:a?concurrentConsumers=4")
            .routeId("ConsumerA")
            .description("My Consumer A")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")
    }
})

Camel Aggregation & Strategies

So what are AggregationStrategy’s anyway? Simple… they’re implementations of the org.apache.camel.processor.aggregate.AggregationStrategy that allow you to specify exactly how two exchanges will be merged. This specification can be as simple or as complex as you require for your use case. Maybe you just want to take the first response and ignore all others. Maybe you want to combine the XML bodies into a list and then merge a select few headers.

See: http://camel.apache.org/aggregator2.html

org.apache.camel.processor.aggregate.UseLatestAggregationStrategy

It’s the default strategy for most Camel EIPs that accept aggregation strategies. So if you don’t specify any strategy, this is likely the one you’re using. Basically, it takes the last exchange it receives and just uses that (ignoring any others that may have been aggregated prior). One example use case for this would be when doing an Aggregator. Perhaps you’re receiving many messages as input, but you want to buffer them (giving the user time to send in corrections/updates), and then only send the latest message to the backend after some period of inactivity

Example

import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy

from('p6sftpd:///')
    .description('SFTP Listener')
    .aggregate(header('path'), new UseLatestAggregationStrategy()).completionTimeout(10000) 
    .to("log:p6.sftpd")
    .routeId('SFTP_Listener')
Here we aggregate based on the absolute path of a file event emitted by the p6sftpd component. As an example, if a user connects to the Sftp server and deletes a file and then uploads a new file to replace it (same name), this aggregation strategy will ensure only the last (FILE_WRITTEN) event is processed.

org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy

As the name would suggest, it “merges” two exchanges together by completely ignoring the new exchange and just taking the original.

org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy

This will combine the exchange messages into a list and then pass the list itself along to the next processor.

When used with a completion option you can effectively ‘batch’ the messages into a List of Message

Not all processors accept lists of messages as inputs however it is possible to use the split component to solve this.

See: http://camel.apache.org/splitter.html

import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy

from('p6sftpd:///')
    .description('SFTP Listener')
    .aggregate(header('user'), new GroupedMessageAggregationStrategy()).completionTimeout(10000) 
    .split().message().streaming()
    .to("log:p6.sftpd")
    .routeId('SFTP_Listener')        
Here we aggregate based on the user who operates the Sftp server. After ten seconds of inactivity the events collected are split/streamed to the next processor (log in this case)

The p6cmb component supports the List of Message

If your service would rather process a list of Messages rather than and be invoked each time for a single message, the GroupedMessageAggregationStrategy can be used without the splitter component.

import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy

camel.getCtx().addRoutes(new RouteBuilder() {

    void configure() {

        from('p6sftpd:///')
            .description('SFTP Listener')
            .aggregate(header('user'), new GroupedMessageAggregationStrategy()).completionTimeout(10000) 
            .to('p6cmb://scripts?platform6.request.action=execute&id=BatchSFTPListener')
            .routeId('SFTP_Listener')
    }
})

Here a JSON List of Map[:] is passed to the scripts service in the common message attachment p6AggregateList Using Groovy this can quickly be decoded and used as follows:

import groovy.json.JsonSlurper

def listOfMaps = new JsonSlurper().parseText(pipeline.get('p6AggregateList'))

listOfMaps.each { mp ->
    println "User: " + mp['user']
    println "Action: " + mp['action']
    println "Path: " + mp['path']
}