ThinkPHP操作RabbitMQ(下载,安装,队列)
**ThinkPHP操作RabbitMQ(下载,安装,队列)**1.docker下载RabbitMQ拉取rabbitmqdocker pull rabbitmq:3.7.7-management根据下载的镜像创建和启动容器docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v pwd/data:/var/li...
**
ThinkPHP操作RabbitMQ(下载,安装,队列)
**
1.docker下载RabbitMQ
拉取rabbitmq
docker pull rabbitmq:3.7.7-management
根据下载的镜像创建和启动容器
docker run -d --name rabbitmq3.7.7 -p 5672:5672 -p 15672:15672 -v pwd
/data:/var/lib/rabbitmq --hostname myRabbit -e RABBITMQ_DEFAULT_VHOST=my_vhost -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin df80af9ca0c9
使用命令:docker ps 查看正在运行容器
这时输入你的服务器ip:15672就可访问了。
注:15672端口和5672端口要在安全组放行
2.下载rabbitmq-c
下载地址:https://github.com/alanxz/rabbitmq-c/releases
wget -c https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz
tar zxf rabbitmq-c-0.8.0.tar.gz
cd rabbitmq-c-0.8.0
./configure --prefix=/usr/local/rabbitmq-c-0.8.0
make && make install
3.下载amqp
注:php安装位置要正确。
官网下载地址:http://pecl.php.net/package/amqp
wget -c http://pecl.php.net/get/amqp-1.9.3.tgz
tar zxf amqp-1.9.3.tgz
cd amqp-1.9.3
/usr/local/php/bin/phpize
./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.8.0
make && make install
下载完成后修改php.ini配置文件。extension =/你的amqp.so位置路径/ amqp.so
查看phpinfo信息
4运行。
可在cli端运行,也可访问运行(建议cli运行)
<?php
namespace app\index\controller;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\Db;
class Rabbit
{
/*
* 生产者步骤
* 1.建立连接
* 2.建立信道
* 3.声明队列
* 4.发布消息
* 5.关闭信道
* 6.关闭连接
*/
public function send()
{
//连接服务器
$connection = new AMQPStreamConnection('106.54.211.184', 5672, 'guest', 'guest');
//连接信道。 $channel_id 信道id,不传则获取$channel[“”]信道,再无则循环$this->channle数组,下标从1到最大信道数找第一个不是AMQPChannel对象的下标,实例化并返回AMQPChannel对象,无则抛出异常No free channel ids
$channel = $connection->channel();
//队列声明,第一个参数:路由键,第三个参数:是否持久化,第四个参数:是否排外,第五个参数:是否自动删除
$channel->queue_declare('hello',false, false, false, false);
//$data string类型 要发送的消息
//$properties array类型 设置的属性,比如设置该消息持久化[‘delivery_mode’=>2]
//$msg = new AMQPMessage($data,$properties)
$count = self::$count++;
dump($count);
$data = array('a'=>1,'b'=>2,'c'=>3);
$data = json_encode($data);
$msg = new AMQPMessage($data);
//第一个值:AMQPMessage对象.第二个值:交换机名字。第三个值:路由键 如果交换机类型
$channel->basic_publish($msg, '', 'hello');
$channel->close();
$connection->close();
}
/*
* 消费者步骤
* 1.建立连接
* 2.建立信道
* 3.声明交换器
* 4.声明队列
* 5.绑定交换器和队列
* 6.消费队列
* 7.关闭信道
* 8.关闭连接
*/
public function out()
{
$connection = new AMQPStreamConnection('106.54.211.184', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL++C', "\n";
$callback = function($msg) {
$data = json_decode($msg->body,true);
dump($data);
Db::name('demo')->insert(['mes'=>$data]);
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
}
}
运行send函数,即可添加队列信息。这里只是简单demo
注:5672端口要在安全组放行。此处链接5672端口
这时在cli运行启动文件的out方法,既订阅了队列,只要有消息进入队列,就会自动进行消费
注:如果开启两个cli订阅频道,队列进来的消息会平均发送给所有订阅该频道的消费者。
更多推荐
所有评论(0)