Posts tagged camel
Test that camel route
May 22nd
You might want to get a bit of context by reading the 2 first parts of this series.
This time, I’ll show you how a route is tested by using mock producers and consumers and setting up the expectations. As I always remind the team, this is unit testing, not integration testing. At this point, we need to trust that the Camel team has properly tested all it’s components. What we want to do is check that 1) our route is routing messages to the proper endpoints, and 2) our enrichers or filters are kicking in properly.
To better understand the rest of this post, reading through this might be usefull.
Route Builder modifications
In the previous example, I had hard coded all route and processor information directly into the RouteBuilder implementation.
Obviously, this won’t cut it if we want to test. So the first thing we want to do is externalize the routeconfiguration information to a class (backed by an interface. You’ll se why later).
class MyRouteBuilder extends RouteBuilder {
private IRouteBuilderConiguration config;
@Override
public void configure() throws Exception {
errorHandler(
deadLetterChannel(config.getDlqEndpointUrl()).
delay(config.getErrorHandlerDelay()).
maximumRedeliveries(config.getErrorHandlerRetries())
);
from(config.getSourceEndpointUrl()).
choice().
when(config.getMessageFilter()).
process(config.getMessageEnricher()).to(config.getMessageEndpointUrl()).
otherwise().to(config.getErrorEndpointUrl());
}
}
What’s missing here is a concrete implementation of the IRouteBuilderConfiguration interface. It would look something like:
public class MyRouteBuilderConfiguration implements IRouteBuilderConfiguration {
public String getMessageEndpointUrl() {
return ....
}
....
}
I used spring to inject the configuration into the builder, but you can use whatever you want.
<bean id="myRouteBuilder" class="default.MyRouteBuilder">
<property name="configuration">
<bean class="default.MyRouteBuilderConfiguration">
<property name="messageFilter" ref="myFilter"/>
<property name="messageEnricher" ref="myEnricher"/>
</bean>
</property>
</bean>
Test class
Once that’s done, then it’s simply a case of following what’s in the Camel testing example page with a few variations.
- Overide the createRouteBuilder() method, but make sure to send back your own implementation
- Inject a different IRouteBuilderConfiguration implementation which uses mock and direct endpoints.
For example:
public class TestMessageRouter extends CamelTestSupport {
private static final String SOURCE_COMPONENT_URL = "direct:start";
private static final String DLQ_COMPONENT_URL = "mock:dlq";
private static final String INVALID_COMPONENT_URL = "mock:invalid";
private static final String ENDPOINT_URL = "mock:endpoint";
@EndpointInject(uri = DLQ_COMPONENT_URL)
protected MockEndpoint dlqEndpoint;
@EndpointInject(uri = INVALID_COMPONENT_URL)
protected MockEndpoint invalidEndpoint;
@EndpointInject(uri = ENDPOINT_URL)
protected MockEndpoint endpoint;
@Produce(uri= SOURCE_COMPONENT_URL)
protected ProducerTemplate producerTemplate;
public void testMessageOk() throws Exception {
producerTemplate.sendBodyAndHeaders("a", getValidHeaders());
invalidEndpoint.expectedMessageCount(0);
dlqEndpoint.expectedMessageCount(0);
endpoint.expectedMessageCount(1);
dlqEndpoint.assertIsSatisfied();
invalidEndpoint.assertIsSatisfied();
endpoint.assertIsSatisfied();
Message inMessage = endpoint.getReceivedExchanges().get(0).getIn();
Assert.assertEquals("a", inMessage.getBody());
}
public void testWithInvalidBodyType() throws Exception {
Map<String, Object> headers =createValidHeaders();
producerTemplate.sendBodyAndHeaders(Integer.valueOf(1), headers);
invalidEndpoint.expectedMessageCount(1);
endpoint.expectedMessageCount(0);
dlqEndpoint.expectedMessageCount(0);
dlqEndpoint.assertIsSatisfied();
invalidEndpoint.assertIsSatisfied();
endpoint.assertIsSatisfied();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
MyRouteBuilder builder = new MyRouteBuilder();
builder.setConfiguration(new TestRunnerRouteConfiguration());
return builder;
}
class TestRunnerRouteConfiguration implements IRouteBuilderConfiguration {
public Endpoint getMessageEndpoint() {
return endpoint;
}
public String getDLQCompomentUrl() {
return DLQ_COMPONENT_URL;
}
public String getInvalidMessageComponentUrl() {
return INVALID_COMPONENT_URL;
}
public String getSourceComponentUrl() {
return SOURCE_COMPONENT_URL;
}
....
}
}
The example isn’t complete, but it does give a good idea of what can be done. The learning curve isn’t extremely steep with Camel, but it’s there and you most likely won’t pickup all the concepts immediately. We’re not talking weeks and days, just a few hours.
I guess that covers it for now. I’ll be looking to setup a Component that can publish to Tumblr, just as an excersise, so if I get around to that, I’ll post the results here.
A bit more meat: Camel applied : JMS to File
May 13th
As mentionned before, here is the use case:
- Listen on a queue for new messages
- Validate that the message is properly formed (has a body and has the proper headers)
- Save the content to a file (file name is in one of the headers)
Spring configuration
<bean id="myRouteBuilder" class="default.MyRouteBuilder"/>
<bean id="myFilter" class="default.MyMessageFilter"/>
<bean id="myEnricher" class="default.MyMessageEnricher"/>
<camel:camelContext id="myCamelContext">
<camel:routeBuilder ref="myRouteBuilder"/>
</camel:camelContext>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://localhost:61616"/>
</bean>
</property>
</bean>
The important bean here is the camelContext bean. The RouteBuilder refered to will be initialized (by calling up the configure() method on it) when the spring container gets initialized.
The ‘jms’ bean is used to configure the JMS component which can be refered to by name inside the route builder.
The 2 other beans (myEnricher and myFilter) are simply used by the route builder. A spring bean lookup is used to get a handle on these from the router.
1 gotcha: My ide (Eclipse) was giving me a lot of greif over the suggested namespace setup. Save yourself some trouble, and save the camel-spring.xsd offline and change your namesapce declaration to something like:
<beans xmlns:camel="http://camel.apache.org/schema/spring" xsi:schemaLocation=" http://camel.apache.org/schema/spring classpath:META-INF/camel-spring.xsd">
The RouteBuilder
This is where you build the Came routing for any messages. In this case, remember that we want to route incoming queue messages to a file (1 file per message). Ideally, any messages we can’t deal with and where there is an error in processing the message would be put into a DLQ, and any invalid messages would be put onto another queue.
public class MyRouteBuilder extends RouteBuilder implements ApplicationContextAware {
private ApplicationContext context;
@Override
public void configure() throws Exception {
Predicate messageFilter = (Predicate) context.getBean("myFilter");
Processor contentEnricher = (Processor) context.getBean("myEnricher");
errorHandler(
deadLetterChannel("jms:queue:my.dlq").
delay(2000).
maximumRedeliveries(5)
);
from("jms:queue:my.queue").
choice().when(messageFilter).process(contentEnricher).to("file://var/tmp").
otherwise().to("jms:queue:my.queue.invalid");
}
Explanation:
- The first section is a simple error handler that will force a message to a DLQ in the event of an exception. A lot more can be done, but this is all I needed
- Second we define the source compoment: “jms:” refers to the “jms” bean in the spring containter. Had we named that bean “activemqconnection” we would have used “activemqconnection:queue:my.queue” instead.
- We then apply a filter (implements Predicate) to determine if our message is valid (in my case, I check for specific header values). If the Predicate returns false, then we route the message to another queue.
- If the predicate passes, the message is stored in the /var/tmp directory. Specifically, for the Camel File component, the myEnricher bean takes care of setting a special header called CamelFileName which tells the component which name to give the saved file. Had that header not been present, the file would have been saved with a defaulting mechanism. An alternative would have been to use the ?fileName option in the component url (file://var/tmp?fileName=xxxxxx). See the docs for the File2 compoment.
Other classes
This isn’t super interesting, but it gives an idea how simple it is to setup a content enricher (which is really jsut a process) and a filter (Predicate)
Enricher
A note: This could have been done using the Camel DSL language, but I prefered to bring it out.
public class MyMessageEnricher implements Processor {
private static final Logger logger = LogManager.getLogger(MyMessageEnricher.class);
public void process(Exchange exchange) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("Enriching received message");
}
String fileName = (String) exchange.getIn().getHeader("fileName");
exchange.getIn().setHeader(Exchange.FILE_NAME, fileName);
if (logger.isTraceEnabled()) {
logger.trace("Enriched content header: " + exchange.getIn().getHeader(Exchange.FILE_NAME).toString());
}
}
}
Filter
(This also could have been implemented using the DSL)
public class MyMessageFilter implements Predicate {
private static final Logger logger = LogManager.getLogger(MyMessageFilter.class);
public boolean matches(Exchange exchange) {
if (logger.isTraceEnabled()) {
logger.trace("Excecuting message filter");
}
Message in = exchange.getIn();
for (String header : MyMessageHeaderKeys.MANDATORY_HEADERS) {
if (in.getHeader(header) == null) {
logger.warn("Filtering out message because of missing header: " + header);
return false;
}
}
if (in.getBody() == null) {
logger.warn("Filtering out message because there is no content");
return false;
}
if (logger.isTraceEnabled()) {
logger.trace("Message was found to be valid. Continuing route");
}
return true;
}
}
Next post, I’ll explain what brings all this together. The test cases.
Apache Camel. 3 Words: Wow
May 11th
Here’s my current use case:
- Receive a message on a queue
- Save the body of the message to a file, where the name is determined by a header value
Easy enough, right? Agreed. But to get something solid and reliable, you’re going to do a lot of plumbing:
- Setup a message listener that recovers from disconnects
- Make sure all message consumption is transacted
- Apply some sort of retry mechanism
- Put messages you can’t read or deal with onto a deal letter queue
- Handle all io locking issues that can arise
Enter Camel: With a bit of XML configuration and a simple RouteBuilder class, I can do most of this work in a few simple lines.
Obviously, having read the EIP book will help, but the documentation is stellar. You get examples for all the different patterns, as well as some amazing documentation for all components.
That being said, the full power of Camel (for this guy, and at this moment in time) comes from:
- How easy it is to add different steps in your route, without breaking anything else.
- Testability
- The amount of components you can route from and to.
- Spring integration
- Component
In my above use case, I already know that the messages will eventually need to be aggregated before getting writen to file. Check. I already know that eventually, I won’t be writing to file, but most likely to FTP. Check. I won’t have much to do short of changing a bit of code in my route, recompile, re-test and voila, all done!
(I’ve always said — ok, for the last few years — that a good senior programmer knows how to find the tools, api’s and frameworks to make his job easier, but more importantly, allow him/her to concentrate on the core buisness code. This is an example of that.)
