前言

最近项目中需要使用到一个消息队列,主要用来将原来一些操作异步化。根据自己的使用场景和熟悉程度,选择了NATS Streaming。之所以,选择NATS Streaming。一,因为我选型一些中间件,我会优先选取一些自己熟悉的语言编写的,这样方便排查问题和进一步的深究。二,因为自己一直做k8s等云原生这块,偏向于cncf基金会管理的项目,毕竟这些项目从一开始就考虑了如何部署在k8s当中。三,是评估项目在不断发展过程中,引入的组件是否能够依旧满足需求。

消息队列的使用场景

如果问为什么这么做,需要说一下消息队列的使用场景。之前看知乎的时候,看到一些回答比较认同,暂时拿过来,更能形象表达。感谢ScienJus同学的精彩解答。

消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系,也不需要受对方的影响,即解耦和。

使用场景的话,举个例子:

假设用户在你的软件中注册,服务端收到用户的注册请求后,它会做这些操作:

  • 校验用户名等信息,如果没问题会在数据库中添加一个用户记录
  • 如果是用邮箱注册会给你发送一封注册成功的邮件,手机注册则会发送一条短信
  • 分析用户的个人信息,以便将来向他推荐一些志同道合的人,或向那些人推荐他
  • 发送给用户一个包含操作指南的系统通知等等……

但是对于用户来说,注册功能实际只需要第一步,只要服务端将他的账户信息存到数据库中他便可以登录上去做他想做的事情了。至于其他的事情,非要在这一次请求中全部完成么?值得用户浪费时间等你处理这些对他来说无关紧要的事情么?所以实际当第一步做完后,服务端就可以把其他的操作放入对应的消息队列中然后马上返回用户结果,由消息队列异步的进行这些操作。

或者还有一种情况,同时有大量用户注册你的软件,再高并发情况下注册请求开始出现一些问题,例如邮件接口承受不住,或是分析信息时的大量计算使cpu满载,这将会出现虽然用户数据记录很快的添加到数据库中了,但是却卡在发邮件或分析信息时的情况,导致请求的响应时间大幅增长,甚至出现超时,这就有点不划算了。面对这种情况一般也是将这些操作放入消息队列(生产者消费者模型),消息队列慢慢的进行处理,同时可以很快的完成注册请求,不会影响用户使用其他功能。

所以在软件的正常功能开发中,并不需要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在可以异步处理的耗时操作,如果存在的话便可以引入消息队列来解决。否则盲目的使用消息队列可能会增加维护和开发的成本却无法得到可观的性能提升,那就得不偿失了。

其实,总结一下消息队列的作用

  • 削峰,形象点的话,可以比喻为蓄水池。比如elk日志收集系统中的kafka,主要在日志高峰期的时候,在牺牲实时性的同时,保证了整个系统的安全。
  • 同步系统异构化。原先一个同步操作里的诸多步骤,可以考虑将一些不影响主线发展的步骤,通过消息队列异步处理。比如,电商行业,一个订单完成之后,一般除了直接返回给客户购买成功的消息,还要通知账户组进行扣费,通知处理库存变化,通知物流进行派送等,通知一些用户组做一些增加会员积分等操作等。

NATS Streaming 简介

NATS Streaming是一个由NATS驱动的数据流系统,用Go编程语言编写。 NATS Streaming服务器的可执行文件名是nats-streaming-server。 NATS Streaming与核心NATS平台无缝嵌入,扩展和互操作。 NATS Streaming服务器作为Apache-2.0许可下的开源软件提供。 Synadia积极维护和支持NATS Streaming服务器。

图片描述

特点

除了核心NATS平台的功能外,NATS Streaming还提供以下功能:

  • 增强消息协议

