Advanced Topics
Route Concurrency¶
Camel offers route concurrency support in a number of places. Using their constructs can also improve the application performance.
The following is a summary of a number of concurrency constructs available in Camel. However, I would strongly advise reading Chapter 10 of the Camel In Action for more details.
Parallel processing EIPs¶
The following Camel EIP implementations support parallel processing:
- multicast
- recipient list
- splitter
- delayer
- wiretap
- error handler
If you are going to enable parallel processing for those, it would be even better if you also provide a custom thread pool specifically tuned for your use case rather than relying on Camel”s default thread pool profile.
Throttler EIP
The throttler allows you to ensure that a specific endpoint does not get overloaded, or that we don”t exceed an agreed SLA with some external service.
from("seda:a")
.throttle(3)
.timePeriodMillis(10000)
.to("log:result", "mock:result")
So the above example will throttle messages all messages received on seda:a
before being sent to mock:result
ensuring that a maximum of 3 messages are sent in any 10 second window.
Note that typically you would often use the default time period of a second.
So to throttle requests at 100 requests per second between two endpoints it would look more like this:
from("seda:a")
.throttle(100)
.to("seda:b")
Threads DSL construct¶
Some Camel endpoints (such as the File consumer) are single threaded by design and cannot be parallelized at endpoint level.
In case of File consumer, a single thread picks a file at a time and processes it through the route until it reaches the end of the route and then the consumer thread picks the next file.
This is when Camel threads()
construct can be useful.
As per the example below, File consumer thread can pick a file and pass it to a thread from the Threads construct for further processing.
Then the File consumer can pick another file without waiting for the previous Exchange to complete processing fully.
Seda component¶
Seda is another way to achieve parallelism in Camel.
The Seda component has in-memory list to accumulate incoming messages from the producer and concurrent consumers to process those incoming request in parallel by multiple threads.
Asynchronous redelivery/retry¶
If you are using an error handler with a redelivery policy as part of the routing process, you can configure it to be asynchronous and do the re-deliveries in a separate thread.
That will use a separate thread pool for the redelivery not block the main request processing thread while waiting.
If you need long delayed re-deliveries, it might be a better approach to use JMS broker redelivery (that is different from consumer redelivery BTW) where the re-deliveries will be persisted on the message broker and not kept in Camel application memory.
Another benefit of this mechanism is that the re-deliveries will survive platform6 restart and also play nicely when the application is clustered
Examples¶
Parallel processing¶
// import java.util.concurrent.Executors
import java.util.concurrent.ExecutorService
import org.apache.camel.builder.ThreadPoolBuilder
def executor = new ThreadPoolBuilder(camel.getCtx()).poolSize(10).maxPoolSize(150).maxQueueSize(150).build("MyCustomPool")
// Alternative way to create an executor
// def executor = Executors.newFixedThreadPool(4)
camel.getCtx().addRoutes(new RouteBuilder() {
void configure() {
from("timer://myTimer?period=10s")
.routeId("MyPooledRoute")
.description("Master Timer")
.multicast()
.parallelProcessing()
.executorService(executor)
.to("direct:a", "direct:b", "direct:c", "direct:d")
from("direct:a")
.routeId("ConsumerA")
.description("My Consumer A")
.to("p6cmb://scripts?platform6.request.action=execute&id=Test")
from("direct:b")
.routeId("ConsumerB")
.description("My Consumer B")
.to("p6cmb://scripts?platform6.request.action=execute&id=Test")
from("direct:c")
.routeId("ConsumerC")
.description("My Consumer C")
.to("p6cmb://scripts?platform6.request.action=execute&id=Test")
from("direct:d")
.routeId("ConsumerD")
.description("My Consumer D")
.to("p6cmb://scripts?platform6.request.action=execute&id=Test")
}
})
Threads DSL¶
camel.getCtx().addRoutes(new RouteBuilder() {
void configure() {
from("file:/opt/b2box5.data/resources/ftp.in?antInclude=*.xml&move=.processed")
.to("file:/opt/b2box5.data/tmp")
// Use up to ten threads to process the FileTesting script
.threads(2,10)
.to( "p6cmb://scripts?platform6.request.action=execute&id=FileTesting" )
.routeId("myFile2Route")
}
})
SEDA component¶
See: https://camel.apache.org/seda.html
camel.getCtx().addRoutes(new RouteBuilder() {
void configure() {
from("timer://myTimer?period=10s")
.routeId("MyPooledRoute")
.description("Master Timer")
.to("seda:a")
from("seda:a?concurrentConsumers=4")
.routeId("ConsumerA")
.description("My Consumer A")
.to("p6cmb://scripts?platform6.request.action=execute&id=Test")
}
})
The Throttler EIP¶
See: https://people.apache.org/~dkulp/camel/throttler.html
camel.getCtx().addRoutes(new RouteBuilder() {
void configure() {
from("timer://myTimer?period=10s")
.routeId("MyPooledRoute")
.description("Master Timer")
.throttle(100)
.to("vm:a")
from("vm:a?concurrentConsumers=4")
.routeId("ConsumerA")
.description("My Consumer A")
.to("p6cmb://scripts?platform6.request.action=execute&id=Test")
}
})
Camel Aggregation & Strategies¶
So what are AggregationStrategy’s anyway? Simple… they’re implementations of the org.apache.camel.processor.aggregate.AggregationStrategy that allow you to specify exactly how two exchanges will be merged. This specification can be as simple or as complex as you require for your use case. Maybe you just want to take the first response and ignore all others. Maybe you want to combine the XML bodies into a list and then merge a select few headers.
See: http://camel.apache.org/aggregator2.html
org.apache.camel.processor.aggregate.UseLatestAggregationStrategy¶
It’s the default strategy for most Camel EIPs that accept aggregation strategies. So if you don’t specify any strategy, this is likely the one you’re using. Basically, it takes the last exchange it receives and just uses that (ignoring any others that may have been aggregated prior). One example use case for this would be when doing an Aggregator. Perhaps you’re receiving many messages as input, but you want to buffer them (giving the user time to send in corrections/updates), and then only send the latest message to the backend after some period of inactivity
Example
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy
from('p6sftpd:///')
.description('SFTP Listener')
.aggregate(header('path'), new UseLatestAggregationStrategy()).completionTimeout(10000)
.to("log:p6.sftpd")
.routeId('SFTP_Listener')
org.apache.camel.processor.aggregate.UseOriginalAggregationStrategy¶
As the name would suggest, it “merges” two exchanges together by completely ignoring the new exchange and just taking the original.
org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy¶
This will combine the exchange messages into a list and then pass the list itself along to the next processor.
When used with a completion
option you can effectively ‘batch’ the messages into a List of Message
Not all processors accept lists of messages as inputs however it is possible to use the split
component to solve this.
See: http://camel.apache.org/splitter.html
import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy
from('p6sftpd:///')
.description('SFTP Listener')
.aggregate(header('user'), new GroupedMessageAggregationStrategy()).completionTimeout(10000)
.split().message().streaming()
.to("log:p6.sftpd")
.routeId('SFTP_Listener')
The p6cmb component supports the List of Message
If your service would rather process a list of Messages rather than and be invoked each time for a single message, the GroupedMessageAggregationStrategy can be used without the splitter component.
import org.apache.camel.processor.aggregate.GroupedMessageAggregationStrategy
camel.getCtx().addRoutes(new RouteBuilder() {
void configure() {
from('p6sftpd:///')
.description('SFTP Listener')
.aggregate(header('user'), new GroupedMessageAggregationStrategy()).completionTimeout(10000)
.to('p6cmb://scripts?platform6.request.action=execute&id=BatchSFTPListener')
.routeId('SFTP_Listener')
}
})
Here a JSON List of Map[:] is passed to the scripts service in the common message attachment p6AggregateList
Using Groovy this can quickly be decoded and used as follows:
import groovy.json.JsonSlurper
def listOfMaps = new JsonSlurper().parseText(pipeline.get('p6AggregateList'))
listOfMaps.each { mp ->
println "User: " + mp['user']
println "Action: " + mp['action']
println "Path: " + mp['path']
}