Apache Pulsar部署及可视化监控部署
一、Apache Pulsar的Local模式构建1.1、启动服务Standalone Local单机本地模式, 是pulsar最简单的安装方式, 此种方式仅适用于测试学习使用, 并无法作为生产环境中使用。下载Apache Pulsar2.10apache-pulsar-2.10.0-bin.tar.gz服务器系统要求:Currently, Pulsar is available for 64-
一、Apache Pulsar的Local模式构建
1.1、启动服务
Standalone Local单机本地模式, 是pulsar最简单的安装方式, 此种方式仅适用于测试学习使用, 并无法作为生产环境中使用。
下载Apache Pulsar2.10
apache-pulsar-2.10.0-bin.tar.gz
服务器系统要求:
Currently, Pulsar is available for 64-bit macOS, Linux, and Windows. To use Pulsar, you need to install 64-bit JRE/JDK 8 or
later versions. (目前,Pulsar可用于64位macOS、Linux和Windows。使用Pulsar需要安装64位JRE/JDK 8或更高版本。)
-
将安装包上传到服务上
-
解压
tar -zxvf apache-pulsar-2.10.0-bin.tar.gz
-
进入bin目录
cd apache-pulsar-2.10.0/bin/
-
启动pulsar
./pulsar standalone
出现以下内容后,则单机版启动成功。
1.2、pulsar-client 测试消息发布订阅
首先我们先通过pulsar-client启动消费者
./pulsar-client consume my-topic -s "first"
我们打开一个新的客户端页面,在启动服务生产者
./pulsar-client produce my-topic --messages "hello pulsar"
生产者发送消息成功,在看下消费者端能否接收到消息。
可以看到消费者端已经消费生产者发送的"hello pulsar"。
二、Pulsar Manager 可视化监控
这里pulsar Manager我们是采用Docker 方式部署,所以我们直接使用Docker命里运行即可。
docker run -it -p 9527:9527 -p 7750:7750 -e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties apachepulsar/pulsar-manager:v0.2.0 -d
运行完成后,我们需要在宿主机上执行以下命令,为pulsar Manager创建用户;
CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl -H 'X-XSRF-TOKEN: $CSRF_TOKEN' -H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' -H "Content-Type: application/json" -X PUT http://localhost:7750/pulsar-manager/users/superuser -d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
这里的用户名是"admin",密码:“apachepulsar”,可以根据自己的实际情况配置。
配置完用户名和密码后,我们在浏览器中访问:http://ip:9527/,这里可以根据自己的ip修改。
登录成功后,我们配置下集群的信息:
注意配置服务url时,需配置可以访问到集群的IP,最好不要使用localhost或127.0.0.1,我这里配置的是我的公网IP。
配置完成后,点击localhost,可以进入集群信息内部查看集群内容。
My-top就是我们刚刚测试的队列。
三、Golang实现pulsar生产消费
3.1、引入golang的pulsar包
github.com/apache/pulsar-client-go/pulsar
3.2、pulsar消费者
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://110.40.141.168:6650",
})
我们看下消费者完整代码:
package main
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"log"
"time"
)
func main() {
fmt.Println("Pulsar Consumer")
//实例化Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://110.40.141.168:6650",
})
if err != nil {
log.Fatal(err)
}
//使用client对象实例化consumer
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my-topic",
SubscriptionName: "sub-demo",
Type: pulsar.Shared,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
ctx := context.Background()
//无限循环监听topic
for {
msg, err := consumer.Receive(ctx)
if err != nil {
log.Fatal(err)
} else {
fmt.Printf("Received message : %v %s \n", string(msg.Payload()), time.Now().Format("2006-01-02 15:04:05"))
}
consumer.Ack(msg)
}
}
这里比较简单,只是写个demo。后续可以对消费者代码进行完善。
3.3、生产者代码
package main
import (
"context"
"fmt"
"github.com/apache/pulsar-client-go/pulsar"
"log"
"time"
)
func main() {
fmt.Println("Pulsar Producer")
ctx := context.Background()
//实例化Pulsar client
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "pulsar://110.40.141.168:6650",
OperationTimeout: 30 * time.Second,
ConnectionTimeout: 30 * time.Second,
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar client:%v", err)
}
// 创建producer
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "my-topic",
})
if err != nil {
log.Fatalf("Could not instantiate Pulsar producer:%v", err)
}
defer producer.Close()
msg := &pulsar.ProducerMessage{
Payload: []byte("Hello,This is a message from Pulsar Producer!"),
DeliverAfter: 30 * time.Second,//延时队列,30s后消费
}
if err, _ := producer.Send(ctx, msg); err != nil {
log.Fatalf("Producer could not send message:%v", err)
}
}
注意:如需使用延时队列,需在消费者端设置消费模式为"pulsar.Shared",否则无效。
我们看下结果:
看下消费者消费记录
更多内容关注公众号:杰子学编程
更多推荐
所有评论(0)