Apache Kafka是一个用于高性能数据管道的开源分布式事件流平台。可用于实时/批量数据处理。典型的 kafka 系统看起来像

[kafka@2x (1)](https://res.cloudinary.com/practicaldev/image/fetch/s--KCEbNJgq--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https:// res.cloudinary.com/practicaldev/image/fetch/s--KCEbNJgq--/dev-to-uploads.s3.amazonaws.com/uploads/articles/r9a617h6wgmtvi266vwh.png)

为什么我们需要kafka?

注意:如果您已经知道什么是 kafka 以及使用 kafka 的好处,请跳过此部分😅

以不同的方式思考如何管理数据以及如何使用提取、转换和加载 (ETL) 技术。

早些时候,我们曾经拥有可操作的数据库,并且我们必须定期转换数据并加载到数据仓库中,以便进一步使用它。

[Untitled-2021-08-11-0001](https://res.cloudinary.com/practicaldev/image/fetch/s--gyy8c3iR--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https ://dev-to-uploads.s3.amazonaws.com/uploads/articles/s298bq6m5zmavdt9sc6s.png)

但是现在数据库被分布式数据系统增强/取代,我们有多个数据库/数据源,如 Mongodb、Casandradb、Hadoop 等,以根据每个系统的要求存储数据。

在分布式系统的情况下,ETL 工具必须处理的不仅仅是数据库和数据仓库。构建 ETL 工具以批量处理数据。它们是资源密集型和耗时的过程。

在这个新时代,应用程序不仅收集操作数据,而且还有许多元数据,例如日志,每个系统收集的分析。

此外,流数据的兴起正在增加,我们需要在旅途中处理数据而不是批量处理。

在这个数据流的新世界中,我们需要能够处理大量和高度多样化的数据。数据通常以事件的形式流动。考虑我们有一个事件中心,它收集来自不同来源的事件并与各种数据源共享

[Untitled-2021-08-11-0001](https://res.cloudinary.com/practicaldev/image/fetch/s--K7xIrmZI--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https ://dev-to-uploads.s3.amazonaws.com/uploads/articles/8axq14ch17gq0bq5v3ey.png)

Kafka 扮演 Event Center 的角色,数据在此排队并存储,直到被消费者消费。

使用Kafka的好处

  • 在消费者失败的情况下可以重新获得数据

  • 降低 ETL 的成本,因为现在消费者自己可以决定如何使用这些数据

  • 异步流式传输数据

  • 可以在流式传输的同时处理大量和多样化的数据。

有关更多信息,您可以查看 Neha Narkhede 的这个惊人的谈话,了解如何在设计大型应用程序时考虑数据以及如何使用 Kafka。

好的,让我们开始使用 Kafka 构建我们的分析系统。为了简化示例,我们将从网站记录页面事件并将它们保存到 Postgres db。我们的系统设计看起来像

[kafka@2x (3)](https://res.cloudinary.com/practicaldev/image/fetch/s--TRa7UJmn--/c_limit%2Cf_auto%2Cfl_progressive%2Cq_auto%2Cw_880/https:// res.cloudinary.com/practicaldev/image/fetch/s--TRa7UJmn--/dev-to-uploads.s3.amazonaws.com/uploads/articles/kdwsf4uttrbro4fbkeoh.png)

第一步:设置Kafka服务器

对于这个演示,我们将使用 docker 来运行 kafka 服务器。但是对于生产,您可以使用Confluent或任何其他托管服务。

  • 创建analytics-system/docker-compose.yaml

  • 将以下内容粘贴到docker-compose.yaml

version: "3"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    container_name: zookeeper
    ports:
    - 2181:2181

  kafka:
    image: wurstmeister/kafka
    container_name: kafka
    ports:
    - 9092:9092
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181

进入全屏模式 退出全屏模式

ZooKeeper是一个集中式服务,用于维护配置信息、命名、提供分布式同步和提供组服务。

  • 启动kafka服务器:docker-compose up

第 2 步:引导项目

  • 创建回购mkdir analytics-system

  • 更改工作目录cd analytics-system

  • 创建生产者目录mkdir producer && cd producer

  • 初始化生产者项目go mod init producer

  • 创建消费者目录cd .. && mkdir consumer && cd consumer

  • 初始化消费者项目go mod init consumer

第三步:创建生产者

使用gqlgen 创建 Graphql 服务器

  • 更改为生产者目录cd analytics-system/producer

  • 下载依赖:go get github.com/99designs/gqlgen

  • 初始化项目:go run github.com/99designs/gqlgen init

注意:如果您收到验证失败错误,请安装错误中提到的依赖项:示例go get github.com/vektah/gqlparser/v2@v2.1.0

  • 启动并测试graphql servergo build && ./producer

  • 将初始样板 graphql 文件替换为以下内容

analytics-system/producer/graph/schema.graphqls

scalar Int64

type Event {
    id: ID!
    eventType: String
    path: String
    search: String
    title: String
    url: String
    userId: String
}

type PingResponse {
    message: String!
}

input RegisterKafkaEventInput {
    eventType: String!
    userId: String!
    path: String!
    search: String!
    title: String!
    url: String!
}

type Mutation {
    register_kafka_event(event: RegisterKafkaEventInput!): Event!
}

type Query {
    ping: PingResponse!
}

进入全屏模式 退出全屏模式

在这里,我们定义了产生页面查看事件所需的突变和类型。

  • 清除analytics-system/producer/graph/schema.resolvers.go的内容
echo "" > graph/schema.resolvers.go

进入全屏模式 退出全屏模式

  • 根据上面定义的 graphql 文件生成新的解析器和查询。
go run github.com/99designs/gqlgen generate

进入全屏模式 退出全屏模式

  • 替换ping查询解析器以返回Hello world或一些字符串。

注意这一步只是为了测试我们的服务器是否正确启动

使用以下内容更新graph/schema.resolvers.go中的Ping解析器

func (r *queryResolver) Ping(ctx context.Context) (*model.PingResponse, error) {
    res := &model.PingResponse{
        Message: "Hello world",
    }
    return res, nil
}

进入全屏模式 退出全屏模式

  • 构建和测试服务器go build && ./producer

  • 在浏览器中点击http://localhost:8080并测试 ping 查询

query {
  ping {
    message
  }
}

进入全屏模式 退出全屏模式

使用confluent-kakfka-go设置 Kafka 生产者

  • 安装依赖项:go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

注意:最新版本的 confluent-kafka-go 不需要librdkafka,但如果您遇到任何错误,请检查以下链接并安装所需的依赖项https://github.com/confluentinc/confluent-kafka-go #安装-librdkafka

  • 设置 kafka主题:

graph/schema.resolvers.go中添加以下实用程序。此功能将确保始终创建该主题

// function to create topic
// sample usage CreateTopic("PAGE_VIEW")

func CreateTopic(topicName string) {
    a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
    if err != nil {
        panic(err)
    }

    defer a.Close()

    maxDur, err := time.ParseDuration("60s")
    if err != nil {
        panic("ParseDuration(60s)")
    }

    ctx := context.Background()
    results, err := a.CreateTopics(
        ctx,
        // Multiple topics can be created simultaneously
        // by providing more TopicSpecification structs here.
        []kafka.TopicSpecification{{
            Topic:         topicName,
            NumPartitions: 1,
        }},
        // Admin options
        kafka.SetAdminOperationTimeout(maxDur))
    if err != nil {
        log.Printf("Failed to create topic: %v\n", err)
    }

    log.Println("results:", results)
}

进入全屏模式 退出全屏模式

主题是存储和发布记录的类别/提要名称

  • 生产 Kafka 事件

graph/schema.resolver.go中的RegisterKafkaEvent解析器功能替换为以下内容

func (r *mutationResolver) RegisterKafkaEvent(ctx context.Context, event model.RegisterKafkaEventInput) (*model.Event, error) {
    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost"})
    if err != nil {
        panic(err)
    }

    defer p.Close()

    // Delivery report handler for produced messages
    go func() {
        for e := range p.Events() {
            switch ev := e.(type) {
            case *kafka.Message:
                if ev.TopicPartition.Error != nil {
                    fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
                } else {
                    fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
                }
            }
        }
    }()

    // Produce messages to topic (asynchronously)
    topic := event.EventType
    CreateTopic(topic)
    currentTimeStamp := fmt.Sprintf("%v", time.Now().Unix())

    e := model.Event{
        ID:        currentTimeStamp,
        EventType: &event.EventType,
        Path:      &event.Path,
        Search:    &event.Search,
        Title:     &event.Title,
        UserID:    &event.UserID,
        URL:       &event.URL,
    }
    value, err := json.Marshal(e)
    if err != nil {
        log.Println("=> error converting event object to bytes:", err)
    }
    p.Produce(&kafka.Message{
        TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
        Value:          []byte(value),
    }, nil)

    // Wait for message deliveries before shutting down
    p.Flush(15 * 1000)

    return &e, nil
}

进入全屏模式 退出全屏模式

  • 测试产生的事件:go build && ./producer

  • 在浏览器中点击 localhost:8080 并测试以下突变

mutation {
  register_kafka_event(event: {
    eventType: "PAGE_VIEW",
    userId: "some_session_id",
    path: "/test",
    search: "?q=foo"
    title: "Kafka Demo",
    url: "kafka.demo.com"
  }) {
    id
    eventType
  }
}

进入全屏模式 退出全屏模式

欢呼! 🚀我们的制作人准备好了🎉

第四步:创建消费者

我们已经在analytics-system/consumer中设置了消费者项目。在消费者中,我们将在第 3 步中侦听 Kafka 服务器产生的事件并将其保存到 postgres db。

请注意,您可以根据要存储的系统处理和转换此数据。

为了简化流程,我们将为 Golang 使用gorm一个 SQL ORM(对象关系模型)。

设置gorm和事件架构

注意:确保你在消费者目录:cd analytics-system/consumer

  • 安装依赖项:
go get -u gorm.io/gorm
go get -u gorm.io/driver/postgres 

进入全屏模式 退出全屏模式

  • 创建main.go文件:touch analytics-system/consumer/main.go

  • 连接到main.go中的 db 并设置 Event Schema

注意:对于这个例子,我们使用的是本地 postgres 实例。

analytics-system/consumer/main.go

package main

import (
    "log"

    "gorm.io/driver/postgres"
    "gorm.io/gorm"
    "gorm.io/gorm/clause"
    "gorm.io/gorm/schema"
)

type Event struct {
    ID        string `gorm:"primaryKey"`
    UserID    string
    EventType string
    Path      string
    Search    string
    Title     string
    URL       string
    CreatedAt int64 `gorm:"autoCreateTime"` // same as receivedAt
    UpdatedAt int64 `gorm:"autoUpdateTime"`
}

func SaveEvent(db *gorm.DB, event Event) (Event, error) {
    result := db.Clauses(
        clause.OnConflict{
            UpdateAll: true,
            Columns:   []clause.Column{},
        }).Create(&event)

    if result.Error != nil {
        log.Println(result.Error)
        return event, result.Error
    }
    return event, nil
}

func main() {
    dbURL :=
        `postgres://localhost:5432/postgres`

    ormConfig := &gorm.Config{
        NamingStrategy: schema.NamingStrategy{
            TablePrefix: "kafka_",
        },
    }

    db, err := gorm.Open(postgres.Open(dbURL), ormConfig)
    if err != nil {
        panic(`Unable to connect to db`)
    }
    log.Println("=>Connected to successfully:", db)

        err = db.AutoMigrate(&Event{})
    if err != nil {
        log.Println("Error migrating schema:", err)
    }
}

进入全屏模式 退出全屏模式

设置kafka 消费码

  • 安装依赖项:go get -u gopkg.in/confluentinc/confluent-kafka-go.v1/kafka

  • 使用以下内容更新main.go

package main

import (
    "encoding/json"
    "fmt"
    "log"

    "github.com/confluentinc/confluent-kafka-go/kafka"
    "gorm.io/driver/postgres"
    "gorm.io/gorm"
    "gorm.io/gorm/clause"
    "gorm.io/gorm/schema"
)

type Event struct {
    ID        string `gorm:"primaryKey"`
    UserID    string
    EventType string
    Path      string
    Search    string
    Title     string
    URL       string
    CreatedAt int64 `gorm:"autoCreateTime"` // same as receivedAt
    UpdatedAt int64 `gorm:"autoUpdateTime"`
}

func SaveEvent(db *gorm.DB, event Event) (Event, error) {
    result := db.Clauses(
        clause.OnConflict{
            UpdateAll: true,
            Columns:   []clause.Column{},
        }).Create(&event)

    if result.Error != nil {
        log.Println(result.Error)
        return event, result.Error
    }
    return event, nil
}

func main() {
    dbURL :=
        `postgres://localhost:5432/postgres`

    ormConfig := &gorm.Config{
        NamingStrategy: schema.NamingStrategy{
            TablePrefix: "kafka_",
        },
    }

    db, err := gorm.Open(postgres.Open(dbURL), ormConfig)
    if err != nil {
        panic(`Unable to connect to db`)
    }
    log.Println("=> Connected to db successfully", db)

    err = db.AutoMigrate(&Event{})
    if err != nil {
        log.Println("Error migrating schema:", err)
    }

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost",
        "group.id":          "myGroup",
        "auto.offset.reset": "earliest",
    })
    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{"PAGE_VIEW"}, nil)

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
            var event Event
            err := json.Unmarshal(msg.Value, &event)
            if err != nil {
                log.Println("=> error converting event object:", err)
            }

            _, err = SaveEvent(db, event)
            if err != nil {
                log.Println("=> error saving event to db...")
            }
        } else {
            // The client will automatically try to recover from all errors.
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }
}

进入全屏模式 退出全屏模式

  • 测试消费者:go build && ./consumer

第五步:测试流量

  • 如果您没有运行 kafka 服务器,请启动它:docker-compose up

  • 开始生产者cd analytics-system/producer && go build && ./producer

  • 启动消费者cd analytics-system/consumer && go build && ./consumer

  • 在浏览器中点击http://localhost:8080

  • 触发突变

mutation {
  register_kafka_event(event: {
    eventType: "PAGE_VIEW",
    userId: "some_session_id",
    path: "/test",
    search: "?q=foo"
    title: "Kafka Demo",
    url: "kafka.demo.com"
  }) {
    id
    eventType
  }
}

进入全屏模式 退出全屏模式

  • 检查消费者日志。您应该能够看到保存在 postgres 中的数据的日志。

  • 检查 postgres 数据SELECT * FROM kafka_events;

欢呼! 🚀 这就是我们所有的页面浏览分析事件系统都准备好了。 👏

您可以在github上查看完整的代码库

Logo

PostgreSQL社区为您提供最前沿的新闻资讯和知识内容

更多推荐