Categories: spring

Spring 4+JMS+ActiveMQ example with @JmsListener & @EnableJms

In the previous post over Spring with JMS, we have seen how applications can communicate among each other using JMS, leveraging Spring’s support for JMS. This post goes a step further, and shows an alternative to javax.jms.MessageListener which allows us to create MessageListeners using plain POJO’s, thanks to Spring’s very own @JmsListener, used in combination with JmsListenerConnectionFactory and @EnableJMS. Let’s get started.


Following technologies being used:

  • Spring 4.3.0.RELEASE
  • Spring JMS 4.3.0.RELEASE
  • ActiveMQ 5.13.3
  • Maven 3
  • JDK 1.7
  • Tomcat 8.0.21
  • Eclipse MARS.1 Release 4.5.1
  • logback 1.1.7

Let’s begin by configuring the listener first.

package com.websystique.spring.messaging;

import javax.jms.JMSException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import com.websystique.spring.model.InventoryResponse;

@Component
public class MessageReceiver {

 static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class);
 private static final String ORDER_RESPONSE_QUEUE = "order-response-queue";
 
 @JmsListener(destination = ORDER_RESPONSE_QUEUE)
 public void receiveMessage(final Message<InventoryResponse> message) throws JMSException {
  LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
  MessageHeaders headers =  message.getHeaders();
  LOG.info("Application : headers received : {}", headers);
  
  InventoryResponse response = message.getPayload();
  LOG.info("Application : response received : {}",response); 
  LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
 }
}

Shown above is a POJO, which does not implement javax.jms.MessageListener, just annotated with @JmsListener which marks a method to be the target of a JMS message listener on the specified destination(). What we still have to do is to :

  • 1. Configure a message-listener-container [ with JmsListenerContainerFactory] : which listens on a destination [can be the one used with @JmsListener] and when any message arrives on this destination, it retrieved that message and passes to the bean annotated with @JmsListener for that destination.
  • 2. Use @EnableJms which enables detection of JmsListener annotations on any Spring-managed bean in the container.

Below shown is the configuration we have just discussed:

package com.websystique.spring.configuration;

import javax.jms.ConnectionFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration
@EnableJms
public class MessagingListnerConfiguration {

 @Autowired
 ConnectionFactory connectionFactory;
 
 @Bean
 public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
         DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
         factory.setConnectionFactory(connectionFactory);
         factory.setConcurrency("1-1");
         return factory;
 }

}

DefaultJmsListenerContainerFactory is a JmsListenerContainerFactory implementation to build a regular DefaultMessageListenerContainer. You can configure several properties. At the very least, it needs a connection factory. Additionally, we have specified the concurrency [max number of concurrent users/consumers] using setConcurrency(“lowwe-upper”). You can also use setConcurrency(“upper”) which means lower will be 1.

Note : You can also specify the container factory to be used in @JmsLietener annoatation, with containerFactory attribute defining the name of the JmsListenerContainerFactory bean to use. When none is set a JmsListenerContainerFactory bean with name jmsListenerContainerFactory is assumed to be present.

That’s all you need to configure the Message listeners using @JmsListener. Complete codes of both producer & consumer applications is shown below [as well as in download section].

Start the damn applications, Bill

We will be using Apache Active MQ. In case you did not, download ActiveMQ Message Broker, unzip it, goto bin directory and start it using $ ./activemq start.

