消息中间件

1、消息(message)是指在应用间传送的数据。消息可以非常简单,比如一个文本字符串;也可以很复杂,比如对象。
2、消息中间件(Message Queue Middleware,简称MQ),是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间通信。
3、RabbitMQ支持事务、消息传递有丰富的安全机制;kafka性能比Rabbit MQ优秀、但是不支持事务,Kafka适用于对消息安全要求一般的日志系统,Rabbit MQ适用于订单系统、注册登陆的验证码等场景;Rocket MQ是阿里开发并贡献给apache维护的项目,但是一些高级特性是阿里云平台提供并收费的,因此一般的消息队列场景、Rabbit MQ会优先考虑,注重极致性能的场景、优选kafka。

Rabbit MQ的安装需要Erlang的环境,我们直接使用docker安装Rabbit MQ会省去很多配置工作

docker安装

可以在windows10或Mac中直接安装docker服务、不需要借助centos7虚拟机,在生产环境、绝大部分还是以centos为服务器的、因此在centos7中安装docker服务很有必要。

yum remove docker \
    docker-client \
    docker-client-latest \
    docker-common \
    docker-latest \
    docker-latest-logrotate \
    docker-logrotate \
    docker-engine
  • 安装必要的包
  yum install -y yum-utils \
  device-mapper-persistent-data \
  lvm2
  • 设置稳定的仓库
    yum-config-manager \
    --add-repo \
    https://download.docker.com/linux/centos/docker-ce.repo
  • 测试仓库
yum-config-manager --enable docker-ce-nightly

yum-config-manager --enable docker-ce-test
  • 执行安装docker命令,不指定版本、默认安装最新版本
yum install docker-ce
  • 查看版本历史的命令。如果需要指定一个版本、可以先执行以下命令查看版本历史,再选择安装哪个版本,这一步非必须
yum list docker-ce --showduplicates | sort -r
  • 启动docker服务
systemctl start docker
  • 查看docker版本并运行hello world测试安装是否成功
docker --version

docker run hello-world

RabbitMQ容器部署

部署Rabbit MQ容器也非常简单,只需要拉取高手共享的Rabbit MQ镜像、然后本地启动容器就可以了。

  • 拉取Rabbit MQ镜像
docker pull rabbitmq:management
  • 运行
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

-hostname:指定容器主机名称
-name:指定容器名称
-p:将mq端口号映射到本地
15672:控制台端口号
5672:应用访问端口号
在本地Windows或mac宿主机访问:http://localhost:15672 ,账户密码为:admin/admin,在启动容器命令中可以修改

JavaAPI

- git代码地址,feature分支
rabbitmq-api : springboot整合
rabbitmq-original : amqp原始方式API实现
在这里插入图片描述

  • 生产者
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @author : zhaochuanzhen
 * @date : 2019/1/25 10:15
 * @Desc:
 */
public class HelloProducer {

    private static final String HOST = "127.0.0.1";
    private static final Integer POST = 5672;
    private static final String USERNAME = "admin";
    private static final String PASSWORD = "admin";
    
    private static final String QUEUE_NAME = "user_queue";
    private static final String EXCHANGE_NAME = "user_exchange";
    private static final String EXCHANGE_TYPE = "direct";
    private static final String ROUTING_KEY = "user.single";
    
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(POST);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        Connection conn = null;
        Channel channel = null;
        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, "陈独秀".getBytes());

            conn.addShutdownListener(new ShutdownListener() {
                @Override
                public void shutdownCompleted(ShutdownSignalException cause) {
                    System.out.println("shutdownCompleted ==> " + cause.getLocalizedMessage());
                }
            });
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            if (conn != null) {
                try {
                    conn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  • 消费者
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
 * @author : zhaochuanzhen
 * @date : 2019/1/26 10:45
 * @Desc:
 */
public class HelloConsumer {

    private static final String HOST = "127.0.0.1";
    private static final Integer POST = 5672;
    private static final String USERNAME = "admin";
    private static final String PASSWORD = "admin";
    
    private static final String QUEUE_NAME = "user_queue";
    private static final String EXCHANGE_NAME = "user_exchange";
    private static final String EXCHANGE_TYPE = "direct";
    private static final String ROUTING_KEY = "user.single";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(POST);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        boolean autoACK = false;
        String consumerTag = "consumerTag1";

        Connection conn = null;
        Channel channel = null;
        try {
            conn = factory.newConnection();
            channel = conn.createChannel();
            channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE, true);
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, null);

            channel.basicQos(64);
            Channel finalChannel = channel;
            channel.basicConsume(QUEUE_NAME, autoACK, consumerTag, new DefaultConsumer(finalChannel) {

                @Override
                public void handleDelivery(String consumerTag,
                                           Envelope envelope,
                                           AMQP.BasicProperties properties,
                                           byte[] body) throws IOException {
                    String routingKey = envelope.getRoutingKey();
                    String contentType = properties.getContentType();
                    long deliveryTag = envelope.getDeliveryTag();
                    finalChannel.basicAck(deliveryTag, false);
                    System.out.println(new String(body));
                }
            });

            TimeUnit.SECONDS.sleep(5);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            if (channel != null) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (conn != null) {
                try {
                    conn.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

  • 首先运行生产者,结果如下:Ready显示当前就绪消息数为1
    在这里插入图片描述
  • 运行消费者,可以看到控制台打印的消息内容,并且Rabbit MQ管理页面显示当前就绪消息为0.
    在这里插入图片描述

这只是Rabbit MQ最基本的操作,若要继续深入,需要理解交换器、队列、路由键、绑定、绑定键等概念,关于队列还有:死信队列、延迟队列、优先级队列等不同场景适用的队列。关于这些代码的编写逻辑、将在写完基本概念后、逐行讲解。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