Apache Pulsar

诞生背景及追求

诞生背景

  1. 解决一些现有开源消息系统的痛点
  • 多租户:一个系统就可以支撑多个产品线(业务部门)的需求 多个业务部门共用一个集群就行,而kafka的话需要部署多个集群

在这里插入图片描述

  • 百万级topics: kafka无法支撑100w级的topics; kafka单broker超过1000个分区会出现性能会变得很差
  • 跨地域复制:巨头互联网有多个数据中心,数据中心间的迁移需求(yahoo全球8+数据中心),用Pulsar迁移成本很低
  • 队列模式+流模式都需要用的场景
  • 举例:智联…
    在这里插入图片描述
  • 问题:如何保证发往两个MQ数据的一致性
  • 解决:在这里插入图片描述
  1. 解决一些现有开源消息系统的other痛点
  • 分区模型紧耦合存储和计算,不是云原生(Cloud Native)的设计,云原生如下:

  • 容器化封装

  • 动态管理

  • 面向微服务
    在这里插入图片描述

  • 存储模型过于简单,强依赖文件系统,业务量上去,必须增加topics数量,造成文件数量也会上去,带来的随机IO的问题,性能下降很大(除非全是SSD);而pulsar是分片存储的,磁盘不够用了,只需要横向加bookie节点就行,会有机架感知、资源感知的,新的segment会往这个新加的bookie上放的

  • IO不隔离:消费者在清除Backlog(积压日志)的时候会影响

  • kafka物理分区的模型造成运维很痛苦(服务扩容、替换机器会有重新均衡数据的过程)

  • 基础架构从物理机向容器时代转型的趋势

  • pulsar面向容器编排设计的新生系统

发展历程

  1. 2012 Yahoo内部启动
  2. 2016.9 开源
  3. 2017.6 捐给Apache
  4. 2018.9 毕业成为顶级项目
  5. 目前40多家公司在生产上使用

追求、愿景

打造实时数据处理时 集消息+计算+存储 三位一体的数据中台

安装部署

安装参考

  1. stanalone模式参考:http://pulsar.apache.org/docs/en/standalone/
  2. 参考0:http://pulsar.apache.org/docs/en/io-quickstart/
  3. 参考1:https://www.jianshu.com/p/728d07918f49

相关知识介绍

  1. topic的名称
{persistent|non-persistent}://tenant/namespace/topic
# 解释:持久化|不持久化://租户/命名空间/主题名
Topic aspectDefault
topic typepersistent
tenantpublic
namespacedefault
Input topic nameTranslated topic name
my-topicpersistent://public/default/my-topic
my-tenant/my-namespace/my-topicpersistent://my-tenant/my-namespace/my-topic
  1. Pulsar package contains
DirectoryContains
binPulsar命令行工具,例如 pulsar 、 pulsar-admin、pulsar-daemon、bookkeeper
conf配置文件,包括broker、bookeeper、zookeeper等的
examplesexample Pulsar Functions
libThe JAR files used by Pulsar
licenses许可文件
data数据存放目录
instancesArtifacts created for Pulsar Functions
logs日志存放目录

消息消费模型

  1. MQ相关思考
  • 消息如何生产消费——如何发送和消费消息
  • 消息怎么确认(ack)——如何确认消息
  • 消息怎么保存——消息保留多长时间,触发消息删除的原因以及怎样删除

生产(发布)

1.发布策略:

  • Single partitioning:Producer随机选择一个Partition并将所有消息写入到这个分区
  • Round robin partitioning :采用Round robin的方式,轮训所有分区进行消息写入
  • Hash partitioning:这种模式每条消息有一个Key,Producer根据消息的Key的哈希值进行分区的选择(Key相同的消息可以保证顺序)。
  • Custom partitioning:用户自定义路由策略

消费模型

