Thursday, January 5, 2012

Handling ActiveMQ disconnects in Camel CXF Routes

If ActiveMQ is setup with default connection handling using the failover protocol in the camel config xml:

failover:(nio://0.0.0.0:61616,nio://0.0.0.0:61617)

With the normal functionality, this will continue retrying the connection to a point that will probably cause your route to timeout.  So in a Request/Reply scenario (MEP = InOut), you would send an exchange to an activemq endpoint and wait for a reply for the default 20 seconds:

from("cxf:bean:myService").to("activemq:queue:myQueue")

Now if there is no active ActiveMQ instance running and you hit your endpoint with SOAPUI, you will see in the log an ExchangeTimedOutException:

ERROR DefaultErrorHandler:232 - Failed delivery for exchangeId: ID-StoneCold-iMac-local-xxxxxxxx-0-1. Exhausted after delivery attempt: 1 caught: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis. Exchange[Message: [com.my.request@4f39ec5c]]
org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 20000 millis. Exchange[Message: [com.my.request@4f39ec5c]]

Now it maybe expected that in Highly Available systems that there is always a passive instance of ActiveMQ waiting to take over and the timeout is set high enough that this error should (theoretically) not happen.  But many people are confused when AMQ is not running and the route still appears to be working without a JMSException being thrown for Connection Refused when ActiveMQ is not available.

But what if you want to handle the connection issue after X number of retries and let the client know there is a problem?

The first thing is to set the parameters on the connection URL so that the URL will define how camel will handle disconnects:

failover:(nio://0.0.0.0:61616,nio://0.0.0.0:61617)?maxReconnectAttempts=3

So above we are telling Camel to try to reconnect 3 times before failing, other parameters can be found here under Transport Options.  So this will allow you to define how camel handles the disconnect when ActiveMQ shutsdown.

Now if you run the route you will see that you get a SOAP Fault returned with a JMSException.

Still not what you are looking for?  So you want to handle the failure with a nice response object with a nice response code instead of an exception?  In order to handle that you need to do exception handling in your route.  There are a couple of different ways to handle exceptions, we will look at the route specific one.  Other info on Exception handling in Camel can be found here.

from("cxf:bean:myService")
    .onException(JMSException.class)
         .maximumRedeliveries(2)
         .process(new Processor() {
                  public void process(Exchange exchange) throws Exception {  
                      MyResponse myResponse = new MyResponse();
                      myResponse.setMyError = "Crap, it didn't work"
                      exchange.getIn().setBody(myResponse);
                    }
               })
   .end()
.to("activemq:queue:myQueue");

Now that you have the exception in place hit the cxf endpoint with SOAPUI and you will see that you still get the JMSException...... WTF!

The reason is the exception is not a handled exception so the exception is passed up the stack and is still getting processed by the service endpoint and sending a SOAP Fault.  So the final exception needs to look like this:

from("cxf:bean:myService")
    .onException(JMSException.class)
         .maximumRedeliveries(2)
         .handled(true)
         .process(new Processor() {
                  public void process(Exchange exchange) throws Exception {  
                      MyResponse myResponse = new MyResponse();
                      myResponse.setMyError = "Crap, it didn't work, but here is a nice message for you"
                      exchange.getIn().setBody(myResponse);
                    }
               })
   .end()
.to("activemq:queue:myQueue");

Notice the onException now has "handled = true" set.  This will pass back whatever you set in your onException route, in this case I am just using a process to create a MyResponse object and set the error message.  Now the client will receive the MyResponse object instead of a soap fault when the connection to ActiveMQ disconnects. 


No comments:

Post a Comment