Stream Resequencing - 6.3

Talend ESB Mediation Developer Guide

EnrichVersion
6.3
EnrichProdName
Talend Data Fabric
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for ESB
Talend Real-Time Big Data Platform
task
Design and Development
EnrichPlatform
Talend ESB

The next example shows how to use the stream-processing resequencer. Messages are re-ordered based on their sequence numbers given by a seqnum header using gap detection and timeouts on the level of individual messages.

Using the Fluent Builders

from("direct:start").resequence(header("seqnum")).
   stream().to("mock:result");

The stream-processing resequencer can be further configured via the capacity() and timeout() methods.

from("direct:start")
   .resequence(header("seqnum")).stream().capacity(5000).timeout(4000L)
   .to("mock:result")

This sets the resequencer's capacity to 5000 and the timeout to 4000 ms (by default, the capacity is 1000 and the timeout is 1000 ms). Alternatively, you can provide a configuration object.

from("direct:start")
   .resequence(header("seqnum")).stream(
   new StreamResequencerConfig(5000, 4000L)).to("mock:result")

The stream-processing resequencer algorithm is based on the detection of gaps in a message stream rather than on a fixed batch size. Gap detection in combination with timeouts removes the constraint of having to know the number of messages of a sequence (i.e. the batch size) in advance. Messages must contain a unique sequence number for which a predecessor and a successor is known. For example a message with the sequence number 3 has a predecessor message with the sequence number 2 and a successor message with the sequence number 4. The message sequence 2,3,5 has a gap because the sucessor of 3 is missing. The resequencer therefore has to retain message 5 until message 4 arrives (or a timeout occurs).

If the maximum time difference between messages (with successor/predecessor relationship with respect to the sequence number) in a message stream is known, then the resequencer's timeout parameter should be set to this value. In this case it is guaranteed that all messages of a stream are delivered in correct order to the next processor. The lower the timeout value is compared to the out-of-sequence time difference the higher is the probability for out-of-sequence messages delivered by this resequencer. Large timeout values should be supported by sufficiently high capacity values. The capacity parameter is used to prevent the resequencer from running out of memory.

By default, the stream resequencer expects long sequence numbers but other sequence numbers types can be supported as well by providing a custom expression.

public class MyFileNameExpression implements Expression {
    
    public String getFileName(Exchange exchange) {
        return exchange.getIn().getBody(String.class);
    }
    
    public Object evaluate(Exchange exchange) {
        // parse the file name with YYYYMMDD-DNNN pattern
        String fileName = getFileName(exchange);
        String[] files = fileName.split("-D");
        Long answer = Long.parseLong(files[0]) * 1000 + 
           Long.parseLong(files[1]);
        return answer;
    }
    

    public <T> T evaluate(Exchange exchange, Class<T> type) {
        Object result = evaluate(exchange);
        return exchange.getContext().getTypeConverter().convertTo(type, 
           result);
    }

}

or custom comparator via the comparator() method

ExpressionResultComparator<Exchange> comparator = new MyComparator();
from("direct:start")
    .resequence(header("seqnum")).stream().comparator(comparator)
    .to("mock:result");

or via a StreamResequencerConfig object.

ExpressionResultComparator<Exchange> comparator = new MyComparator();
StreamResequencerConfig config = new StreamResequencerConfig(100, 1000L, 
comparator);

from("direct:start")
    .resequence(header("seqnum")).stream(config)
    .to("mock:result");

Using the Spring XML Extensions

<camelContext id="camel" 
  xmlns="http://camel.apache.org/schema/spring">
  <route>
    <from uri="direct:start"/>
    <resequence>
      <simple>in.header.seqnum</simple>
      <to uri="mock:result" />
      <stream-config capacity="5000" timeout="4000"/>
    </resequence>
  </route>
</camelContext>