image

  1. 灵活的订阅策略
    image

  2. 相关解释

  • producer-topic-subscription-consumer 消费模型的抽象
  • Topic:用于发送消息的通道,一个Topic对应一个bookeeper的分布式日志;该消息会被复制后存储到多个节点上;topic中的消息可以根据需求被多次使用
  • 订阅:subscription相当于一个Consumer Group;以及丰富灵活的订阅模式
  1. 队列(Queue)模型
  • 共享模式(Shared)
    在这里插入图片描述
  • 特点
  • 无序、共享的方式来消费
  • user可以创建多个consumer从单个管道receive消息(一条消息只会被一个consumer接收,接收之后单条ACK),所以可以横向加consumer来并行消费
  • consumer数量是可以多于producer的(kafka、rocketMQ多的consumer会空跑)
  • 结合无状态应用程序(用户不关心排序)
  • 代表:RabbitMQ 和 RocketMQ
  1. 流(Stream)模型
  • 特点

  • 严格有序的、一个pipeline,只有一个consumer使用,和kafka的单分区有序一样

  • 有状态程序

  • 独占模式(Exclusive)
    image

  • 一个订阅中只能有一个consumer来消费,严格有序

  • 灾备模式(Failover)
    image

  • 一个订阅中只能有一个master consumer来消费,严格有序

  • 其他consumer作为standby,故障切换时用,所有未确认(ack)的消息都将传递给新的主消费者;类似于kafka的 Consumer partition rebalance

ACK机制

  1. ACK的意义:分布式系统发生故障时的容错,比如:消息消费过程中,consumer和broker都可能发生错误;消息ACK机制就是保证在出现上述错误的时候,consumer能够从上一次停止的地方恢复消费(既不丢,也不重)
  2. 类比kafka
  • kafka中的恢复点叫offset,pulsar更新offset的操作叫提交offset(消息确认)
  • pulsar中的恢复点叫cursor(游标),每次ack后cursor都会更新
  1. pulsar的ack方式
  • 单条ack(一条):Individual Ack

  • 累计ack(一批):Cumulative Ack

  • 差异如下:
    在这里插入图片描述

  • 不同订阅模式的ack差异

  • 流模式:可以进行单条ack或者累计ack

  • 队列模式:只能单条

  • 单条的好处:在一些consumer故障的时候, 处理一条消息时间可能很长或者代价很大,防止重新传送就显得很重要

  • ACK的专门的数据结构Cursor,由broker管理,bookeeper的ledger来存储

消息的保留策略

  1. 消息只有被所有的订阅都ack后,才能被删除;不过也可以保留更长的时间(忽略前面的情况)
  2. 只会删除M0-M5,but如果配置了消息保留期,则消息 M0 到 M5 将在配置的时间段内保持不变,即使 A 和 B 已经确认消费了它们
    在这里插入图片描述
  3. 除此之外,还可以配置消息生存时间(TTL),如果消息未在配置的 TTL 时间段内被任何消费者使用,则消息将自动标记为已确认
  4. 消息保留期消息 TTL 的区别
    在这里插入图片描述
  • 前者作用于已经ACK后,被打上Deleted标记的数据
  • 后者作用于未ACK的消息

对比Kafka

  1. 模型概念
  • Kafka: Producer - topic - consumer group - consumer
  • Pulsar:Producer - topic - subscription - consumer
  1. 消费模式
  • Kafka: 主要集中在流(Stream)模式,对单个 partition 是独占消费,没有共享(Queue)的消费模式

  • Pulsar:提供了统一的消息模型和 API。流(Stream)模式 – 独占和故障切换订阅方式;队列(Queue)模式 – 共享订阅的方式

  1. 消息(ACK)
  • Kafka: 使用偏移 Offset

  • Pulsar:使用专门的 Cursor 管理。累积确认和 Kafka 效果一样;提供单条或选择性确认

  1. 消息保留
  • Kafka:根据设置的保留期来删除消息。有可能消息没被消费,过期后被删除。 不支持 TTL。

  • Pulsar:消息只有被所有订阅消费后才会删除,不会丢失数据。也允许设置保留期,保留被消费的数据。支持 TTL

  1. kafka的物理分区(以Partition为存储中心)和pulsar的逻辑分片(以Segment为存储中心)
    在这里插入图片描述

  2. 总结:Pulsar统一了传统消息队列(RabiitMQ)和流队列(Kafka、RocketMQ)