NATS Streaming使用谷歌协议缓冲区实现自己的增强型消息格式。这些消息通过二进制数据流在NATS核心平台进行传播,因此不需要改变NATS的基本协议。NATS Streaming信息包含以下字段:

  - 序列 - 一个全局顺序序列号为主题的通道
  - 主题 - 是NATS Streaming 交付对象
  - 答复内容 - 对应"reply-to"对应的对象内容
  - 数据 - 真是数据内容
  - 时间戳 - 接收的时间戳,单位是纳秒
  - 重复发送 - 标志这条数据是否需要服务再次发送
  - CRC32 - 一个循环冗余数据校验选项,在数据存储和数据通讯领域里,为了保证数据的正确性所采用的检错手段,这里使用的是 IEEE CRC32 算法

 - 消息/事件的持久性
  NATS Streaming提供了可配置的消息持久化,持久目的地可以为内存或者文件。另外,对应的存储子系统使用了一个公共接口允许我们开发自己自定义实现来持久化对应的消息

 - 至少一次的发送
  NATS Streaming提供了发布者和服务器之间的消息确认(发布操作) 和订阅者和服务器之间的消息确认(确认消息发送)。其中消息被保存在服务器端内存或者辅助存储(或其他外部存储器)用来为需要重新接受消息的订阅者进行重发消息。

 - 发布者发送速率限定
  NATS Streaming提供了一个连接选项叫 MaxPubAcksInFlight,它能有效的限制一个发布者可能随意的在任何时候发送的未被确认的消息。当达到这个配置的最大数量时,异步发送调用接口将会被阻塞,直到未确认消息降到指定数量之下。

- 每个订阅者的速率匹配/限制
  NATS Streaming运行指定的订阅中设置一个参数为 MaxInFlight,它用来指定已确认但未消费的最大数据量,当达到这个限制时,NATS Streaming 将暂停发送消息给订阅者,直到未确认的数据量小于设定的量为止

  • 以主题重发的历史数据

  新订阅的可以在已经存储起来的订阅的主题频道指定起始位置消息流。通过使用这个选项,消息就可以开始发送传递了:

  1. 订阅的主题存储的最早的信息
  2. 与当前订阅主题之前的最近存储的数据,这通常被认为是 "最后的值" 或 "初值" 对应的缓存
  3. 一个以纳秒为基准的 日期/时间
  4. 一个历史的起始位置相对当前服务的 日期/时间,例如:最后30秒
  5. 一个特定的消息序列号
  • 持久订阅

  订阅也可以指定一个“持久化的名称”可以在客户端重启时不受影响。持久订阅会使得对应服务跟踪客户端最后确认消息的序列号和持久名称。当这个客户端重启或者重新订阅的时候,使用相同的客户端ID 和 持久化的名称,对应的服务将会从最早的未被确认的消息处恢复。

docker 运行NATS Streaming

在运行之前,前面已经讲过NATS Streaming 相比nats,多了持久化的一个future。所以我们在接下来的demo演示中,会重点说这点。

运行基于memory的持久化示例:

docker run -ti -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0

你将会看到如下的输出:

[1] 2019/02/26 08:13:01.769734 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:13:01.769811 [INF] STREAM: ServerID: arfYGWPtu7Cn8Ojcb1yko3
[1] 2019/02/26 08:13:01.769826 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:13:01.770363 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:13:01.770398 [INF] Git commit [not set]
[4] 2019/02/26 08:13:01.770492 [INF] Starting http monitor on 0.0.0.0:8222
[1] 2019/02/26 08:13:01.770555 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:13:01.770581 [INF] Server is ready
[1] 2019/02/26 08:13:01.799435 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:13:01.799461 [INF] STREAM: No recovered state
[1] 2019/02/26 08:13:02.052460 [INF] STREAM: Message store is MEMORY
[1] 2019/02/26 08:13:02.052552 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:13:02.052574 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:13:02.052586 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:13:02.052601 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:13:02.052613 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:13:02.052624 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:13:02.052635 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:13:02.052649 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:13:02.052697 [INF] STREAM: ----------------------------------

可以看出默认的是基于内存的持久化。

运行基于file的持久化示例:

