ActionMQ 是开源的JMS实现

1.下载ActiveMQ

去官方网站下载:http://activemq.apache.org/

2.运行ActiveMQ

解压缩apache-activemq-5.5.1-bin.zip,然后双击apache-activemq-5.8.0\bin\activemq.bat运行ActiveMQ程序。

启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。如下图

目录结构如下:


运行在网页中,它会让你登录用户方可进入,默认的用户名和密码是admin,




3 . 测试
ActiveMQ默认使用的TCP连接端口是61616,通过查看该端口的信息可以测试ActiveMQ是否成功启动
window环境运行  netstat -an|find "61616"

Class源代码中可以看到


4.创建Eclipse JAVA项目并运行

创建project:ActiveMQ-5.8,并导入apache-activemq-5.8.0\lib目录下需要用到的jar文件,项目结构如下图所示:


若找不到相应的包,使用以下连接下载也行 http://repo1.maven.org/maven2/org/apache/activemq/activemq-rar/

3.1.Sender.java

[java]  view plain copy print ?
  1. package cn.jms;  
  2.   
  3. import javax.jms.Connection;  
  4. import javax.jms.ConnectionFactory;  
  5. import javax.jms.DeliveryMode;  
  6. import javax.jms.Destination;  
  7. import javax.jms.MessageProducer;  
  8. import javax.jms.Session;  
  9. import javax.jms.TextMessage;  
  10. import org.apache.activemq.ActiveMQConnection;  
  11. import org.apache.activemq.ActiveMQConnectionFactory;  
  12.   
  13. public class Sender {  
  14.     private static final int SEND_NUMBER = 5;  
  15.   
  16.     public static void main(String[] args) {  
  17.         // ConnectionFactory :连接工厂,JMS 用它创建连接  
  18.         ConnectionFactory connectionFactory;  
  19.         // Connection :JMS 客户端到JMS Provider 的连接  
  20.         Connection connection = null;  
  21.         // Session: 一个发送或接收消息的线程  
  22.         Session session;  
  23.         // Destination :消息的目的地;消息发送给谁.  
  24.         Destination destination;  
  25.         // MessageProducer:消息发送者  
  26.         MessageProducer producer;  
  27.         // TextMessage message;  
  28.         // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar  
  29.         connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");  
  30.         try {  
  31.             // 构造从工厂得到连接对象  
  32.             connection = connectionFactory.createConnection();  
  33.             // 启动  
  34.             connection.start();  
  35.             // 获取操作连接  
  36.             session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);  
  37.             // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
  38.             destination = session.createQueue("FirstQueue");  
  39.             // 得到消息生成者【发送者】  
  40.             producer = session.createProducer(destination);  
  41.             // 设置不持久化,此处学习,实际根据项目决定  
  42.             producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
  43.             // 构造消息,此处写死,项目就是参数,或者方法获取  
  44.             sendMessage(session, producer);  
  45.             session.commit();  
  46.         } catch (Exception e) {  
  47.             e.printStackTrace();  
  48.         } finally {  
  49.             try {  
  50.                 if (null != connection)  
  51.                     connection.close();  
  52.             } catch (Throwable ignore) {  
  53.             }  
  54.         }  
  55.     }  
  56.   
  57.     public static void sendMessage(Session session, MessageProducer producer)throws Exception {  
  58.         for (int i = 1; i <= SEND_NUMBER; i++) {  
  59.             TextMessage message = session.createTextMessage("ActiveMq 发送的消息" + i);  
  60.             // 发送消息到目的地方  
  61.             System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);  
  62.             producer.send(message);  
  63.         }  
  64.     }  
  65. }  

3.2.Receiver.java

