ActiveMQ之集群(主从)搭建-yellowcong
ActiveMq的集群是依赖于Zookeeper的,ActiveMq的集群是,主从集群的方式,主的挂掉,才会自动切换到从,从的才开始运行。
·
ActiveMq的集群是依赖于Zookeeper的,ActiveMq的集群是,主从集群的方式,主的挂掉,才会自动切换到从,从的才开始运行。
安装准备
zookeeper集群安装
Shell实战之一键自动部署Zookeeper-yellowcong
Zookeeper之集群搭建-yellowcong
系统架构
Zookeeper架构
节点 | 服务 | 目录 |
---|---|---|
192.168.100.10:2182 | zookeeper1 | /usr/local/zookeeper/zookeeper1 |
192.168.100.10:2183 | zookeeper2 | /usr/local/zookeeper/zookeeper2 |
192.168.100.10:2184 | zookeeper3 | /usr/local/zookeeper/zookeeper3 |
ActiveMQ架构
节点 | 消息端口 | jett管控台端口 | 安装目录 |
---|---|---|---|
192.168.100.10:62621 | 51511 | 8161 | /usr/local/activemq-cluster/node1 |
192.168.100.10:62622 | 51512 | 8162 | /usr/local/activemq-cluster/node2 |
192.168.100.10:62623 | 51513 | 8163 | /usr/local/activemq-cluster/node3 |
安装ActiveMq
#下载activemq
wget http://yellowcong.qiniudn.com/apache-activemq-5.11.1-bin.tar.gz
#解压安装包,解压到node1目录,注意-C是大写的
tar -zxvf apache-activemq-5.11.1-bin.tar.gz -C node1/
#修改jetty
vim node1/apache-activemq-5.11.1/conf/jetty.xml
#103行 左右,修改 端口 ,同样其他几个节点也修改
<property name="port" value="8161"/>
#修改brokername
#不要忘了,3个MQ实例的brokerName必须一致,要不然你会在集群启动时出现:
#Not enough cluster members when using LevelDB replication
#这样的错误。
vim node1/apache-activemq-5.11.1/conf/activemq.xml
#40 行左右 brokerName
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="activemq-cluster" dataDirectory="${activemq.data}">
#修改persistenceAdapter
#参数说明
# replicas 节点数
# bind 绑定端口号
# zkAddress zookeeeper的集群 ip
# hostname 主机名称
# zkPath 在zookeeepr上存储的地址
# 62621 修改为指定端口
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:62621"
zkAddress="192.168.100.10:2182,192.168.100.10:2183,192.168.100.10:2184"
zkPassword=""
hostname="yellowcong"
sync="local_disk"
zkPath="/activemq/leveldb-stores"
/>
#修改该通信端口51511 修改为指定端口
<transportConnector name="openwire" uri="tcp://0.0.0.0:51511?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
修改jetty的端口号,也就是修改管控台的端口
修改broke的名称
启动服务
#启动服务
node1/apache-activemq-5.11.1/bin/activemq start
node2/apache-activemq-5.11.1/bin/activemq start
node3/apache-activemq-5.11.1/bin/activemq start
#停止服务
node1/apache-activemq-5.11.1/bin/activemq stop
node2/apache-activemq-5.11.1/bin/activemq stop
node3/apache-activemq-5.11.1/bin/activemq stop
#查看日志
tail -f node1/apache-activemq-5.11.1/data/activemq.log
tail -f node2/apache-activemq-5.11.1/data/activemq.log
tail -f node3/apache-activemq-5.11.1/data/activemq.log
#连接zookeeper查看节点信息
zookeeper/zookeeper1/zookeeper-3.4.10/bin/zkCli.sh -server 192.168.100.10:2182
在zookeeper存储的节点信息
在eclipse插件中查看
elected 有数据的节点,表示是主节点,其他节点是备用节点,处于待机状态,activemq其中一个节点挂掉,另外一个才会启动,而不是都处于工作状态,那么意味着,只有启动的节点的管控台可以查看,没有启动的节点,管控台是没有运行的
java测试
集群后,连接方式改为了failover 了,其他的和原来的一样,没啥区别
private static final String ACTIVEMQ_HOST = "failover:(tcp://192.168.100.10:51511,tcp://192.168.50.133:51512,tcp://192.168.50.134:51513)?Randomize=false";
案例
package com.yellowcong.provice;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 创建用户:狂飙的yellowcong<br/>
* 创建日期:2017年12月9日<br/>
* 创建时间:下午1:29:28<br/>
* 机能概要:
*/
public class Provider {
// activemq的服务器地址
private static final String ACTIVEMQ_HOST = "failover:(tcp://192.168.100.10:51511,tcp://192.168.50.133:51512,tcp://192.168.50.134:51513)?Randomize=false";
// 用户名
private static final String USERNAME = null;
// 密码
private static final String PASSWORD = null;
public static void main(String[] args) throws Exception {
provider();
//通过大小过滤
customerByAge();
}
public static Session getSession() {
Session session = null;
try {
// 1.获取工厂连接类
ConnectionFactory fc = new ActiveMQConnectionFactory(USERNAME, PASSWORD, ACTIVEMQ_HOST);
// 2.获取连接
Connection conn = fc.createConnection();
conn.start();
// 3.创建session
session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return session;
}
/**
* 创建用户:狂飙的yellowcong<br/>
* 创建日期:2017年12月9日<br/>
* 创建时间:下午2:39:20<br/>
* 机能概要:添加数据
*
* @throws JMSException
*/
public static void provider() throws JMSException {
Session session = getSession();
// 添加生产者
MessageProducer producter = session.createProducer(null);
MapMessage msg = session.createMapMessage();
msg.setString("username", "张三");
msg.setInt("age", 12);
msg.setString("nickname", "zhangsan");
// xxProperty 是用来检索的,过滤消息的
msg.setIntProperty("age", 12);
msg.setStringProperty("nickname", "zhangsan");
MapMessage msg2 = session.createMapMessage();
msg2.setString("username", "李四");
msg2.setInt("age", 18);
msg2.setString("nickname", "lisi");
msg2.setIntProperty("age", 18);
msg2.setStringProperty("nickname", "lisi");
MapMessage msg3 = session.createMapMessage();
msg3.setString("username", "王五");
msg3.setInt("age", 122);
msg3.setString("nickname", "wangwu");
msg3.setIntProperty("age", 122);
msg3.setStringProperty("nickname", "wangwu");
MapMessage msg4 = session.createMapMessage();
msg4.setString("username", "赵六");
msg4.setInt("age", 24);
msg4.setString("nickname", "zhaoliu");
msg4.setIntProperty("age", 24);
msg4.setStringProperty("nickname", "zhaoliu");
MapMessage msg5 = session.createMapMessage();
msg5.setString("username", "黄聪");
msg5.setInt("age", 24);
msg5.setString("nickname", "yellowcong");
msg5.setIntProperty("age", 24);
msg5.setStringProperty("nickname", "yellowcong");
Destination destination = session.createQueue("test");
// 发送消息
producter.send(destination, msg, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg2, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg3, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg4, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
producter.send(destination, msg5, DeliveryMode.NON_PERSISTENT, 4, 1000 * 60);
session.commit();
}
/**
* 创建用户:狂飙的yellowcong<br/>
* 创建日期:2017年12月9日<br/>
* 创建时间:下午2:47:56<br/>
* 机能概要:消费者
* @throws Exception
*/
public static void customerByAge() throws Exception {
Session session = getSession();
// 创建消费者
Destination destination = session.createQueue("test");
//查询年大于 18的数据
//字符串的过滤,需要有''
MessageConsumer consumer = session.createConsumer(destination, "age >18 OR nickname = 'yellowcong'");
// 设定消费者的监听器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// TODO Auto-generated method stub
// 当是Map 消息的情况
if (message instanceof MapMessage) {
try {
MapMessage msg = (MapMessage) message;
//接受消息
msg.acknowledge();
String username = msg.getString("username");
String nickname = msg.getString("nickname");
int age = msg.getInt("age");
System.out.printf("获取消费者信息\t%s:%s:%d\r\n", username, nickname, age);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
});
}
}
参考文章
http://wosyingjun.iteye.com/blog/2314683
https://www.cnblogs.com/shihaiming/p/6018916.html
更多推荐
已为社区贡献22条内容
所有评论(0)