RabbitMQ(一):环境安装(Docker容器化)及第一个程序
Rabbit MQ之一:环境安装、hello world程序消息中间件docker安装RabbitMQ容器部署JavaAPI消息中间件1、消息(message)是指在应用间传送的数据。消息可以非常简单,比如一个文本字符串;也可以很复杂,比如对象。2、消息中间件(Message Queue Middleware,简称MQ),是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通讯...
消息中间件
1、消息(message)是指在应用间传送的数据。消息可以非常简单,比如一个文本字符串;也可以很复杂,比如对象。
2、消息中间件(Message Queue Middleware,简称MQ),是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间通信。
3、RabbitMQ支持事务、消息传递有丰富的安全机制;kafka性能比Rabbit MQ优秀、但是不支持事务,Kafka适用于对消息安全要求一般的日志系统,Rabbit MQ适用于订单系统、注册登陆的验证码等场景;Rocket MQ是阿里开发并贡献给apache维护的项目,但是一些高级特性是阿里云平台提供并收费的,因此一般的消息队列场景、Rabbit MQ会优先考虑,注重极致性能的场景、优选kafka。
Rabbit MQ的安装需要Erlang的环境,我们直接使用docker安装Rabbit MQ会省去很多配置工作
docker安装
可以在windows10或Mac中直接安装docker服务、不需要借助centos7虚拟机,在生产环境、绝大部分还是以centos为服务器的、因此在centos7中安装docker服务很有必要。
- 首先、需要搭建一个centos7的虚拟机。安装过程参考官网:https://docs.docker.com/install/linux/docker-ce/centos/
目前最新版docker版本为docker-ce-18.09.1,整理后的安装过程如下 - 移除系统之前的docker服务残留:
yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-engine
- 安装必要的包
yum install -y yum-utils \
device-mapper-persistent-data \
lvm2
- 设置稳定的仓库
yum-config-manager \
--add-repo \
https://download.docker.com/linux/centos/docker-ce.repo
- 测试仓库
yum-config-manager --enable docker-ce-nightly
yum-config-manager --enable docker-ce-test
- 执行安装docker命令,不指定版本、默认安装最新版本
yum install docker-ce
- 查看版本历史的命令。如果需要指定一个版本、可以先执行以下命令查看版本历史,再选择安装哪个版本,这一步非必须
yum list docker-ce --showduplicates | sort -r
- 启动docker服务
systemctl start docker
- 查看docker版本并运行hello world测试安装是否成功
docker --version
docker run hello-world
RabbitMQ容器部署
部署Rabbit MQ容器也非常简单,只需要拉取高手共享的Rabbit MQ镜像、然后本地启动容器就可以了。
- 拉取Rabbit MQ镜像
docker pull rabbitmq:management
- 运行
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
-hostname:指定容器主机名称
-name:指定容器名称
-p:将mq端口号映射到本地
15672:控制台端口号
5672:应用访问端口号
在本地Windows或mac宿主机访问:http://localhost:15672 ,账户密码为:admin/admin,在启动容器命令中可以修改
JavaAPI
- git代码地址,feature分支
rabbitmq-api : springboot整合
rabbitmq-original : amqp原始方式API实现
- 生产者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : zhaochuanzhen
* @date : 2019/1/25 10:15
* @Desc:
*/
public class HelloProducer {
private static final String HOST = "127.0.0.1";
private static final Integer POST = 5672;
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
private static final String QUEUE_NAME = "user_queue";
private static final String EXCHANGE_NAME = "user_exchange";
private static final String EXCHANGE_TYPE = "direct";
private static final String ROUTING_KEY = "user.single";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(POST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "陈独秀".getBytes());
conn.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException cause) {
System.out.println("shutdownCompleted ==> " + cause.getLocalizedMessage());
}
});
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
- 消费者
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @author : zhaochuanzhen
* @date : 2019/1/26 10:45
* @Desc:
*/
public class HelloConsumer {
private static final String HOST = "127.0.0.1";
private static final Integer POST = 5672;
private static final String USERNAME = "admin";
private static final String PASSWORD = "admin";
private static final String QUEUE_NAME = "user_queue";
private static final String EXCHANGE_NAME = "user_exchange";
private static final String EXCHANGE_TYPE = "direct";
private static final String ROUTING_KEY = "user.single";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(POST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
boolean autoACK = false;
String consumerTag = "consumerTag1";
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
channel.basicQos(64);
Channel finalChannel = channel;
channel.basicConsume(QUEUE_NAME, autoACK, consumerTag, new DefaultConsumer(finalChannel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
finalChannel.basicAck(deliveryTag, false);
System.out.println(new String(body));
}
});
TimeUnit.SECONDS.sleep(5);
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
- 首先运行生产者,结果如下:Ready显示当前就绪消息数为1
- 运行消费者,可以看到控制台打印的消息内容,并且Rabbit MQ管理页面显示当前就绪消息为0.
这只是Rabbit MQ最基本的操作,若要继续深入,需要理解交换器、队列、路由键、绑定、绑定键等概念,关于队列还有:死信队列、延迟队列、优先级队列等不同场景适用的队列。关于这些代码的编写逻辑、将在写完基本概念后、逐行讲解。
更多推荐
所有评论(0)