Spring 4+JMS+ActiveMQ example with @JmsListener & @EnableJms

In the previous post over Spring with JMS, we have seen how applications can communicate using JMS, leveraging Spring’s support for JMS. This post goes a step further, and shows an alternative to javax.jms.MessageListener which allows us to create MessageListeners using plain POJO’s, thanks to Spring’s very own @JmsListener, used in combination with JmsListenerConnectionFactory and @EnableJMS.


Following technologies being used:

  • Spring 4.3.0.RELEASE
  • Spring JMS 4.3.0.RELEASE
  • ActiveMQ 5.13.3
  • Maven 3
  • JDK 1.7
  • Tomcat 8.0.21
  • Eclipse MARS.1 Release 4.5.1
  • logback 1.1.7

Let’s begin by configuring the listener first.

package com.websystique.spring.messaging;

import javax.jms.JMSException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import com.websystique.spring.model.InventoryResponse;

@Component
public class MessageReceiver {

	static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class);
	private static final String ORDER_RESPONSE_QUEUE = "order-response-queue";
	
	@JmsListener(destination = ORDER_RESPONSE_QUEUE)
	public void receiveMessage(final Message<InventoryResponse> message) throws JMSException {
		LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
		MessageHeaders headers =  message.getHeaders();
		LOG.info("Application : headers received : {}", headers);
		
		InventoryResponse response = message.getPayload();
		LOG.info("Application : response received : {}",response);	
		LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
	}
}

Shown above is a POJO, which does not implement javax.jms.MessageListener, just annotated with @JmsListener which marks a method to be the target of a JMS message listener on the specified destination(). What we still have to do is to :

  • 1. Configure a message-listener-container [ with JmsListenerContainerFactory] : which listens on a destination [can be the one used with @JmsListener] and when any message arrives on this destination, it retrieved that message and passes to the bean annotated with @JmsListener for that destination.
  • 2. Use @EnableJms which enables detection of JmsListener annotations on any Spring-managed bean in the container.

Below shown is the configuration we have just discussed:

package com.websystique.spring.configuration;

import javax.jms.ConnectionFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration
@EnableJms
public class MessagingListnerConfiguration {

	@Autowired
	ConnectionFactory connectionFactory;
	
	@Bean
	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        	DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        	factory.setConnectionFactory(connectionFactory);
        	factory.setConcurrency("1-1");
        	return factory;
	}

}

DefaultJmsListenerContainerFactory is a JmsListenerContainerFactory implementation to build a regular DefaultMessageListenerContainer. You can configure several properties. At the very least, it needs a connection factory. Additionally, we have specified the concurrency [max number of concurrent users/consumers] using setConcurrency(“lowwe-upper”). You can also use setConcurrency(“upper”) which means lower will be 1.

Note : You can also specify the container factory to be used in @JmsLietener annoatation, with containerFactory attribute defining the name of the JmsListenerContainerFactory bean to use. When none is set a JmsListenerContainerFactory bean with name jmsListenerContainerFactory is assumed to be present.

That’s all you need to configure the Message listeners using @JmsListener. Complete codes of both producer & consumer applications is shown below [as well as in download section].

Start the damn applications, Bill

We will be using Apache Active MQ. In case you did not, download ActiveMQ Message Broker, unzip it, goto bin directory and start it using $ ./activemq start.

