In the previous post over Spring with JMS, we have seen how applications can communicate among each other using JMS, leveraging Spring’s support for JMS. This post goes a step further, and shows an alternative to javax.jms.MessageListener which allows us to create MessageListeners using plain POJO’s, thanks to Spring’s very own @JmsListener, used in combination with JmsListenerConnectionFactory and @EnableJMS. Let’s get started.
Following technologies being used:
Let’s begin by configuring the listener first.
package com.websystique.spring.messaging; import javax.jms.JMSException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; import com.websystique.spring.model.InventoryResponse; @Component public class MessageReceiver { static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class); private static final String ORDER_RESPONSE_QUEUE = "order-response-queue"; @JmsListener(destination = ORDER_RESPONSE_QUEUE) public void receiveMessage(final Message<InventoryResponse> message) throws JMSException { LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); MessageHeaders headers = message.getHeaders(); LOG.info("Application : headers received : {}", headers); InventoryResponse response = message.getPayload(); LOG.info("Application : response received : {}",response); LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); } }
Shown above is a POJO, which does not implement javax.jms.MessageListener, just annotated with @JmsListener which marks a method to be the target of a JMS message listener on the specified destination(). What we still have to do is to :
Below shown is the configuration we have just discussed:
package com.websystique.spring.configuration; import javax.jms.ConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; @Configuration @EnableJms public class MessagingListnerConfiguration { @Autowired ConnectionFactory connectionFactory; @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("1-1"); return factory; } }
DefaultJmsListenerContainerFactory is a JmsListenerContainerFactory implementation to build a regular DefaultMessageListenerContainer. You can configure several properties. At the very least, it needs a connection factory. Additionally, we have specified the concurrency [max number of concurrent users/consumers] using setConcurrency(“lowwe-upper”). You can also use setConcurrency(“upper”) which means lower will be 1.
Note : You can also specify the container factory to be used in @JmsLietener annoatation, with containerFactory attribute defining the name of the JmsListenerContainerFactory bean to use. When none is set a JmsListenerContainerFactory bean with name jmsListenerContainerFactory is assumed to be present.
That’s all you need to configure the Message listeners using @JmsListener. Complete codes of both producer & consumer applications is shown below [as well as in download section].
We will be using Apache Active MQ. In case you did not, download ActiveMQ Message Broker, unzip it, goto bin directory and start it using $ ./activemq start.
You can quickly check the WebConsole [available at http://localhost:8161/admin/
with credentials admin/admin
.
Application(s) Overview:
In this example, we have two applications A & B trying to communicate with each other via sending Messages on Queues. Although we could have opted for Topic instead, Queue is perfect for this one producer-one consumer scenario.
A sends a message [a pojo object] to a Queue [order-queue] and listens for the response on another Queue [order-response-queue]. B is listening on order-queue, and upon receiving the message, B will send a reply [a pojo response object] to A on order-response-queue. A short but simple example of inter-application communication using JMS.
Below are the main’s of both Producer [A] & Consumer[B] applications.
package com.websystique.spring.configuration; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import com.websystique.spring.model.Product; import com.websystique.spring.service.ProductService; import com.websystique.spring.util.BasicUtil; public class ProducerApplication { static final Logger LOG = LoggerFactory.getLogger(ProducerApplication.class); private static AtomicInteger id = new AtomicInteger(); public static void main(String[] args){ AbstractApplicationContext context = new AnnotationConfigApplicationContext( AppConfig.class); ProductService productService = (ProductService) context.getBean("productService"); Product product = getProduct(); LOG.info("Application : sending order {}", product); productService.sendProduct(product); try { Thread.sleep(60000); } catch (InterruptedException e) { e.printStackTrace(); } ((AbstractApplicationContext)context).close(); } private static Product getProduct(){ Product p = new Product(); p.setName("Product "+id.incrementAndGet()); p.setProductId(BasicUtil.getUniqueId()); p.setQuantity(2); return p; } }
package com.websystique.spring.configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.support.AbstractApplicationContext; public class ConsumerApplication { static final Logger LOG = LoggerFactory.getLogger(ConsumerApplication.class); public static void main(String[] args) { AbstractApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class); try { Thread.sleep(60000); } catch (InterruptedException e) { e.printStackTrace(); } ((AbstractApplicationContext) context).close(); } }
Start Application A [Producer]. Goto web-console, and click on queue tab. You should see two queues being created with a message on one queue [A has just sent a message] while other empty.
Now start Application B [Consumer], again check queues on web-console, you will see message being de-queued on first queue [B has just processed it] and new message being enqueued [B has sent the response to A] and eventually dequeued [A got the response] on second queue.
Logs of Producer Application:
Jun 26, 2016 2:50:50 PM org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh INFO: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@5cca2df1: startup date [Sun Jun 26 14:50:50 CEST 2016]; root of context hierarchy Jun 26, 2016 2:50:51 PM org.springframework.context.support.DefaultLifecycleProcessor start INFO: Starting beans in phase 2147483647 14:50:51.343 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.414 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54635 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807} 14:50:51.415 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54635] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54635 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} 14:50:51.548 [DefaultMessageListenerContainer-1] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@6efeb87[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 14:50:51.607 [main] INFO com.websystique.spring.configuration.ProducerApplication - Application : sending order Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2] 14:50:51.607 [main] INFO com.websystique.spring.service.ProductServiceImpl - +++++++++++++++++++++++++++++++++++++++++++++++++++++ 14:50:51.607 [main] INFO com.websystique.spring.service.ProductServiceImpl - Application : sending order request Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2] 14:50:51.613 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54636 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807} 14:50:51.616 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54636] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54636 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} 14:50:51.786 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5769fd07[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds. 14:50:51.787 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54636 14:50:51.788 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5bef330a[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 14:50:51.789 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54636] 14:50:51.789 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5bef330a[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1] 14:50:51.790 [main] INFO com.websystique.spring.service.ProductServiceImpl - +++++++++++++++++++++++++++++++++++++++++++++++++++++ 14:51:11.417 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check. 14:51:11.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:51:21.418 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 14:51:21.418 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:51:31.418 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - 30001ms elapsed since last read check. 14:51:31.419 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 14:51:31.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:51:41.419 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check. 14:51:41.419 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:51:47.329 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - +++++++++++++++++++++++++++++++++++++++++++++++++++++ 14:51:47.330 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : headers received : {timestamp=1466945507330, id=401a34a7-a0f9-8ab1-efd7-5a590987733f, jms_deliveryMode=2, jms_timestamp=1466945507317, jms_redelivered=false, jms_destination=queue://order-response-queue, jms_priority=4, jms_expiration=0, jms_messageId=ID:dragon-54643-1466945506965-1:1:1:1:1} 14:51:47.331 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : response received : InventoryResponse [productId=cf669916-2558-41f3-9f93-05125788173d, returnCode=200, comment=Order Processed successfully] 14:51:47.331 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - +++++++++++++++++++++++++++++++++++++++++++++++++++++ 14:51:51.420 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. Jun 26, 2016 2:51:51 PM org.springframework.context.annotation.AnnotationConfigApplicationContext doClose INFO: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@5cca2df1: startup date [Sun Jun 26 14:50:50 CEST 2016]; root of context hierarchy Jun 26, 2016 2:51:51 PM org.springframework.context.support.DefaultLifecycleProcessor stop INFO: Stopping beans in phase 2147483647 14:51:52.332 [DefaultMessageListenerContainer-1] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dragon-54634-1466945451200-1:1:1:1, lastDeliveredSequenceId: 12 14:51:52.338 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6efeb87[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: true took: 0.000 seconds. 14:51:52.338 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f4e7ecf[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds. 14:51:52.339 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54635 14:51:52.339 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@1a6778eb[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 14:51:52.339 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54635] 14:51:52.340 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@1a6778eb[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
Logs of Consumer Application:
Jun 26, 2016 2:51:46 PM org.springframework.context.annotation.AnnotationConfigApplicationContext prepareRefresh INFO: Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@6c09e6e0: startup date [Sun Jun 26 14:51:46 CEST 2016]; root of context hierarchy Jun 26, 2016 2:51:46 PM org.springframework.context.support.DefaultLifecycleProcessor start INFO: Starting beans in phase 2147483647 14:51:47.091 [main] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Sending: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:51:47.097 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - Received WireFormat: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:51:47.099 [main] DEBUG org.apache.activemq.transport.InactivityMonitor - Using min of local: WireFormatInfo { version=11, properties={MaxFrameSize=9223372036854775807, CacheSize=1024, CacheEnabled=true, Host=localhost, SizePrefixDisabled=false, MaxInactivityDurationInitalDelay=10000, TcpNoDelayEnabled=true, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} and remote: WireFormatInfo { version=11, properties={CacheSize=1024, MaxFrameSize=104857600, CacheEnabled=true, SizePrefixDisabled=false, TcpNoDelayEnabled=true, MaxInactivityDurationInitalDelay=10000, MaxInactivityDuration=30000, TightEncodingEnabled=true, StackTraceEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} 14:51:47.101 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54644 before negotiation: OpenWireFormat{version=11, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false, maxFrameSize=9223372036854775807} 14:51:47.101 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.transport.WireFormatNegotiator - tcp://localhost/127.0.0.1:61616@54644 after negotiation: OpenWireFormat{version=11, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false, maxFrameSize=104857600} 14:51:47.166 [ActiveMQ Transport: tcp://localhost/127.0.0.1:61616@54644] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Session Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f47af7e[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 14:51:47.190 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - ---------------------------------------------------- 14:51:47.299 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : headers received : {timestamp=1466945507299, id=04629f2a-99b2-ad9d-dc91-641c5cb834ac, jms_deliveryMode=2, jms_timestamp=1466945451665, jms_redelivered=false, jms_destination=queue://order-queue, jms_priority=4, jms_expiration=0, jms_messageId=ID:dragon-54634-1466945451200-1:2:1:1:1} 14:51:47.299 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - Application : product : Product [productId=cf669916-2558-41f3-9f93-05125788173d, name=Product 1, quantity=2] 14:51:47.300 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.service.OrderServiceImpl - Inventory : sending order confirmation InventoryResponse [productId=cf669916-2558-41f3-9f93-05125788173d, returnCode=200, comment=Order Processed successfully] 14:51:47.395 [DefaultMessageListenerContainer-1] INFO com.websystique.spring.messaging.MessageReceiver - ---------------------------------------------------- 14:52:07.101 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 14:52:07.103 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:52:17.101 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10000ms elapsed since last write check. 14:52:17.101 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:52:27.101 [ActiveMQ InactivityMonitor ReadCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - 30001ms elapsed since last read check. 14:52:27.102 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 14:52:27.102 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:52:37.106 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10004ms elapsed since last write check. 14:52:37.107 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] 14:52:47.107 [ActiveMQ InactivityMonitor WriteCheckTimer] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - WriteChecker: 10001ms elapsed since last write check. 14:52:47.107 [ActiveMQ InactivityMonitor Worker] DEBUG org.apache.activemq.transport.AbstractInactivityMonitor - Running WriteCheck[tcp://127.0.0.1:61616] Jun 26, 2016 2:52:47 PM org.springframework.context.annotation.AnnotationConfigApplicationContext doClose INFO: Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@6c09e6e0: startup date [Sun Jun 26 14:51:46 CEST 2016]; root of context hierarchy Jun 26, 2016 2:52:47 PM org.springframework.context.support.DefaultLifecycleProcessor stop INFO: Stopping beans in phase 2147483647 14:52:47.407 [main] DEBUG org.apache.activemq.ActiveMQMessageConsumer - remove: ID:dragon-54643-1466945506965-1:1:1:1, lastDeliveredSequenceId: 6 14:52:47.414 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@5f47af7e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2] is shutdown: true and terminated: true took: 0.000 seconds. 14:52:47.415 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@6c45e3b9[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] is shutdown: true and terminated: true took: 0.000 seconds. 14:52:47.416 [main] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Stopping transport tcp://localhost/127.0.0.1:61616@54644 14:52:47.418 [main] DEBUG org.apache.activemq.thread.TaskRunnerFactory - Initialized TaskRunnerFactory[ActiveMQ Task] using ExecutorService: java.util.concurrent.ThreadPoolExecutor@c6d1896[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0] 14:52:47.419 [ActiveMQ Task-1] DEBUG org.apache.activemq.transport.tcp.TcpTransport - Closed socket Socket[addr=localhost/127.0.0.1,port=61616,localport=54644] 14:52:47.419 [main] DEBUG org.apache.activemq.util.ThreadPoolUtils - Forcing shutdown of ExecutorService: java.util.concurrent.ThreadPoolExecutor@c6d1896[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
Let’s start with coding Producer Application [A]. Consumer Application [B] is exactly same as A, but listening and sending on opposite queues.
Project Structure
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.websystique.spring</groupId> <artifactId>SpringJMSActiveMQProducerEx2</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>SpringJMSActiveMQProducerEx2</name> <properties> <springframework.version>4.3.0.RELEASE</springframework.version> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>5.13.3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>
package com.websystique.spring.configuration; import java.util.Arrays; import org.apache.activemq.spring.ActiveMQConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.core.JmsTemplate; @Configuration public class MessagingConfiguration { private static final String DEFAULT_BROKER_URL = "tcp://localhost:61616"; private static final String ORDER_QUEUE = "order-queue"; @Bean public ActiveMQConnectionFactory connectionFactory(){ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(DEFAULT_BROKER_URL); connectionFactory.setTrustedPackages(Arrays.asList("com.websystique.spring","java.util")); return connectionFactory; } @Bean public JmsTemplate jmsTemplate(){ JmsTemplate template = new JmsTemplate(); template.setConnectionFactory(connectionFactory()); template.setDefaultDestinationName(ORDER_QUEUE); return template; } }
package com.websystique.spring.configuration; import javax.jms.ConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; @Configuration @EnableJms public class MessagingListnerConfiguration { @Autowired ConnectionFactory connectionFactory; @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("1-1"); return factory; } }
package com.websystique.spring.configuration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @Configuration @ComponentScan(basePackages = "com.websystique.spring") @Import({MessagingConfiguration.class, MessagingListnerConfiguration.class}) public class AppConfig { //Put Other Application configuration here. }
package com.websystique.spring.messaging; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import com.websystique.spring.model.Product; @Component public class MessageSender { @Autowired JmsTemplate jmsTemplate; public void sendMessage(final Product product) { jmsTemplate.send(new MessageCreator(){ @Override public Message createMessage(Session session) throws JMSException{ ObjectMessage objectMessage = session.createObjectMessage(product); return objectMessage; } }); } }
package com.websystique.spring.messaging; import javax.jms.JMSException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; import com.websystique.spring.model.InventoryResponse; @Component public class MessageReceiver { static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class); private static final String ORDER_RESPONSE_QUEUE = "order-response-queue"; @JmsListener(destination = ORDER_RESPONSE_QUEUE) public void receiveMessage(final Message<InventoryResponse> message) throws JMSException { LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); MessageHeaders headers = message.getHeaders(); LOG.info("Application : headers received : {}", headers); InventoryResponse response = message.getPayload(); LOG.info("Application : response received : {}",response); LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); } }
package com.websystique.spring.service; import com.websystique.spring.model.Product; public interface ProductService { public void sendProduct(Product product); }
package com.websystique.spring.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.websystique.spring.messaging.MessageSender; import com.websystique.spring.model.Product; @Service("productService") public class ProductServiceImpl implements ProductService{ static final Logger LOG = LoggerFactory.getLogger(ProductServiceImpl.class); @Autowired MessageSender messageSender; @Override public void sendProduct(Product product) { LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); LOG.info("Application : sending order request {}", product); messageSender.sendMessage(product); LOG.info("+++++++++++++++++++++++++++++++++++++++++++++++++++++"); } }
package com.websystique.spring.model; import java.io.Serializable; public class Product implements Serializable { private String productId; private String name; private int quantity; public String getName() { return name; } public void setName(String name) { this.name = name; } public int getQuantity() { return quantity; } public void setQuantity(int quantity) { this.quantity = quantity; } public String getProductId() { return productId; } public void setProductId(String productId) { this.productId = productId; } @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + ((productId == null) ? 0 : productId.hashCode()); return result; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Product other = (Product) obj; if (productId == null) { if (other.productId != null) return false; } else if (!productId.equals(other.productId)) return false; return true; } @Override public String toString() { return "Product [productId=" + productId + ", name=" + name + ", quantity=" + quantity + "]"; } }
package com.websystique.spring.model; import java.io.Serializable; public class InventoryResponse implements Serializable{ private String productId; private int returnCode; private String comment; public String getProductId() { return productId; } public void setProductId(String productId) { this.productId = productId; } public int getReturnCode() { return returnCode; } public void setReturnCode(int returnCode) { this.returnCode = returnCode; } public String getComment() { return comment; } public void setComment(String comment) { this.comment = comment; } @Override public String toString() { return "InventoryResponse [productId=" + productId + ", returnCode=" + returnCode + ", comment=" + comment + "]"; } }
package com.websystique.spring.util; import java.util.UUID; public class BasicUtil { public static String getUniqueId(){ return UUID.randomUUID().toString(); } }
Project Structure
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.websystique.spring</groupId> <artifactId>SpringJMSActiveMQConsumerEx2</artifactId> <version>1.0.0</version> <packaging>jar</packaging> <name>SpringJMSActiveMQConsumerEx2</name> <properties> <springframework.version>4.3.0.RELEASE</springframework.version> </properties> <dependencies> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-core</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> <version>${springframework.version}</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-spring</artifactId> <version>5.13.3</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.1.7</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.2</version> <configuration> <source>1.7</source> <target>1.7</target> </configuration> </plugin> </plugins> </build> </project>
package com.websystique.spring.configuration; import java.util.Arrays; import org.apache.activemq.spring.ActiveMQConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.core.JmsTemplate; @Configuration public class MessagingConfiguration { private static final String DEFAULT_BROKER_URL = "tcp://localhost:61616"; private static final String ORDER_RESPONSE_QUEUE = "order-response-queue"; @Bean public ActiveMQConnectionFactory connectionFactory(){ ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); connectionFactory.setBrokerURL(DEFAULT_BROKER_URL); connectionFactory.setTrustedPackages(Arrays.asList("com.websystique.spring","java.util")); return connectionFactory; } @Bean public JmsTemplate jmsTemplate(){ JmsTemplate template = new JmsTemplate(); template.setConnectionFactory(connectionFactory()); template.setDefaultDestinationName(ORDER_RESPONSE_QUEUE); return template; } }
package com.websystique.spring.configuration; import javax.jms.ConnectionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; @Configuration @EnableJms public class MessagingListnerConfiguration { @Autowired ConnectionFactory connectionFactory; @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("1-1"); return factory; } }
package com.websystique.spring.configuration; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; @Configuration @ComponentScan(basePackages = "com.websystique.spring") @Import({MessagingConfiguration.class, MessagingListnerConfiguration.class}) public class AppConfig { //Put Other Application configuration here. }
package com.websystique.spring.messaging; import javax.jms.JMSException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.annotation.JmsListener; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Component; import com.websystique.spring.model.Product; import com.websystique.spring.service.OrderService; @Component public class MessageReceiver { static final Logger LOG = LoggerFactory.getLogger(MessageReceiver.class); private static final String ORDER_QUEUE = "order-queue"; @Autowired OrderService orderService; @JmsListener(destination = ORDER_QUEUE) public void receiveMessage(final Message<Product> message) throws JMSException { LOG.info("----------------------------------------------------"); MessageHeaders headers = message.getHeaders(); LOG.info("Application : headers received : {}", headers); Product product = message.getPayload(); LOG.info("Application : product : {}",product); orderService.processOrder(product); LOG.info("----------------------------------------------------"); } }
package com.websystique.spring.messaging; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.ObjectMessage; import javax.jms.Session; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsTemplate; import org.springframework.jms.core.MessageCreator; import org.springframework.stereotype.Component; import com.websystique.spring.model.InventoryResponse; @Component public class MessageSender { @Autowired JmsTemplate jmsTemplate; public void sendMessage(final InventoryResponse inventoryResponse) { jmsTemplate.send(new MessageCreator(){ @Override public Message createMessage(Session session) throws JMSException{ ObjectMessage objectMessage = session.createObjectMessage(inventoryResponse); return objectMessage; } }); } }
package com.websystique.spring.service; import com.websystique.spring.model.Product; public interface OrderService { public void processOrder(Product product); }
package com.websystique.spring.service; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.websystique.spring.messaging.MessageSender; import com.websystique.spring.model.InventoryResponse; import com.websystique.spring.model.Product; @Service("orderService") public class OrderServiceImpl implements OrderService{ static final Logger LOG = LoggerFactory.getLogger(OrderServiceImpl.class); @Autowired MessageSender messageSender; @Override public void processOrder(Product product) { InventoryResponse response = prepareResponse(product); LOG.info("Inventory : sending order confirmation {}", response); messageSender.sendMessage(response); } private InventoryResponse prepareResponse(Product product){ InventoryResponse response = new InventoryResponse(); response.setProductId(product.getProductId()); response.setReturnCode(200); response.setComment("Order Processed successfully"); return response; } }
That’s it. As we saw, SPRING JMS support is easy to use. Feel free to Comment, and suggest improvements.
Producer Application :
Consumer Application :
If you like tutorials on this site, why not take a step further and connect me on Facebook , Google Plus & Twitter as well? I would love to hear your thoughts on these articles, it will help improve further our learning process.
In this post we will be developing a full-blown CRUD application using Spring Boot, AngularJS, Spring Data, JPA/Hibernate and MySQL,…
Spring Boot complements Spring REST support by providing default dependencies/converters out of the box. Writing RESTful services in Spring Boot…
Being able to start the application as standalone jar is great, but sometimes it might not be possible to run…
Spring framework has taken the software development industry by storm. Dependency Injection, rock solid MVC framework, Transaction management, messaging support,…
Let's secure our Spring REST API using OAuth2 this time, a simple guide showing what is required to secure a…
This post shows how an AngularJS application can consume a REST API which is secured with Basic authentication using Spring…
View Comments
Hello, Jack speaking. I've bookmarked your site and make it a habit to check in daily. The information is top-notch, and I appreciate your efforts.
Hey, cool post You can check if there's a problem with your website with Internet Explorer. Because of this issue, many readers will overlook your excellent writing because IE is still the most popular browser.
Excellent job !!
Hi, The above example using ActiveMQ is working for me.
But, I need to migrate to JCAPS using springboot. I am not able to get the connectionFactory using the JCAPS jar and create the connection.
I have implemented a standalone java program. It is working well for a single message.
Please help me on how to get the connectionfactory from the container and do the process inside the listener.
Thank you in advance.