[java]  view plain copy print ?
  1. package cn.jms;  
  2.   
  3. import javax.jms.Connection;  
  4. import javax.jms.ConnectionFactory;  
  5. import javax.jms.Destination;  
  6. import javax.jms.MessageConsumer;  
  7. import javax.jms.Session;  
  8. import javax.jms.TextMessage;  
  9. import org.apache.activemq.ActiveMQConnection;  
  10. import org.apache.activemq.ActiveMQConnectionFactory;  
  11.   
  12. public class Receiver {  
  13.     public static void main(String[] args) {  
  14.         // ConnectionFactory :连接工厂,JMS 用它创建连接  
  15.         ConnectionFactory connectionFactory;  
  16.         // Connection :JMS 客户端到JMS Provider 的连接  
  17.         Connection connection = null;  
  18.         // Session: 一个发送或接收消息的线程  
  19.         Session session;  
  20.         // Destination :消息的目的地;消息发送给谁.  
  21.         Destination destination;  
  22.         // 消费者,消息接收者  
  23.         MessageConsumer consumer;  
  24.         connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");  
  25.         try {  
  26.             // 构造从工厂得到连接对象  
  27.             connection = connectionFactory.createConnection();  
  28.             // 启动  
  29.             connection.start();  
  30.             // 获取操作连接  
  31.             session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
  32.             // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置  
  33.             destination = session.createQueue("FirstQueue");  
  34.             consumer = session.createConsumer(destination);  
  35.             while (true) {  
  36.                 // 设置接收者接收消息的时间,为了便于测试,这里设定为100s  
  37.                 TextMessage message = (TextMessage) consumer.receive(100000);  
  38.                 if (null != message) {  
  39.                     System.out.println("收到消息" + message.getText());  
  40.                 } else {  
  41.                     break;  
  42.                 }  
  43.             }  
  44.         } catch (Exception e) {  
  45.             e.printStackTrace();  
  46.         } finally {  
  47.             try {  
  48.                 if (null != connection)  
  49.                     connection.close();  
  50.             } catch (Throwable ignore) {  
  51.             }  
  52.         }  
  53.     }  
  54. }  

4.注意事项

先启动apache-activemq-5.8.0\bin\activemq.bat运行ActiveMQ程序。在运行main方法 ,否则回报连接失败的错误,如下

[java]  view plain copy print ?
  1. SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.  
  2. javax.jms.JMSException: Could not connect to broker URL: tcp://localhost:61616. Reason: java.net.ConnectException: Connection refused: connect  
  3.     at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:35)  
  4.     at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:293)  
  5.     at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:238)  
  6.     at org.apache.activemq.ActiveMQConnectionFactory.createConnection(ActiveMQConnectionFactory.java:184)  
  7.     at cn.jms.Sender.main(Sender.java:32)  
  8. Caused by: java.net.ConnectException: Connection refused: connect  
  9.     at java.net.PlainSocketImpl.socketConnect(Native Method)  
  10.     at java.net.PlainSocketImpl.doConnect(Unknown Source)  
  11.     at java.net.PlainSocketImpl.connectToAddress(Unknown Source)  
  12.     at java.net.PlainSocketImpl.connect(Unknown Source)  
  13.     at java.net.SocksSocketImpl.connect(Unknown Source)  
  14.     at java.net.Socket.connect(Unknown Source)  
  15.     at org.apache.activemq.transport.tcp.TcpTransport.connect(TcpTransport.java:496)  
  16.     at org.apache.activemq.transport.tcp.TcpTransport.doStart(TcpTransport.java:459)  
  17.     at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)  
  18.     at org.apache.activemq.transport.AbstractInactivityMonitor.start(AbstractInactivityMonitor.java:140)  
  19.     at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58)  
  20.     at org.apache.activemq.transport.WireFormatNegotiator.start(WireFormatNegotiator.java:72)  
  21.     at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58)  
  22.     at org.apache.activemq.transport.TransportFilter.start(TransportFilter.java:58)  
  23.     at org.apache.activemq.ActiveMQConnectionFactory.createActiveMQConnection(ActiveMQConnectionFactory.java:273)  
  24.     ... 3 more  

5.测试过程

1、先运行Sender,输出以下信息,若先运行Receiver则无输出

发送消息:ActiveMq 发送的消息1
发送消息:ActiveMq 发送的消息2
发送消息:ActiveMq 发送的消息3
发送消息:ActiveMq 发送的消息4
发送消息:ActiveMq 发送的消息5

2、运行Receiver

收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息5

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