系统架构以及设计理念

pulsar的分层架构

部署架构

在这里插入图片描述

在这里插入图片描述

  • 采用ZooKeeper存储元数据,集群配置,作为coordination
  • local zk负责Pulsar Cluster内部的配置等
  • global zk则用于Pulsar Cluster之间的数据复制等
  • 采用Bookie作为存储设备(大多数MQ系统都采用本地磁盘或者DB作为存储设备)
  • Broker负责负载均衡(Load Banlancer)和消息的读取、写入(处理IO请求)等
  • Global replicators负责集群间的数据复制(global zk上的元数据)
存储和服务分离
  1. 无状态的服务层(broker层)
  • 使用Pulsar client(Java,C ++,Python,Go 和 Websockets来写producer&consumer)和Broker交互,进行发布和订阅消息
  • Pulsar 客户端不直接与存储层 Apache BookKeeper 交互。 客户端也没有直接的 BookKeeper 访问权限。这种隔离,为 Pulsar 实现安全的多租户统一身份验证模型提供了基础
  • Pulsar兼容现有的kafka应用程序
  • broker层不存消息,是无状态的,消息是存在bookeeper上的
  • 每个主题分区(Topic Partition)由 Pulsar 分配给某个 Broker,该 Broker 称为该主题分区的所有者。 Pulsar 生产者和消费者连接到主题分区的所有者 Broker,以向所有者代理发送消息并消费消息
  • 如果一个 Broker 失败,Pulsar 会自动将其拥有的主题分区移动到群集中剩余的某一个可用 Broker 中。这里要说的一件事是:由于 Broker 是无状态的,当发生 Topic 的迁移时,Pulsar 只是将所有权从一个 Broker 转移到另一个 Broker,在这个过程中,不会有任何数据复制发生
  • 下图显示了一个拥有 4 个 Broker 的 Pulsar 集群,其中 4 个主题分区分布在 4 个 Broker 中。每个 Broker 拥有并为一个主题分区提供消息服务
    在这里插入图片描述
  1. 持久化存储层(bookeeper层)
  • Apache BookKeeper 是 Apache Pulsar 的持久化存储层。 Apache Pulsar 中的每个主题分区本质上都是存储在 Apache BookKeeper 中的分布式日志
  • 每个分布式日志又被分为 Segment 分段。 每个 Segment 分段作为 Apache BookKeeper 中的一个 Ledger,均匀分布并存储在 BookKeeper 群集中的多个 Bookie(Apache BookKeeper 的存储节点)中
  • Segment 的创建时机包括以下几种:基于配置的 Segment 大小;基于配置的滚动时间;或者当 Segment 的所有者被切换
  • 通过 Segment 分段的方式,主题分区中的消息可以均匀和平衡地分布在群集中的所有 Bookie 中。 这意味着主题分区的大小不仅受一个节点容量的限制; 相反,它可以扩展到整个 BookKeeper 集群的总容量。
  • 下面的图说明了一个分为 x 个 Segment 段的主题分区。 每个 Segment 段存储 3 个副本。 所有 Segment 都分布并存储在 4 个 Bookie 中;即分片存储的概念
    在这里插入图片描述
  • 层级存储(冷热数据)
    在这里插入图片描述
  1. Segment为中心的存储以及故障恢复
  • 存储服务的分层的架构 和 以 Segment 为中心的存储 是 Apache Pulsar(使用 Apache BookKeeper)的两个关键设计理念

  • 好处

  • 独立扩展

  • 灵活容错(broker、bookie的故障恢复,集群的扩展)

  • 快速扩容

  • 运维简单(分片)

  • broker故障恢复
    在这里插入图片描述

  • bookie故障恢复
    在这里插入图片描述

  • bookie节点扩容
    在这里插入图片描述

  1. 分区存储VS分片存储
    在这里插入图片描述
    在这里插入图片描述

