通过 5 个步骤使用 Kafka、Go、Postgres 和 GraphQL 创建页面视图分析系统
Apache Kafka是一个用于高性能数据管道的开源分布式事件流平台。可用于实时/批量数据处理。典型的 kafka 系统看起来像
[
](https://res.cloudinary.com/practicaldev/image/fetch/s--KCEbNJgq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https:// res.cloudinary.com/practicaldev/image/fetch/s--KCEbNJgq--/dev-to-uploads.s3.amazonaws.com/uploads/articles/r9a617h6wgmtvi266vwh.png)
为什么我们需要kafka?
注意:如果您已经知道什么是 kafka 以及使用 kafka 的好处,请跳过此部分😅
以不同的方式思考如何管理数据以及如何使用提取、转换和加载 (ETL) 技术。
早些时候,我们曾经拥有可操作的数据库,并且我们必须定期转换数据并加载到数据仓库中,以便进一步使用它。
[
](https://res.cloudinary.com/practicaldev/image/fetch/s--gyy8c3iR--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https ://dev-to-uploads.s3.amazonaws.com/uploads/articles/s298bq6m5zmavdt9sc6s.png)
但是现在数据库被分布式数据系统增强/取代,我们有多个数据库/数据源,如 Mongodb、Casandradb、Hadoop 等,以根据每个系统的要求存储数据。
在分布式系统的情况下,ETL 工具必须处理的不仅仅是数据库和数据仓库。构建 ETL 工具以批量处理数据。它们是资源密集型和耗时的过程。
在这个新时代,应用程序不仅收集操作数据,而且还有许多元数据,例如日志,每个系统收集的分析。
此外,流数据的兴起正在增加,我们需要在旅途中处理数据而不是批量处理。
在这个数据流的新世界中,我们需要能够处理大量和高度多样化的数据。数据通常以事件的形式流动。考虑我们有一个事件中心,它收集来自不同来源的事件并与各种数据源共享
[
](https://res.cloudinary.com/practicaldev/image/fetch/s--K7xIrmZI--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https ://dev-to-uploads.s3.amazonaws.com/uploads/articles/8axq14ch17gq0bq5v3ey.png)
Kafka 扮演 Event Center 的角色,数据在此排队并存储,直到被消费者消费。
使用Kafka的好处
-
在消费者失败的情况下可以重新获得数据
-
降低 ETL 的成本,因为现在消费者自己可以决定如何使用这些数据
-
异步流式传输数据
-
可以在流式传输的同时处理大量和多样化的数据。
有关更多信息,您可以查看 Neha Narkhede 的这个惊人的谈话,了解如何在设计大型应用程序时考虑数据以及如何使用 Kafka。
好的,让我们开始使用 Kafka 构建我们的分析系统。为了简化示例,我们将从网站记录页面事件并将它们保存到 Postgres db。我们的系统设计看起来像
[
](https://res.cloudinary.com/practicaldev/image/fetch/s--TRa7UJmn--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https:// res.cloudinary.com/practicaldev/image/fetch/s--TRa7UJmn--/dev-to-uploads.s3.amazonaws.com/uploads/articles/kdwsf4uttrbro4fbkeoh.png)
第一步:设置Kafka服务器
对于这个演示,我们将使用 docker 来运行 kafka 服务器。但是对于生产,您可以使用Confluent或任何其他托管服务。
-
创建
analytics-system/docker-compose.yaml -
将以下内容粘贴到
docker-compose.yaml中
version: "3"
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- 2181:2181
kafka:
image: wurstmeister/kafka
container_name: kafka
ports:
- 9092:9092
environment:
KAFKA_ADVERTISED_HOST_NAME: localhost
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
进入全屏模式 退出全屏模式
ZooKeeper是一个集中式服务,用于维护配置信息、命名、提供分布式同步和提供组服务。
- 启动kafka服务器:
docker-compose up
第 2 步:引导项目
-
创建回购
mkdir analytics-system -
更改工作目录
cd analytics-system -
创建生产者目录
mkdir producer && cd producer -
初始化生产者项目
go mod init producer -
创建消费者目录
cd .. && mkdir consumer && cd consumer -
初始化消费者项目
go mod init consumer
第三步:创建生产者
使用gqlgen 创建 Graphql 服务器
-
更改为生产者目录
cd analytics-system/producer -
下载依赖:
go get github.com/99designs/gqlgen -
初始化项目:
go run github.com/99designs/gqlgen init
注意:如果您收到验证失败错误,请安装错误中提到的依赖项:示例
go get github.com/vektah/gqlparser/v2@v2.1.0
-
启动并测试graphql server
go build && ./producer -
将初始样板 graphql 文件替换为以下内容
analytics-system/producer/graph/schema.graphqls
scalar Int64
type Event {
id: ID!
eventType: String
path: String
search: String
title: String
url: String
userId: String
}
type PingResponse {
message: String!
}
input RegisterKafkaEventInput {
eventType: String!
userId: String!
path: String!
search: String!
title: String!
url: String!
}
type Mutation {
register_kafka_event(event: RegisterKafkaEventInput!): Event!
}
type Query {
ping: PingResponse!
}
进入全屏模式 退出全屏模式
在这里,我们定义了产生页面查看事件所需的突变和类型。
- 清除
analytics-system/producer/graph/schema.resolvers.go的内容
echo "" > graph/schema.resolvers.go
进入全屏模式 退出全屏模式
- 根据上面定义的 graphql 文件生成新的解析器和查询。
go run github.com/99designs/gqlgen generate
进入全屏模式 退出全屏模式
- 替换
ping查询解析器以返回Hello world或一些字符串。
注意这一步只是为了测试我们的服务器是否正确启动
使用以下内容更新graph/schema.resolvers.go中的Ping解析器
func (r *queryResolver) Ping(ctx context.Context) (*model.PingResponse, error) {
res := &model.PingResponse{
Message: "Hello world",
}
return res, nil
}
进入全屏模式 退出全屏模式
-
构建和测试服务器
go build && ./producer -
在浏览器中点击
http://localhost:8080并测试 ping 查询
query {
ping {
message
}
}
进入全屏模式 退出全屏模式
使用confluent-kakfka-go设置 Kafka 生产者
- 安装依赖项:
go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
注意:最新版本的 confluent-kafka-go 不需要
librdkafka,但如果您遇到任何错误,请检查以下链接并安装所需的依赖项https://github.com/confluentinc/confluent-kafka-go #安装-librdkafka
- 设置 kafka主题:
在graph/schema.resolvers.go中添加以下实用程序。此功能将确保始终创建该主题
// function to create topic
// sample usage CreateTopic("PAGE_VIEW")
func CreateTopic(topicName string) {
a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
if err != nil {
panic(err)
}
defer a.Close()
maxDur, err := time.ParseDuration("60s")
if err != nil {
panic("ParseDuration(60s)")
}
ctx := context.Background()
results, err := a.CreateTopics(
ctx,
// Multiple topics can be created simultaneously
// by providing more TopicSpecification structs here.
[]kafka.TopicSpecification{{
Topic: topicName,
NumPartitions: 1,
}},
// Admin options
kafka.SetAdminOperationTimeout(maxDur))
if err != nil {
log.Printf("Failed to create topic: %v\n", err)
}
log.Println("results:", results)
}
进入全屏模式 退出全屏模式
主题是存储和发布记录的类别/提要名称
- 生产 Kafka 事件
将graph/schema.resolver.go中的RegisterKafkaEvent解析器功能替换为以下内容
func (r *mutationResolver) RegisterKafkaEvent(ctx context.Context, event model.RegisterKafkaEventInput) (*model.Event, error) {
p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
if err != nil {
panic(err)
}
defer p.Close()
// Delivery report handler for produced messages
go func() {
for e := range p.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
} else {
fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
}
}
}
}()
// Produce messages to topic (asynchronously)
topic := event.EventType
CreateTopic(topic)
currentTimeStamp := fmt.Sprintf("%v", time.Now().Unix())
e := model.Event{
ID: currentTimeStamp,
EventType: &event.EventType,
Path: &event.Path,
Search: &event.Search,
Title: &event.Title,
UserID: &event.UserID,
URL: &event.URL,
}
value, err := json.Marshal(e)
if err != nil {
log.Println("=> error converting event object to bytes:", err)
}
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(value),
}, nil)
// Wait for message deliveries before shutting down
p.Flush(15 * 1000)
return &e, nil
}
进入全屏模式 退出全屏模式
-
测试产生的事件:
go build && ./producer -
在浏览器中点击 localhost:8080 并测试以下突变
mutation {
register_kafka_event(event: {
eventType: "PAGE_VIEW",
userId: "some_session_id",
path: "/test",
search: "?q=foo"
title: "Kafka Demo",
url: "kafka.demo.com"
}) {
id
eventType
}
}
进入全屏模式 退出全屏模式
欢呼! 🚀我们的制作人准备好了🎉
第四步:创建消费者
我们已经在analytics-system/consumer中设置了消费者项目。在消费者中,我们将在第 3 步中侦听 Kafka 服务器产生的事件并将其保存到 postgres db。
请注意,您可以根据要存储的系统处理和转换此数据。
为了简化流程,我们将为 Golang 使用gorm一个 SQL ORM(对象关系模型)。
设置gorm和事件架构
注意:确保你在消费者目录:
cd analytics-system/consumer
- 安装依赖项:
go get -u gorm.io/gorm
go get -u gorm.io/driver/postgres
进入全屏模式 退出全屏模式
-
创建
main.go文件:touch analytics-system/consumer/main.go -
连接到
main.go中的 db 并设置 Event Schema
注意:对于这个例子,我们使用的是本地 postgres 实例。
analytics-system/consumer/main.go
package main
import (
"log"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/schema"
)
type Event struct {
ID string `gorm:"primaryKey"`
UserID string
EventType string
Path string
Search string
Title string
URL string
CreatedAt int64 `gorm:"autoCreateTime"` // same as receivedAt
UpdatedAt int64 `gorm:"autoUpdateTime"`
}
func SaveEvent(db *gorm.DB, event Event) (Event, error) {
result := db.Clauses(
clause.OnConflict{
UpdateAll: true,
Columns: []clause.Column{},
}).Create(&event)
if result.Error != nil {
log.Println(result.Error)
return event, result.Error
}
return event, nil
}
func main() {
dbURL :=
`postgres://localhost:5432/postgres`
ormConfig := &gorm.Config{
NamingStrategy: schema.NamingStrategy{
TablePrefix: "kafka_",
},
}
db, err := gorm.Open(postgres.Open(dbURL), ormConfig)
if err != nil {
panic(`Unable to connect to db`)
}
log.Println("=>Connected to successfully:", db)
err = db.AutoMigrate(&Event{})
if err != nil {
log.Println("Error migrating schema:", err)
}
}
进入全屏模式 退出全屏模式
设置kafka 消费码
-
安装依赖项:
go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka -
使用以下内容更新
main.go
package main
import (
"encoding/json"
"fmt"
"log"
"github.com/confluentinc/confluent-kafka-go/kafka"
"gorm.io/driver/postgres"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/schema"
)
type Event struct {
ID string `gorm:"primaryKey"`
UserID string
EventType string
Path string
Search string
Title string
URL string
CreatedAt int64 `gorm:"autoCreateTime"` // same as receivedAt
UpdatedAt int64 `gorm:"autoUpdateTime"`
}
func SaveEvent(db *gorm.DB, event Event) (Event, error) {
result := db.Clauses(
clause.OnConflict{
UpdateAll: true,
Columns: []clause.Column{},
}).Create(&event)
if result.Error != nil {
log.Println(result.Error)
return event, result.Error
}
return event, nil
}
func main() {
dbURL :=
`postgres://localhost:5432/postgres`
ormConfig := &gorm.Config{
NamingStrategy: schema.NamingStrategy{
TablePrefix: "kafka_",
},
}
db, err := gorm.Open(postgres.Open(dbURL), ormConfig)
if err != nil {
panic(`Unable to connect to db`)
}
log.Println("=> Connected to db successfully", db)
err = db.AutoMigrate(&Event{})
if err != nil {
log.Println("Error migrating schema:", err)
}
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "localhost",
"group.id": "myGroup",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
c.SubscribeTopics([]string{"PAGE_VIEW"}, nil)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
var event Event
err := json.Unmarshal(msg.Value, &event)
if err != nil {
log.Println("=> error converting event object:", err)
}
_, err = SaveEvent(db, event)
if err != nil {
log.Println("=> error saving event to db...")
}
} else {
// The client will automatically try to recover from all errors.
fmt.Printf("Consumer error: %v (%v)\n", err, msg)
}
}
}
进入全屏模式 退出全屏模式
- 测试消费者:
go build && ./consumer
第五步:测试流量
-
如果您没有运行 kafka 服务器,请启动它:
docker-compose up -
开始生产者
cd analytics-system/producer && go build && ./producer -
启动消费者
cd analytics-system/consumer && go build && ./consumer -
在浏览器中点击http://localhost:8080
-
触发突变
mutation {
register_kafka_event(event: {
eventType: "PAGE_VIEW",
userId: "some_session_id",
path: "/test",
search: "?q=foo"
title: "Kafka Demo",
url: "kafka.demo.com"
}) {
id
eventType
}
}
进入全屏模式 退出全屏模式
-
检查消费者日志。您应该能够看到保存在 postgres 中的数据的日志。
-
检查 postgres 数据
SELECT * FROM kafka_events;
欢呼! 🚀 这就是我们所有的页面浏览分析事件系统都准备好了。 👏
您可以在github上查看完整的代码库
更多推荐
所有评论(0)