简介:

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。

使用场景:

应用解耦、流量肖峰、异步处理。

六种应用模式:

1、简单模式
在这里插入图片描述

注释:P:生产者(发送数据) 队列:存储数据(消息缓冲器) C:消费者(获取数据)

生产者消费者均为应用程序。

2、工作队列
在这里插入图片描述

注释:默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均而言,每个消费者将获得相同数量的消息。这种分发消息的方式称为循环法。与三个或更多工人一起尝试。

消息确认:分两种自动确认(易丢失数据)和手动确认(不易丢失数据)
手动确认:我们通过autoAck = true 标志明确地将它们关闭。一旦我们完成任务,就应该将此标志设置为false并从工作人员发送适当的确认。若忘记返回确认会造成消息无法释放,内存堆积。

消息持久性:分消息持久化与消息非持久化
消息持久化:数据写入磁盘,数据不易丢失(队列和消息都标记为持久)

消息非持久化:数据写入内存,数据存储速度快

公平派遣

能者多劳

使用basicQos方法和 prefetchCount = 1设置。在处理并确认前一个消息之前,不要向工作人员发送新消息。相反,它会将它发送给下一个仍然不忙的工人。
3、发布订阅

基本上,发布的消息将被广播给所有接收者。

Publish/Subscribe

注释:P:生产者(发送数据) X:交换机 队列:存储数据(消息缓冲器) C:消费者(获取数据)

RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。

流程:消息发送到Exchanges(交换机),Exchanges把消息发到绑定的队列,消费者获取队列消息。
四种Exchanges类型:direct, topic, headers and fanout

发布订阅用的交换机类型为fanout

4、路由

Routing

交换机类型:direct

直接交换背后的路由算法很简单 - 消息进入队列,其 绑定密钥与消息的路由密钥完全匹配。

5、主题

在这里插入图片描述

交换机类型:topic

发送到主题交换的消息不能具有任意 routing_key - 它必须是由点分隔的单词列表。单词可以是任何内容,但通常它们指定与消息相关的一些功能。一些有效的路由密钥示例:“ stock.usd.nyse ”,“ nyse.vmw ”,“ quick.orange.rabbit ”。路由密钥中可以包含任意数量的单词,最多可达255个字节。

绑定密钥也必须采用相同的形式。主题交换背后的逻辑 类似于直接交换- 使用特定路由密钥发送的消息将被传递到与匹配绑定密钥绑定的所有队列。但是,绑定键有两个重要的特殊情况:

*(星号)可以替代一个单词。
#(hash)可以替换零个或多个单词。
6、远程过程调用(RPC)

在这里插入图片描述

使用RabbitMQ构建RPC系统:客户端和可伸缩的RPC服务器。由于我们没有任何值得分发的耗时任务,我们将创建一个返回Fibonacci数字的虚拟RPC服务。

RPC将这样工作:

对于RPC请求,客户端发送带有两个属性的消息: replyTo,设置为仅为请求创建的匿名独占队列;以及correlationId,设置为每个请求的唯一值。

请求被发送到rpc_queue队列。

RPC worker(aka:server)正在等待该队列上的请求。当出现请求时,它会执行该作业,并使用来自replyTo字段的队列将带有结果的消息发送回客户端。

客户端等待回复队列上的数据。出现消息时,它会检查correlationId属性。如果它与请求中的值匹配,则返回对应用程序的响应。

安装环境:

我的环境:mac、nginx、php7.2、thinkphp5.0

1、安装RabbitMQ:
支持windows和linux,各自找对应版本安装。
安装完成可登录查看界面:
http://localhost:15672/

  • 默认账号密码是:guest
  • 添加vhost (相当于数据库)
  • 创建账号、并给账号配置vhost

2、php安装amqp扩展:
在这里插入图片描述

3、安装php-amqplib扩展包:

composer require php-amqplib/php-amqplib

模型及生产者实现:

1、RabbitMQ模型以及生产者类代码目录
在这里插入图片描述

RabbitMq.php // RabbitMq 模型

<?php

namespace RabbitMq;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// RabbitMq 模型
class RabbitMq
{
    static private $instance;
    static private $connection;
    static private $channel;
    const DIRECT = 'direct';
    const TOPIC = 'topic';
    const HEADERS = 'headers';
    const FANOUT = 'fanout';
    static private $exchangeNames = [
        self::DIRECT=>'direct_exchange',
        self::TOPIC=>'topic_exchange',
        self::HEADERS=>'headers_exchange',
        self::FANOUT=>'fanout_exchange',
    ];
    const SEVERITYS = [
        'info',
        'warning',
        'error'
    ];
    static private $exchangeName = '';