docker run -ti -v /Users/gao/test/mq:/datastore  -p 4222:4222 -p 8222:8222  nats-streaming:0.12.0  -store file --dir /datastore -m 8222

你将会看到如下的输出:

[1] 2019/02/26 08:16:07.641972 [INF] STREAM: Starting nats-streaming-server[test-cluster] version 0.12.0
[1] 2019/02/26 08:16:07.642038 [INF] STREAM: ServerID: 9d4H6GAFPibpZv282KY9QM
[1] 2019/02/26 08:16:07.642099 [INF] STREAM: Go version: go1.11.5
[1] 2019/02/26 08:16:07.643733 [INF] Starting nats-server version 1.4.1
[1] 2019/02/26 08:16:07.643762 [INF] Git commit [not set]
[5] 2019/02/26 08:16:07.643894 [INF] Listening for client connections on 0.0.0.0:4222
[1] 2019/02/26 08:16:07.643932 [INF] Server is ready
[1] 2019/02/26 08:16:07.672145 [INF] STREAM: Recovering the state...
[1] 2019/02/26 08:16:07.679327 [INF] STREAM: No recovered state
[1] 2019/02/26 08:16:07.933519 [INF] STREAM: Message store is FILE
[1] 2019/02/26 08:16:07.933570 [INF] STREAM: Store location: /datastore
[1] 2019/02/26 08:16:07.933633 [INF] STREAM: ---------- Store Limits ----------
[1] 2019/02/26 08:16:07.933679 [INF] STREAM: Channels:                  100 *
[1] 2019/02/26 08:16:07.933697 [INF] STREAM: --------- Channels Limits --------
[1] 2019/02/26 08:16:07.933711 [INF] STREAM:   Subscriptions:          1000 *
[1] 2019/02/26 08:16:07.933749 [INF] STREAM:   Messages     :       1000000 *
[1] 2019/02/26 08:16:07.933793 [INF] STREAM:   Bytes        :     976.56 MB *
[1] 2019/02/26 08:16:07.933837 [INF] STREAM:   Age          :     unlimited *
[1] 2019/02/26 08:16:07.933857 [INF] STREAM:   Inactivity   :     unlimited *
[1] 2019/02/26 08:16:07.933885 [INF] STREAM: ----------------------------------

PS

  • 如果部署在k8s当中,那么就可以采取基于file的持久化,通过挂载一个块存储来保证,数据可靠。比如,aws的ebs或是ceph的rbd。
  • 4222为客户端连接的端口。8222为监控端口。

启动以后访问:localhost:8222,可以看到如下的网页:

图片描述

启动参数解析

Streaming Server Options:
    -cid, --cluster_id  <string>         Cluster ID (default: test-cluster)
    -st,  --store <string>               Store type: MEMORY|FILE|SQL (default: MEMORY)
          --dir <string>                 For FILE store type, this is the root directory
    -mc,  --max_channels <int>           Max number of channels (0 for unlimited)
    -msu, --max_subs <int>               Max number of subscriptions per channel (0 for unlimited)
    -mm,  --max_msgs <int>               Max number of messages per channel (0 for unlimited)
    -mb,  --max_bytes <size>             Max messages total size per channel (0 for unlimited)
    -ma,  --max_age <duration>           Max duration a message can be stored ("0s" for unlimited)
    -mi,  --max_inactivity <duration>    Max inactivity (no new message, no subscription) after which a channel can be garbage collected (0 for unlimited)
    -ns,  --nats_server <string>         Connect to this external NATS Server URL (embedded otherwise)
    -sc,  --stan_config <string>         Streaming server configuration file
    -hbi, --hb_interval <duration>       Interval at which server sends heartbeat to a client
    -hbt, --hb_timeout <duration>        How long server waits for a heartbeat response
    -hbf, --hb_fail_count <int>          Number of failed heartbeats before server closes the client connection
          --ft_group <string>            Name of the FT Group. A group can be 2 or more servers with a single active server and all sharing the same datastore
    -sl,  --signal <signal>[=<pid>]      Send signal to nats-streaming-server process (stop, quit, reopen)
          --encrypt <bool>               Specify if server should use encryption at rest
          --encryption_cipher <string>   Cipher to use for encryption. Currently support AES and CHAHA (ChaChaPoly). Defaults to AES
          --encryption_key <sting>       Encryption Key. It is recommended to specify it through the NATS_STREAMING_ENCRYPTION_KEY environment variable instead

