一. 简介

           从 ActiveMQ 5.9 开始,ActiveMQ 的集群实现方式取消了传统的 Master-Slave 方式,增加了基于 ZooKeeper + LevelDB 的 Master-Slave 实现方式,其他两种方式目录共享和数据库共享依然存在。本文采用zookeeper来管理节点实现activemq的高可用。

           zookeeper集群的安装说明,请参照笔者的另一篇博客: linux下安装zookeeper集群

二. 集群部署说明

2.1 节点信息

 activemq.xmlactivemq.xmljetty.xml 
主机

集群端口

replicatedLevelDB.bind

消息端口

transportConnector.uri

管控台端口

jettyPort.port

安装目录
182.61.53.64tcp://0.0.0.0:62621515118161/home/env/activemq
182.61.61.90tcp://0.0.0.0:62622515128161/home/env/activemq
182.61.32.229tcp://0.0.0.0:62623515138161/home/env/activemq

 

 

 

 

 

2.2 安装activemq

           给三台服务器分别安装activemq. 

           activemq的安装说明,请参照笔者的另一篇博客: linux下安装activemq

 

三. 配置高可用

3.1 修改activemq.xml

3.1.1 修改brokerName

注: 三台机器要一致,此处,均改为"activemqCluster"

3.1.2 为persistenceAdapter添加replicatedLevelDB元素

注:zkAddress为之前搭好的三台zookeeper集群的地址。 

        其它俩台也这样修改,需要注意: zookeeper之间和ip:port之间不要有多余的空格

        bind分别为 tcp://0.0.0.0:62621,tcp://0.0.0.0:62622,tcp://0.0.0.0:62623

        hostname要对应各自服务器的ip(182.61.61.90, 182.61.32.229)即可。         

         <persistenceAdapter>
            
            <!--  <kahaDB directory="${activemq.data}/kahadb"/>  -->
            
            <!-- activemq zookeeper high availiability -->
            <replicatedLevelDB
                directory="${activemq.data}/leveldb"
                replicas="3"
                bind="tcp://0.0.0.0:62623"
                zkAddress="182.61.53.64:2181,182.61.61.90:2181,182.61.32.229:2181"
                hostname="182.61.32.229"
                zkPath="/activemq/leveldb-stores"
                />
                
        </persistenceAdapter>

 

3.1.3 修改消息端口uri

注: 将uri的链接端口分别改为51511, 51512, 51513即可

 

四. 测试

4.1 控制台访问测试

        分别访问三台activemq的控制台,发现只有一台可以访问。

        关闭这台后,再访问另外俩台,发现有一台可以访问。

4.2 代码测试

4.2.1 代码

A.. pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.yzx</groupId>
    <artifactId>activemqdemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>activemqdemo</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.3.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- activemq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
        </dependency>
        <!-- activemq -->

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.40</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

B. application.yml

server:
  port: 8080

spring:
  activemq:
    #broker-url: tcp://47.106.116.228:61616
    broker-url: failover:(tcp://182.61.53.64:51511,tcp://182.61.61.90:51512,tcp://182.61.32.229:51513)
    user: admin
    password: admin
    pool:
      enabled: true
    packages:
      trust-all: true   # 如果使用ObjectMessage传输对象,必须要加上这个信任包,否则会报ClassNotFound异常
  jms:
    pub-sub-domain: true  # 启动主题消息

C. ActiveMqConfig

package com.yzx.activemqdemo.demo1;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;

import javax.jms.ConnectionFactory;

@Configuration
public class ActiveMqConfig {


    // queue模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }


    // topic模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
        DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
        bean.setPubSubDomain(true);
        bean.setConnectionFactory(activeMQConnectionFactory);
        return bean;
    }

}

D. MqProducer

package com.yzx.activemqdemo.demo1;

import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;

import java.io.Serializable;
import java.util.List;

@Service
public class MqProducer {


    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;


    /**
     * 发送字符串消息队列
     *
     * @param queueName 队列名称
     * @param message   字符串
     */
    public void sendStringQueue(String queueName, String message) {
        this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);
    }


}

E. QueueConsumer

package com.yzx.activemqdemo.demo1;

import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import javax.jms.ObjectMessage;
import java.util.List;

@Component
public class QueueConsumer {

    @JmsListener(destination = "stringQueue", containerFactory = "jmsListenerContainerQueue")
    public void receiveStringQueue(String msg) {
        System.out.println("接收到消息...." + msg);
    }


}

F. ActivemqdemoApplicationTests

package com.yzx.activemqdemo;

import com.yzx.activemqdemo.demo1.MqProducer;
import com.yzx.activemqdemo.demo1.User;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ActivemqdemoApplicationTests {

    @Autowired
    private MqProducer mqProducer;


    @Test
    public void testStringQueue() {

        for (int i = 1; i <= 1000; i++) {
            System.out.println("第" + i + "次发送字符串队列消息");
            mqProducer.sendStringQueue("stringQueue", "消息:" + i);
        }
    }

}

