Camel Component: Cache - 6.3

Talend ESB Mediation Developer Guide

EnrichVersion
6.3
EnrichProdName
Talend Data Fabric
Talend Data Services Platform
Talend ESB
Talend MDM Platform
Talend Open Studio for ESB
Talend Real-Time Big Data Platform
task
Design and Development
EnrichPlatform
Talend ESB

The cache component enables you to perform caching operations using EHCache as the Cache Implementation. The cache itself is created on demand or if a cache of that name already exists then it is simply utilized with its original settings.

This component supports producer and event based consumer endpoints.

The Cache consumer is an event based consumer and can be used to listen and respond to specific cache activities. If you need to perform selections from a pre-existing cache, use the processors defined for the cache component.

Maven users will need to add the following dependency to their pom.xml for this component:

<dependency>
   <groupId>org.apache.camel</groupId>
   <artifactId>camel-cache</artifactId>
   <!-- use the same version as your Camel core version -->
   <version>x.x.x</version>
</dependency>

URI format and Options

cache://cacheName[?options]

You can append query options to the URI in the following format, ?option=value&option=value&..., where option can be:

Name

Default Value

Description

maxElementsInMemory

1000

The numer of elements that may be stored in the defined cache

memoryStore-EvictionPolicy

MemoryStore-Eviction-Policy.LFU

The number of elements that may be stored in the defined cache. Options include

  • MemoryStoreEvictionPolicy.LFU - Least frequently used

  • MemoryStoreEvictionPolicy.LRU - Least recently used

  • MemoryStoreEvictionPolicy.FIFO - first in first out, the oldest element by creation time

overflowToDisk

true

Specifies whether cache may overflow to disk

eternal

false

Sets whether elements are eternal. If eternal, timeouts are ignored and the element never expires.

timeToLiveSeconds

300

The maximum time between creation time and when an element expires. Is used only if the element is not eternal.

timeToIdleSeconds

300

The maximum amount of time between accesses before an element expires

diskPersistent

false

Whether the disk store persists between restarts of the Virtual Machine.

diskExpiryThread-IntervalSeconds

120

The number of seconds between runs of the disk expiry thread.

cacheManagerFactory

null

If you want to use a custom factory which instantiates and creates the EHCache net.sf.ehcache.CacheManager. Use type of abstract org.apache.camel. component.cache. CacheManagerFactory.

eventListenerRegistrynullSets a list of EHCache net.sf.ehcache.event.CacheEventListener for all new caches - no need to define it per cache in EHCache xml config anymore. Use type of org.apache.camel. component.cache. CacheEventListenerRegistry.
cacheLoaderRegistrynullSets a list of org.apache.camel. component.cache. CacheLoaderWrapper that extends EHCache net.sf.ehcache.loader.CacheLoader for all new caches - no need to define it per cache in EHCache xml config anymore. Use type of org.apache.camel. component.cache. CacheLoaderRegistry
keynullTo configure using a cache key by default. If a key is provided in the message header, then the key from the header takes precedence.
operationnullTo configure using an cache operation by default. If an operation in the message header, then the operation from the header takes precedence.

Cache Component options

Name

Default Value

Description

configuration

To use a custom org.apache.camel.component.

cache.CacheConfiguration configuration.

cacheManagerFactory

To use a custom org.apache.camel.component.

cache.CacheManagerFactory.

configurationFile

Camel 2.13/2.12.3: To configure the location of the ehcache.xml file to use, such as classpath:com/foo/mycache.xml to load from classpath. If no configuration is given, then the default settings from EHCache is used.

Sending/Receiving Messages to/from the cache

Message Headers

Header

Description

CamelCacheOperation

The operation to be performed on the cache. These headers are removed from the exchange after the cache operation is performed. Valid options are

  • CamelCacheGet

  • CamelCacheCheck

  • CamelCacheAdd

  • CamelCacheUpdate

  • CamelCacheDelete

  • CamelCacheDeleteAll

CamelCacheKey

The cache key used to store the Message in the cache. The cache key is optional if the CamelCacheOperation is CamelCacheDeleteAll.

Starting with Camel 2.11, the CamelCacheAdd and CamelCacheUpdate operations support additional headers:

