Splitter - 6.3

Talend ESB Mediation Developer Guide

EnrichVersion
6.3
EnrichProdName
Talend Data Fabric
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for ESB
Talend Real-Time Big Data Platform
task
Design and Development
EnrichPlatform
Talend ESB

The Splitter from the EIP patterns allows you split a message into a number of pieces and process them individually

You need to specify a Splitter as split(). In earlier versions of Camel, you need to use splitter().

Options:

Name

Default Value

Description

strategyRef Refers to an AggregationStrategy to be used to assemble the replies from the sub-messages, into a single outgoing message from the Splitter. See the defaults described below in What the Splitter returns. From Camel 2.12 onwards you can also use a POJO as the AggregationStrategy, see the Aggregate page for more details.
strategyMethodName 

Camel 2.12: This option can be used to explicit declare the method name to use, when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

strategyMethodAllowNullfalse

Camel 2.12: If this option is false then the aggregate method is not used for the very first splitted message. If this option is true then null values is used as the oldExchange (for the very first message splitted), when using POJOs as the AggregationStrategy. See the Aggregate page for more details.

parallelProcessingfalseIf enables then processing the sub-messages occurs concurrently. Note the caller thread will still wait until all sub-messages has been fully processed, before it continues.
parallelAggregatefalse

Camel 2.14: If enabled then the aggregate method on AggregationStrategy can be called concurrently. Notice that this would require the implementation of AggregationStrategy to be implemented as thread-safe. By default this is false meaning that Camel synchronizes the call to the aggregate method. Though in some use-cases this can be used to archive higher performance when the AggregationStrategy is implemented as thread-safe.

executorServiceRef Refers to a custom Thread Pool to be used for parallel processing. Notice if you set this option, then parallel processing is automatically implied, and you do not have to enable that option as well.
stopOnExceptionfalseWhether or not to stop continue processing immediately when an exception occurred. If disable, then Camel continue splitting and process the sub-messages regardless if one of them failed. You can deal with exceptions in the AggregationStrategy class where you have full control how to handle that.
streamingfalseIf enabled then Camel will split in a streaming fashion, which means it will split the input message in chunks. This reduces the memory overhead. For example if you split big messages its recommended to enable streaming. If streaming is enabled then the sub-message replies will be aggregated out-of-order, eg in the order they come back. If disabled, Camel will process sub-message replies in the same order as they where splitted.
timeout Sets a total timeout specified in millis. If the Recipient List hasn't been able to split and process all replies within the given timeframe, then the timeout triggers and the Splitter breaks out and continues. Notice if you provide a TimeoutAwareAggregationStrategy then the timeout method is invoked before breaking out. If the timeout is reached with running tasks still remaining, certain tasks for which it is difficult for Camel to shut down in a graceful manner may continue to run. So use this option with a bit of care.
onPrepareRef Refers to a custom Processor to prepare the sub-message of the Exchange, before its processed. This allows you to do any custom logic, such as deep-cloning the message payload if that's needed etc.
shareUnitOfWorkfalseWhether the unit of work should be shared. See further below for more details.

Exchange Properties:

Property

Type

Description

CamelSplitIndexintA split counter that increases for each Exchange being split. The counter starts from 0.
CamelSplitSizeintThe total number of Exchanges that was splitted. This header is not applied for stream based splitting. This header is also set in stream based splitting, but only on the completed Exchange.
CamelSplitCompletebooleanWhether or not this Exchange is the last.

Note

The Splitter will by default return the last splitted message.

The Splitter will by default return the original input message.

For all versions You can override this by suppling your own strategy as an AggregationStrategy. See the Camel Website for the split aggregate request/reply sample. It uses the same strategy the Aggregator supports. This Splitter can be viewed as having a build in light weight Aggregator.

Example

The following example shows how to take a request from the queue:a endpoint the split it into pieces using an Expression, then forward each piece to queue:b

Using the Fluent Builders

RouteBuilder builder = new RouteBuilder() {
    public void configure() {
        errorHandler(deadLetterChannel("mock:error"));

        from("seda:a")
            .split(body(String.class).tokenize("\n"))
                .to("seda:b");
    }
};