    /**
     * RabbitMq constructor.
     * @param $exchangeType
     */
    private function __construct($exchangeType)
    {
        // 取默认配置
        $config = [
            'host'     => config('rabbit_mq_conf.host'), //ip
            'port'     => config('rabbit_mq_conf.port'),      //端口号
            'user'     => config('rabbit_mq_conf.username'),     //用户
            'password' => config('rabbit_mq_conf.password'), //密码
            'vhost'    => config('rabbit_mq_conf.vhost')         //虚拟host
        ];

        //创建链接
        self::$connection = new AMQPStreamConnection($config['host'], $config['port'], $config['user'], $config['password'], $config['vhost']);
        self::$channel = self::$connection->channel();
        if (!empty($exchangeType)){
            self::$exchangeName = self::$exchangeNames[$exchangeType];
            self::$channel->exchange_declare(
                self::$exchangeName, //交换机名称
                $exchangeType, //路由类型
                false, //don't check if a queue with the same name exists 是否检测同名队列
                true, //the queue will not survive server restarts 是否开启队列持久化
                false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列
            );
        }
    }

    /**
     * 实例化
     * @param string $exchangeType
     * @return RabbitMq
     */
    public static function instance($exchangeType='')
    {
        if (!self::$instance instanceof self){
            self::$instance = new self($exchangeType);
        }
        return self::$instance;
    }

    /**
     * 防止被外部复制
     */
    private function __clone(){}

    /**
     * 简单的发送
     * @param $body
     */
    public function send($body)
    {
        self::$channel->queue_declare('hello',false,false,false);
        $msg = new AMQPMessage($body);
        self::$channel->basic_publish($msg,'','hello');
        echo "[X] Sent ".$body."'\n";
    }

    /**
     * 简单的接收
     * @param $queueName
     * @param $callback
     */
    public function receive($callback)
    {
        self::$channel->queue_declare('hello',false,false,false,true);
        echo "[*] Waiting for messages. To exit press CTRL+C\n";

        self::$channel->basic_consume("hello","",false,false,false,false,$callback);

        while(count(self::$channel->callbacks)){
            self::$channel->wait();
        }
    }

    /**
     * 添加工作队列
     * @param string $data
     * @param string $queueName
     */
    public function addTask($data="",$queueName)
    {
        self::$channel->queue_declare($queueName,false,true,false,true);
        if (empty($data))$data = 'Hello World!';
        $msg = new AMQPMessage(
            $data,
            array("delivery_mode" => AMQPMessage::DELIVERY_MODE_PERSISTENT)
        );
        self::$channel->basic_publish($msg,"",$queueName);

//        echo "[x] Sent $data \n";
    }

    /**
     * 执行工作队列
     * @param $callback
     * @param $queueName
     */
    public function workTask($callback,$queueName)
    {
        self::$channel->queue_declare($queueName,false,true,false,true);
        echo " [*] Waiting for messages. To exit press CTRL+C", "\n";

        self::$channel->basic_qos(null,1,null);
        self::$channel->basic_consume($queueName,"",false,false,false,false,$callback);

        while(count(self::$channel->callbacks)){
            self::$channel->wait();
        }
    }

    /**
     * 发布
     * @param string $data
     */
    public function sendQueue($data="")
    {
        if (empty($data))$data = 'info:Hello World!';
        $msg = new AMQPMessage($data);
        self::$channel->basic_publish($msg,self::$exchangeName);
        echo "[x] Sent $data \n";
    }

    /**
     * 订阅
     * @param $callback
     */
    public function subscribeQueue($callback)
    {
        list($queue_name,,) = self::$channel->queue_declare(
            "", //队列名称
            false, //dont check if a queue with the same name exists 是否检测同名队列
            true, //the queue will not survive server restarts 是否开启队列持久化
            true, //the queue might be accessed by other channels 队列是否可以被其他队列访问
            false //the queue will be deleted once the channel is closed. 通道关闭后是否删除队列
        );
        self::$channel->queue_bind($queue_name,self::$exchangeName);
        echo "[*] Waiting for logs. To exit press CTRL+C \n";
        self::$channel->basic_consume($queue_name,"",false,true,false,false,$callback);

        while(count(self::$channel->callbacks)){
            self::$channel->wait();
        }
    }