Header

Type

Description

CamelCacheTimeToLive

Integer

Time to live in seconds

CamelCacheTimeToIdle

Integer

Time to idle in seconds

CamelCacheEternal

Integer

Whether the content is eternal

The CamelCacheAdd and CamelCacheUpdate operations support additional headers:

Header

Type

Description

CamelCacheTimeToLive

IntegerCamel 2.11: Time to live in seconds.

CamelCacheTimeToIdle

IntegerCamel 2.11: Time to idle in seconds.

CamelCacheEternal

IntegerCamel 2.11: Whether the content is eternal.

Cache Producer

Sending data to the cache involves the ability to direct payloads in exchanges to be stored in a pre-existing or created-on-demand cache. The mechanics of doing this involve

  • setting the Message Exchange Headers shown above.

  • ensuring that the Message Exchange Body contains the message directed to the cache

Cache Consumer

Receiving data from the cache involves the ability of the CacheConsumer to listen on a pre-existing or created-on-demand Cache using an event Listener and receive automatic notifications when any cache activity take place (i.e., Add, Update, Delete, or DeleteAll). Upon such an activity taking place

  • an exchange containing Message Exchange Headers and a Message Exchange Body containing the just added/updated payload is placed and sent.

  • in case of a CamelCacheDeleteAll operation, the Message Exchange Header CamelCacheKey and the Message Exchange Body are not populated.

Cache Processors

There are a set of nice processors with the ability to perform cache lookups and selectively replace payload content at the

  • body

  • token

  • xpath level

Cache Usage Samples

Example: Configuring the cache

from("cache://MyApplicationCache" +
   "?maxElementsInMemory=1000" + 
   "&memoryStoreEvictionPolicy=" +
   "MemoryStoreEvictionPolicy.LFU" +
   "&overflowToDisk=true" +
   "&eternal=true" +
   "&timeToLiveSeconds=300" + 
   "&timeToIdleSeconds=true" +
   "&diskPersistent=true" +
   "&diskExpiryThreadIntervalSeconds=300")

Example: Adding keys to the cache

RouteBuilder builder = new RouteBuilder() {
   public void configure() {
      from("direct:start")
         .setHeader(CacheConstants.CACHE_OPERATION, 
             constant(CacheConstants.CACHE_OPERATION_ADD)) 
         .setHeader(CacheConstants.CACHE_KEY, 
             constant("Ralph_Waldo_Emerson")) 
         .to("cache://TestCache1")
   }
};

Example: Updating existing keys in a cache

RouteBuilder builder = new RouteBuilder() {
   public void configure() {
      from("direct:start")
         .setHeader(CacheConstants.CACHE_OPERATION, 
             constant(CacheConstants.CACHE_OPERATION_UPDATE)) 
         .setHeader(CacheConstants.CACHE_KEY, 
             constant("Ralph_Waldo_Emerson"))
         .to("cache://TestCache1")
   }
};

Example: Deleting existing keys in a cache

RouteBuilder builder = new RouteBuilder() {
   public void configure() {
      from("direct:start")
         .setHeader(CacheConstants.CACHE_OPERATION, 
             constant(CacheConstants.CACHE_DELETE)) 
         .setHeader(CacheConstants.CACHE_KEY, 
             constant("Ralph_Waldo_Emerson"))
         .to("cache://TestCache1")
   }
};

Example: Deleting all existing keys in a cache

RouteBuilder builder = new RouteBuilder() {
   public void configure() {
      from("direct:start")
         .setHeader(CacheConstants.CACHE_OPERATION, 
             constant(CacheConstants.CACHE_DELETEALL)) 
         .to("cache://TestCache1");
   }
};

Example: Notifying any changes registering in a Cache to Processors and other Producers

RouteBuilder builder = new RouteBuilder() {
   public void configure() {
      from("cache://TestCache1").process(new Processor() {
         public void process(Exchange exchange) throws Exception {
            String operation = 
               (String) exchange.getIn().getHeader(
               CacheConstants.CACHE_OPERATION);
            String key = (String) 
               exchange.getIn().getHeader(CacheConstants.CACHE_KEY);
            Object body = exchange.getIn().getBody();
           // Do something
         } 
      })
   } 
};