Pulsar的监控和报警

  1. Pulsar 提供丰富的 Prometheus 指标信息输出,我们可以这些指标信息来做好 Pulsar 的监控报警。Pulsar 的客户端也记录了丰富的指标,一般可以通过将 client 的扩展包将 client 的节点信息记录在 Zookeeper 中,由 Prometheus 自动发现,这样 Client 端的指标信息由 Prometheus 采集。

  2. 报警规则设置

  • Client 发送失败次数
  • Backlog 超阈值
  • Rates/Network 超阈值
  • Client > 50ms 延迟
  • Broker > 50ms 延迟
  • Storage Size 超阈值
  • 等等
  1. 配合 Grafana 的监控展示,实时了解集群的状态
  • 集群状态看板 -1
    在这里插入图片描述

  • 集群状态看板 -2
    在这里插入图片描述

  • 示例:分 Namespace 状态展示
    在这里插入图片描述

Pulsar的其他应用

  1. Pulsar Function
  • 轻量化计算
    在这里插入图片描述
  • 可以用来完成一些简单的ETL、数据清洗之类的操作,但是Complex Computing之类(比如由复杂的join、aggregate之类的)的做不了,还是需要集成第三方计算框架来做(Spark、Flink这种)
  • Function的部署方式
  • location,本地
  • Thread/Process Mode模式
  • K8s
  1. Pulsar IO-基于Functions的连接器框架
    在这里插入图片描述
  • Source、Sink、Pulsar内置的IO connector(mysql、mongoDB、oracle、postgreSQL)、用户也可以实现Function接口编写自己想要的IO Connector
  • CDC Connector(Source)
  • 和Debezium and Canal进行了整合
  • Debezium Connector
  • Alibaba Canal Connector
  1. Pulsar SQL
  • 在Pulsar数据上进行交互式SQL查询
  • 基于Presto SQL(Facebook的分布式SQL查询引擎)来实现的
    在这里插入图片描述
  • 可以直接从BK层读取数据,不用访问broker层;并且presto worker可以并发访问同一个分区的不同分片(因为pulsar数据是分片储存的)
  • 可以通过时间索引(publishTime)来快速定位分片
    在这里插入图片描述

Demo

  1. Producer
public class ProducerDemo {
    private static final Logger log = LoggerFactory.getLogger(ProducerDemo.class);
    private static final String SERVER_URL = "pulsar://10.28.12.49:6650";
    private static final Gson gson=new Gson();
    
    public static void main(String[] args) throws Exception {
        send();
    }
    
    //pulsar.cluster.com
    public  static void send() throws Exception {
        // 构造Pulsar Client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .enableTcpNoDelay(true)
                .build();
        
        // 构造生产者
        Producer<String> producer = client.newProducer(Schema.STRING)
                .producerName("my-producer")
                .topic("persistent://public/default/my-topic")
                .batchingMaxMessages(1024)
                .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
                .enableBatching(true)
                .blockIfQueueFull(true)
                .maxPendingMessages(512)
                .sendTimeout(10, TimeUnit.SECONDS)
                .blockIfQueueFull(true)
                .create();

        // 除了只给一个消息value之外,还可以给value增加一些元数据信息
        MessageId metaMessageId = producer.newMessage()
                .key("my-message-key")
                .value("mu-message-value")
                .property("my-key1", "my-value1")
                .property("my-other-key", "my-other-value")
                .send();
        log.info("metaMessage id is {}", metaMessageId);

        //也可以sendAsync()异步
        producer.sendAsync("my-async-message").thenAccept(msgId->{
            log.info("Message whit ID %s successfully send",msgId);
            System.out.printf("Message whit ID %s successfully send",msgId);
        });
        
        

        // 关闭producer的方式有两种:同步和异步
        //producer.close(); 
        producer.closeAsync()
                .thenRun(()-> log.info("producer closed"))
                .exceptionally((ex)-> {
                    log.error("failed to closed producer:" + ex);
                    return null;
                });
                
                
        // 关闭licent的方式有两种,同步和异步
        //client.close();
        client.closeAsync();
        

    }
}
  1. Consumer
