Use cases - 6.3

Talend ESB Mediation Developer Guide

EnrichVersion
6.3
EnrichProdName
Talend Data Fabric
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for ESB
Talend Real-Time Big Data Platform
task
Design and Development
EnrichPlatform
Talend ESB

Reading from a znode

The following snippet will read the data from the znode '/somepath/somenode/' provided that it already exists. The data retrieved will be placed into an exchange and passed onto the rest of the route.

from("zookeeper://localhost:39913/somepath/somenode").to("mock:result");

If the node does not yet exist then a flag can be supplied to have the endpoint await its creation:

from("zookeeper://localhost:39913/somepath/somenode?awaitCreation=true").
   to("mock:result");

When data is read due to a WatchedEvent received from the ZooKeeper ensemble, the CamelZookeeperEventType header will hold the ZooKeeper's EventType value from that WatchedEvent. If the data is read initially (not triggered by a WatchedEvent) the CamelZookeeperEventType header will not be set.

Writing to a znode

The following snippet will write the payload of the exchange into the znode at '/somepath/somenode/' provided that it already exists:

from("direct:write-to-znode")
   .to("zookeeper://localhost:39913/somepath/somenode");

For flexibility, the endpoint allows the target znode to be specified dynamically as a message header. If a header keyed by the string 'CamelZooKeeperNode' is present then the value of the header will be used as the path to the znode on the server. For instance using the same route definition above, the following code snippet will write the data not to '/somepath/somenode' but to the path from the header '/somepath/someothernode'

Exchange e = createExchangeWithBody(testPayload);
template.sendBodyAndHeader("direct:write-to-znode", 
   e, "CamelZooKeeperNode", "/somepath/someothernode");

To also create the node if it does not exist the 'create' option should be used.

from("direct:create-and-write-to-znode").
   to("zookeeper://localhost:39913/somepath/somenode?create=true");

Starting with Camel 2.11 it will also be possible to delete a node using the header 'CamelZookeeperOperation' by setting it to 'DELETE':

from("direct:delete-znode")
    .setHeader(ZooKeeperMessage.ZOOKEEPER_OPERATION, constant("DELETE"))
    .to("zookeeper://localhost:39913/somepath/somenode"); 

Or equivalently:

<route> 
    <from uri="direct:delete-znode" /> 
    <setHeader headerName="CamelZookeeperOperation"> 
        <constant>DELETE</constant> 
    </setHeader> 
    <to uri="zookeeper://localhost:39913/somepath/somenode" /> 
</route> 

ZooKeeper nodes can have different types, they can be 'Ephemeral' or 'Persistent' and 'Sequenced' or 'Unsequenced'. Information of each type is described on the ZooKeeper site. By default endpoints will create unsequenced, ephemeral nodes, but the type can be easily manipulated via a uri config parameter or via a special message header. The values expected for the create mode are simply the names from the CreateMode enumeration

  • PERSISTENT
  • PERSISTENT_SEQUENTIAL
  • EPHEMERAL
  • EPHEMERAL_SEQUENTIAL

For example to create a persistent znode via the URI config:

from("direct:create-and-write-to-persistent-znode")
   .to("zookeeper://localhost:39913/somepath/somenode?create=true   //
&createMode=PERSISTENT");

Or using the header 'CamelZookeeperCreateMode'

Exchange e = createExchangeWithBody(testPayload);
template.sendBodyAndHeader("direct:create-and-write-to-persistent-znode", 
   e, "CamelZooKeeperCreateMode", "PERSISTENT");