Streaming Server Clustering Options:
    --clustered <bool>                   Run the server in a clustered configuration (default: false)
    --cluster_node_id <string>           ID of the node within the cluster if there is no stored ID (default: random UUID)
    --cluster_bootstrap <bool>           Bootstrap the cluster if there is no existing state by electing self as leader (default: false)
    --cluster_peers <string>             List of cluster peer node IDs to bootstrap cluster state.
    --cluster_log_path <string>          Directory to store log replication data
    --cluster_log_cache_size <int>       Number of log entries to cache in memory to reduce disk IO (default: 512)
    --cluster_log_snapshots <int>        Number of log snapshots to retain (default: 2)
    --cluster_trailing_logs <int>        Number of log entries to leave after a snapshot and compaction
    --cluster_sync <bool>                Do a file sync after every write to the replication log and message store
    --cluster_raft_logging <bool>        Enable logging from the Raft library (disabled by default)

Streaming Server File Store Options:
    --file_compact_enabled <bool>        Enable file compaction
    --file_compact_frag <int>            File fragmentation threshold for compaction
    --file_compact_interval <int>        Minimum interval (in seconds) between file compactions
    --file_compact_min_size <size>       Minimum file size for compaction
    --file_buffer_size <size>            File buffer size (in bytes)
    --file_crc <bool>                    Enable file CRC-32 checksum
    --file_crc_poly <int>                Polynomial used to make the table used for CRC-32 checksum
    --file_sync <bool>                   Enable File.Sync on Flush
    --file_slice_max_msgs <int>          Maximum number of messages per file slice (subject to channel limits)
    --file_slice_max_bytes <size>        Maximum file slice size - including index file (subject to channel limits)
    --file_slice_max_age <duration>      Maximum file slice duration starting when the first message is stored (subject to channel limits)
    --file_slice_archive_script <string> Path to script to use if you want to archive a file slice being removed
    --file_fds_limit <int>               Store will try to use no more file descriptors than this given limit
    --file_parallel_recovery <int>       On startup, number of channels that can be recovered in parallel
    --file_truncate_bad_eof <bool>       Truncate files for which there is an unexpected EOF on recovery, dataloss may occur

Streaming Server SQL Store Options:
    --sql_driver <string>            Name of the SQL Driver ("mysql" or "postgres")
    --sql_source <string>            Datasource used when opening an SQL connection to the database
    --sql_no_caching <bool>          Enable/Disable caching for improved performance
    --sql_max_open_conns <int>       Maximum number of opened connections to the database

Streaming Server TLS Options:
    -secure <bool>                   Use a TLS connection to the NATS server without
                                     verification; weaker than specifying certificates.
    -tls_client_key <string>         Client key for the streaming server
    -tls_client_cert <string>        Client certificate for the streaming server
    -tls_client_cacert <string>      Client certificate CA for the streaming server

Streaming Server Logging Options:
    -SD, --stan_debug=<bool>         Enable STAN debugging output
    -SV, --stan_trace=<bool>         Trace the raw STAN protocol
    -SDV                             Debug and trace STAN
         --syslog_name               On Windows, when running several servers as a service, use this name for the event source
    (See additional NATS logging options below)

Embedded NATS Server Options:
    -a, --addr <string>              Bind to host address (default: 0.0.0.0)
    -p, --port <int>                 Use port for clients (default: 4222)
    -P, --pid <string>               File to store PID
    -m, --http_port <int>            Use port for http monitoring
    -ms,--https_port <int>           Use port for https monitoring
    -c, --config <string>            Configuration file

