MongoDB operations - producer endpoints - 6.3

Talend ESB Mediation Developer Guide

EnrichVersion
6.3
EnrichProdName
Talend Data Fabric
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for ESB
Talend Real-Time Big Data Platform
task
Design and Development
EnrichPlatform
Talend ESB

Query operations

findById

This operation retrieves only one element from the collection whose _id field matches the content of the IN message body. The incoming object can be anything that has an equivalent to a BSON type. See http://bsonspec.org/#/specification and http://www.mongodb.org/display/DOCS/Java+Types.

from("direct:findById")
    .to("mongodb:myDb?database=flights&collection=tickets&operation&operation=
    findById")
    .to("mock:resultFindById");

This operation supports specifying a fields filter. See Specifying a fields filter.

findOneByQuery

Use this operation to retrieve just one element from the collection that matches a MongoDB query. The query object is extracted from the IN message body, i.e. it should be of type DBObject or convertible to DBObject. It can be a JSON String or a Hashmap. See #Type conversions for more info.

Example with no query (returns any object of the collection):

from("direct:findOneByQuery")
    .to("mongodb:myDb?database=flights&collection=tickets&operation&operation=
    findOneByQuery")
    .to("mock:resultFindOneByQuery");

Example with a query (returns one matching result):

from("direct:findOneByQuery")
    .setBody().constant("{ \"name\": \"Raul Kripalani\" }")
    .to("mongodb:myDb?database=flights&collection=tickets&operation&operation=
    findOneByQuery")
    .to("mock:resultFindOneByQuery");

This operation supports specifying a fields filter. See Specifying a fields filter.

findAll

The findAll operation returns all documents matching a query, or none at all, in which case all documents contained in the collection are returned. The query object is extracted from the IN message body, i.e. it should be of type DBObject or convertible to DBObject. It can be a JSON String or a Hashmap. See #Type conversions for more info.

Example with no query (returns all object in the collection):

from("direct:findAll")
    .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
    .to("mock:resultFindAll");

Example with a query (returns all matching results):

from("direct:findAll")
    .setBody().constant("{ \"name\": \"Raul Kripalani\" }")
    .to("mongodb:myDb?database=flights&collection=tickets&operation=findAll")
    .to("mock:resultFindAll");

Paging and efficient retrieval is supported via the following headers:

Header key

Quick constant

Description (extracted from MongoDB API doc)

Expected type

CamelMongoDb

NumToSkip

MongoDbConstants

.NUM_TO_SKIP

Discards a given number of elements at the beginning of the cursor.

int/Integer

CamelMongoDb

Limit

MongoDbConstants

.LIMIT

Limits the number of elements returned.

int/Integer

CamelMongoDb

BatchSize

MongoDbConstants

.BATCH_SIZE

Limits the number of elements returned in one batch. A cursor typically fetches a batch of result objects and store them locally. If batchSize is positive, it represents the size of each batch of objects retrieved. It can be adjusted to optimize performance and limit data transfer. If batchSize is negative, it will limit of number objects returned, that fit within the max batch size limit (usually 4MB), and cursor will be closed. For example if batchSize is -10, then the server will return a maximum of 10 documents and as many as can fit in 4MB, then close the cursor. Note that this feature is different from limit() in that documents must fit within a maximum size, and it removes the need to send a request to close the cursor server-side. The batch size can be changed even after a cursor is iterated, in which case the setting will apply on the next batch retrieval.

int/Integer

Additionally, you can set a sortBy criteria by putting the relevant DBObject describing your sorting in the CamelMongoDbSortBy header, quick constant: MongoDbConstants.SORT_BY.

The findAll operation will also return the following OUT headers to enable you to iterate through result pages if you are using paging:

Header key

Quick constant

Description (extracted from MongoDB API doc)

Expected type

CamelMongoDb

ResultTotalSize

MongoDbConstants

.RESULT_TOTAL_SIZE

Number of objects matching the query. This does not take limit/skip into consideration.

int/Integer

CamelMongoDb

ResultPageSize

MongoDbConstants

.RESULT_PAGE_SIZE

Number of objects matching the query. This does not take limit/skip into consideration.

int/Integer

This operation supports specifying a fields filter. See Specifying a fields filter.

Specifying a fields filter

Query operations will, by default, return the matching objects in their entirety (with all their fields). If your documents are large and you only require retrieving a subset of their fields, you can specify a field filter in all query operations, simply by setting the relevant DBObject (or type convertible to DBObject, such as a JSON String, Map, etc.) on the CamelMongoDbFieldsFilter header, constant shortcut: MongoDbConstants.FIELDS_FILTER.

Here is an example that uses MongoDB's BasicDBObjectBuilder to simplify the creation of DBObjects. It retrieves all fields except _id and boringField:

// route: 
from("direct:findAll").to("mongodb:myDb?database=flights&collection=tickets&operati
on=findAll")
DBObject fieldFilter = BasicDBObjectBuilder.start().add("_id", 
0).add("boringField", 0).get();
Object result = template.requestBodyAndHeader("direct:findAll", (Object) null, 
MongoDbConstants.FIELDS_FILTER, fieldFilter);

Create/update operations

insert

Inserts an new object into the MongoDB collection, taken from the IN message body. Type conversion is attempted to turn it into DBObject or a List.

Two modes are supported: single insert and multiple insert. For multiple insert, the endpoint will expect a List, Array or Collections of objects of any type, as long as they are - or can be converted to - DBObject. All objects are inserted at once. The endpoint will intelligently decide which backend operation to invoke (single or multiple insert) depending on the input.

Example:

from("direct:insert")
    .to("mongodb:myDb?database=flights&collection=tickets&operation=insert");

The operation will return a WriteResult, and depending on the WriteConcern or the value of the invokeGetLastError option, getLastError() would have been called already or not. If you want to access the ultimate result of the write operation, you need to retrieve the CommandResult by calling getLastError() or getCachedLastError() on the WriteResult. Then you can verify the result by calling CommandResult.ok(), CommandResult.getErrorMessage() and/or CommandResult.getException().

Note that the new object's _id must be unique in the collection. If you don't specify the value, MongoDB will automatically generate one for you. But if you do specify it and it is not unique, the insert operation will fail (and for Camel to notice, you will need to enable invokeGetLastError or set a WriteConcern that waits for the write result).

This is not a limitation of the component, but it is how things work in MongoDB for higher throughput. If you are using a custom _id, you are expected to ensure at the application level that is unique (and this is a good practice too).

save

The save operation is equivalent to an upsert (UPdate, inSERT) operation, where the record will be updated, and if it doesn't exist, it will be inserted, all in one atomic operation. MongoDB will perform the matching based on the _id field.

Beware that in case of an update, the object is replaced entirely and the usage of MongoDB's $modifiers is not permitted. Therefore, if you want to manipulate the object if it already exists, you have two options:

  1. perform a query to retrieve the entire object first along with all its fields (may not be efficient), alter it inside Camel and then save it.

  2. use the update operation with $modifiers, which will execute the update at the server-side instead. You can enable the upsert flag, in which case if an insert is required, MongoDB will apply the $modifiers to the filter query object and insert the result.

For example:

from("direct:insert")
    .to("mongodb:myDb?database=flights&collection=tickets&operation=save");
update

Update one or multiple records on the collection. Requires a List<DBObject> as the IN message body containing exactly 2 elements:

  • Element 1 (index 0) => filter query => determines what objects will be affected, same as a typical query object

  • Element 2 (index 1) => update rules => how matched objects will be updated. All modifier operations from MongoDB are supported.

By default, MongoDB will only update 1 object even if multiple objects match the filter query. To instruct MongoDB to update all matching records, set the CamelMongoDbMultiUpdate IN message header to true.

A header with key CamelMongoDbRecordsAffected will be returned (MongoDbConstants.RECORDS_AFFECTED constant) with the number of records updated (copied from WriteResult.getN()).

Supports the following IN message headers:

Header key

Quick constant

Description (extracted from MongoDB API doc)

Expected type

CamelMongoDbMultiUpdate

MongoDbConstants

.MULTIUPDATE

If the update should be applied to all objects matching. See http://www.mongodb.org/display/DOCS/Atomic+Operations

boolean/Boolean

CamelMongoDbUpsert

MongoDbConstants

.UPSERT

If the database should create the element if it does not exist

boolean/Boolean

For example, the following will update all records whose filterField field equals true by setting the value of the "scientist" field to "Darwin":

// route: 
from("direct:update").to("mongodb:myDb?database=science&collection=notableScientist
s&operation=update");
DBObject filterField = new BasicDBObject("filterField", true);
DBObject updateObj = new BasicDBObject("$set", new BasicDBObject("scientist", 
"Darwin"));
Object result = template.requestBodyAndHeader("direct:update", new Object[] 
{filterField, updateObj}, MongoDbConstants.MULTIUPDATE, true);

Delete operations

remove

Remove matching records from the collection. The IN message body will act as the removal filter query, and is expected to be of type DBObject or a type convertible to it.

The following example will remove all objects whose field 'conditionField' equals true, in the science database, notableScientists collection:

// route: 
from("direct:remove").to("mongodb:myDb?database=science&collection=notableScientist
s&operation=remove");
DBObject conditionField = new BasicDBObject("conditionField", true);
Object result = template.requestBody("direct:remove", conditionField);

A header with key CamelMongoDbRecordsAffected is returned (MongoDbConstants.RECORDS_AFFECTED constant) with type int, containing the number of records deleted (copied from WriteResult.getN()).

Other operations

count

Returns the total number of objects in a collection, returning a Long as the OUT message body.

The following example will count the number of records in the "dynamicCollectionName" collection. Notice how dynamicity is enabled, and as a result, the operation will not run against the "notableScientists" collection, but against the "dynamicCollectionName" collection.

// from("direct:count").to("mongodb:myDb?database=tickets&collection=flights&operation
=count&dynamicity=true");
Long result = template.requestBodyAndHeader("direct:count", "irrelevantBody", 
MongoDbConstants.COLLECTION, "dynamicCollectionName");
assertTrue("Result is not of type Long", result instanceof Long);
getDbStats

Equivalent of running the db.stats() command in the MongoDB shell, which displays useful statistic figures about the database.

For example:

> db.stats();
{
    "db" : "test",
    "collections" : 7,
    "objects" : 719,
    "avgObjSize" : 59.73296244784423,
    "dataSize" : 42948,
    "storageSize" : 1000058880,
    "numExtents" : 9,
    "indexes" : 4,
    "indexSize" : 32704,
    "fileSize" : 1275068416,
    "nsSizeMB" : 16,
    "ok" : 1
}

Usage example:

// 
from("direct:getDbStats").to("mongodb:myDb?database=flights&collection=tickets&oper
ation=getDbStats");
Object result = template.requestBody("direct:getDbStats", "irrelevantBody");
assertTrue("Result is not of type DBObject", result instanceof DBObject);

The operation will return a data structure similar to the one displayed in the shell, in the form of a DBObject in the OUT message body.

getColStats

Equivalent of running the db.collection.stats() command in the MongoDB shell, which displays useful statistic figures about the collection. For example:

For example:

> db.camelTest.stats();
{
    "ns" : "test.camelTest",
    "count" : 100,
    "size" : 5792,
    "avgObjSize" : 57.92,
    "storageSize" : 20480,
    "numExtents" : 2,
    "nindexes" : 1,
    "lastExtentSize" : 16384,
    "paddingFactor" : 1,
    "flags" : 1,
    "totalIndexSize" : 8176,
    "indexSizes" : {
        "_id_" : 8176
    },
    "ok" : 1
}

Usage example:

// 
from("direct:getColStats").to("mongodb:myDb?database=flights&collection=tickets&ope
ration=getColStats");
Object result = template.requestBody("direct:getColStats", "irrelevantBody");
assertTrue("Result is not of type DBObject", result instanceof DBObject);

The operation will return a data structure similar to the one displayed in the shell, in the form of a DBObject in the OUT message body.

Dynamic operations

An Exchange can override the endpoint's fixed operation by setting the CamelMongoDbOperation header, defined by the MongoDbConstants.OPERATION_HEADER constant.

The values supported are determined by the MongoDbOperation enumeration and match the accepted values for the operation parameter on the endpoint URI.

For example:

// 
from("direct:insert").to("mongodb:myDb?database=flights&collection=tickets&operatio
n=insert");
Object result = template.requestBodyAndHeader("direct:insert", "irrelevantBody", 
MongoDbConstants.OPERATION_HEADER, "count");
assertTrue("Result is not of type Long", result instanceof Long);