You can quickly check the WebConsole [available at http://localhost:8161/admin/ with credentials admin/admin.

Application(s) Overview:

In this example, we have two applications A & B trying to communicate with each other via sending Messages on Queues. Although we could have opted for Topic instead, Queue is perfect for this one producer-one consumer scenario.

A sends a message [a pojo object] to a Queue [order-queue] and listens for the response on another Queue [order-response-queue]. B is listening on order-queue, and upon receiving the message, B will send a reply [a pojo response object] to A on order-response-queue. A short but simple example of inter-application communication using JMS.

Below are the main’s of both Producer [A] & Consumer[B] applications.

package com.websystique.spring.configuration;

import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

import com.websystique.spring.model.Product;
import com.websystique.spring.service.ProductService;
import com.websystique.spring.util.BasicUtil;

public class ProducerApplication {

 static final Logger LOG = LoggerFactory.getLogger(ProducerApplication.class);
 
 private static AtomicInteger id = new AtomicInteger();
 
 public static void main(String[] args){
  AbstractApplicationContext context = new AnnotationConfigApplicationContext(
                AppConfig.class);
 
        ProductService productService = (ProductService) context.getBean("productService");
        
        
        Product product = getProduct();
        LOG.info("Application : sending order {}", product);
        productService.sendProduct(product);
        
        try {
   Thread.sleep(60000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
        ((AbstractApplicationContext)context).close();
 }
 
 
 private static Product getProduct(){
  Product p = new Product();
  p.setName("Product "+id.incrementAndGet());
  p.setProductId(BasicUtil.getUniqueId());
  p.setQuantity(2);
  return p;
 }
}
package com.websystique.spring.configuration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

public class ConsumerApplication {

 static final Logger LOG = LoggerFactory.getLogger(ConsumerApplication.class);

 public static void main(String[] args) {
  AbstractApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);

  try {
   Thread.sleep(60000);
  } catch (InterruptedException e) {
   e.printStackTrace();
  }

  ((AbstractApplicationContext) context).close();
 }

}

Start Application A [Producer]. Goto web-console, and click on queue tab. You should see two queues being created with a message on one queue [A has just sent a message] while other empty.

Now start Application B [Consumer], again check queues on web-console, you will see message being de-queued on first queue [B has just processed it] and new message being enqueued [B has sent the response to A] and eventually dequeued [A got the response] on second queue.

Logs of Producer Application:

Jun 26, 2016 2:50:50 PM org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh
INFO: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@5cca2df1: startup date [Sun Jun 26 14:50:50 CEST 2016]; root of context hierarchy
Jun 26, 2016 2:50:51 PM org.springframework.context.support.DefaultLifecycleProcessor start
INFO: Starting beans in phase 2147483647
14:50:51.343 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.414 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54635 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54635 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
14:50:51.548 [DefaultMessageListenerContainer-1] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@6efeb87[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
14:50:51.607 [main] INFO com.websystique.spring.configuration.ProducerApplication - Application : sending order Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2]
14:50:51.607 [main] INFO com.websystique.spring.service.ProductServiceImpl - +++++++++++++++++++++++++++++++++++++++++++++++++++++
14:50:51.607 [main] INFO com.websystique.spring.service.ProductServiceImpl - Application : sending order request Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2]
14:50:51.613 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54636 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54636 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
14:50:51.786 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5769fd07[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
14:50:51.787 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54636
14:50:51.788 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5bef330a[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
14:50:51.789 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54636]
14:50:51.789 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5bef330a[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
14:50:51.790 [main] INFO com.websystique.spring.service.ProductServiceImpl - +++++++++++++++++++++++++++++++++++++++++++++++++++++
14:51:11.417 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
14:51:11.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:51:21.418 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
14:51:21.418 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:51:31.418 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - 30001ms elapsed since last read check.
14:51:31.419 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
14:51:31.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:51:41.419 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
14:51:41.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:51:47.329 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - +++++++++++++++++++++++++++++++++++++++++++++++++++++
14:51:47.330 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : headers received : {timestamp=1466945507330, id=401a34a7-a0f9-8ab1-efd7-5a590987733f, jms_deliveryMode=2, jms_timestamp=1466945507317, jms_redelivered=false, jms_destination=queue://order-response-queue, jms_priority=4, jms_expiration=0, jms_messageId=ID:dragon-54643-1466945506965-1:1:1:1:1}
14:51:47.331 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : response received : InventoryResponse [productId=cf669916-2558-41f3-9f93-05125788173d, returnCode=200, comment=Order Processed successfully]
14:51:47.331 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - +++++++++++++++++++++++++++++++++++++++++++++++++++++
14:51:51.420 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
Jun 26, 2016 2:51:51 PM org.springframework.context.annotation.AnnotationConfigApplicationContext doClose
INFO: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@5cca2df1: startup date [Sun Jun 26 14:50:50 CEST 2016]; root of context hierarchy
Jun 26, 2016 2:51:51 PM org.springframework.context.support.DefaultLifecycleProcessor stop
INFO: Stopping beans in phase 2147483647
14:51:52.332 [DefaultMessageListenerContainer-1] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dragon-54634-1466945451200-1:1:1:1, lastDeliveredSequenceId: 12
14:51:52.338 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6efeb87[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: true took: 0.000 seconds.
14:51:52.338 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f4e7ecf[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
14:51:52.339 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54635
14:51:52.339 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@1a6778eb[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
14:51:52.339 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54635]
14:51:52.340 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@1a6778eb[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]

Logs of Consumer Application:

Jun 26, 2016 2:51:46 PM org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh
INFO: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6c09e6e0: startup date [Sun Jun 26 14:51:46 CEST 2016]; root of context hierarchy
Jun 26, 2016 2:51:46 PM org.springframework.context.support.DefaultLifecycleProcessor start
INFO: Starting beans in phase 2147483647
14:51:47.091 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:51:47.097 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:51:47.099 [main] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:51:47.101 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54644 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
14:51:47.101 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54644 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
14:51:47.166 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f47af7e[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
14:51:47.190 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - ----------------------------------------------------
14:51:47.299 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : headers received : {timestamp=1466945507299, id=04629f2a-99b2-ad9d-dc91-641c5cb834ac, jms_deliveryMode=2, jms_timestamp=1466945451665, jms_redelivered=false, jms_destination=queue://order-queue, jms_priority=4, jms_expiration=0, jms_messageId=ID:dragon-54634-1466945451200-1:2:1:1:1}
14:51:47.299 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : product : Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2]
14:51:47.300 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.service.OrderServiceImpl - Inventory : sending order confirmation InventoryResponse [productId=cf669916-2558-41f3-9f93-05125788173d, returnCode=200, comment=Order Processed successfully]
14:51:47.395 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - ----------------------------------------------------
14:52:07.101 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
14:52:07.103 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:52:17.101 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
14:52:17.101 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:52:27.101 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - 30001ms elapsed since last read check.
14:52:27.102 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
14:52:27.102 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:52:37.106 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10004ms elapsed since last write check.
14:52:37.107 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:52:47.107 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
14:52:47.107 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
Jun 26, 2016 2:52:47 PM org.springframework.context.annotation.AnnotationConfigApplicationContext doClose
INFO: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@6c09e6e0: startup date [Sun Jun 26 14:51:46 CEST 2016]; root of context hierarchy
Jun 26, 2016 2:52:47 PM org.springframework.context.support.DefaultLifecycleProcessor stop
INFO: Stopping beans in phase 2147483647
14:52:47.407 [main] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dragon-54643-1466945506965-1:1:1:1, lastDeliveredSequenceId: 6
14:52:47.414 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f47af7e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: true took: 0.000 seconds.
14:52:47.415 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6c45e3b9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
14:52:47.416 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54644
14:52:47.418 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@c6d1896[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
14:52:47.419 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54644]
14:52:47.419 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@c6d1896[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]

Complete Code for Producer+Consumer Applications

Let’s start with coding Producer Application [A]. Consumer Application [B] is exactly same as A, but listening and sending on opposite queues.

Producer Application

Project Structure

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.websystique.spring</groupId>
 <artifactId>SpringJMSActiveMQProducerEx2</artifactId>
 <version>1.0.0</version>
 <packaging>jar</packaging>

 <name>SpringJMSActiveMQProducerEx2</name>

 <properties>
  <springframework.version>4.3.0.RELEASE</springframework.version>
 </properties>

 <dependencies>
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-core</artifactId>
   <version>${springframework.version}</version>
  </dependency>
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-context</artifactId>
   <version>${springframework.version}</version>
  </dependency>
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-jms</artifactId>
   <version>${springframework.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-spring</artifactId>
   <version>5.13.3</version>
  </dependency>
  <dependency>
   <groupId>ch.qos.logback</groupId>
   <artifactId>logback-classic</artifactId>
   <version>1.1.7</version>
  </dependency>
 </dependencies>

 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.2</version>
    <configuration>
     <source>1.7</source>
     <target>1.7</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>
package com.websystique.spring.configuration;

import java.util.Arrays;

import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class MessagingConfiguration {

 private static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
 
 private static final String ORDER_QUEUE = "order-queue";

 @Bean
 public ActiveMQConnectionFactory connectionFactory(){
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
  connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
  connectionFactory.setTrustedPackages(Arrays.asList("com.websystique.spring","java.util"));
  return connectionFactory;
 }
 
 @Bean 
 public JmsTemplate jmsTemplate(){
  JmsTemplate template = new JmsTemplate();
  template.setConnectionFactory(connectionFactory());
  template.setDefaultDestinationName(ORDER_QUEUE);
  return template;
 }
 
}
package com.websystique.spring.configuration;

import javax.jms.ConnectionFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration
@EnableJms
public class MessagingListnerConfiguration {

 @Autowired
 ConnectionFactory connectionFactory;
 
 @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-1");
        return factory;
    }

}
package com.websystique.spring.configuration;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@ComponentScan(basePackages = "com.websystique.spring")
@Import({MessagingConfiguration.class, MessagingListnerConfiguration.class})
public class AppConfig {
 
 //Put Other Application configuration here.
}
package com.websystique.spring.messaging;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import com.websystique.spring.model.Product;

@Component
public class MessageSender {

 @Autowired
 JmsTemplate jmsTemplate;

 public void sendMessage(final Product product) {

  jmsTemplate.send(new MessageCreator(){
    @Override
    public Message createMessage(Session session) throws JMSException{
     ObjectMessage objectMessage = session.createObjectMessage(product);
     return objectMessage;
    }
   });
 }

}
package com.websystique.spring.messaging;

import javax.jms.JMSException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import com.websystique.spring.model.InventoryResponse;

@Component
public class MessageReceiver {

 static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class);
 private static final String ORDER_RESPONSE_QUEUE = "order-response-queue";
 
 @JmsListener(destination = ORDER_RESPONSE_QUEUE)
 public void receiveMessage(final Message<InventoryResponse> message) throws JMSException {
  LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
  MessageHeaders headers =  message.getHeaders();
  LOG.info("Application : headers received : {}", headers);
  
  InventoryResponse response = message.getPayload();
  LOG.info("Application : response received : {}",response); 
  LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
 }
}
package com.websystique.spring.service;

import com.websystique.spring.model.Product;

public interface ProductService {

 public void sendProduct(Product product);
}

package com.websystique.spring.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.websystique.spring.messaging.MessageSender;
import com.websystique.spring.model.Product;

@Service("productService")
public class ProductServiceImpl implements ProductService{

 static final Logger LOG = LoggerFactory.getLogger(ProductServiceImpl.class);
 
 @Autowired
 MessageSender messageSender;
 
 @Override
 public void sendProduct(Product product) {
  LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
  LOG.info("Application : sending order request {}", product);
  messageSender.sendMessage(product);
  LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
 }
}
package com.websystique.spring.model;

import java.io.Serializable;

public class Product implements Serializable {

 private String productId;
 
 private String name;
 
 private int quantity;

 public String getName() {
  return name;
 }

 public void setName(String name) {
  this.name = name;
 }

 public int getQuantity() {
  return quantity;
 }

 public void setQuantity(int quantity) {
  this.quantity = quantity;
 }

 public String getProductId() {
  return productId;
 }

 public void setProductId(String productId) {
  this.productId = productId;
 }

 @Override
 public int hashCode() {
  final int prime = 31;
  int result = 1;
  result = prime * result + ((productId == null) ? 0 : productId.hashCode());
  return result;
 }

 @Override
 public boolean equals(Object obj) {
  if (this == obj)
   return true;
  if (obj == null)
   return false;
  if (getClass() != obj.getClass())
   return false;
  Product other = (Product) obj;
  if (productId == null) {
   if (other.productId != null)
    return false;
  } else if (!productId.equals(other.productId))
   return false;
  return true;
 }

 @Override
 public String toString() {
  return "Product [productId=" + productId + ", name=" + name + ", quantity=" + quantity + "]";
 }
}
package com.websystique.spring.model;

import java.io.Serializable;

public class InventoryResponse implements Serializable{

 private String productId;
 private int returnCode;
 private String comment;

 public String getProductId() {
  return productId;
 }
 public void setProductId(String productId) {
  this.productId = productId;
 }
 public int getReturnCode() {
  return returnCode;
 }
 public void setReturnCode(int returnCode) {
  this.returnCode = returnCode;
 }
 public String getComment() {
  return comment;
 }
 public void setComment(String comment) {
  this.comment = comment;
 }
 @Override
 public String toString() {
  return "InventoryResponse [productId=" + productId + ", returnCode=" + returnCode + ", comment=" + comment
    + "]";
 }
}
package com.websystique.spring.util;

import java.util.UUID;

public class BasicUtil {

 public static String getUniqueId(){
  return UUID.randomUUID().toString();
 }
}

Consumer Application

Project Structure

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <modelVersion>4.0.0</modelVersion>

 <groupId>com.websystique.spring</groupId>
 <artifactId>SpringJMSActiveMQConsumerEx2</artifactId>
 <version>1.0.0</version>
 <packaging>jar</packaging>

 <name>SpringJMSActiveMQConsumerEx2</name>

 <properties>
  <springframework.version>4.3.0.RELEASE</springframework.version>
 </properties>

 <dependencies>
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-core</artifactId>
   <version>${springframework.version}</version>
  </dependency>
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-context</artifactId>
   <version>${springframework.version}</version>
  </dependency>
  <dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-jms</artifactId>
   <version>${springframework.version}</version>
  </dependency>
  <dependency>
   <groupId>org.apache.activemq</groupId>
   <artifactId>activemq-spring</artifactId>
   <version>5.13.3</version>
  </dependency>
  <dependency>
   <groupId>ch.qos.logback</groupId>
   <artifactId>logback-classic</artifactId>
   <version>1.1.7</version>
  </dependency>
 </dependencies>

 <build>
  <plugins>
   <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>3.2</version>
    <configuration>
     <source>1.7</source>
     <target>1.7</target>
    </configuration>
   </plugin>
  </plugins>
 </build>
</project>
package com.websystique.spring.configuration;

import java.util.Arrays;

import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class MessagingConfiguration {

 private static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
 
 private static final String ORDER_RESPONSE_QUEUE = "order-response-queue";

 @Bean
 public ActiveMQConnectionFactory connectionFactory(){
  ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
  connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
  connectionFactory.setTrustedPackages(Arrays.asList("com.websystique.spring","java.util"));
  return connectionFactory;
 }
 
 @Bean 
 public JmsTemplate jmsTemplate(){
  JmsTemplate template = new JmsTemplate();
  template.setConnectionFactory(connectionFactory());
  template.setDefaultDestinationName(ORDER_RESPONSE_QUEUE);
  return template;
 }
 
}
package com.websystique.spring.configuration;

import javax.jms.ConnectionFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration
@EnableJms
public class MessagingListnerConfiguration {

 @Autowired
 ConnectionFactory connectionFactory;

 @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-1");
        return factory;
    }

}

package com.websystique.spring.configuration;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@ComponentScan(basePackages = "com.websystique.spring")
@Import({MessagingConfiguration.class, MessagingListnerConfiguration.class})
public class AppConfig {
 
 //Put Other Application configuration here.
 
}
package com.websystique.spring.messaging;

import javax.jms.JMSException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import com.websystique.spring.model.Product;
import com.websystique.spring.service.OrderService;

@Component
public class MessageReceiver {

 static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class);
 private static final String ORDER_QUEUE = "order-queue";
 
 @Autowired
 OrderService orderService;

 @JmsListener(destination = ORDER_QUEUE)
 public void receiveMessage(final Message<Product> message) throws JMSException {
  LOG.info("----------------------------------------------------");
  MessageHeaders headers =  message.getHeaders();
  LOG.info("Application : headers received : {}", headers);
  
  Product product = message.getPayload();
  LOG.info("Application : product : {}",product); 

  orderService.processOrder(product);
  LOG.info("----------------------------------------------------");

 }
 
}

package com.websystique.spring.messaging;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import com.websystique.spring.model.InventoryResponse;

@Component
public class MessageSender {

 @Autowired
 JmsTemplate jmsTemplate;

 public void sendMessage(final InventoryResponse inventoryResponse) {

  jmsTemplate.send(new MessageCreator(){
    @Override
    public Message createMessage(Session session) throws JMSException{
     ObjectMessage objectMessage = session.createObjectMessage(inventoryResponse);
     return objectMessage;
    }
   });
 }

}
package com.websystique.spring.service;

import com.websystique.spring.model.Product;

public interface OrderService {

 public void processOrder(Product product);
 
 
}
package com.websystique.spring.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.websystique.spring.messaging.MessageSender;
import com.websystique.spring.model.InventoryResponse;
import com.websystique.spring.model.Product;

@Service("orderService")
public class OrderServiceImpl implements OrderService{

 static final Logger LOG = LoggerFactory.getLogger(OrderServiceImpl.class);
 
 @Autowired
 MessageSender messageSender;
 
 @Override
 public void processOrder(Product product) {
  
  InventoryResponse response = prepareResponse(product);
  LOG.info("Inventory : sending order confirmation {}", response);
  messageSender.sendMessage(response);
 }
 
 private InventoryResponse prepareResponse(Product product){
  InventoryResponse response = new InventoryResponse();
  response.setProductId(product.getProductId());
  response.setReturnCode(200);
  response.setComment("Order Processed successfully");
  return response;
 }
}

That’s it. As we saw, SPRING JMS support is easy to use. Feel free to Comment, and suggest improvements.

Download Source Code

Producer Application :

Consumer Application :


References

View Comments

  • Hello, Jack speaking. I've bookmarked your site and make it a habit to check in daily. The information is top-notch, and I appreciate your efforts.

  • Hey, cool post You can check if there's a problem with your website with Internet Explorer. Because of this issue, many readers will overlook your excellent writing because IE is still the most popular browser.

  • Hi, The above example using ActiveMQ is working for me.
    But, I need to migrate to JCAPS using springboot. I am not able to get the connectionFactory using the JCAPS jar and create the connection.

    I have implemented a standalone java program. It is working well for a single message.

    Please help me on how to get the connectionfactory from the container and do the process inside the listener.

    Thank you in advance.

Share
Published by

Recent Posts

Spring Boot + AngularJS + Spring Data + JPA CRUD App Example

In this post we will be developing a full-blown CRUD application using Spring Boot, AngularJS, Spring Data, JPA/Hibernate and MySQL,…

8 years ago

Spring Boot Rest API Example

Spring Boot complements Spring REST support by providing default dependencies/converters out of the box. Writing RESTful services in Spring Boot…

8 years ago

Spring Boot WAR deployment example

Being able to start the application as standalone jar is great, but sometimes it might not be possible to run…

8 years ago

Spring Boot Introduction + hello world example

Spring framework has taken the software development industry by storm. Dependency Injection, rock solid MVC framework, Transaction management, messaging support,…

8 years ago

Secure Spring REST API using OAuth2

Let's secure our Spring REST API using OAuth2 this time, a simple guide showing what is required to secure a…

8 years ago

AngularJS+Spring Security using Basic Authentication

This post shows how an AngularJS application can consume a REST API which is secured with Basic authentication using Spring…

8 years ago