public class ConsumerDemo {
    private static final Logger log = LoggerFactory.getLogger(ConsumerDemo.class);
    private static final String SERVER_URL = "pulsar://10.28.12.49:6650";

    public static void main(String[] args) throws Exception {
        receive();
    }
    
    //pulsar.cluster.com
    public static void receive() throws Exception {
        // 构造Pulsar Client
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(SERVER_URL)
                .enableTcpNoDelay(true)
                .build();
        Consumer consumer = client.newConsumer()
                .consumerName("my-consumer")
                .topic("persistent://public/default/my-topic")
                .subscriptionName("my-subscription")
                .ackTimeout(10, TimeUnit.SECONDS)
                .maxTotalReceiverQueueSizeAcrossPartitions(10)
                .subscriptionType(SubscriptionType.Exclusive)
                .property("","")
                .subscribe();

        do {
            // 接收消息有两种方式:异步和同步
            // CompletableFuture<Message<String>> message = consumer.receiveAsync();
            Message message = null;
            try {
                message = consumer.receive();
                log.info("Message received from pulsar cluster: %s",new String(message.getData()));
                //Acknowledge the message so that it can be deleted by the message broker
                consumer.acknowledge(message.getMessageId());//也可以异步ack
            } catch (PulsarClientException e) {
                //Message failed to process,redeliver later
                log.error("Message failed to process",e.getMessage());
            }
            log.info("get message from pulsar cluster,{}", message.getMessageId());
        } while (true);
    }
}

Pulsar的应用场景与案例

  1. 应用场景
    在这里插入图片描述
  2. 案例
  • 案例1:Yahoo

  • 使用pulsar作为核心的event data bus

  • 整合不同的技术和集群成统一的解决方案

  • 全球多机房之间的复制(8+) pulsar集群间的复制

  • 每日处理千亿级的消息、多达230w个topics

  • 支撑多个业务线:yahoo finance、yahoo mail 、yahoo Sports等
    在这里插入图片描述

  • 案例2:Twitter

  • 整个消息系统 Event Bus

  • Ads(广告–收入来源)、Search、Stream computing

  • 1000+ bookies per datacenter

  • 17PB/day,1.5 trillion(万亿) records/second

  • 案例3:智联

Apache BookKeeper

  1. BookKeeper是一个提供日志条目流存储持久化的服务框架
  2. 特别适合日志流存储,一个比较经典的应用是作为消息队列Pulsar的持久框架(实时工作负载的存储平台)
  3. 读写分离设计来源于HDFS,Namenode在宕机的时候可以通过这些记录(操作日志)进行恢复,数据复制也是源于HDFS

术语和定义

术语和定义解释
Entry主节点写的每一个日志对象则为一个entry
Ledger一个ledger由entry集合组成,每一个日志段对应一个ledger,相同日志段追加edits即为向相应的ledger追加entry
Bookkeeper client在HDFS中即为namenode
Bookie一个bookkeeper的存储服务,存储了bookkeeper的write ahead日志,及其数据(ledgers)内容
Metadata server由zookeeper充当bookkeeper的元数据服务器,在zk中存储了ledger相关元数据,edits元数据,及其bookie相关元数据
Ensemble即为bookie可用的最小的节点数量;该参数应该大于等于quorums
Quorums法定的bookie数量,即日志写入bk服务端的冗余分数,并且每份副本均成功才算成功,否则通过rr算法,查找下一组quorums重新写日志
ledgerId标志ledger的编号,该编号依次递增
entryId标志entry的编号,该编号依次递增,一个txid就会对应一个entryId
entryLogIdBookie内标志存储entry的log文件编号
startLogSegment开始一个新的日志段,该日志段状态为接收写入日志的状态
finalizeLogSegment将文件由正在写入日志的状态转化为不接收写日志的状态
recoverUnfinalizedSegments主从切换等情况下,恢复没有转换为finalized状态的日志