Logging Options:
    -l, --log <string>               File to redirect log output
    -T, --logtime=<bool>             Timestamp log entries (default: true)
    -s, --syslog <string>            Enable syslog as log method
    -r, --remote_syslog <string>     Syslog server addr (udp://localhost:514)
    -D, --debug=<bool>               Enable debugging output
    -V, --trace=<bool>               Trace the raw protocol
    -DV                              Debug and trace

Authorization Options:
        --user <string>              User required for connections
        --pass <string>              Password required for connections
        --auth <string>              Authorization token required for connections

TLS Options:
        --tls=<bool>                 Enable TLS, do not verify clients (default: false)
        --tlscert <string>           Server certificate file
        --tlskey <string>            Private key for server certificate
        --tlsverify=<bool>           Enable TLS, verify client certificates
        --tlscacert <string>         Client certificate CA for verification

NATS Clustering Options:
        --routes <string, ...>       Routes to solicit and connect
        --cluster <string>           Cluster URL for solicited routes

Common Options:
    -h, --help                       Show this message
    -v, --version                    Show version
        --help_tls                   TLS help.

源码简单分析NATS Streaming 持久化

目前NATS Streaming支持以下4种持久化方式:

  • MEMORY
  • FILE
  • SQL
  • RAFT

其实看源码可以知道:NATS Streaming的store基于接口实现,很容易扩展到更多的持久化方式。具体的接口如下:

// Store is the storage interface for NATS Streaming servers.
//
// If an implementation has a Store constructor with StoreLimits, it should be
// noted that the limits don't apply to any state being recovered, for Store
// implementations supporting recovery.
//
type Store interface {
    // GetExclusiveLock is an advisory lock to prevent concurrent
    // access to the store from multiple instances.
    // This is not to protect individual API calls, instead, it
    // is meant to protect the store for the entire duration the
    // store is being used. This is why there is no `Unlock` API.
    // The lock should be released when the store is closed.
    //
    // If an exclusive lock can be immediately acquired (that is,
    // it should not block waiting for the lock to be acquired),
    // this call will return `true` with no error. Once a store
    // instance has acquired an exclusive lock, calling this
    // function has no effect and `true` with no error will again
    // be returned.
    //
    // If the lock cannot be acquired, this call will return
    // `false` with no error: the caller can try again later.
    //
    // If, however, the lock cannot be acquired due to a fatal
    // error, this call should return `false` and the error.
    //
    // It is important to note that the implementation should
    // make an effort to distinguish error conditions deemed
    // fatal (and therefore trying again would invariably result
    // in the same error) and those deemed transient, in which
    // case no error should be returned to indicate that the
    // caller could try later.
    //
    // Implementations that do not support exclusive locks should
    // return `false` and `ErrNotSupported`.
    GetExclusiveLock() (bool, error)

    // Init can be used to initialize the store with server's information.
    Init(info *spb.ServerInfo) error

    // Name returns the name type of this store (e.g: MEMORY, FILESTORE, etc...).
    Name() string

    // Recover returns the recovered state.
    // Implementations that do not persist state and therefore cannot
    // recover from a previous run MUST return nil, not an error.
    // However, an error must be returned for implementations that are
    // attempting to recover the state but fail to do so.
    Recover() (*RecoveredState, error)

    // SetLimits sets limits for this store. The action is not expected
    // to be retroactive.
    // The store implementation should make a deep copy as to not change
    // the content of the structure passed by the caller.
    // This call may return an error due to limits validation errors.
    SetLimits(limits *StoreLimits) error

    // GetChannelLimits returns the limit for this channel. If the channel
    // does not exist, returns nil.
    GetChannelLimits(name string) *ChannelLimits

    // CreateChannel creates a Channel.
    // Implementations should return ErrAlreadyExists if the channel was
    // already created.
    // Limits defined for this channel in StoreLimits.PeChannel map, if present,
    // will apply. Otherwise, the global limits in StoreLimits will apply.
    CreateChannel(channel string) (*Channel, error)

    // DeleteChannel deletes a Channel.
    // Implementations should make sure that if no error is returned, the
    // channel would not be recovered after a restart, unless CreateChannel()
    // with the same channel is invoked.
    // If processing is expecting to be time consuming, work should be done
    // in the background as long as the above condition is guaranteed.
    // It is also acceptable for an implementation to have CreateChannel()
    // return an error if background deletion is still happening for a
    // channel of the same name.
    DeleteChannel(channel string) error

    // AddClient stores information about the client identified by `clientID`.
    AddClient(info *spb.ClientInfo) (*Client, error)

    // DeleteClient removes the client identified by `clientID` from the store.
    DeleteClient(clientID string) error

    // Close closes this store (including all MsgStore and SubStore).
    // If an exclusive lock was acquired, the lock shall be released.
    Close() error
}

官方也提供了mysql和pgsql两种数据的支持:

postgres.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INTEGER DEFAULT 1, id VARCHAR(1024), proto BYTEA, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id));
CREATE INDEX Idx_ChannelsName ON Channels (name(256));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT, timestamp BIGINT, size INTEGER, data BYTEA, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq));
CREATE INDEX Idx_MsgsTimestamp ON Messages (timestamp);
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT, lastsent BIGINT DEFAULT 0, proto BYTEA, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT, row BIGINT, seq BIGINT DEFAULT 0, lastsent BIGINT DEFAULT 0, pending BYTEA, acks BYTEA, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, row));
CREATE INDEX Idx_SubsPendingSeq ON SubsPending (seq);
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT DEFAULT 0);

