MongoDB offers a mechanism to instantaneously consume ongoing data from a collection, by
keeping the cursor open just like the
tail -f command of *nix systems. This
mechanism is significantly more efficient than a scheduled poll, due to the fact that the
server pushes new data to the client as it becomes available, rather than making the client
ping back at scheduled intervals to fetch new data. It also reduces otherwise redundant
There is only one requisite to use tailable cursors: the collection must be a "capped collection", meaning that it will only hold N objects, and when the limit is reached, MongoDB flushes old objects in the same order they were originally inserted. For more information, please refer to: http://www.mongodb.org/display/DOCS/Tailable+Cursors.
The Camel MongoDB component implements a tailable cursor consumer, making this feature available for you to use in your Camel routes. As new objects are inserted, MongoDB will push them as DBObjects in natural order to your tailable cursor consumer, who will transform them to an Exchange and will trigger your route logic.
To turn a cursor into a tailable cursor, a few special flags are to be signalled to
MongoDB when first generating the cursor. Once created, the cursor will then stay open and
will block upon calling the
DBCursor.next() method until new data arrives.
However, the MongoDB server reserves itself the right to kill your cursor if new data
doesn't appear after an indeterminate period. If you are interested to continue consuming
new data, you have to regenerate the cursor. And to do so, you will have to remember the
position where you left off or else you will start consuming from the top again.
The Camel MongoDB tailable cursor consumer takes care of all these tasks for you. You will just need to provide the key to some field in your data of increasing nature, which will act as a marker to position your cursor every time it is regenerated, e.g. a timestamp, a sequential ID, etc. It can be of any datatype supported by MongoDB. Date, Strings and Integers are found to work well. We call this mechanism "tail tracking" in the context of this component.
The consumer will remember the last value of this field and whenever the cursor is to
be regenerated, it will run the query with a filter like:
lastValue, so that only unread data is consumed.
Setting the increasing field: Set the key of the
increasing field on the endpoint URI
tailTrackingIncreasingField option. In
Camel 2.10, it must be a top-level field in your data, as nested navigation for this field
is not yet supported. That is, the "timestamp" field is okay, but "nested.timestamp" will
not work. Please open a ticket in the Camel JIRA if you do require support for nested
Cursor regeneration delay: One thing to note is that
if new data is not already available upon initialisation, MongoDB will kill the cursor
instantly. Since we don't want to overwhelm the server in this case, a
cursorRegenerationDelay option has been introduced (with a default value of
1000ms.), which you can modify to suit your needs.
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime") .id("tailableCursorConsumer1") .autoStartup(false) .to("mock:test");
The above route will consume from the "flights.cancellations" capped collection, using "departureTime" as the increasing field, with a default regeneration cursor delay of 1000ms.
Standard tail tracking is volatile and the last value is only kept in memory. However, in practice you will need to restart your Camel container every now and then, but your last value would then be lost and your tailable cursor consumer would start consuming from the top again, very likely sending duplicate records into your route.
To overcome this situation, you can enable the persistent tail tracking feature to keep track of the last consumed increasing value in a special collection inside your MongoDB database too. When the consumer initialises again, it will restore the last tracked value and continue as if nothing happened.
The last read value is persisted on two occasions: every time the cursor is regenerated and when the consumer shuts down. We may consider persisting at regular intervals too in the future (flush every 5 seconds) for added robustness if the demand is there. To request this feature, please open a ticket in the Camel JIRA.
To enable this function, set at least the following options on the endpoint URI:
persistentIdoption to a unique identifier for this consumer, so that the same collection can be reused across many consumers
Additionally, you can set the
tailTrackField options to customise
where the runtime information will be stored. Refer to the endpoint options table at the
top of this page for descriptions of each option.
For example, the following route will consume from the "flights.cancellations" capped
collection, using "departureTime" as the increasing field, with a default regeneration
cursor delay of 1000ms, with persistent tail tracking turned on, and persisting under the
"cancellationsTracker" id on the "flights.camelTailTracking", storing the last processed
value under the "lastTrackingValue" field (
lastTrackingValue are defaults).
from("mongodb:myDb?database=flights&collection=cancellations &tailTrackIncreasingField=departureTime&persistentTailTracking=true" + "&persistentId=cancellationsTracker") .id("tailableCursorConsumer2") .autoStartup(false) .to("mock:test");
Below is another example identical to the one above, but where the persistent tail tracking runtime information will be stored under the "trackers.camelTrackers" collection, in the "lastProcessedDepartureTime" field:
from("mongodb:myDb?database=flights&collection=cancellations&tailTrackIncreasingField=departureTime &persistentTailTracking=true" + "&persistentId=cancellationsTracker&tailTrackDb=trackers&tailTrackCollection= camelTrackers" + "&tailTrackField=lastProcessedDepartureTime") .id("tailableCursorConsumer3") .autoStartup(false) .to("mock:test");