You can quickly check the WebConsole [available at http://localhost:8161/admin/ with credentials admin/admin.
SpringJMSEX2_img1

Application(s) Overview:

In this example, we have two applications A & B trying to communicate with each other via sending Messages on Queues. Although we could have opted for Topic instead, Queue is perfect for this one producer-one consumer scenario.

A sends a message [a pojo object] to a Queue [order-queue] and listens for the response on another Queue [order-response-queue]. B is listening on order-queue, and upon receiving the message, B will send a reply [a pojo response object] to A on order-response-queue. A short but simple example of inter-application communication using JMS.

Below are the main’s of both Producer [A] & Consumer[B] applications.

package com.websystique.spring.configuration;

import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

import com.websystique.spring.model.Product;
import com.websystique.spring.service.ProductService;
import com.websystique.spring.util.BasicUtil;

public class ProducerApplication {

	static final Logger LOG = LoggerFactory.getLogger(ProducerApplication.class);
	
	private static AtomicInteger id = new AtomicInteger();
	
	public static void main(String[] args){
		AbstractApplicationContext context = new AnnotationConfigApplicationContext(
                AppConfig.class);
 
        ProductService productService = (ProductService) context.getBean("productService");
        
        
        Product product = getProduct();
        LOG.info("Application : sending order {}", product);
        productService.sendProduct(product);
        
        try {
			Thread.sleep(60000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
        ((AbstractApplicationContext)context).close();
	}
	
	
	private static Product getProduct(){
		Product p = new Product();
		p.setName("Product "+id.incrementAndGet());
		p.setProductId(BasicUtil.getUniqueId());
		p.setQuantity(2);
		return p;
	}
}
package com.websystique.spring.configuration;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.support.AbstractApplicationContext;

public class ConsumerApplication {

	static final Logger LOG = LoggerFactory.getLogger(ConsumerApplication.class);

	public static void main(String[] args) {
		AbstractApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);

		try {
			Thread.sleep(60000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}

		((AbstractApplicationContext) context).close();
	}

}

Start Application A [Producer]. Goto web-console, and click on queue tab. You should see two queues being created with a message on one queue [A has just sent a message] while other empty.

SpringJMSEX2_img4

Now start Application B [Consumer], again check queues on web-console, you will see message being de-queued on first queue [B has just processed it] and new message being enqueued [B has sent the response to A] and eventually dequeued [A got the response] on second queue.
SpringJMSEX2_img5

Logs of Producer Application:

Jun 26, 2016 2:50:50 PM org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh
INFO: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@5cca2df1: startup date [Sun Jun 26 14:50:50 CEST 2016]; root of context hierarchy
Jun 26, 2016 2:50:51 PM org.springframework.context.support.DefaultLifecycleProcessor start
INFO: Starting beans in phase 2147483647
14:50:51.343 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.414 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54635 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54635 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
14:50:51.548 [DefaultMessageListenerContainer-1] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@6efeb87[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
14:50:51.607 [main] INFO com.websystique.spring.configuration.ProducerApplication - Application : sending order Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2]
14:50:51.607 [main] INFO com.websystique.spring.service.ProductServiceImpl - +++++++++++++++++++++++++++++++++++++++++++++++++++++
14:50:51.607 [main] INFO com.websystique.spring.service.ProductServiceImpl - Application : sending order request Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2]
14:50:51.613 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54636 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54636 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
14:50:51.786 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5769fd07[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
14:50:51.787 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54636
14:50:51.788 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5bef330a[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
14:50:51.789 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54636]
14:50:51.789 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5bef330a[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
14:50:51.790 [main] INFO com.websystique.spring.service.ProductServiceImpl - +++++++++++++++++++++++++++++++++++++++++++++++++++++
14:51:11.417 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
14:51:11.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:51:21.418 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
14:51:21.418 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:51:31.418 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - 30001ms elapsed since last read check.
14:51:31.419 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
14:51:31.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:51:41.419 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
14:51:41.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:51:47.329 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - +++++++++++++++++++++++++++++++++++++++++++++++++++++
14:51:47.330 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : headers received : {timestamp=1466945507330, id=401a34a7-a0f9-8ab1-efd7-5a590987733f, jms_deliveryMode=2, jms_timestamp=1466945507317, jms_redelivered=false, jms_destination=queue://order-response-queue, jms_priority=4, jms_expiration=0, jms_messageId=ID:dragon-54643-1466945506965-1:1:1:1:1}
14:51:47.331 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : response received : InventoryResponse [productId=cf669916-2558-41f3-9f93-05125788173d, returnCode=200, comment=Order Processed successfully]
14:51:47.331 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - +++++++++++++++++++++++++++++++++++++++++++++++++++++
14:51:51.420 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
Jun 26, 2016 2:51:51 PM org.springframework.context.annotation.AnnotationConfigApplicationContext doClose
INFO: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@5cca2df1: startup date [Sun Jun 26 14:50:50 CEST 2016]; root of context hierarchy
Jun 26, 2016 2:51:51 PM org.springframework.context.support.DefaultLifecycleProcessor stop
INFO: Stopping beans in phase 2147483647
14:51:52.332 [DefaultMessageListenerContainer-1] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dragon-54634-1466945451200-1:1:1:1, lastDeliveredSequenceId: 12
14:51:52.338 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6efeb87[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: true took: 0.000 seconds.
14:51:52.338 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f4e7ecf[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
14:51:52.339 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54635
14:51:52.339 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@1a6778eb[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
14:51:52.339 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54635]
14:51:52.340 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@1a6778eb[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]

Logs of Consumer Application:

Jun 26, 2016 2:51:46 PM org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh
INFO: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6c09e6e0: startup date [Sun Jun 26 14:51:46 CEST 2016]; root of context hierarchy
Jun 26, 2016 2:51:46 PM org.springframework.context.support.DefaultLifecycleProcessor start
INFO: Starting beans in phase 2147483647
14:51:47.091 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:51:47.097 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:51:47.099 [main] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]}
14:51:47.101 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54644 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807}
14:51:47.101 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54644 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600}
14:51:47.166 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f47af7e[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
14:51:47.190 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - ----------------------------------------------------
14:51:47.299 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : headers received : {timestamp=1466945507299, id=04629f2a-99b2-ad9d-dc91-641c5cb834ac, jms_deliveryMode=2, jms_timestamp=1466945451665, jms_redelivered=false, jms_destination=queue://order-queue, jms_priority=4, jms_expiration=0, jms_messageId=ID:dragon-54634-1466945451200-1:2:1:1:1}
14:51:47.299 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : product : Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2]
14:51:47.300 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.service.OrderServiceImpl - Inventory : sending order confirmation InventoryResponse [productId=cf669916-2558-41f3-9f93-05125788173d, returnCode=200, comment=Order Processed successfully]
14:51:47.395 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - ----------------------------------------------------
14:52:07.101 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
14:52:07.103 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:52:17.101 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check.
14:52:17.101 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:52:27.101 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - 30001ms elapsed since last read check.
14:52:27.102 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
14:52:27.102 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:52:37.106 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10004ms elapsed since last write check.
14:52:37.107 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
14:52:47.107 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check.
14:52:47.107 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616]
Jun 26, 2016 2:52:47 PM org.springframework.context.annotation.AnnotationConfigApplicationContext doClose
INFO: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@6c09e6e0: startup date [Sun Jun 26 14:51:46 CEST 2016]; root of context hierarchy
Jun 26, 2016 2:52:47 PM org.springframework.context.support.DefaultLifecycleProcessor stop
INFO: Stopping beans in phase 2147483647
14:52:47.407 [main] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dragon-54643-1466945506965-1:1:1:1, lastDeliveredSequenceId: 6
14:52:47.414 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f47af7e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: true took: 0.000 seconds.
14:52:47.415 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6c45e3b9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds.
14:52:47.416 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54644
14:52:47.418 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@c6d1896[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
14:52:47.419 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54644]
14:52:47.419 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@c6d1896[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]

Complete Code for Producer+Consumer Applications

Let’s start with coding Producer Application [A]. Consumer Application [B] is exactly same as A, but listening and sending on opposite queues.

Producer Application

Project Structure
SpringJMSEX2_img2

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.websystique.spring</groupId>
	<artifactId>SpringJMSActiveMQProducerEx2</artifactId>
	<version>1.0.0</version>
	<packaging>jar</packaging>

	<name>SpringJMSActiveMQProducerEx2</name>

	<properties>
		<springframework.version>4.3.0.RELEASE</springframework.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${springframework.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${springframework.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>${springframework.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-spring</artifactId>
			<version>5.13.3</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.1.7</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.2</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>
package com.websystique.spring.configuration;

import java.util.Arrays;

import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class MessagingConfiguration {

	private static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
	
	private static final String ORDER_QUEUE = "order-queue";

	@Bean
	public ActiveMQConnectionFactory connectionFactory(){
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
		connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
		connectionFactory.setTrustedPackages(Arrays.asList("com.websystique.spring","java.util"));
		return connectionFactory;
	}
	
	@Bean 
	public JmsTemplate jmsTemplate(){
		JmsTemplate template = new JmsTemplate();
		template.setConnectionFactory(connectionFactory());
		template.setDefaultDestinationName(ORDER_QUEUE);
		return template;
	}
	
}
package com.websystique.spring.configuration;

import javax.jms.ConnectionFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration
@EnableJms
public class MessagingListnerConfiguration {

	@Autowired
	ConnectionFactory connectionFactory;
	
	@Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-1");
        return factory;
    }

}
package com.websystique.spring.configuration;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@ComponentScan(basePackages = "com.websystique.spring")
@Import({MessagingConfiguration.class, MessagingListnerConfiguration.class})
public class AppConfig {
	
	//Put Other Application configuration here.
}
package com.websystique.spring.messaging;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import com.websystique.spring.model.Product;

@Component
public class MessageSender {

	@Autowired
	JmsTemplate jmsTemplate;

	public void sendMessage(final Product product) {

		jmsTemplate.send(new MessageCreator(){
				@Override
				public Message createMessage(Session session) throws JMSException{
					ObjectMessage objectMessage = session.createObjectMessage(product);
					return objectMessage;
				}
			});
	}

}
package com.websystique.spring.messaging;

import javax.jms.JMSException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import com.websystique.spring.model.InventoryResponse;

@Component
public class MessageReceiver {

	static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class);
	private static final String ORDER_RESPONSE_QUEUE = "order-response-queue";
	
	@JmsListener(destination = ORDER_RESPONSE_QUEUE)
	public void receiveMessage(final Message<InventoryResponse> message) throws JMSException {
		LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
		MessageHeaders headers =  message.getHeaders();
		LOG.info("Application : headers received : {}", headers);
		
		InventoryResponse response = message.getPayload();
		LOG.info("Application : response received : {}",response);	
		LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
	}
}
package com.websystique.spring.service;

import com.websystique.spring.model.Product;

public interface ProductService {

	public void sendProduct(Product product);
}

package com.websystique.spring.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.websystique.spring.messaging.MessageSender;
import com.websystique.spring.model.Product;

@Service("productService")
public class ProductServiceImpl implements ProductService{

	static final Logger LOG = LoggerFactory.getLogger(ProductServiceImpl.class);
	
	@Autowired
	MessageSender messageSender;
	
	@Override
	public void sendProduct(Product product) {
		LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
		LOG.info("Application : sending order request {}", product);
		messageSender.sendMessage(product);
		LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++");
	}
}
package com.websystique.spring.model;

import java.io.Serializable;

public class Product implements Serializable {

	private String productId;
	
	private String name;
	
	private int quantity;

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public int getQuantity() {
		return quantity;
	}

	public void setQuantity(int quantity) {
		this.quantity = quantity;
	}

	public String getProductId() {
		return productId;
	}

	public void setProductId(String productId) {
		this.productId = productId;
	}

	@Override
	public int hashCode() {
		final int prime = 31;
		int result = 1;
		result = prime * result + ((productId == null) ? 0 : productId.hashCode());
		return result;
	}

	@Override
	public boolean equals(Object obj) {
		if (this == obj)
			return true;
		if (obj == null)
			return false;
		if (getClass() != obj.getClass())
			return false;
		Product other = (Product) obj;
		if (productId == null) {
			if (other.productId != null)
				return false;
		} else if (!productId.equals(other.productId))
			return false;
		return true;
	}

	@Override
	public String toString() {
		return "Product [productId=" + productId + ", name=" + name + ", quantity=" + quantity + "]";
	}
}
package com.websystique.spring.model;

import java.io.Serializable;

public class InventoryResponse implements Serializable{

	private String productId;
	private int returnCode;
	private String comment;

	public String getProductId() {
		return productId;
	}
	public void setProductId(String productId) {
		this.productId = productId;
	}
	public int getReturnCode() {
		return returnCode;
	}
	public void setReturnCode(int returnCode) {
		this.returnCode = returnCode;
	}
	public String getComment() {
		return comment;
	}
	public void setComment(String comment) {
		this.comment = comment;
	}
	@Override
	public String toString() {
		return "InventoryResponse [productId=" + productId + ", returnCode=" + returnCode + ", comment=" + comment
				+ "]";
	}
}
package com.websystique.spring.util;

import java.util.UUID;

public class BasicUtil {

	public static String getUniqueId(){
		return UUID.randomUUID().toString();
	}
}

Consumer Application

Project Structure
SpringJMSEX2_img3
pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.websystique.spring</groupId>
	<artifactId>SpringJMSActiveMQConsumerEx2</artifactId>
	<version>1.0.0</version>
	<packaging>jar</packaging>

	<name>SpringJMSActiveMQConsumerEx2</name>

	<properties>
		<springframework.version>4.3.0.RELEASE</springframework.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>${springframework.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-context</artifactId>
			<version>${springframework.version}</version>
		</dependency>
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-jms</artifactId>
			<version>${springframework.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.activemq</groupId>
			<artifactId>activemq-spring</artifactId>
			<version>5.13.3</version>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
			<version>1.1.7</version>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.2</version>
				<configuration>
					<source>1.7</source>
					<target>1.7</target>
				</configuration>
			</plugin>
		</plugins>
	</build>
</project>
package com.websystique.spring.configuration;

import java.util.Arrays;

import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.core.JmsTemplate;

@Configuration
public class MessagingConfiguration {

	private static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
	
	private static final String ORDER_RESPONSE_QUEUE = "order-response-queue";

	@Bean
	public ActiveMQConnectionFactory connectionFactory(){
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
		connectionFactory.setBrokerURL(DEFAULT_BROKER_URL);
		connectionFactory.setTrustedPackages(Arrays.asList("com.websystique.spring","java.util"));
		return connectionFactory;
	}
	
	@Bean 
	public JmsTemplate jmsTemplate(){
		JmsTemplate template = new JmsTemplate();
		template.setConnectionFactory(connectionFactory());
		template.setDefaultDestinationName(ORDER_RESPONSE_QUEUE);
		return template;
	}
	
}
package com.websystique.spring.configuration;

import javax.jms.ConnectionFactory;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

@Configuration
@EnableJms
public class MessagingListnerConfiguration {

	@Autowired
	ConnectionFactory connectionFactory;

	@Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrency("1-1");
        return factory;
    }

}

package com.websystique.spring.configuration;

import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@ComponentScan(basePackages = "com.websystique.spring")
@Import({MessagingConfiguration.class, MessagingListnerConfiguration.class})
public class AppConfig {
	
	//Put Other Application configuration here.
	
}
package com.websystique.spring.messaging;

import javax.jms.JMSException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import com.websystique.spring.model.Product;
import com.websystique.spring.service.OrderService;

@Component
public class MessageReceiver {

	static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class);
	private static final String ORDER_QUEUE = "order-queue";
	
	@Autowired
	OrderService orderService;

	@JmsListener(destination = ORDER_QUEUE)
	public void receiveMessage(final Message<Product> message) throws JMSException {
		LOG.info("----------------------------------------------------");
		MessageHeaders headers =  message.getHeaders();
		LOG.info("Application : headers received : {}", headers);
		
		Product product = message.getPayload();
		LOG.info("Application : product : {}",product);	

		orderService.processOrder(product);
		LOG.info("----------------------------------------------------");

	}
	
}

package com.websystique.spring.messaging;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Component;

import com.websystique.spring.model.InventoryResponse;

@Component
public class MessageSender {

	@Autowired
	JmsTemplate jmsTemplate;

	public void sendMessage(final InventoryResponse inventoryResponse) {

		jmsTemplate.send(new MessageCreator(){
				@Override
				public Message createMessage(Session session) throws JMSException{
					ObjectMessage objectMessage = session.createObjectMessage(inventoryResponse);
					return objectMessage;
				}
			});
	}

}
package com.websystique.spring.service;

import com.websystique.spring.model.Product;

public interface OrderService {

	public void processOrder(Product product);
	
	
}
package com.websystique.spring.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import com.websystique.spring.messaging.MessageSender;
import com.websystique.spring.model.InventoryResponse;
import com.websystique.spring.model.Product;

@Service("orderService")
public class OrderServiceImpl implements OrderService{

	static final Logger LOG = LoggerFactory.getLogger(OrderServiceImpl.class);
	
	@Autowired
	MessageSender messageSender;
	
	@Override
	public void processOrder(Product product) {
		
		InventoryResponse response = prepareResponse(product);
		LOG.info("Inventory : sending order confirmation {}", response);
		messageSender.sendMessage(response);
	}
	
	private InventoryResponse prepareResponse(Product product){
		InventoryResponse response = new InventoryResponse();
		response.setProductId(product.getProductId());
		response.setReturnCode(200);
		response.setComment("Order Processed successfully");
		return response;
	}
}

That’s it. As we saw, SPRING JMS support is easy to use. Feel free to Comment, and suggest improvements.

Download Source Code

Producer Application :

Consumer Application :



References

If you like tutorials on this site, why not take a step further and connect me on Facebook , Google Plus & Twitter as well? I would love to hear your thoughts on these articles, it will help me improve further our learning process.

If you appreciate the effort I have put in this learning site, help me improve the visibility of this site towards global audience by sharing and linking this site from within and beyond your network. You & your friends can always link my site from your site on www.websystique.com, and share the learning.

After all, we are here to learn together, aren’t we?