In the previous post over Spring with JMS, we have seen how applications can communicate among each other using JMS, leveraging Spring’s support for JMS. This post goes a step further, and shows an alternative to javax.jms.MessageListener which allows us to create MessageListeners using plain POJO’s, thanks to Spring’s very own @JmsListener, used in combination with JmsListenerConnectionFactory and @EnableJMS. Let’s get started.
- Spring Boot+AngularJS+Spring Data+Hibernate+MySQL CRUD App
- Spring Boot WAR deployment example
- Spring Boot REST API Tutorial
- Spring Boot Introduction + Hello World Example
- Secure Spring REST API using OAuth2
- AngularJS+Spring Security using Basic Authentication
- Secure Spring REST API using Basic Authentication
- Spring 4 MVC+JPA2+Hibernate Many-to-many Example
- Spring 4 Caching Annotations Tutorial
- Spring 4 Cache Tutorial with EhCache
- Spring 4 Email Template Library Example
- Spring 4 Email With Attachment Tutorial
- Spring 4 Email Integration Tutorial
- Spring MVC 4+JMS+ActiveMQ Integration Example
- Spring 4+JMS+ActiveMQ Integration Example
- Spring MVC 4+Hibernate 4 Many-to-many JSP Example
- Spring MVC 4+AngularJS Example
- Spring MVC 4+Hibernate 4+MySQL+Maven integration example using annotations
- Spring MVC4 FileUpload-Download Hibernate+MySQL Example
- TestNG Mockito Integration Example Stubbing Void Methods
- Maven surefire plugin and TestNG Example
- Spring MVC 4 Form Validation and Resource Handling
Following technologies being used:
- Spring 4.3.0.RELEASE
- Spring JMS 4.3.0.RELEASE
- ActiveMQ 5.13.3
- Maven 3
- JDK 1.7
- Tomcat 8.0.21
- Eclipse MARS.1 Release 4.5.1
- logback 1.1.7
Let’s begin by configuring the listener first.
package com.websystique.spring.messaging; import javax.jms.JMSException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; import com.websystique.spring.model.InventoryResponse; @Component public class MessageReceiver { static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class); private static final String ORDER_RESPONSE_QUEUE = "order-response-queue"; @JmsListener(destination = ORDER_RESPONSE_QUEUE) public void receiveMessage(final Message<InventoryResponse> message) throws JMSException { LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); MessageHeaders headers = message.getHeaders(); LOG.info("Application : headers received : {}", headers); InventoryResponse response = message.getPayload(); LOG.info("Application : response received : {}",response); LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); } }
Shown above is a POJO, which does not implement javax.jms.MessageListener, just annotated with @JmsListener which marks a method to be the target of a JMS message listener on the specified destination(). What we still have to do is to :
- 1. Configure a message-listener-container [ with JmsListenerContainerFactory] : which listens on a destination [can be the one used with @JmsListener] and when any message arrives on this destination, it retrieved that message and passes to the bean annotated with @JmsListener for that destination.
- 2. Use @EnableJms which enables detection of JmsListener annotations on any Spring-managed bean in the container.
Below shown is the configuration we have just discussed:
package com.websystique.spring.configuration; import javax.jms.ConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; @Configuration @EnableJms public class MessagingListnerConfiguration { @Autowired ConnectionFactory connectionFactory; @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("1-1"); return factory; } }
DefaultJmsListenerContainerFactory is a JmsListenerContainerFactory implementation to build a regular DefaultMessageListenerContainer. You can configure several properties. At the very least, it needs a connection factory. Additionally, we have specified the concurrency [max number of concurrent users/consumers] using setConcurrency(“lowwe-upper”). You can also use setConcurrency(“upper”) which means lower will be 1.
Note : You can also specify the container factory to be used in @JmsLietener annoatation, with containerFactory attribute defining the name of the JmsListenerContainerFactory bean to use. When none is set a JmsListenerContainerFactory bean with name jmsListenerContainerFactory is assumed to be present.
That’s all you need to configure the Message listeners using @JmsListener. Complete codes of both producer & consumer applications is shown below [as well as in download section].
Start the damn applications, Bill
We will be using Apache Active MQ. In case you did not, download ActiveMQ Message Broker, unzip it, goto bin directory and start it using $ ./activemq start.
You can quickly check the WebConsole [available at http://localhost:8161/admin/
with credentials admin/admin
.
Application(s) Overview:
In this example, we have two applications A & B trying to communicate with each other via sending Messages on Queues. Although we could have opted for Topic instead, Queue is perfect for this one producer-one consumer scenario.
A sends a message [a pojo object] to a Queue [order-queue] and listens for the response on another Queue [order-response-queue]. B is listening on order-queue, and upon receiving the message, B will send a reply [a pojo response object] to A on order-response-queue. A short but simple example of inter-application communication using JMS.
Below are the main’s of both Producer [A] & Consumer[B] applications.
package com.websystique.spring.configuration; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import com.websystique.spring.model.Product; import com.websystique.spring.service.ProductService; import com.websystique.spring.util.BasicUtil; public class ProducerApplication { static final Logger LOG = LoggerFactory.getLogger(ProducerApplication.class); private static AtomicInteger id = new AtomicInteger(); public static void main(String[] args){ AbstractApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class); ProductService productService = (ProductService) context.getBean("productService"); Product product = getProduct(); LOG.info("Application : sending order {}", product); productService.sendProduct(product); try { Thread.sleep(60000); } catch (InterruptedException e) { e.printStackTrace(); } ((AbstractApplicationContext)context).close(); } private static Product getProduct(){ Product p = new Product(); p.setName("Product "+id.incrementAndGet()); p.setProductId(BasicUtil.getUniqueId()); p.setQuantity(2); return p; } }
package com.websystique.spring.configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.support.AbstractApplicationContext; public class ConsumerApplication { static final Logger LOG = LoggerFactory.getLogger(ConsumerApplication.class); public static void main(String[] args) { AbstractApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); try { Thread.sleep(60000); } catch (InterruptedException e) { e.printStackTrace(); } ((AbstractApplicationContext) context).close(); } }
Start Application A [Producer]. Goto web-console, and click on queue tab. You should see two queues being created with a message on one queue [A has just sent a message] while other empty.
Now start Application B [Consumer], again check queues on web-console, you will see message being de-queued on first queue [B has just processed it] and new message being enqueued [B has sent the response to A] and eventually dequeued [A got the response] on second queue.
Logs of Producer Application:
Jun 26, 2016 2:50:50 PM org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh INFO: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@5cca2df1: startup date [Sun Jun 26 14:50:50 CEST 2016]; root of context hierarchy Jun 26, 2016 2:50:51 PM org.springframework.context.support.DefaultLifecycleProcessor start INFO: Starting beans in phase 2147483647 14:50:51.343 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.414 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54635 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807} 14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54635 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} 14:50:51.548 [DefaultMessageListenerContainer-1] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@6efeb87[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 14:50:51.607 [main] INFO com.websystique.spring.configuration.ProducerApplication - Application : sending order Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2] 14:50:51.607 [main] INFO com.websystique.spring.service.ProductServiceImpl - +++++++++++++++++++++++++++++++++++++++++++++++++++++ 14:50:51.607 [main] INFO com.websystique.spring.service.ProductServiceImpl - Application : sending order request Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2] 14:50:51.613 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54636 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807} 14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54636 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} 14:50:51.786 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5769fd07[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds. 14:50:51.787 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54636 14:50:51.788 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5bef330a[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 14:50:51.789 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54636] 14:50:51.789 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5bef330a[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1] 14:50:51.790 [main] INFO com.websystique.spring.service.ProductServiceImpl - +++++++++++++++++++++++++++++++++++++++++++++++++++++ 14:51:11.417 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check. 14:51:11.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:51:21.418 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 14:51:21.418 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:51:31.418 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - 30001ms elapsed since last read check. 14:51:31.419 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 14:51:31.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:51:41.419 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check. 14:51:41.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:51:47.329 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - +++++++++++++++++++++++++++++++++++++++++++++++++++++ 14:51:47.330 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : headers received : {timestamp=1466945507330, id=401a34a7-a0f9-8ab1-efd7-5a590987733f, jms_deliveryMode=2, jms_timestamp=1466945507317, jms_redelivered=false, jms_destination=queue://order-response-queue, jms_priority=4, jms_expiration=0, jms_messageId=ID:dragon-54643-1466945506965-1:1:1:1:1} 14:51:47.331 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : response received : InventoryResponse [productId=cf669916-2558-41f3-9f93-05125788173d, returnCode=200, comment=Order Processed successfully] 14:51:47.331 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - +++++++++++++++++++++++++++++++++++++++++++++++++++++ 14:51:51.420 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. Jun 26, 2016 2:51:51 PM org.springframework.context.annotation.AnnotationConfigApplicationContext doClose INFO: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@5cca2df1: startup date [Sun Jun 26 14:50:50 CEST 2016]; root of context hierarchy Jun 26, 2016 2:51:51 PM org.springframework.context.support.DefaultLifecycleProcessor stop INFO: Stopping beans in phase 2147483647 14:51:52.332 [DefaultMessageListenerContainer-1] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dragon-54634-1466945451200-1:1:1:1, lastDeliveredSequenceId: 12 14:51:52.338 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6efeb87[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: true took: 0.000 seconds. 14:51:52.338 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f4e7ecf[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds. 14:51:52.339 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54635 14:51:52.339 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@1a6778eb[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 14:51:52.339 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54635] 14:51:52.340 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@1a6778eb[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
Logs of Consumer Application:
Jun 26, 2016 2:51:46 PM org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh INFO: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6c09e6e0: startup date [Sun Jun 26 14:51:46 CEST 2016]; root of context hierarchy Jun 26, 2016 2:51:46 PM org.springframework.context.support.DefaultLifecycleProcessor start INFO: Starting beans in phase 2147483647 14:51:47.091 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:51:47.097 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:51:47.099 [main] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:51:47.101 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54644 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807} 14:51:47.101 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54644 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} 14:51:47.166 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f47af7e[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 14:51:47.190 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - ---------------------------------------------------- 14:51:47.299 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : headers received : {timestamp=1466945507299, id=04629f2a-99b2-ad9d-dc91-641c5cb834ac, jms_deliveryMode=2, jms_timestamp=1466945451665, jms_redelivered=false, jms_destination=queue://order-queue, jms_priority=4, jms_expiration=0, jms_messageId=ID:dragon-54634-1466945451200-1:2:1:1:1} 14:51:47.299 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : product : Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2] 14:51:47.300 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.service.OrderServiceImpl - Inventory : sending order confirmation InventoryResponse [productId=cf669916-2558-41f3-9f93-05125788173d, returnCode=200, comment=Order Processed successfully] 14:51:47.395 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - ---------------------------------------------------- 14:52:07.101 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 14:52:07.103 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:52:17.101 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check. 14:52:17.101 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:52:27.101 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - 30001ms elapsed since last read check. 14:52:27.102 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 14:52:27.102 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:52:37.106 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10004ms elapsed since last write check. 14:52:37.107 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:52:47.107 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 14:52:47.107 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] Jun 26, 2016 2:52:47 PM org.springframework.context.annotation.AnnotationConfigApplicationContext doClose INFO: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@6c09e6e0: startup date [Sun Jun 26 14:51:46 CEST 2016]; root of context hierarchy Jun 26, 2016 2:52:47 PM org.springframework.context.support.DefaultLifecycleProcessor stop INFO: Stopping beans in phase 2147483647 14:52:47.407 [main] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dragon-54643-1466945506965-1:1:1:1, lastDeliveredSequenceId: 6 14:52:47.414 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f47af7e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: true took: 0.000 seconds. 14:52:47.415 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6c45e3b9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds. 14:52:47.416 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54644 14:52:47.418 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@c6d1896[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 14:52:47.419 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54644] 14:52:47.419 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@c6d1896[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
Complete Code for Producer+Consumer Applications
Let’s start with coding Producer Application [A]. Consumer Application [B] is exactly same as A, but listening and sending on opposite queues.
Producer Application
Project Structure
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.websystique.spring</groupId> <artifactId>SpringJMSActiveMQProducerEx2</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>SpringJMSActiveMQProducerEx2</name> <properties> <springframework.version>4.3.0.RELEASE</springframework.version> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>5.13.3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>
package com.websystique.spring.configuration; import java.util.Arrays; import org.apache.activemq.spring.ActiveMQConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.core.JmsTemplate; @Configuration public class MessagingConfiguration { private static final String DEFAULT_BROKER_URL = "tcp://localhost:61616"; private static final String ORDER_QUEUE = "order-queue"; @Bean public ActiveMQConnectionFactory connectionFactory(){ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(DEFAULT_BROKER_URL); connectionFactory.setTrustedPackages(Arrays.asList("com.websystique.spring","java.util")); return connectionFactory; } @Bean public JmsTemplate jmsTemplate(){ JmsTemplate template = new JmsTemplate(); template.setConnectionFactory(connectionFactory()); template.setDefaultDestinationName(ORDER_QUEUE); return template; } }
package com.websystique.spring.configuration; import javax.jms.ConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; @Configuration @EnableJms public class MessagingListnerConfiguration { @Autowired ConnectionFactory connectionFactory; @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("1-1"); return factory; } }
package com.websystique.spring.configuration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @Configuration @ComponentScan(basePackages = "com.websystique.spring") @Import({MessagingConfiguration.class, MessagingListnerConfiguration.class}) public class AppConfig { //Put Other Application configuration here. }
package com.websystique.spring.messaging; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import com.websystique.spring.model.Product; @Component public class MessageSender { @Autowired JmsTemplate jmsTemplate; public void sendMessage(final Product product) { jmsTemplate.send(new MessageCreator(){ @Override public Message createMessage(Session session) throws JMSException{ ObjectMessage objectMessage = session.createObjectMessage(product); return objectMessage; } }); } }
package com.websystique.spring.messaging; import javax.jms.JMSException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; import com.websystique.spring.model.InventoryResponse; @Component public class MessageReceiver { static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class); private static final String ORDER_RESPONSE_QUEUE = "order-response-queue"; @JmsListener(destination = ORDER_RESPONSE_QUEUE) public void receiveMessage(final Message<InventoryResponse> message) throws JMSException { LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); MessageHeaders headers = message.getHeaders(); LOG.info("Application : headers received : {}", headers); InventoryResponse response = message.getPayload(); LOG.info("Application : response received : {}",response); LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); } }
package com.websystique.spring.service; import com.websystique.spring.model.Product; public interface ProductService { public void sendProduct(Product product); }
package com.websystique.spring.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.websystique.spring.messaging.MessageSender; import com.websystique.spring.model.Product; @Service("productService") public class ProductServiceImpl implements ProductService{ static final Logger LOG = LoggerFactory.getLogger(ProductServiceImpl.class); @Autowired MessageSender messageSender; @Override public void sendProduct(Product product) { LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); LOG.info("Application : sending order request {}", product); messageSender.sendMessage(product); LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); } }
package com.websystique.spring.model; import java.io.Serializable; public class Product implements Serializable { private String productId; private String name; private int quantity; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getQuantity() { return quantity; } public void setQuantity(int quantity) { this.quantity = quantity; } public String getProductId() { return productId; } public void setProductId(String productId) { this.productId = productId; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((productId == null) ? 0 : productId.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Product other = (Product) obj; if (productId == null) { if (other.productId != null) return false; } else if (!productId.equals(other.productId)) return false; return true; } @Override public String toString() { return "Product [productId=" + productId + ", name=" + name + ", quantity=" + quantity + "]"; } }
package com.websystique.spring.model; import java.io.Serializable; public class InventoryResponse implements Serializable{ private String productId; private int returnCode; private String comment; public String getProductId() { return productId; } public void setProductId(String productId) { this.productId = productId; } public int getReturnCode() { return returnCode; } public void setReturnCode(int returnCode) { this.returnCode = returnCode; } public String getComment() { return comment; } public void setComment(String comment) { this.comment = comment; } @Override public String toString() { return "InventoryResponse [productId=" + productId + ", returnCode=" + returnCode + ", comment=" + comment + "]"; } }
package com.websystique.spring.util; import java.util.UUID; public class BasicUtil { public static String getUniqueId(){ return UUID.randomUUID().toString(); } }
Consumer Application
Project Structure
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.websystique.spring</groupId> <artifactId>SpringJMSActiveMQConsumerEx2</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>SpringJMSActiveMQConsumerEx2</name> <properties> <springframework.version>4.3.0.RELEASE</springframework.version> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>5.13.3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>
package com.websystique.spring.configuration; import java.util.Arrays; import org.apache.activemq.spring.ActiveMQConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.core.JmsTemplate; @Configuration public class MessagingConfiguration { private static final String DEFAULT_BROKER_URL = "tcp://localhost:61616"; private static final String ORDER_RESPONSE_QUEUE = "order-response-queue"; @Bean public ActiveMQConnectionFactory connectionFactory(){ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(DEFAULT_BROKER_URL); connectionFactory.setTrustedPackages(Arrays.asList("com.websystique.spring","java.util")); return connectionFactory; } @Bean public JmsTemplate jmsTemplate(){ JmsTemplate template = new JmsTemplate(); template.setConnectionFactory(connectionFactory()); template.setDefaultDestinationName(ORDER_RESPONSE_QUEUE); return template; } }
package com.websystique.spring.configuration; import javax.jms.ConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; @Configuration @EnableJms public class MessagingListnerConfiguration { @Autowired ConnectionFactory connectionFactory; @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("1-1"); return factory; } }
package com.websystique.spring.configuration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @Configuration @ComponentScan(basePackages = "com.websystique.spring") @Import({MessagingConfiguration.class, MessagingListnerConfiguration.class}) public class AppConfig { //Put Other Application configuration here. }
package com.websystique.spring.messaging; import javax.jms.JMSException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; import com.websystique.spring.model.Product; import com.websystique.spring.service.OrderService; @Component public class MessageReceiver { static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class); private static final String ORDER_QUEUE = "order-queue"; @Autowired OrderService orderService; @JmsListener(destination = ORDER_QUEUE) public void receiveMessage(final Message<Product> message) throws JMSException { LOG.info("----------------------------------------------------"); MessageHeaders headers = message.getHeaders(); LOG.info("Application : headers received : {}", headers); Product product = message.getPayload(); LOG.info("Application : product : {}",product); orderService.processOrder(product); LOG.info("----------------------------------------------------"); } }
package com.websystique.spring.messaging; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import com.websystique.spring.model.InventoryResponse; @Component public class MessageSender { @Autowired JmsTemplate jmsTemplate; public void sendMessage(final InventoryResponse inventoryResponse) { jmsTemplate.send(new MessageCreator(){ @Override public Message createMessage(Session session) throws JMSException{ ObjectMessage objectMessage = session.createObjectMessage(inventoryResponse); return objectMessage; } }); } }
package com.websystique.spring.service; import com.websystique.spring.model.Product; public interface OrderService { public void processOrder(Product product); }
package com.websystique.spring.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.websystique.spring.messaging.MessageSender; import com.websystique.spring.model.InventoryResponse; import com.websystique.spring.model.Product; @Service("orderService") public class OrderServiceImpl implements OrderService{ static final Logger LOG = LoggerFactory.getLogger(OrderServiceImpl.class); @Autowired MessageSender messageSender; @Override public void processOrder(Product product) { InventoryResponse response = prepareResponse(product); LOG.info("Inventory : sending order confirmation {}", response); messageSender.sendMessage(response); } private InventoryResponse prepareResponse(Product product){ InventoryResponse response = new InventoryResponse(); response.setProductId(product.getProductId()); response.setReturnCode(200); response.setComment("Order Processed successfully"); return response; } }
That’s it. As we saw, SPRING JMS support is easy to use. Feel free to Comment, and suggest improvements.
Download Source Code
Producer Application :
Consumer Application :
References
If you like tutorials on this site, why not take a step further and connect me on Facebook , Google Plus & Twitter as well? I would love to hear your thoughts on these articles, it will help improve further our learning process.