**

ThinkPHP操作RabbitMQ(下载,安装,队列)

**
1.docker下载RabbitMQ
拉取rabbitmq
docker pull rabbitmq:3.7.7-management
根据下载的镜像创建和启动容器
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v pwd/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9
使用命令:docker ps 查看正在运行容器
这时输入你的服务器ip:15672就可访问了。

注:15672端口和5672端口要在安全组放行

2.下载rabbitmq-c
下载地址:https://github.com/alanxz/rabbitmq-c/releases

wget -c https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz

tar zxf rabbitmq-c-0.8.0.tar.gz

cd rabbitmq-c-0.8.0

./configure --prefix=/usr/local/rabbitmq-c-0.8.0

make && make install
3.下载amqp
注:php安装位置要正确。
官网下载地址:http://pecl.php.net/package/amqp

wget -c http://pecl.php.net/get/amqp-1.9.3.tgz

tar zxf amqp-1.9.3.tgz

cd amqp-1.9.3

/usr/local/php/bin/phpize

./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.8.0

make && make install
下载完成后修改php.ini配置文件。extension =/你的amqp.so位置路径/ amqp.so
查看phpinfo信息
在这里插入图片描述
4运行。
可在cli端运行,也可访问运行(建议cli运行)

	<?php


namespace app\index\controller;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\Db;
class Rabbit
{
  
    /*
     * 生产者步骤
     * 1.建立连接
     * 2.建立信道
     * 3.声明队列
     * 4.发布消息
     * 5.关闭信道
     * 6.关闭连接
     */
    public function send()
    {
        //连接服务器
        $connection = new AMQPStreamConnection('106.54.211.184', 5672, 'guest', 'guest');
        //连接信道。 $channel_id 信道id,不传则获取$channel[“”]信道,再无则循环$this->channle数组,下标从1到最大信道数找第一个不是AMQPChannel对象的下标,实例化并返回AMQPChannel对象,无则抛出异常No free channel ids
        $channel = $connection->channel();
        //队列声明,第一个参数:路由键,第三个参数:是否持久化,第四个参数:是否排外,第五个参数:是否自动删除
        $channel->queue_declare('hello',false, false, false, false);
        //$data  string类型 要发送的消息
        //$properties array类型 设置的属性,比如设置该消息持久化[‘delivery_mode’=>2]
        //$msg = new AMQPMessage($data,$properties)
        $count = self::$count++;
        dump($count);
        $data = array('a'=>1,'b'=>2,'c'=>3);
        $data = json_encode($data);
        $msg = new AMQPMessage($data);
        //第一个值:AMQPMessage对象.第二个值:交换机名字。第三个值:路由键 如果交换机类型
        $channel->basic_publish($msg, '', 'hello');
        $channel->close();
        $connection->close();
    }

    /*
     * 消费者步骤
     * 1.建立连接
     * 2.建立信道
     * 3.声明交换器
     * 4.声明队列
     * 5.绑定交换器和队列
     * 6.消费队列
     * 7.关闭信道
     * 8.关闭连接
     */
    public function out()
    {
     
        $connection = new AMQPStreamConnection('106.54.211.184', 5672, 'guest', 'guest');
        $channel = $connection->channel();
        $channel->queue_declare('hello', false, false, false, false);
        echo ' [*] Waiting for messages. To exit press CTRL++C', "\n";
        $callback = function($msg) {
            $data = json_decode($msg->body,true);
            dump($data);
            Db::name('demo')->insert(['mes'=>$data]);
        };
        $channel->basic_consume('hello', '', false, true, false, false, $callback);
        while(count($channel->callbacks)) {
            $channel->wait();
        }

        $channel->close();
        $connection->close();


    }
}

运行send函数,即可添加队列信息。这里只是简单demo
注:5672端口要在安全组放行。此处链接5672端口
这时在cli运行启动文件的out方法,既订阅了队列,只要有消息进入队列,就会自动进行消费
注:如果开启两个cli订阅频道,队列进来的消息会平均发送给所有订阅该频道的消费者。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