在 php 中简单调用 kafka
一、确保 kafka 已被安装在 Linux 服务器中若未安装,查看此博客快速安装:https://blog.csdn.net/panguangyuu/article/details/88408320二、启动 kakfa 服务# 启动zookeeperbin/zookeeper-server-start.sh config/zookeeper.properties &...
·
推荐优先使用 rdkakfa :
GitHub - arnaud-lb/php-rdkafka: Production-ready, stable Kafka client for PHP
以下是基于 nmred/kafka-php 的介绍
一、确保 kafka 已被安装在 Linux 服务器中
若未安装,查看此博客快速安装:
二、启动 kakfa 服务
# 启动zookeeper
bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动kafka服务
bin/kafka-server-start.sh config/server.properties &
三、创建一个 Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
# 指定复制集个数、分区大小等。
四、安装 Kafka-php
我这里使用的是 nmred/kafka-php :GitHub - weiboad/kafka-php: kafka php client
安装方式:确保 composer 已被安装在Linux服务器中
创建一个目录 kafka,在目录下创建一个 composer.json :
{
"require": {
"nmred/kafka-php": "0.2.*"
}
}
保存后,输入命令:
# 使用国内 composer 代理,速度较快
composer config repo.packagist composer https://packagist.phpcomposer.com
# 安装 kafka-php
composer require nmred/kafka-php
安装完毕可见该目录下有一个 vendor 目录。
五、在 kafka 目录下新建一个生产者 Producer.php
<?php
require './vendor/autoload.php';
date_default_timezone_set('PRC');
/* 创建一个配置实例 */
$config = \Kafka\ProducerConfig::getInstance();
/* Topic的元信息刷新的间隔 */
$config->setMetadataRefreshIntervalMs(10000);
/* 设置broker的地址 */
$config->setMetadataBrokerList('127.0.0.1:9092');
/* 设置broker的代理版本 */
$config->setBrokerVersion('1.0.0');
/* 只需要leader确认消息 */
$config->setRequiredAck(1);
/* 选择异步 */
$config->setIsAsyn(false);
/* 每500毫秒发送消息 */
$config->setProduceInterval(500);
/* 创建一个生产者实例 */
$producer = new \Kafka\Producer();
for($i = 0; $i < 100; $i++ ) {
$producer->send([
[
'topic' => 'test',
'value' => 'test'.$i,
],
]);
}
/* 更多配置项参考:https://github.com/weiboad/kafka-php/blob/master/docs/Configure.md */
六、在 kafka 目录下新建一个消费者 Consumer.php
<?php
require './vendor/autoload.php';
date_default_timezone_set('PRC');
$config = \Kafka\ConsumerConfig::getInstance();
$config->setMetadataRefreshIntervalMs(10000);
$config->setMetadataBrokerList('127.0.0.1:9092');
$config->setGroupId('test');
$config->setBrokerVersion('1.0.0');
$config->setTopics(['test']);
$consumer = new \Kafka\Consumer();
$consumer->start(function($topic, $part, $message) {
var_dump($message);
});
七、启动生产者消费者
php Producer.php
php Consumer.php
可见生产者生产的数据,全部被消费者消费掉并输出到屏幕
更多推荐
已为社区贡献2条内容
所有评论(0)