4.2.2 运行结果

A. 初次正好选中主节点直接发消息

2018-07-29 18:21:34.503  INFO 25764 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://182.61.53.64:51511
2018-07-29 18:21:34.695  INFO 25764 --- [           main] c.y.a.ActivemqdemoApplicationTests       : Started ActivemqdemoApplicationTests in 4.931 seconds (JVM running for 6.787)
第1次发送字符串队列消息
第2次发送字符串队列消息
第3次发送字符串队列消息
接收到消息....消息:1
接收到消息....消息:2

B. 初次没有选中主节点,先报异常,然后直接选中主节点发消息

2018-07-29 18:23:08.454  INFO 23884 --- [ActiveMQ Task-1] o.a.a.t.failover.FailoverTransport       : Successfully connected to tcp://182.61.32.229:51513
2018-07-29 18:23:17.796  WARN 23884 --- [229:51513@55464] o.a.a.t.failover.FailoverTransport       : Transport (tcp://182.61.32.229:51513) failed , attempting to automatically reconnect: {}

java.net.SocketException: Software caused connection abort: recv failed
	at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_171]
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_171]
	at java.net.SocketInputStream.read(SocketInputStream.java:189) ~[na:1.8.0_171]
	at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_171]
	at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619) ~[activemq-client-5.15.4.jar:5.15.4]
	at java.io.DataInputStream.readInt(DataInputStream.java:387) ~[na:1.8.0_171]
	at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.15.4.jar:5.15.4]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

2018-07-29 18:23:17.847  INFO 23884 --- [ActiveMQ Task-2] o.a.a.t.failover.FailoverTransport       : Successfully reconnected to tcp://182.61.53.64:51511
2018-07-29 18:23:17.914  INFO 23884 --- [           main] c.y.a.ActivemqdemoApplicationTests       : Started ActivemqdemoApplicationTests in 14.434 seconds (JVM running for 16.723)
第1次发送字符串队列消息
第2次发送字符串队列消息
接收到消息....消息:1

 

C. 在发送消息的过程中,主节点挂了,会在zookeeper选举新的主节点之后继续将剩余的消息消费掉。

     为方便测试,将发送消息的数量调到1000, 中间关闭主节点64的mq.

第237次发送字符串队列消息
接收到消息....消息:236
2018-07-29 18:35:11.204  WARN 26616 --- [.64:51511@55365] o.a.a.t.failover.FailoverTransport       : Transport (tcp://182.61.53.64:51511) failed , attempting to automatically reconnect: {}

java.net.SocketException: Software caused connection abort: recv failed
	at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_171]
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_171]
	at java.net.SocketInputStream.read(SocketInputStream.java:189) ~[na:1.8.0_171]
	at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_171]
	at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619) ~[activemq-client-5.15.4.jar:5.15.4]
	at java.io.DataInputStream.readInt(DataInputStream.java:387) ~[na:1.8.0_171]
	at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232) ~[activemq-client-5.15.4.jar:5.15.4]
	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215) ~[activemq-client-5.15.4.jar:5.15.4]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

2018-07-29 18:35:39.313  INFO 26616 --- [ActiveMQ Task-2] o.a.a.t.failover.FailoverTransport       : Successfully reconnected to tcp://182.61.61.90:51512
2018-07-29 18:35:39.452  WARN 26616 --- [ Session Task-4] o.a.activemq.ActiveMQMessageConsumer     : ID:PC--20180608YMM-55364-1532860506351-1:1:10:1 suppressing duplicate delivery on connection, poison acking: MessageDispatch {commandId = 0, responseRequired = false, consumerId = ID:PC--20180608YMM-55364-1532860506351-1:1:10:1, destination = queue://stringQueue, message = ActiveMQTextMessage {commandId = 498, responseRequired = true, messageId = ID:PC--20180608YMM-55364-1532860506351-1:1:13:1:236, originalDestination = null, originalTransactionId = null, producerId = ID:PC--20180608YMM-55364-1532860506351-1:1:13:1, destination = queue://stringQueue, transactionId = null, expiration = 0, timestamp = 1532860511134, arrival = 0, brokerInTime = 1532860510868, brokerOutTime = 1532860539178, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@4311891c, marshalledProperties = org.apache.activemq.util.ByteSequence@139629ab, dataStructure = null, redeliveryCounter = 0, size = 0, properties = {timestamp=1532860511133}, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = 消息:236}, redeliveryCounter = 0}
第238次发送字符串队列消息
接收到消息....消息:237
第239次发送字符串队列消息
接收到消息....消息:238

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