    /**
     * 发送(直接交换机)
     * @param $routingKey
     * @param string $data
     */
    public function sendDirect($routingKey, $data = "")
    {
        if (empty($data))$data = "Hello World!";
        $msg = new AMQPMessage($data);
        self::$channel->basic_publish($msg,self::$exchangeName,$routingKey);
        echo "[x] Sent $routingKey:$data \n";
    }

    /**
     * 接收(直接交换机)
     * @param \Closure $callback
     * @param array $bindingKeys
     */
    public function receiveDirect(\Closure $callback, array $bindingKeys)
    {
        list($queue_namme,,) = self::$channel->queue_declare("",false,true,true,false);
        foreach ($bindingKeys as $bindingKey){
            self::$channel->queue_bind($queue_namme,self::$exchangeName,$bindingKey);
        }
        echo "[x] Waiting for logs. To exit press CTRL+C \n";
        self::$channel->basic_consume($queue_namme,"",false,true,false,false,$callback);
        while(count(self::$channel->callbacks)){
            self::$channel->wait();
        }
    }

    /**
     * 发送(主题交换机)
     * @param $routingKey
     * @param string $data
     */
    public function sendTopic($routingKey, $data = "")
    {
        if (empty($data)) $data = "Hello World!";
        $msg = new AMQPMessage($data);
        self::$channel->basic_publish($msg,self::$exchangeName,$routingKey);
        echo " [x] Sent ",$routingKey,':',$data," \n";
    }

    /**
     * 接收(主题交换机)
     * @param \Closure $callback
     * @param array $bindingKeys
     */
    public function receiveTopic(\Closure $callback, array $bindingKeys)
    {
        list($queueName,,) = self::$channel->queue_declare("",false,true,true,false);
        foreach ($bindingKeys as $bindingKey){
            self::$channel->queue_bind($queueName,self::$exchangeName,$bindingKey);
        }

        echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";
        self::$channel->basic_consume($queueName, '', false, true, false, false, $callback);

        while(count(self::$channel->callbacks)) {
            self::$channel->wait();
        }
    }

    /**
     * 销毁
     */
    public function __destruct()
    {
        // TODO: Implement __destruct() method.
        self::$channel->close();
        self::$connection->close();
    }
}

RabbitMqWork.php // RabbitMq 各个模式方法操作类

<?php
namespace RabbitMq;

require 'RabbitMq.php';

// RabbitMq 各个模式方法操作类
class RabbitMqWork
{
    private $RabbitMq;
    public function __construct($exchageType='')
    {
        $this->RabbitMq = RabbitMq::instance($exchageType);
    }

    /**
     * 发送(普通)
     * @param $body
     */
    public function send($body)
    {
        $this->RabbitMq->send($body);
    }

    /**
     * 接收(普通)
     * @param $callback
     */
    public function receive($callback)
    {
        $this->RabbitMq->receive($callback);
    }

    /**
     * 发送(工作队列)
     * @param $data
     * @param $queueName
     */
    public function addTask($data,$queueName)
    {
        $this->RabbitMq->addTask($data,$queueName);
    }

    /**
     * 接收(工作队列)
     * @param $callback
     * @param $queueName
     */
    public function workTask($callback,$queueName)
    {
        $this->RabbitMq->workTask($callback,$queueName);
    }

    /**
     * 发布(扇形交换机)
     * @param $data
     */
    public function sendQueue($data)
    {
        $this->RabbitMq->sendQueue($data);
    }

    /**
     * 订阅(扇形交换机)
     * @param $callback
     */
    public function subscribeQueue($callback)
    {
        $this->RabbitMq->subscribeQueue($callback);
    }

    /**
     * 发送(直接交换机)
     * @param $bindingKey
     * @param $data
     */
    public function sendDirect($routingKey, $data)
    {
        $this->RabbitMq->sendDirect($routingKey,$data);
    }

    /**
     * 接收(直接交换机)
     * @param \Closure $callback
     * @param array $bindingKeys
     */
    public function receiveDirect(\Closure $callback, array $bindingKeys)
    {
        $this->RabbitMq->receiveDirect($callback,$bindingKeys);
    }

    /**
     * 发送(主题交换机)
     * @param $routingKey
     * @param $data
     */
    public function sendTopic($routingKey, $data)
    {
        $this->RabbitMq->sendTopic($routingKey,$data);
    }

    /**
     * 接收(主题交换机)
     * @param \Closure $callback
     * @param array $bindingKeys
     */
    public function receiveTopic(\Closure $callback, array $bindingKeys)
    {
        $this->RabbitMq->receiveTopic($callback,$bindingKeys);
    }
}