The splitter can use any Expression language so you could use any of the Languages Supported such as XPath, XQuery, SQL or one of the Scripting Languages to perform the split. e.g.

from("activemq:my.queue").split(xpath("//foo/bar")).convertBodyTo(
    String.class).to("file://some/directory")

Using the Spring XML Extensions

<camelContext errorHandlerRef="errorHandler" 
    xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="seda:a"/>
        <split>
            <xpath>/invoice/lineItems</xpath>
            <to uri="seda:b"/>
        </split>
    </route>
</camelContext>

For further examples of this pattern in use see this JUnit test case.

Using Tokenizer from Spring XML Extensions

You can use the tokenizer expression in the Spring DSL to split bodies or headers using a token. This is a common use-case, so we provided a special tokenizer tag for this. In the sample below we split the body using a @ as separator. You can of course use comma or space or even a regex pattern, also set regex=true.

<camelContext xmlns="http://camel.apache.org/schema/spring">
    <route>
        <from uri="direct:start"/>
        <split>
            <tokenize token="@"/>
            <to uri="mock:result"/>
        </split>
    </route>
</camelContext>

Exchange properties

The following properties is set on each Exchange that is split:

header

type

description

CamelSplitIndex

int

A split counter that increases for each Exchange being split. The counter starts from 0.

CamelSplitSize

int

The total number of Exchanges that was splitted. This header is not applied for stream based splitting.

CamelSplitComplete

boolean

Whether or not this Exchange is the last.

Splitting a Collection, Iterator or Array

A common use case is to split a Collection, Iterator or Array from the message. In the sample below we simply use an Expression to identify the value to split.

from("direct:splitUsingBody").split(body()).to("mock:result");
 
from("direct:splitUsingHeader").split(header("foo")).to("mock:result"); 

In Spring XML you can use the Simple language to identify the value to split.

<split>
   <simple>${body}</simple>
   <to uri="mock:result"/>
</split>
 
<split>
   <simple>${header.foo}</simple>
   <to uri="mock:result"/>
</split> 

Parallel execution of distinct 'parts'

If you want to execute all parts in parallel you can use special notation of split() with two arguments, where the second one is a boolean flag if processing should be parallel. e.g.

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); 
from("activemq:my.queue").split(xPathBuilder, true).to(
   "activemq:my.parts");

In the boolean option has been refactored into a builder method parallelProcessing so it is easier to understand what the route does when we use a method instead of true|false.

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); 
from("activemq:my.queue").split(xPathBuilder).parallelProcessing().
    to("activemq:my.parts");

Stream based

The XPath engine in Java and XQuery will load the entire XML content into memory. And thus they are not well suited for very big XML payloads. Instead you can use a custom Expression which will iterate the XML payload in a streamed fashion. Alternatively, you can use the Tokenizer language which supports this when you supply the start and end tokens. From Camel 2.14, you can use the XMLTokenizer language which is specifically provided for tokenizing XML documents.

You can split streams by enabling the streaming mode using the streaming builder method.

from("direct:streaming").split(body().tokenize(",")).streaming().
    to("activemq:my.parts");

You can also supply your custom splitter to use with streaming like this:

import static org.apache.camel.builder.ExpressionBuilder.beanExpression;
from("direct:streaming")
     .split(beanExpression(new MyCustomIteratorFactory(),  "iterator"))
     .streaming().to("activemq:my.parts")

Streaming big XML payloads using Tokenizer language

There are two tokenizers that can be used to tokenize an XML payload. The first tokenizer uses the same principle as in the text tokenizer to scan the XML payload and extract a sequence of tokens.

If you have a big XML payload, from a file source, and want to split it in streaming mode, then you can use the Tokenizer language with start/end tokens to do this with low memory footprint. (Note the Camel StAX component can also be used to split big XML files in a streaming mode.) See the Camel Website for an example.

Specifying a custom aggregation strategy

This is specified similar to the Aggregator.

Specifying a custom ThreadPoolExecutor

You can customize the underlying ThreadPoolExecutor used in the parallel splitter. In the Java DSL try something like this:

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); 

