The sql: component allows you to work with databases using JDBC queries. The difference between this component and Camel Component: JDBC component is that in case of SQL the query is a property of the endpoint and it uses message payload as parameters passed to the query.
This component uses spring-jdbc
behind the scenes for the SQL handling.
Maven users will need to add the following dependency to their pom.xml
for
this component:
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-sql</artifactId> <!-- use the same version as your Camel core version --> <version>x.x.x</version> </dependency>
The SQL component also supports:
a JDBC based repository for the Idempotent Consumer EIP pattern. See further below.
a JDBC based repository for the Aggregator EIP pattern. See further below.
In Camel 2.10 or older the SQL component can be used only as a producer. Starting with
Camel 2.11, however, you will be able to use this as a consumer, i.e., define within a
from()
statement.
From Camel 2.11 onwards this component can create both consumer (e.g.
from()
) and producer endpoints (e.g. to()
). In previous
versions, it could only act as a producer.
This component can be used as a Transactional Client.
The SQL component uses the following endpoint URI notation:
sql:select * from table where id=# order by name[?options]
From Camel 2.11 onwards you can use named parameters by using #:name
style
as shown:
sql:select * from table where id=:#myId order by name[?options]
When using named parameters, Camel will lookup the names from, in the given precedence:
1. from message body if its a java.util.Map
2. from message headers
If a named parameter cannot be resolved, then an exception is thrown.
From Camel 2.14 onward you can use Simple expressions as parameters as shown:
sql:select * from table where id=:#${property.myId} order by name[?options]
Notice that the standard ?
symbol that denotes the parameters to an SQL
query is substituted with the #
symbol, because the ?
symbol is
used to specify options for the endpoint. The ?
symbol replacement can be
configured on endpoint basis.
You can append query options to the URI in the following format,
?option=value&option=value&...
Option |
Type |
Default |
Description |
---|---|---|---|
|
|
|
Camel 2.7.5, 2.8.4 and 2.9: Execute SQL batch
update statements. See notes below on how the treatment of the inbound message
body changes if this is set to |
|
|
|
Deprecated and will be removed in Camel 3.0:
Reference to a |
|
|
|
Starting with Camel 2.11, Reference to a |
|
|
|
Specifies a character that will be replaced to |
|
|
|
Sets additional options on the Spring |
|
|
|
Camel 2.11: Whether to allow using named parameters in the queries. |
|
|
|
Camel 2.11: SQL consumer only: Allows for plugging in a custom org.apache. camel.component. sql.SqlProcessingStrategy to execute queries when the consumer has processed the rows/batch. |
|
|
|
Camel 2.11: Allows to plugin to use a custom org.apache. camel.component. sql. SqlPrepareStatementStrategy to control preparation of the query and prepared statement. |
|
|
|
Camel 2.11: SQL consumer only: Delay in milliseconds between each poll. |
|
|
|
Camel 2.11: SQL consumer only: Milliseconds before polling starts. |
|
|
|
Camel 2.11: SQL consumer only: Set to |
|
|
|
Camel 2.11: (SQL consumer only) An integer value to define the maximum number of messages to gather per poll. By default, no maximum is set. |
|
|
|
Camel 2.11: (SQL consumer only): If |
|
|
|
Camel 2.11: SQL consumer only: Whether to route a single empty Exchange if there was no data to poll. |
|
|
|
Camel 2.11: SQL consumer only: After processing each row then this query can be executed, if the Exchange was processed successfully, for example to mark the row as processed. The query can have parameter. |
|
|
|
Camel 2.11: (SQL consumer only) After processing each row this query can be executed, if the Exchange failed, for example to mark the row as failed. The query can take parameters. |
|
|
|
Camel 2.11: (SQL consumer only) After processing the entire batch, this query can be executed to bulk update rows etc. The query cannot have parameters. |
|
|
|
Camel 2.11: (SQL consumer only) If using consumer.onConsume then this option
can be used to set an expected number of rows being updated. Typically you may set
this to |
|
|
|
Camel 2.11: (SQL consumer only) If using consumer. onConsume and it fails, then this option controls whether to break out of the batch or continue processing the next row from the batch. |
|
|
|
Camel 2.11: (SQL producer only) If enabled then the
|
|
|
|
Camel 2.11.1: The separator to use when parameter values is taken from message
body (if the body is a String type), to be inserted at # placeholders. Notice if
you use named parameters, then a |
|
|
|
Camel 2.12.0: Make the output of consumer or producer to
a) If the query has only single column, then that JDBC Column object is
returned. (such as b) If the query has more than one column, then it will return a Map of that result. c) If the d) If the query resulted in more than one rows, it throws an non-unique result exception. |
|
|
|
Camel 2.12.0: Specify the full package and class name to use as conversion
when |
|
|
|
Camel 2.11.2/2.12.0: If set greater than zero, then Camel will use this count value of parameters to replace instead of querying via JDBC metadata API. This is useful if the JDBC vendor could not return correct parameters count, then user may override instead. |
|
|
|
Camel 2.12.0: If set, will ignore the results of the SQL query and use the existing IN message as the OUT message for the continuation of processing |
The SQL component tries to convert the message body to an object of
java.util.Iterator
type and then uses this iterator to fill the query
parameters (where each query parameter is represented by a #
symbol (or
configured placeholder) in the endpoint URI). If the message body is not an array or
collection, the conversion results in an iterator that iterates over only one object, which
is the body itself.
For example, if the message body is an instance of java.util.List
, the
first item in the list is substituted into the first occurrence of #
in the SQL
query, the second item in the list is substituted into the second occurrence of
#
, and so on.
For select
operations, the result is an instance of
List<Map<String, Object>>
type, as returned by the JdbcTemplate.queryForList() method. For update
operations, the result
is the number of updated rows, returned as an Integer
.
When performing update
operations, the SQL Component stores the update
count in the following message headers:
Header |
Description |
---|---|
|
The number of rows updated for |
|
The number of rows returned for |
|
Query to execute. This query takes precedence over the query specified in the endpoint URI. Note that query parameters in the header are represented by a ? instead of a # symbol. |
When performing insert
operations, the SQL Component stores the rows with
the generated keys and number of these rown in the following message headers (Available as of Camel 2.12.4, 2.13.1):
Header |
Description |
---|---|
|
The number of rows in the header that contains generated keys. |
|
Rows that contains the generated keys (a list of maps of keys). |
Available as of Camel 2.12.4, 2.13.1 and 2.14
If you insert data using SQL INSERT, then the RDBMS may support auto generated keys. You can instruct the SQL producer to return the generated keys in headers.
To do that set the header CamelSqlRetrieveGeneratedKeys=true
. Then the
generated keys will be provided as headers with the keys listed in the table above.
You can see more details in this unit test.
A reference to a DataSource
can be set in the URI as shown:
sql:select * from table where id=# order by name?dataSource=myDS
In the sample below we execute a query and retrieve the result as a List
of rows, where each row is a Map<String, Object
and the key is the column
name.
First, we set up a table to use for our sample. As this is based on an unit test, we'll do it using java code:
// this is the database we create with some initial data for our unit test jdbcTemplate.execute("create table projects (id integer primary key," + "project varchar(10), license varchar(5))"); jdbcTemplate.execute("insert into projects values (1, 'Camel', 'ASF')"); jdbcTemplate.execute("insert into projects values (2, 'AMQ', 'ASF')"); jdbcTemplate.execute("insert into projects values (3, 'Linux', 'XXX')");
Then we configure our route and our sql
component. Notice that we use a
direct
endpoint in front of the sql
endpoint. This allows us to
send an exchange to the direct
endpoint with the URI,
direct:simple
, which is much easier for the client to use than the long
sql:
URI. Note that the DataSource
is looked up up in the
registry, so we can use standard Spring XML to configure our DataSource
.
from("direct:simple") .to("sql:select * from projects where license=# order by id? dataSourceRef=jdbc/myDataSource").to("mock:result");
And then we fire the message into the direct
endpoint that will route it
to our sql
component that queries the database.
MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); // send the query to direct that will route it to the sql where we will // execute the query and bind the parameters with the data from the body. // The body only contains one value in this case (XXX) but if we should // use multiple values then the body will be iterated so we could supply // a List<String> instead containing each binding value. template.sendBody("direct:simple", "XXX"); mock.assertIsSatisfied(); // the result is a List List received = assertIsInstanceOf( List.class, mock.getReceivedExchanges().get(0).getIn().getBody()); // and each row in the list is a Map Map row = assertIsInstanceOf(Map.class, received.get(0)); // and we should be able the get the project // from the map that should be Linux assertEquals("Linux", row.get("PROJECT"));
We could configure the DataSource
in Spring XML as follows:
<jee:jndi-lookup id="myDS" jndi-name="jdbc/myDataSource"/>
This feature is available starting with Camel 2.11.
In the given route below, we want to get all the projects from the projects table. Notice the SQL query has 2 named parameters, :#lic and :#min.
Camel will then lookup for these parameters from the message body or message headers. Notice in the example above we set two headers with constant value for the named parameters:
from("direct:projects") .setHeader("lic", constant("ASF")) .setHeader("min", constant(123)) .to("sql:select * from projects where license = :#lic and id > :#min order by id")
Though if the message body is a java.util.Map
then the named parameters
will be taken from the body.
from("direct:projects") .to("sql:select * from projects where license = :#lic and id > :#min order by id")
Available as of Camel 2.14
In the given route below, we want to get all the project from the database. It uses the body of the exchange for defining the license and uses the value of a property as the second parameter.
from("direct:projects") .setBody(constant("ASF")) .setProperty("min", constant(123)) .to("sql:select * from projects where license = :#${body} and id > :#${property.min} order by id")
Available as of Camel 2.7: In this section we will use the JDBC based idempotent repository.
From Camel 2.9 onwards there is an abstract class
org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository
you can extend to build custom JDBC idempotent repository.
First we have to create the database table which will be used by the idempotent repository. For Camel 2.7, we use the following schema:
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100) )
In Camel 2.8, we added the createdAt column:
CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP )
The SQL Server TIMESTAMP type is a fixed-length binary-string type. It does not map to any of the JDBC time types: DATE, TIME, or TIMESTAMP.
We recommend to have a unique constraint on the columns processorName and messageId. Because the syntax for this constraint differs for database to database, we do not show it here.
Second we need to setup a javax.sql.DataSource
in the spring XML
file:
<jdbc:embedded-database id="dataSource" type="DERBY" />
And we can create our JDBC idempotent repository in the Spring XML file as well:
<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository"> <constructor-arg ref="dataSource" /> <constructor-arg value="myProcessorName" /> </bean> <camel:camelContext> <camel:errorHandler id="deadLetterChannel" type="DeadLetterChannel" deadLetterUri="mock:error"> <camel:redeliveryPolicy maximumRedeliveries="0" maximumRedeliveryDelay="0" logStackTrace="false" /> </camel:errorHandler> <camel:route id="JdbcMessageIdRepositoryTest" errorHandlerRef="deadLetterChannel"> <camel:from uri="direct:start" /> <camel:idempotentConsumer messageIdRepositoryRef="messageIdRepository"> <camel:header>messageId</camel:header> <camel:to uri="mock:result" /> </camel:idempotentConsumer> </camel:route> </camel:camelContext>
Important
The JdbcAggregationRepository
is provided in the camel-sql
component.
JdbcAggregationRepository
is an AggregationRepository
which on the
fly persists the aggregated messages. This ensures that you will not loose messages, as the
default aggregator will use an in memory only AggregationRepository
.
The JdbcAggregationRepository
allows together with Camel to
provide persistent support for the Aggregator.
It has the following options:
Option |
Type |
Description |
---|---|---|
|
|
Mandatory: The
|
|
|
Mandatory: The name of the repository. |
|
|
Mandatory: The |
|
|
A |
|
boolean |
Whether the get operation should return the old existing Exchange if any
existed. By default this option is |
|
boolean |
Whether or not recovery is enabled. This option is by default
|
|
long |
If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 milliseconds. |
|
int |
Allows you to limit the maximum number of redelivery attempts for a recovered
exchange. If enabled then the Exchange will be moved to the dead letter channel if
all redelivery attempts failed. By default this option is disabled. If this option
is used then the |
|
String |
An endpoint uri for a Dead Letter Channel where exhausted
recovered Exchanges will be moved. If this option is used then the
|
|
boolean |
Starting with Camel 2.11, whether to store the message body as String which
is human readable. By default this option is |
|
List <String> |
Starting with Camel 2.11, allows for storing headers as a human-readable String. By default this option is disabled, meaning they will be stored in binary format. |
|
false |
Starting with Camel 2.12, to turn on optimistic locking, which often would be needed in clustered environments where multiple Camel applications shared the same JDBC based aggregation repository. |
|
Starting with Camel 2.12, allows to plugin a custom
|
JdbcAggregationRepository
will preserve only Serializable
compatible data types. If a data type is not such a type it is dropped and a
WARN
is logged. And it only persists the Message
body and the
Message
headers. The Exchange
properties are not persisted.
Note from Camel 2.11 onwards you can store the message body and select(ed) headers as String in separate columns.
The JdbcAggregationRepository
will by default recover any failed Exchange. It does this by
having a background tasks that scans for failed Exchange s in the persistent
store. You can use the checkInterval
option to set how often this task runs.
The recovery works as transactional which ensures that Camel will try to recover and
redeliver the failed Exchange. Any Exchange which was found to be recovered will be restored from the persistent
store and resubmitted and send out again.
The following headers is set when an Exchange is being recovered/redelivered:
Header |
Type |
Description |
---|---|---|
|
Boolean |
Is set to true to indicate the Exchange is being redelivered. |
|
Integer |
The redelivery attempt, starting from 1. |
Only when an Exchange
has been successfully processed it will be marked as complete which happens when the
confirm
method is invoked on the AggregationRepository
. This
means if the same Exchange
fails again it will be kept retried until it success.
You can use option maximumRedeliveries
to limit the maximum number of
redelivery attempts for a given recovered Exchange. You must also set
the deadLetterUri
option so Camel knows where to send the Exchange when the
maximumRedeliveries
was hit.
You can see some examples in the unit tests of camel-sql, for example this test.
To be operational, each aggregator uses two table: the aggregation and completed one.
By convention the completed has the same name as the aggregation one suffixed with
"_COMPLETED"
. The name must be configured in the Spring bean with the
RepositoryName
property. In the following example aggregation will be used.
The table structure definition of both table are identical: in both case a String value is used as key ( id ) whereas a Blob contains the exchange serialized in byte array. However one difference should be remembered: the id field does not have the same content depending on the table. In the aggregation table id holds the correlation Id used by the component to aggregate the messages. In the completed table, id holds the id of the exchange stored in corresponding the blob field.
Here is the SQL query used to create the tables, just replace
"aggregation"
with your aggregator repository name.
CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_completed_pk PRIMARY KEY (id) );
Available as of Camel 2.11
You can configure the JdbcAggregationRepository
to store message body and
select(ed) headers as String in separate columns. For example to store the body, and the
following two headers companyName
and accountName
use the
following SQL:
CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id) );
And then configure the repository to enable this behavior as shown below:
<bean id="repo3" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="repositoryName" value="aggregationRepo3"/> <property name="transactionManager" ref="txManager3"/> <property name="dataSource" ref="dataSource3"/> <!-- configure to store the message body and following headers as text in the repo --> <property name="storeBodyAsText" value="true"/> <property name="headersToStoreAsText"> <list> <value>companyName</value> <value>accountName</value> </list> </property> </bean>
Since they can contain any type of payload, Exchanges are not serializable by design.
It is converted into a byte array to be stored in a database BLOB field. All those
conversions are handled by the JdbcCodec
class. One detail of the code
requires your attention: the ClassLoadingAwareObjectInputStream
.
The ClassLoadingAwareObjectInputStream
has been reused from the Apache ActiveMQ project. It wraps an
ObjectInputStream
and use it with the ContextClassLoader
rather than the currentThread
one. The benefit is to be able to load classes
exposed by other bundles. This allows the exchange body and headers to have custom types
object references.
The start
method verify the connection of the database and the presence
of the required tables. If anything is wrong it will fail during starting.
Depending on the targeted environment, the aggregator might need some configuration.
As you already know, each aggregator should have its own repository (with the
corresponding pair of table created in the database) and a data source. If the default
lobHandler is not adapted to your database system, it can be injected with the
lobHandler
property.
Here is the declaration for Oracle:
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean id="nativeJdbcExtractor" class="org.springframework.jdbc. // support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="repo" class= "org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use default --> <property name="lobHandler" ref="lobHandler"/> </bean>
From Camel 2.12 onwards you can turn on
optimisticLocking
and use this JDBC based aggregation repository in a
clustered environment where multiple Camel applications shared the same database for the
aggregation repository. If there is a race condition there JDBC driver will throw a vendor
specific exception which the JdbcAggregationRepository
can react upon. To
know which caused exceptions from the JDBC driver is regarded as an optimistick locking
error we need a mapper to do this. Therefore there is a
org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper
allows you to implement your custom logic if needed. There is a default implementation
org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLocking
ExceptionMapper
which works as follows:
The following check is done:
If the caused exception is an
SQLException
then the SQLState is checked if starts with 23.If the caused exception is a
DataIntegrityViolationException
If the caused exception class name has "ConstraintViolation" in its name.
optional checking for FQN class name matches if any class names has been configured
You can in addition add FQN classnames, and if any of the caused exception (or any nested) equals any of the FQN class names, then its an optimistick locking error.
Here is an example, where we define 2 extra FQN class names from the JDBC vendor.
<bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <property name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extra FQN class names from our JDBC driver --> <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLocking ExceptionMapper"> <property name="classNames"> <util:set> <value>com.foo.sql.MyViolationExceptoion</value> <value>com.foo.sql.MyOtherViolationExceptoion</value> </util:set> </property> </bean>