Example: Using Processors to selectively replace payload with cache values

RouteBuilder builder = new RouteBuilder() {
   public void configure() {
     //Message Body Replacer
      from("cache://TestCache1")
         .filter(header(CacheConstants.CACHE_KEY).isEqualTo("greeting")) 
         .process(new CacheBasedMessageBodyReplacer(
     							"cache://TestCache1","farewell"))
         .to("direct:next");  
      //Message Token replacer
      from("cache://TestCache1")
         .filter(header(CacheConstants.CACHE_KEY).isEqualTo("quote"))
         .process(new CacheBasedTokenReplacer(
    							"cache://TestCache1","novel","#novel#"))
         .process(new CacheBasedTokenReplacer(
    							"cache://TestCache1","author","#author#"))
         .process(new CacheBasedTokenReplacer(
    							"cache://TestCache1","number","#number#"))
         .to("direct:next");

      //Message XPath replacer
      from("cache://TestCache1")
         .filter(header(CacheConstants.CACHE_KEY)
         .isEqualTo("XML_FRAGMENT")) 
         .process(new CacheBasedXPathReplacer(
    							"cache://TestCache1","book1","/books/book1"))
         .process (new CacheBasedXPathReplacer(
    							"cache://TestCache1","book2","/books/book2"))
         .to("direct:next");
   }
};

Example: Getting an entry from the Cache

from("direct:start")
   // Prepare headers
   .setHeader(CacheConstants.CACHE_OPERATION, constant(
      CacheConstants.CACHE_OPERATION_GET))
   .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson"))
   .to("cache://TestCache1").
   // Check if entry was not found
   .choice().when(header(
      CacheConstants.CACHE_ELEMENT_WAS_FOUND).isNull()).
   // If not found, get the payload and put it to cache
   .to("cxf:bean:someHeavyweightOperation")
   .setHeader(CacheConstants.CACHE_OPERATION, constant(
      CacheConstants.CACHE_OPERATION_ADD))
   .setHeader(CacheConstants.CACHE_KEY, 
      constant("Ralph_Waldo_Emerson"))
   .to("cache://TestCache1")
   .end()
   .to("direct:nextPhase");

Example: Checking for an entry in the Cache

Note: The CHECK command tests existence of an entry in the cache but doesn't place a message in the body.

from("direct:start")
   // Prepare headers
   .setHeader(CacheConstants.CACHE_OPERATION, 
      constant(CacheConstants.CACHE_OPERATION_CHECK))
   .setHeader(CacheConstants.CACHE_KEY, constant("Ralph_Waldo_Emerson"))
   .to("cache://TestCache1").
   // Check if entry was not found
   .choice().when(header(
      CacheConstants.CACHE_ELEMENT_WAS_FOUND).isNull())
   // If not found, get the payload and put it to cache
   .to("cxf:bean:someHeavyweightOperation").
   .setHeader(CacheConstants.CACHE_OPERATION, 
       constant(CacheConstants.CACHE_OPERATION_ADD))
   .setHeader(CacheConstants.CACHE_KEY, 
       constant("Ralph_Waldo_Emerson"))
   .to("cache://TestCache1")
   .end();

Management of EHCache

EHCache has its own statistics and management from JMX.

Here's a snippet on how to expose them via JMX in a Spring application context:

<bean id="ehCacheManagementService" 
   class="net.sf.ehcache.management.ManagementService" 
   init-method="init" lazy-init="false">
   <constructor-arg>
      <bean class="net.sf.ehcache.CacheManager" 
         factory-method="getInstance"/>
   </constructor-arg>
   <constructor-arg>
      <bean class="org.springframework.jmx.support.JmxUtils" 
         factory-method="locateMBeanServer"/>
   </constructor-arg>
   <constructor-arg value="true"/>
   <constructor-arg value="true"/>
   <constructor-arg value="true"/>
   <constructor-arg value="true"/>
</bean>

Of course the same thing can be done in straight Java:

ManagementService.registerMBeans(CacheManager.getInstance(), 
   mbeanServer, true, true, true, true);

You can get cache hits, misses, in-memory hits, disk hits, size stats this way. You can also change CacheConfiguration parameters on the fly.