ExecutorService pool = ...

from("activemq:my.queue")
   .split(xPathBuilder).parallelProcessing().executorService(pool)
   .to("activemq:my.parts");

Using a Pojo to do the splitting

As the Splitter can use any Expression to do the actual splitting we leverage this fact and use a method expression to invoke a Camel Component: Bean to get the splitted parts. The Camel Component: Bean should return a value that is iterable such as: java.util.Collection, java.util.Iterator or an array.

In the route we define the Expression as a method call to invoke our Camel Component: Bean that we have registered with the id mySplitterBean in the Registry.

from("direct:body")
    // here we use a POJO bean mySplitterBean to do split payload
    .split().method("mySplitterBean", "splitBody")
    .to("mock:result");
from("direct:message")
    // here we use a POJO bean mySplitterBean to do split message 
    // with a certain header value
    .split().method("mySplitterBean", "splitMessage")
    .to("mock:result");

And the logic for our Camel Component: Bean is as simple as. Notice we use Camel Bean Binding to pass in the message body as a String object.

public class MySplitterBean {

    /**
     * The split body method returns something that is iteratable 
     * such as a java.util.List.
     *
     * @param body the payload of the incoming message
     * @return a list containing each part splitted
     */
    public List<String> splitBody(String body) {
        // since this is based on an unit test you can of cause
        // use different logic for splitting as Camel have out
        // of the box support for splitting a String based on comma
        // but this is for show and tell, since this is Java code
        // you have the full power how you like to split your messages
        List<String> answer = new ArrayList<String>();
        String[] parts = body.split(",");
        for (String part : parts) {
            answer.add(part);
        }
        return answer;
    }
    
    /**
     * The split message method returns something that is iteratable 
     * such as a java.util.List.
     *
     * @param header the header of the incoming message
     * @param body the payload of the incoming message
     * @return a list containing each part splitted
     */
    public List<Message> splitMessage(@Header(value = "user") 
        String header, @Body String body) {
        // we can leverage the Parameter Binding Annotations  
        // http://camel.apache.org/parameter-binding-annotations.html
        // to access the message header and body at same time, 
        // then create the message that we want, splitter will
        // take care rest of them.
        // *NOTE* this feature requires Camel version >= 1.6.1
        List<Message> answer = new ArrayList<Message>();
        String[] parts = header.split(",");
        for (String part : parts) {
            DefaultMessage message = new DefaultMessage();
            message.setHeader("user", part);
            message.setBody(body);
            answer.add(message);
        }
        return answer;
    }
}

Stop processing in case of exceptions

The Splitter will by default continue to process the entire Exchange even in case of one of the splitted message will throw an exception during routing. For example if you have an Exchange with 1000 rows that you split and route each sub message. During processing of these sub messages an exception is thrown at the 17th. What Camel does by default is to process the remainder 983 messages. You have the chance to remedy or handle this in the AggregationStrategy.

But sometimes you just want Camel to stop and let the exception be propagated back, and let the Camel error handler handle it. You can do this by specifying that it should stop in case of an exception occurred. This is done by the stopOnException option as shown below:

from("direct:start")
        .split(body().tokenize(",")).stopOnException()
            .process(new MyProcessor())
            .to("mock:split");

And using XML DSL you specify it as follows:

<route>
   <from uri="direct:start"/>
   <split stopOnException="true">
      <tokenize token=","/>
      <process ref="myProcessor"/>
      <to uri="mock:split"/>
   </split>
</route>

Sharing Unit of Work

The Splitter will by default not share a unit of work between the parent exchange and each splitted exchange. This means each sub exchange has its own individual unit of work. For example you may have an use case, where you want to split a big message, and you want to regard that process as an atomic isolated operation that either is a success or failure. In case of a failure you want that big message to be moved into a dead letter queue. To support this use case, you would have to share the unit of work on the Splitter. See the online example maintained on the Apache Camel site for more information.

XPathBuilder xPathBuilder = new XPathBuilder("//foo/bar"); 
from("activemq:my.queue").split(xPathBuilder).parallelProcessing().
    to("activemq:my.parts");