php rdkafka操作kafka消息队列——k8s从入门到高并发系列教程(十七)
php rdkafka操作kafka消息队列
ads:
关注以下公众号查看更多文章
安装kafka
通过docker
安装zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
安装kafka
docker run -d --name kafka -p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_MESSAGE_MAX_BYTES=20000000 \
-e KAFKA_ZOOKEEPER_CONNECT=192.168.50.131:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.50.131:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-e KAFKA_LOG_RETENTION_HOURS=1 \
-e KAFKA_LOG_RETENTION_BYTES=10737418240 \
-e KAFKA_MAX_REQUEST_SIZE=20582912 \
-e KAFKA_REPLICA_FETCH_MAX_BYTES=20582912 \
-e KAFKA_FETCH_MESSAGE_MAX_BYTES=20485760 \
-t wurstmeister/kafka
通过k8s
增加bitnami repo仓库
helm repo add bitnami https://charts.bitnami.com/bitnami
安装zookeeper
helm install zookeeper bitnami/zookeeper \
--set replicaCount=1 \
--set auth.enabled=false \
--set allowAnonymousLogin=true \
-n rubyruby
可以看到如下的输出信息
** Please be patient while the chart is being deployed **
ZooKeeper can be accessed via port 2181 on the following DNS name from within your cluster:
zookeeper.rubyruby.svc.cluster.local
To connect to your ZooKeeper server run the following commands:
export POD_NAME=$(kubectl get pods --namespace rubyruby -l "app.kubernetes.io/name=zookeeper,app.kubernetes.io/instance=zookeeper,app.kubernetes.io/component=zookeeper" -o jsonpath="{.items[0].metadata.name}")
kubectl exec -it $POD_NAME -- zkCli.shTo connect to your ZooKeeper server from outside the cluster execute the following commands:
kubectl port-forward --namespace rubyruby svc/zookeeper 2181:2181 &
zkCli.sh 127.0.0.1:2181
安装kafka
helm install kafka bitnami/kafka \
--set zookeeper.enabled=false \
--set replicaCount=1 \
--set externalZookeeper.servers=zookeeper \
-n rubyruby
输出信息如下
** Please be patient while the chart is being deployed **
Kafka can be accessed by consumers via port 9092 on the following DNS name from within your cluster:
kafka.rubyruby.svc.cluster.local
Each Kafka broker can be accessed by producers via port 9092 on the following DNS name(s) from within your cluster:
kafka-0.kafka-headless.rubyruby.svc.cluster.local:9092
To create a pod that you can use as a Kafka client run the following commands:
kubectl run kafka-client --restart='Never' --image docker.io/bitnami/kafka:3.3.1-debian-11-r1 --namespace rubyruby --command -- sleep infinity
kubectl exec --tty -i kafka-client --namespace rubyruby -- bashPRODUCER:
kafka-console-producer.sh \--broker-list kafka-0.kafka-headless.rubyruby.svc.cluster.local:9092 \
--topic testCONSUMER:
kafka-console-consumer.sh \--bootstrap-server kafka.rubyruby.svc.cluster.local:9092 \
--topic test \
--from-beginning
php生产者消费者使用代码
<?php
$conf = new \RdKafka\Conf();
//rdkakfa的连接入口
$conf->set("metadata.broker.list", "kafka.rubyruby.svc.cluster.local:9092");
$topicConf = new \RdKafka\TopicConf();
//topic关闭自动提交
$topicConf->set('auto.commit.enable', 0);
$topicConf->set('auto.commit.interval.ms', 1000);
//从最近消息开始消费
$topicConf->set('auto.offset.reset', 'latest');
//kafka topic
$conf->setDefaultTopicConf($topicConf);
//rdkafka 消息
$rdKafkaMessage = new \RdKafka\Message();
//设置rdkafka消息topic
$rdKafkaMessage->topic_name = 'ruby1';
$headers = [
'event' => 'live.broadcast.send.msg',
'message_id' => md5(microtime(true) . uniqid('msg', true))
];
$payloads = [
'uid' => "10000001",
'roomid' => "10000002",
'app' => 'rubyruby',
'msg' => "hello world",
'happened_at' => time(),
];
$payload['headers'] = $headers;
$payload['payload'] = $payloads;
//rdkafka消息负载内容
$rdKafkaMessage->payload = json_encode($payload);
$rdKafkaMessage->timestamp = time();
//消息key
$rdKafkaMessage->key = md5(microtime(true) . uniqid('key', true));
//生产者
$producer = new \RdKafka\Producer($conf);
//拿到topic
$topic = $producer->newTopic($rdKafkaMessage->topic_name);
//topic产生消息
$topic->producev(RD_KAFKA_PARTITION_UA, 0, json_encode($payload), $rdKafkaMessage->key, $headers);
//拿产生消息的结果
$result = $producer->flush(1000);
if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
echo "send success";
}
//rdkafka的消费组
$conf->set("group.id", "ruby-live");
//消费者重定分区
$conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
switch ($err) {
case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
$kafka->assign($partitions);
break;
case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
$kafka->assign(null);
break;
default:
throw new Exception($err);
}
});
//kafka消费者对象
$consumer = new \RdKafka\KafkaConsumer($conf);
//自动安排分区
$consumer->subscribe([$rdKafkaMessage->topic_name]);
//主动设置分区
// $consumer->assign([
// new \RdKafka\TopicPartition("zzy8", 0),
// new \RdKafka\TopicPartition("zzy8", 1),
// ]);
//拿消息
$kafkaMessage = $consumer->consume(1000);
echo $kafkaMessage->err . PHP_EOL;
if($kafkaMessage->err == RD_KAFKA_RESP_ERR_NO_ERROR){
print_r(json_decode($rdKafkaMessage->payload, true));
}else if($kafkaMessage->err == RD_KAFKA_RESP_ERR__TIMED_OUT){
echo "连接超时";
}
//提交确认消息
try {
$consumer->commitAsync($kafkaMessage);
// $consumer->commit($kafkaMessage);
} catch (Exception $e) {
throw new MessageAcknowledgeException($message, 'commit kafka message failed', $e);
}
//结束消息监听
$consumer->unsubscribe();
相关链接
常用开发工具:php_codesniffer代码规范检查&修复、phpstan语法检查、phpunit单元测试
.gitlab-ci.yaml自动镜像打包&&互联网企业规范化上线流程(上)
更多推荐
所有评论(0)