Bookeeper部署架构

在这里插入图片描述

  1. 组成部分
  • 一组独立的存储服务器,成为bookies
  • 一个元数据存储系统,提供服务发现以及元数据管理服务—用的Zookeeper
  1. API
  • 可以使用高级DistributedLog API(又称日志流API,与bookeeper集群交互的)

  • 也可以使用底层账簿API来直接与Ledger交互

  • client主要职责:bookeeper client主要负责创建删除Ledgers,并且从Ledgers中读取entries

  • BK Client Demo

public class BKClientDemo {
    private static final Logger log = LoggerFactory.getLogger(BKClientDemo.class);

    public static void main(String[] args) throws Exception {
        testClient();
    }

    public static void testClient() throws Exception {
        BookKeeper bkc = new BookKeeper("10.28.12.43:2181");
        byte[] ledgerPassword = "test".getBytes();
        // Create a new ledger and fetch its identifier
        LedgerHandle lh = bkc.createLedger(BookKeeper.DigestType.MAC, ledgerPassword);
        long ledgerId = lh.getId();
        ByteBuffer entry = ByteBuffer.allocate(4);

        int numberOfEntries = 100;

        // Add entries to the ledger, then close it
        for (int i = 0; i < numberOfEntries; i++) {
            entry.putInt(i);
            entry.position(0);
            lh.addEntry(entry.array());
        }
        lh.close();

        //Open the ledger for reading
        lh = bkc.openLedger(ledgerId, BookKeeper.DigestType.MAC, ledgerPassword);

        // Read all available entries
        Enumeration<LedgerEntry> entries = lh.readEntries(0, numberOfEntries - 1);

        while (entries.hasMoreElements()) {
            ByteBuffer result = ByteBuffer.wrap(entries.nextElement().getEntry());
            int returnEntry = result.getInt();
            log.info(String.format("Result:%s", returnEntry));
        }
        // Close the ledger and the client
        lh.close();
        bkc.close();
    }
}

Result:0
Result:1
Result:2
# etc
  1. 元数据存储
  • 使用Zookeeper来保存元数据
  1. 需求:流存储
  • 客户端必须可以以很低的延时(小于5ms)读写流数据,即使同时还要确保数据的持久性存储
  • 系统必须保证数据存储的持久性、一致的,容错性
  • 系统必须保证数据读写顺序一致
  • 系统必须保证同时提供对历史数据以及实时数据的访问
  1. 特性(分布式日志/流存储)
  • 容错(多副本机制):数据会被复制到不同机器上进行持久化存储(类似HDFS的多副本),kafka的partition的leader、flower也是在不同的机器上

  • BK对数据会有多分拷贝(3份或者5份),副本可以是在同一数据中心或者跨数据中心;but与其他分布式系统的多副本机制(HDFS、Kafka)不一样的是,bookeeper使用的是一种投票并行复制算法(Quorum算法)来确保可预测的低延时的基础上进行数据复制,如下图所示