-- Updates for 0.10.0
ALTER TABLE Clients ADD proto BYTEA;

mysql.db.sql

CREATE TABLE IF NOT EXISTS ServerInfo (uniquerow INT DEFAULT 1, id VARCHAR(1024), proto BLOB, version INTEGER, PRIMARY KEY (uniquerow));
CREATE TABLE IF NOT EXISTS Clients (id VARCHAR(1024), hbinbox TEXT, PRIMARY KEY (id(256)));
CREATE TABLE IF NOT EXISTS Channels (id INTEGER, name VARCHAR(1024) NOT NULL, maxseq BIGINT UNSIGNED DEFAULT 0, maxmsgs INTEGER DEFAULT 0, maxbytes BIGINT DEFAULT 0, maxage BIGINT DEFAULT 0, deleted BOOL DEFAULT FALSE, PRIMARY KEY (id), INDEX Idx_ChannelsName (name(256)));
CREATE TABLE IF NOT EXISTS Messages (id INTEGER, seq BIGINT UNSIGNED, timestamp BIGINT, size INTEGER, data BLOB, CONSTRAINT PK_MsgKey PRIMARY KEY(id, seq), INDEX Idx_MsgsTimestamp (timestamp));
CREATE TABLE IF NOT EXISTS Subscriptions (id INTEGER, subid BIGINT UNSIGNED, lastsent BIGINT UNSIGNED DEFAULT 0, proto BLOB, deleted BOOL DEFAULT FALSE, CONSTRAINT PK_SubKey PRIMARY KEY(id, subid));
CREATE TABLE IF NOT EXISTS SubsPending (subid BIGINT UNSIGNED, `row` BIGINT UNSIGNED, seq BIGINT UNSIGNED DEFAULT 0, lastsent BIGINT UNSIGNED DEFAULT 0, pending BLOB, acks BLOB, CONSTRAINT PK_MsgPendingKey PRIMARY KEY(subid, `row`), INDEX Idx_SubsPendingSeq(seq));
CREATE TABLE IF NOT EXISTS StoreLock (id VARCHAR(30), tick BIGINT UNSIGNED DEFAULT 0);

# Updates for 0.10.0
ALTER TABLE Clients ADD proto BLOB;

总结

后续会详细解读一下代码实现和一些集群部署。当然肯定少不了如何部署高可用的集群在k8s当中。

参阅文章:

NATS Streaming详解

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