Advanced Topics

Route Concurrency

Camel offers route concurrency support in a number of places. Using their constructs can also improve the application performance.

The following is a summary of a number of concurrency constructs available in Camel. However, I would strongly advise reading Chapter 10 of the Camel In Action for more details.

Parallel processing EIPs

The following Camel EIP implementations support parallel processing:

  • multicast
  • recipient list
  • splitter
  • delayer
  • wiretap
  • error handler

If you are going to enable parallel processing for those, it would be even better if you also provide a custom thread pool specifically tuned for your use case rather than relying on Camel”s default thread pool profile.

Throttler EIP

The throttler allows you to ensure that a specific endpoint does not get overloaded, or that we don”t exceed an agreed SLA with some external service.

from("seda:a")
    .throttle(3)
    .timePeriodMillis(10000)
    .to("log:result", "mock:result")

So the above example will throttle messages all messages received on seda:a before being sent to mock:result ensuring that a maximum of 3 messages are sent in any 10 second window. Note that typically you would often use the default time period of a second. So to throttle requests at 100 requests per second between two endpoints it would look more like this:

from("seda:a")
    .throttle(100)
    .to("seda:b")

Threads DSL
 construct

Some Camel endpoints (such as the File consumer) are single threaded by design and cannot be parallelized at endpoint level.

In case of File consumer, a single thread picks a file at a time and processes it through the route until it reaches the end of the route and then the consumer thread picks the next file.

This is when Camel threads() construct can be useful. As per the example below, File consumer thread can pick a file and pass it to a thread from the Threads construct for further processing.

Then the File consumer can pick another file without waiting for the previous Exchange to complete processing fully.

Seda component

Seda is another way to achieve parallelism in Camel.

The Seda component has in-memory list to accumulate incoming messages from the producer and concurrent consumers to process those incoming request in parallel by multiple threads.

Asynchronous redelivery/retry

If you are using an error handler with a redelivery policy as part of the routing process, you can configure it to be asynchronous and do the re-deliveries in a separate thread.

That will use a separate thread pool for the redelivery not block the main request processing thread while waiting.

If you need long delayed re-deliveries, it might be a better approach to use JMS broker redelivery (that is different from consumer redelivery BTW) where the re-deliveries will be persisted on the message broker and not kept in Camel application memory.

Another benefit of this mechanism is that the re-deliveries will survive platform6 restart and also play nicely when the application is clustered

Examples

Parallel processing

// import java.util.concurrent.Executors

import java.util.concurrent.ExecutorService
import org.apache.camel.builder.ThreadPoolBuilder

def executor = new ThreadPoolBuilder(camel.getCtx()).poolSize(10).maxPoolSize(150).maxQueueSize(150).build("MyCustomPool")
// Alternative way to create an executor
// def executor = Executors.newFixedThreadPool(4)

camel.getCtx().addRoutes(new RouteBuilder() {
    void configure() {
        from("timer://myTimer?period=10s")
            .routeId("MyPooledRoute")
            .description("Master Timer")
            .multicast()        
            .parallelProcessing()
            .executorService(executor)
            .to("direct:a", "direct:b", "direct:c", "direct:d")

        from("direct:a")
            .routeId("ConsumerA")
            .description("My Consumer A")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")

        from("direct:b")
            .routeId("ConsumerB")
            .description("My Consumer B")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")

        from("direct:c")
            .routeId("ConsumerC")
            .description("My Consumer C")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")

        from("direct:d")
            .routeId("ConsumerD")
            .description("My Consumer D")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")
    }
})

Threads DSL

See: https://camel.apache.org/maven/camel-2.15.0/camel-core/apidocs/org/apache/camel/model/ProcessorDefinition.html#threads()

camel.getCtx().addRoutes(new RouteBuilder() {
    void configure() {
        from("file:/opt/b2box5.data/resources/ftp.in?antInclude=*.xml&move=.processed")
            .to("file:/opt/b2box5.data/tmp")
            // Use up to ten threads to process the FileTesting script
            .threads(2,10)
            .to( "p6cmb://scripts?platform6.request.action=execute&id=FileTesting" )
            .routeId("myFile2Route")
    }
})

SEDA component

See: https://camel.apache.org/seda.html

camel.getCtx().addRoutes(new RouteBuilder() {
    void configure() {
        from("timer://myTimer?period=10s")
            .routeId("MyPooledRoute")
            .description("Master Timer")
            .to("seda:a")

        from("seda:a?concurrentConsumers=4")
            .routeId("ConsumerA")
            .description("My Consumer A")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")
    }
})

The Throttler EIP

See: https://people.apache.org/~dkulp/camel/throttler.html

camel.getCtx().addRoutes(new RouteBuilder() {
    void configure() {
        from("timer://myTimer?period=10s")
            .routeId("MyPooledRoute")
            .description("Master Timer")
            .throttle(100)
            .to("vm:a")

        from("vm:a?concurrentConsumers=4")
            .routeId("ConsumerA")
            .description("My Consumer A")
            .to("p6cmb://scripts?platform6.request.action=execute&id=Test")
    }
})