在这里插入图片描述

  • 首先:上图中一组Bookies被从BookKeeper集群中(自动)选出(图中的例子里是Bookie1-5)这些Bookie作为一个整体(ensemble)存储一个特定的账簿

  • 之后,账簿中的数据被条带化地存储在Bookie中。也就是说,每一条记录都被存储在多个副本中,副本数可以在客户端进行配置,被称为最小写入数Write quorum)。在图中,最小写入数是3,也就是说数据会被写入Bookie2、3、4

  • 然后,每当一个客户端写入一笔数据,客户端会等待一定数量的副本响应成功。这个数字就是最小响应数。当一定数量副本响应成功后,客户端即可认为写入成功。在上图中,最小响应数(Ack quorum)是2,也就是说,假如Bookie3/4写入成功,客户端即会被告知写入成功。

  • 当有Bookie发生故障时ensemble结构会发生改变。故障的Bookie会被健康的Bookie所替换,这也许只是暂时性的。比如说,如果Bookie5宕机,BookieX有可能会顶替它。

  • 复制的核心思想

  • 核心思想1:日志流的原子结构是记录而不是字节。也就是说,数据总是以不可分割的记录形式(包括了元数据)存放的,而不是一个个字节组成的数组

  • 核心思想2:日志流中记录的顺序(index文件)与实际记录的实际存储(log文件)是解耦的(这其实和kafka的设计一样的)

  • 以此,提供了选择写入记录Bookie的丰富选项,可以用来保障写入总是能够成功完成,即使ensemble中存在许多故障或是缓慢的Bookie(只要总容量还足以应对)。ensemble结构调整确保了这一点

  • 以此,通过调优最小应答数来降低添加数据的延时。这对BookKeeper如上文所述在确保一致性和持久性的同时确保低延时是至关重要的。

  • 提供了快速的重新复制机制,一种多对多的副本恢复技术(重新复制为那些副本数不足,也就是低于最小写入数的副本创建更多的副本)。所有的Bookie都可以发送或是接收数据副本

  • 持久性:数据进行复制,在所有操作都完成后才ACK,因此在通知客户端写入成功前会强制使用fsync

  • 强一致性:提供的一致性模型(数据一致性、顺序一致性),使用的类raft算法来实现副本数据的一致性

  • 数据的一致性(刷盘之后再ACK给用户);顺序的一致性(每一个entry都有一个从0开始的自增ID)

  • 一致性1-读的一致性(基于LastAddConfirmed(LAC)的协议实现的,类似于kafka的HW(读一致性保障)和LEO,BK保存的lastAddConfirmed,减少对ZK的访问);Fencing的机制避免脑裂(宕机后的broker和重启后的broker),
    在这里插入图片描述

  • 一致性2(类Paxo、Raft的结合,两个Writer选举过程时防止脑裂(Fencing)的,)
    在这里插入图片描述

  • 一致性3(replication时的数据的一致性,每一个entry都有一个自增的id(LAC,写下一个entry的时候会带上上一次的lastAddConfirmed的)
    在这里插入图片描述

  • 高可用性:集群结构调整以及speculative readsspeculative reads被用以改善读写可用性并同时确保一致性和可用性

  • 写高可用-Ensemble 变更(Bookie发生故障时客户端会调整记录的存储位置——记录写入的Bookie。这确保了只要集群中剩余总Bookie容量足够的话写入就能成功)

  • 读高可用-Speculative Reads(任意读取,没有主节点、每个副本都可以提供读、通过Speculative减少长尾时延),(kafka是只能读取leader的数据的)
    在这里插入图片描述

  • 读写的低延时:通过I/O隔离机制来确保低延时性,同时维持一致性和持久性
    在这里插入图片描述
    在这里插入图片描述

  1. CAP理论
  • 在CAP(一致性、可用性、分区容忍)理论的语境中,Bookkeeper是一个CP系统

总结

  1. 需求:多租户、流+队列的模型这两种场景我们可能会有的(跨地域复制、百万级topics时达不到的),可以借鉴

推荐一篇文章

Jay Kreps(kafka、Samza的主要作者): The Log: What every software engineer should know about real-time data’s unifying abstraction

Logo

开源、云原生的融合云平台

更多推荐