在上一篇《初识JMS》的博文中,小编已经带大家了解了JMS的基本情况,今天小编就和大家一起分享 在消息系统中比较常用的activemq这个传统的消息中间件:

一、基本介绍

       简单来说,activemq就是一个消息的接收和转发的容器,用于消息推送;它是Apache公司出品的一个开源的消息队列软件,运行在jvm下,支持多种语言。

       那么接下来小编就从基本的下载安装一点点的给大家介绍它:

二、下载安装

        在下载之前我们需要明确,activemq有Linux和Windows环境两种,两种方式的安装过程类似,这里仅以Windows为例:

        首先我们需要在官网http://activemq.apache.org/中下载稳定版本的 apache-activemq-**-bin.zip(zip是Windows版本,tar为Linux版本)。下载成功之后我们将该文件解压到本地的某个位置,点击bin目录下的win32(或win64,这里根据当前Windows环境的位数来选择):

       

         双击该目录下的activemq.bat:

            

        如上图所示,出现 “access to all MBeans is allowed”标志着当前的activemq服务已经启动成功;

       (PS:Windows的版本是可以直接解压运行的,但Linux需要解压安装并配置,后续将会奉上Linux的安装步骤)

        启动成功后,我们可以通过以下网址来访问该服务:htpp://localhost:8161/admin ,默认的用户名和密码均为admin:


     PS:停止当前activemq服务的操作为:ctrl+C ,之后在弹出的提示中输入 Y,当前的activemq服务就会被shutdown;   

     截止到这里,我们在Windows环境中的activemq服务就已经成功安装并启动了,下面开始介绍它在项目中的具体应用:

三、具体应用

     目标:使用activemq实现消息的生产者和消息的消费者完成消息队列的输入和输出(共10条数据);

     实现过程:

         1.创建一个新的java project,并引入需要的jar包

       

     2.编写当前项目中的两个java类:消息的生产者(producer.java),消息的消费者(Consumer.java)

      producer.java:

<span style="font-size:18px;">package com.tgb.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息的生产者(发送者) 
 * @author ysc 
 *
 */
public class JMSProducer {

    //默认连接用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //默认连接密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //默认连接地址
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //发送的消息数量
    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //连接工厂
        ConnectionFactory connectionFactory;
        //连接
        Connection connection = null;
        //会话 接受或者发送消息的线程
        Session session;
        //消息的目的地
        Destination destination;
        //消息生产者
        MessageProducer messageProducer;
        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME, JMSProducer.PASSWORD, JMSProducer.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //创建一个名称为HelloWorld的消息队列
            destination = session.createQueue("HelloWorld");
            //创建消息生产者
            messageProducer = session.createProducer(destination);
            //发送消息
            sendMessage(session, messageProducer);

            session.commit();

        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            if(connection != null){
                try {
                    connection.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    /**
     * 发送消息
     * @param session
     * @param messageProducer  消息生产者
     * @throws Exception
     */
    public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{
        for (int i = 0; i < JMSProducer.SENDNUM; i++) {
            //创建一条文本消息 
            TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i);
            System.out.println("发送消息:Activemq 发送消息" + i);
            //通过消息生产者发出消息 
            messageProducer.send(message);
        }

    }
}
</span>

     Consumer.java:

<span style="font-size:18px;">package com.tgb.activemq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class JMSConsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码
    private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址

    public static void main(String[] args) {
        ConnectionFactory connectionFactory;//连接工厂
        Connection connection = null;//连接

        Session session;//会话 接受或者发送消息的线程
        Destination destination;//消息的目的地

        MessageConsumer messageConsumer;//消息的消费者

        //实例化连接工厂
        connectionFactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME, JMSConsumer.PASSWORD, JMSConsumer.BROKEURL);

        try {
            //通过连接工厂获取连接
            connection = connectionFactory.createConnection();
            //启动连接
            connection.start();
            //创建session
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建一个连接HelloWorld的消息队列
            destination = session.createQueue("HelloWorld");
            //创建消息消费者
            messageConsumer = session.createConsumer(destination);

            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(100000);
                if(textMessage != null){
                    System.out.println("收到的消息:" + textMessage.getText());
                }else {
                    break;
                }
            }


        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}</span>

四、部署运行

        1.运行消息的生产者

             

      运行完之后我们会发现,activemq的网址中队列已经发生变化:

       

         点击HelloWorld 会发现,其中有我们消息的生产者产生的10条消息:

        

    2.运行消息的消息的消费者:

       

     网页中发生如下的变化:

      


      说明当前产生的10条消息数据,已经在消息消费者类运行的那一刻被消费完毕;       

五、总结

      activemq具有Apache公司的强大的支持,是较流行,功能强大的即时通讯和集成模式的开源服务器,从上述的执行过程我们可以将它的执行步骤总结如下:

          

     消息生产者产生消息的步骤:

        1)创建连接使用的工厂类:JMS connectionFactory

        2)使用管理对象JMS ConnectionFactory建立连接connection,并启动

        3)使用连接connection建立会话session

        4)使用会话session和管理对象创建消息生产者MessagerSender

        5)使用消息生产者发送消息

     消息的消费者接收消息的步骤:

        1)~3)与消息的生产者产生消息的步骤一致;

        4)使用会话session和管理对象Destination创建消息接收者MessagerReceiver

        5)使用消息接收者接收消息,需要用setMessageListener将messageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件的方法;

     以上就是windows版本的activemq从下载到使用的整个过程,希望能帮大家更好的理解activemq;

   

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