Using a custom scheduler - 6.3

Talend ESB Mediation Developer Guide

EnrichVersion
6.3
EnrichProdName
Talend Data Fabric
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for ESB
Talend Real-Time Big Data Platform
task
Design and Development
EnrichPlatform
Talend ESB

Available as of Camel 2.12:

The SPI interface org.apache.camel.spi.ScheduledPollConsumerScheduler allows to implement a custom scheduler to control when the Polling Consumer runs. The default implementation is based on the JDKs ScheduledExecutorService with a single thread in the thread pool. There is a CRON based implementation in the Quartz2, and Spring components.

For an example of developing and using a custom scheduler, see the unit test org.apache.camel.component.file.FileConsumerCustomSchedulerTest from the source code in camel-core.

Controlling the error handling using PollingConsumerPollStrategy

org.apache.camel.PollingConsumerPollStrategy is a pluggable strategy that you can configure on the ScheduledPollConsumer. The default implementation org.apache.camel.impl.DefaultPollingConsumerPollStrategy will log the caused exception at WARN level and then ignore this issue.

The strategy interface provides the following 3 methods

  • begin

    • void begin(Consumer consumer, Endpoint endpoint)

  • begin (Camel 2.3)

    • boolean begin(Consumer consumer, Endpoint endpoint)

  • commit

    • void commit(Consumer consumer, Endpoint endpoint)

  • commit (Camel 2.6)

    • void commit(Consumer consumer, Endpoint endpoint, int polledMessages

  • rollback

    • boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, Exception e) throws Exception

The begin method returns a boolean which indicates whether or not to skipping polling. So you can implement your custom logic and return false if you do not want to poll this time.

The commit method has an additional parameter containing the number of message that was actually polled. For example if there was no messages polled, the value would be zero, and you can react accordingly.

The most interesting is the rollback as it allows you do handle the caused exception and decide what to do.

For instance if we want to provide a retry feature to a scheduled consumer we can implement the PollingConsumerPollStrategy method and put the retry logic in the rollback method. Let's just retry up until 3 times:

public boolean rollback(Consumer consumer, Endpoint endpoint, 
    int retryCounter, Exception e) throws Exception {
    if (retryCounter < 3) {
        // return true to tell Camel that it 
        // should retry the poll immediately
        return true;
    }
    // okay we give up do not retry anymore
    return false;
}

Notice that we are given the Consumer as a parameter. We could use this to restart the consumer as we can invoke stop and start:

// error occurred let's restart the consumer, 
// that could maybe resolve the issue
consumer.stop();
consumer.start();

Notice: If you implement the begin operation make sure to avoid throwing exceptions as in such a case the poll operation is not invoked and Camel will invoke the rollback directly.

Configuring an Endpoint to use PollingConsumerPollStrategy

To configure an Endpoint to use a custom PollingConsumerPollStrategy you use the option pollStrategy. For example in the file consumer below we want to use our custom strategy defined in the Registry with the bean id myPoll :

from("file://inbox/?pollStrategy=#myPoll").to("activemq:queue:inbox")