As mentionned before, here is the use case:

  1. Listen on a queue for new messages
  2. Validate that the message is properly formed (has a body and has the proper headers)
  3. Save the content to a file (file name is in one of the headers)

Spring configuration

<bean id="myRouteBuilder" class="default.MyRouteBuilder"/>
    <bean id="myFilter" class="default.MyMessageFilter"/>
    <bean id="myEnricher" class="default.MyMessageEnricher"/>

    <camel:camelContext id="myCamelContext">
    <camel:routeBuilder ref="myRouteBuilder"/>
    </camel:camelContext>

    <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
        <property name="connectionFactory">
        <bean class="org.apache.activemq.ActiveMQConnectionFactory">
             <property name="brokerURL" value="tcp://localhost:61616"/>
        </bean>
    </property>
</bean>

The important bean here is the camelContext bean. The RouteBuilder refered to will be initialized (by calling up the configure() method on it) when the spring container gets initialized.

The ‘jms’ bean is used to configure the JMS component which can be refered to by name inside the route builder.

The 2 other beans (myEnricher and myFilter) are simply used by the route builder. A spring bean lookup is used to get a handle on these from the router.

1 gotcha: My ide (Eclipse) was giving me a lot of greif over the suggested namespace setup. Save yourself some trouble, and save the camel-spring.xsd offline  and change your namesapce declaration to something like:

<beans xmlns:camel="http://camel.apache.org/schema/spring"
xsi:schemaLocation="
http://camel.apache.org/schema/spring classpath:META-INF/camel-spring.xsd">

The RouteBuilder

This is where you build the Came routing for any messages. In this case, remember that we want to route incoming queue messages to a file (1 file per message). Ideally, any messages we can’t deal with and where there is an error in processing the message would be put into a DLQ, and any invalid messages would be put onto another queue.

public class MyRouteBuilder extends RouteBuilder implements ApplicationContextAware {
private ApplicationContext context;

@Override
public void configure() throws Exception {
    Predicate messageFilter = (Predicate) context.getBean("myFilter");
    Processor contentEnricher = (Processor) context.getBean("myEnricher");

    errorHandler(
       deadLetterChannel("jms:queue:my.dlq").
       delay(2000).
       maximumRedeliveries(5)
    );

    from("jms:queue:my.queue").
    choice().when(messageFilter).process(contentEnricher).to("file://var/tmp").
    otherwise().to("jms:queue:my.queue.invalid");
}

Explanation:

  • The first section is a simple error handler that will force a message to a DLQ in the event of an exception. A lot more can be done, but this is all I needed
  • Second we define the source compoment: “jms:” refers to the “jms” bean in the spring containter. Had we named that bean “activemqconnection” we would have used “activemqconnection:queue:my.queue” instead.
  • We then apply a filter (implements Predicate) to determine if our message is valid (in my case, I check for specific header values). If the Predicate returns false, then we route the message to another queue.
  • If the predicate passes, the message is stored in the /var/tmp directory. Specifically, for the Camel File component, the myEnricher bean takes care of setting a special header called CamelFileName which tells the component which name to give the saved file. Had that header not been present, the file would have been saved with a defaulting mechanism. An alternative would have been to use the ?fileName option in the component url (file://var/tmp?fileName=xxxxxx). See the docs for the File2 compoment.

Other classes

This isn’t super interesting, but it gives an idea how simple it is to setup a content enricher (which is really jsut a process) and a filter (Predicate)

Enricher

A note: This could have been done using the Camel DSL language, but I prefered to bring it out.

public class MyMessageEnricher implements Processor {
    private static final Logger logger = LogManager.getLogger(MyMessageEnricher.class);

    public void process(Exchange exchange) throws Exception {
       if (logger.isTraceEnabled()) {
            logger.trace("Enriching received message");
       }

    String fileName = (String) exchange.getIn().getHeader("fileName");
    exchange.getIn().setHeader(Exchange.FILE_NAME, fileName);

    if (logger.isTraceEnabled()) {
        logger.trace("Enriched content header: " + exchange.getIn().getHeader(Exchange.FILE_NAME).toString());
    }
  }
}

Filter

(This also could have been implemented using the DSL)

public class MyMessageFilter implements Predicate {
    private static final Logger logger = LogManager.getLogger(MyMessageFilter.class);

    public boolean matches(Exchange exchange) {
        if (logger.isTraceEnabled()) {
           logger.trace("Excecuting message filter");
        }

       Message in = exchange.getIn();
          for (String header : MyMessageHeaderKeys.MANDATORY_HEADERS) {
             if (in.getHeader(header) == null) {
                logger.warn("Filtering out message because of missing header: " + header);
                return false;
             }
          }

          if (in.getBody() == null) {
             logger.warn("Filtering out message because there is no content");
             return false;
          }

          if (logger.isTraceEnabled()) {
              logger.trace("Message was found to be valid. Continuing route");
          }
          return true;
      }
}

Next post, I’ll explain what brings all this together. The test cases.