SendQueue.php // 实际发送队列消息任务类

<?php
namespace RabbitMq;

// 实际发送队列消息任务类
class SendQueue
{

    /**
     * Explain: 发送
     * @param $fid
     * User: smt \(^o^)/~
     * Date: 2020-10-13 17:02
     */
    public static function sendSyncFullEsQueue($fid){
        $RabbitMqWork = new RabbitMqWork();
        $RabbitMqWork->addTask($fid,'sync_full_to_es');
        return true;
    }


}

消费者实现:

1、目录
在这里插入图片描述
2、配置增加 /application/command.php
在这里插入图片描述
3、脚本代码

<?php

namespace app\console\controller;
use app\api\controller\Exception;
use RabbitMq\RabbitMqWork;
use searchEngine\ElasticService;
use think\Cache;
use think\console\Command;
use think\console\Input;
use think\console\Output;

/*
 * 队列脚本
 * */

class Rabbitmq extends Command
{
    protected function configure()
    {
        parent::configure();
        $this->setName('rabbitmq')
            ->addArgument('action', 2, "方法")// 设置参数
            ->setDescription('计划任务');
    }

    // 默认方法
    protected function execute(Input $input, Output $output)
    {
        $action = $input->getArgument('action');

        // 防止重复执行
        $sTaskTitle = "rabbitmq_" . $action;
        $oCache     = new Cache();

        // 业务逻辑
        $this->$action();

        $oCache->rm($sTaskTitle);

        return true;
    }

    /**
     * 同步搭配商品信息到es工作队列
     * Explain: work模式:一个生产者,多个消费者,每个消费者获取到的消息唯一。
     * User: smt \(^o^)/~
     * Date: 2020-10-13 10:38
     * php think rabbitmq syncFullEsWorkQueue ( 可启动多个 )
     */
    public function syncFullEsWorkQueue(){
        $RabbitMqWork = new RabbitMqWork();
        $queueName = 'sync_full_to_es';
        $callback = function ($msg){
            try{
                // 出现消费异常记录日志
                // throw new Exception(10101,$msg->body);

                echo " [x] Get to... ", $msg->body, "\n";

                // fid,同步单个搭配信息,存在则更新,不存在则插入
                $res = ElasticService::syncOneFullToEs($msg->body);
                if (empty($res))
                    throw new Exception(10101,$msg->body);

                echo " [x] ok ", "\n";
            }catch (Exception $e){
                // 记录消费异常日志
                add_debug_log($e->getMessage(),'syncFullEsWorkQueue','同步搭配商品信息到es队列出错');
            }
            // 处理成功或失败都发送确认ack
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        };
        $RabbitMqWork->workTask($callback,$queueName);
    }

    /**
     * Explain: 路由模式:发送消息到交换机并且要指定路由key ,消费者将队列绑定到交换机时需要指定路由key
     * User: smt \(^o^)/~
     * Date: 2020-10-13 10:38
     */
    public function directQueue(){
        $RabbitMqWork = new RabbitMqWork(RabbitMq::DIRECT);
        $callback = function ($msg){
            echo "[x] ".$msg->delivery_info['routing_key'].":$msg->body \n";
        };
        $RabbitMqWork->receiveDirect($callback,RabbitMq::SEVERITYS);
    }

    /**
     * Explain: 主题topic模式:将路由键和某模式进行匹配,此时队列需要绑定在一个模式上,“#”匹配一个词或多个词,“*”只匹配一个词。
     * User: smt \(^o^)/~
     * Date: 2020-10-13 10:38
     */
    public function topicQueue(){
        $RabbitMqWork = new RabbitMqWork(RabbitMq::TOPIC);
        $callback = function ($msg){
            echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
        };
        $bindingKeys = [
            '*.orange.*',
            '*.*.rabbit',
            'lazy.#'
        ];
        $RabbitMqWork->receiveTopic($callback,$bindingKeys);
    }

    /**
     * Explain: 订阅模式:一个生产者发送的消息会被多个消费者获取。
     * User: smt \(^o^)/~
     * Date: 2020-10-13 10:38
     */
    public function sendQueue(){
        $RabbitMqWork = new RabbitMqWork(RabbitMq::FANOUT);
        $callback = function ($msg){
            echo 'Receive:';
            echo "Msg:$msg->body \n";
        };
        $RabbitMqWork->subscribeQueue($callback);
    }
}


启动消费者:

php think rabbitmq syncFullEsWorkQueue

end

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