JBoss.org Community Documentation
The JMS publish/subscribe (Pub-Sub) message model is a one-to-many model. A publisher sends a message to a topic and all active subscribers of the topic receive the message. Subscribers that are not actively listening to the topic will miss the published message. shows a complete JMS client that sends a javax.jms.TextMessage
to a topic and asynchronously receives the message from the same topic.
package org.jboss.book.jms.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; import EDU.oswego.cs.dl.util.concurrent.CountDown; /** * A complete JMS client example program that sends a TextMessage to * a Topic and asynchronously receives the message from the same * Topic. * * @author Scott.Stark@jboss.org * @version $Revision: 1.9 $ */ public class TopicSendRecvClient { static CountDown done = new CountDown(1); TopicConnection conn = null; TopicSession session = null; Topic topic = null; public static class ExListener implements MessageListener { public void onMessage(Message msg) { done.release(); TextMessage tm = (TextMessage) msg; try { System.out.println("onMessage, recv text=" + tm.getText()); } catch(Throwable t) { t.printStackTrace(); } } } public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendRecvAsync(String text) throws JMSException, NamingException { System.out.println("Begin sendRecvAsync"); // Setup the PubSub connection, session setupPubSub(); // Set the async listener TopicSubscriber recv = session.createSubscriber(topic); recv.setMessageListener(new ExListener()); // Send a text msg TopicPublisher send = session.createPublisher(topic); TextMessage tm = session.createTextMessage(text); send.publish(tm); System.out.println("sendRecvAsync, sent text=" + tm.getText()); send.close(); System.out.println("End sendRecvAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicSendRecvClient, now=" + System.currentTimeMillis()); TopicSendRecvClient client = new TopicSendRecvClient(); client.sendRecvAsync("A text msg, now="+System.currentTimeMillis()); client.done.acquire(); client.stop(); System.out.println("End TopicSendRecvClient"); System.exit(0); } }
Example 7.2. A Pub-Sub JMS client example
The client may be run using the following command line:
[examples]$ ant -Dchap=jms -Dex=1ps run-example ... run-example1ps: [java] Begin TopicSendRecvClient, now=1102809427043 [java] Begin sendRecvAsync [java] onMessage, recv text=A text msg, now=1102809427071 [java] sendRecvAsync, sent text=A text msg, now=1102809427071 [java] End sendRecvAsync [java] End TopicSendRecvClient
Now let's break the publisher and subscribers into separate programs to demonstrate that subscribers only receive messages while they are listening to a topic. Example 7.3, “A JMS publisher client” shows a variation of the previous pub-sub client that only publishes messages to the topic/testTopic
topic. The subscriber only client is shown in Example 7.4, “A JMS subscriber client”.
package org.jboss.book.jms.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSlistubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * A JMS client example program that sends a TextMessage to a Topic * * @author Scott.Stark@jboss.org * @version $Revision: 1.9 $ */ public class TopicSendClient { TopicConnection conn = null; TopicSession session = null; Topic topic = null; public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void sendAsync(String text) throws JMSException, NamingException { System.out.println("Begin sendAsync"); // Setup the pub/sub connection, session setupPubSub(); // Send a text msg TopicPublisher send = session.createPublisher(topic); TextMessage tm = session.createTextMessage(text); send.publish(tm); System.out.println("sendAsync, sent text=" + tm.getText()); send.close(); System.out.println("End sendAsync"); } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicSendClient, now=" + System.currentTimeMillis()); TopicSendClient client = new TopicSendClient(); client.sendAsync("A text msg, now="+System.currentTimeMillis()); client.stop(); System.out.println("End TopicSendClient"); System.exit(0); } }
Example 7.3. A JMS publisher client
package org.jboss.book.jms.ex1; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicConnectionFactory; import javax.jms.TopicPublisher; import javax.jms.TopicSubscriber; import javax.jms.TopicSession; import javax.jms.TextMessage; import javax.naming.InitialContext; import javax.naming.NamingException; /** * A JMS client example program that synchronously receives a message a Topic * * @author Scott.Stark@jboss.org * @version $Revision: 1.9 $ */ public class TopicRecvClient { TopicConnection conn = null; TopicSession session = null; Topic topic = null; public void setupPubSub() throws JMSException, NamingException { InitialContext iniCtx = new InitialContext(); Object tmp = iniCtx.lookup("ConnectionFactory"); TopicConnectionFactory tcf = (TopicConnectionFactory) tmp; conn = tcf.createTopicConnection(); topic = (Topic) iniCtx.lookup("topic/testTopic"); session = conn.createTopicSession(false, TopicSession.AUTO_ACKNOWLEDGE); conn.start(); } public void recvSync() throws JMSException, NamingException { System.out.println("Begin recvSync"); // Setup the pub/sub connection, session setupPubSub(); // Wait upto 5 seconds for the message TopicSubscriber recv = session.createSubscriber(topic); Message msg = recv.receive(5000); if (msg == null) { System.out.println("Timed out waiting for msg"); } else { System.out.println("TopicSubscriber.recv, msgt="+msg); } } public void stop() throws JMSException { conn.stop(); session.close(); conn.close(); } public static void main(String args[]) throws Exception { System.out.println("Begin TopicRecvClient, now=" + System.currentTimeMillis()); TopicRecvClient client = new TopicRecvClient(); client.recvSync(); client.stop(); System.out.println("End TopicRecvClient"); System.exit(0); } }
Example 7.4. A JMS subscriber client
Run the TopicSendClient
followed by the TopicRecvClient
as follows:
[examples]$ ant -Dchap=jms -Dex=1ps2 run-example ... run-example1ps2: [java] Begin TopicSendClient, now=1102810007899 [java] Begin sendAsync [java] sendAsync, sent text=A text msg, now=1102810007909 [java] End sendAsync [java] End TopicSendClient [java] Begin TopicRecvClient, now=1102810011524 [java] Begin recvSync [java] Timed out waiting for msg [java] End TopicRecvClient
The output shows that the topic subscriber client (TopicRecvClient
) fails to receive the message sent by the publisher due to a timeout.