基于NATS JetStream构建分布式事件流系统

【译文】原文地址
本文我将简要介绍NATS JetStream项目,它是一个来自NATS生态的分布式事件流系统。简言之,NATS JetStream是一个以发布/订阅作为消息传递模式的分布式流媒体系统。当我在咨询工作中向我的客户介绍NATS时,对NATS、NATS Streaming和新推出的NATS JetStream的不同感到困惑。我将简要介绍这三个产品,然后通过一个非常简单的示例深入介绍NATS JetStream。

从NATS服务到NATS JetStream来理解NATS生态

NAT云原生消息分发系统提供了一个基于“最多一次”交付模型的简单NATS服务,该服务将已发布的消息转发给消费者,而对已发布的消息不进行任何持久性处理,其性能处于行业领先地位。对于某些类型的应用程序,如果不需要事件流平台,那么基本的NATS平台就足够了并且还能获得高性能。使用“最多一次”发布模型,如果消息服务在转发数据给订阅者过程中,订阅者系统故障的话,消息将会丢失,因此无法保证已发布的消息的传递。

后来NATS生态添加了第二个消息发布可选方案:NATS流,这是一个基于NATS生态构建的高性能、轻量级、可靠的流平台。当使用NATS流发布消息时,发布的消息被持久化到一个可定制的存储中,由于具备发布者和订阅者提供ACK消息、发布者速率限制和每个订阅者速率匹配/限制的能力,这样我们可以重用发布的消息为消费者提供“At-least-once-delivery”模式。基础NATS和NATS流都是用Go编写的。虽然NATS Streaming非常高性能和轻量化,但在某些能力和成熟度方面,它没有像Kafka这样的分布式流系统那么强大。与此同时,随着NATS 2.0的出现,NATS的生态系统也得到了很大的发展,NATS 2.0提供了分布式安全、去中心化管理、多租户、更大的网络、全球可扩展以及安全的数据共享。但NATS流在适应NATS 2.0方面存在很多限制,而且流系统还没有发展到能够应对下一代物联网和边缘计算的挑战。

NATS 2.0的下一代NATS流被称为NATS JetStream。因此,如果你想使用流系统来构建分布式系统、物联网和边缘计算,那么最好基于NATS使用NATS JetStream。官方对NATS流的支持将于2023年6月结束。因此,如果不想持久化发布的消息,请选择基本的NATS;如果希望将消息持久化到持久化存储中,并通过ACK、速率匹配/限制等方式重复来自持久化存储的消息,请使用NATS JetStream。NATS JetStream是用Go编写的。

NATS JetStream

NATS JetStream是NATS生态系统的下一代流系统,为NATS 2.0打造,具有分布式安全、多租户和水平扩展能力。

什么是分布式事件流系统?

流系统允许您从分布式系统和微服务、物联网传感器、边缘设备和软件应用程序中捕获事件流,并将这些流持久化到存储中。因为将这些事件流持久化到持久化存储中,就可以使用事件驱动的结构回放这些事件流,以便检索、处理和响应这些事件流。使用NATS JetStream,流将由生产者系统发布,通过持久化重放消息并通过消费系统发布。简而言之,NATS JetStream是一个以发布/订阅作为消息传递模式的分布式流系统。

尽管现有的流技术很多,其中一些流技术在某些方面比较擅长,但大多数技术还没有经过很大的发展,无法应对未来的计算场景,尤其是物联网和边缘计算。例如,当您使用物联网和边缘计算发布和订阅消息时,您可能需要更好的安全模型。您可能需要多租户和对多个部署模型的支持。NATS JetStream就是为解决这些问题而设计的,这些问题我们在今天的流技术堆栈中已经看到了。

设计目标

NATS的官方文档列出了NATS JetStream的设计目标:

  • 系统必须易于配置和操作,易于观察。

  • 系统必须是安全的,并且在NATS 2.0安全模型下运行良好。

  • 系统必须能水平扩展,并适用于高摄取率。

  • 系统必须支持多个用例。

  • 系统必须能自愈并高可用。

  • 系统必须有一个接近NATS核心的API。

  • 系统必须允许NATS消息成为流的一部分。

  • 系统必须显示与有效载荷无关的行为。

  • 系统不能有第三方依赖关系。

用于运行NATS服务和NATS JetStream服的简单可执行文件

安装NATS Server v2.2.2,您可以使用相同的可执行文件“nats-server”运行基本的NATS服务(没有持久性)和NATS JetStream服务。当你运行nats-server可执行文件时,它将作为基本的NATS服务运行,当你运行带有-js参数的nats-server可执行文件或配置启用JetStream时,它将运行启用了JetStream子系统的NATS服务。
以下是JetStream的配置文件:
清单1。启用JetStream子系统

1
2
3
4
5
6
7
8
 jetstream {

store_dir: "/data/nats-server"

max_memory_store: 1073741824

max_file_store: 10737418240
}

上面的配置为NATS服务启用了JetStream子系统,并将/data/ nats-server目录配置为持久化流的存储目录。
在这里,我们使用js.conf文件运行带有JetStream子系统的nats-server:
清单2。使用js.conf文件运行NATS服务器

图1所示。NATS JetStream服务正在运行:

使用单个NATS客户端SDK与NATS和NATS JetStream工作

我们使用单独的客户端sdk与NATS服务和NATS流媒体服务一起工作。但Go SDK nats.gov1.11.0版本中,你可以使用相同的库来处理基本的NATS和NATS JetStream。
下面是从NATS连接创建JetStream上下文的代码:
清单3。创建JetStream上下文

1
2
3
import "github.com/nats-io/nats.go"
nc, _ := nats.Connect(nats.DefaultURL)
js, err := nc.JetStream()

JetStreamContext允许JetStream消息和流管理。一旦创建了JetStreamContext,就可以通过使用其发布和订阅api轻松地使用JetStream。

JetStream流

在JetStream中,事件流以Streams存储,其中多个相关的主题存储在一个Stream中。例如,当你在构建一个订单处理系统时,可以考虑“ORDERS”作为一个Stream,在这里可以使用与Stream相关的主题比如“ORDERS.created”、“ORDERS.approved"、“ORDERS.rejected"、“ORDER.payment.debited"和“ORDERS.shiped",来发布和消费事件流。因此所有这些相关的主题都属于“ORDERS”这个Stream。
下图显示了在服务器上以存储文件形式存储的“ORDERS”流。
图2。订单流存储

img

拉式消费者和推式消费者

JetStream提供了两种消费者(订阅者)系统:基于拉的消费者和基于推的消费者。基于Pull的消费者让JetStream从消费者系统中提取消息。基于pull的消费者系统就像工作队列。因为JetStream提供了ACK(确认)机制,所以可以轻松地水平扩展基于Pull的消费者系统,而不会出现消息重复的问题。拉式订阅是NATS生态系统的新功能。基于Push的消费者使JetStream将消息推送到消费者系统,这对于监控系统来说是一个很好的选择。
以下是来自NATS文档的JetStream混合推/拉订单处理架构:

原图链接: https://docs.nats.io/jetstream/concepts

通配符订阅

NATS流的消费者不支持通配符订阅。庆幸的是在JetStream中支持通配符订阅。下面的代码块显示了通配符订阅:

1
2
3
4
nc, _ := nats.Connect(nats.DefaultURL)
js, _ := nc.JetStream()
js.Subscribe("ORDERS.*", func(msg *nats.Msg) {
}

NATS命令行

NATS生态系统为管理需求提供了一个新的NATS CLI工具。你可以从源代码安装或使用Homebrew tap。

1
2
$ brew tap nats-io/nats-tools
$ brew install nats-io/nats-tools/nats

一个NATS JetStream的示例

让我们是用 nats.goGo客户端SDK编写一个简单的示例来理解如何使用NATS JetStream。
配置最新版本NATS服务和nats.go客户端SDK:

1
2
3
4
5
# Go client latest 
go get github.com/nats-io/nats.go/@latest

# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2

例子演示

在这个例子中,使用Stream ”ORDERS“作为主题”ORDERS.*的流。为了演示,我们用“ORDERS.created"和"ORDERS.approved"主题来发布消息。一个系统用“ORDERS.created"主题发布消息。一个基于Pull消费者将订阅该主题的消息,并基于这个事件做一些处理之后,进一步在"ORDERS.approved"主题上发布消息。另一个基于Push的消费者使用“ORDERS.*”通配符订阅消息,因此所有“ORDERS”上的流都被订阅。

使用JetStreamContext创建流

我们首先创建一个流“ORDERS”,用于主题“ORDERS.*”。您可以使用管理工具(如NATS CLI)或使用NATS客户端sdk来创建Stream。这里我们使用nats.go创建Stream。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const (
streamName = "ORDERS"
streamSubjects = "ORDERS.*"
)

func createStream(js nats.JetStreamContext) error {

stream, err := js.StreamInfo(streamName)
if err != nil {
log.Println(err)
}
if stream == nil {
log.Printf("creating stream %q and subjects %q", streamName, streamSubjects)
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamSubjects},
})
if err != nil {
return err
}
}
return nil
}

在上面的代码块中,如果流不存在,我们就使用JetStreamContext的AddStream方法创建一个新的流。JetStreamContext可以通过调用NATS连接对象的JetStream方法来创建。
从NATS连接创建JetStreamContext,然后调用createStream方法创建一个Stream。

1
2
3
4
 nc, _ := nats.Connect(nats.DefaultURL)

js, err := nc.JetStream()
err = createStream(js)

事件发布流

下面的代码块以“ORDERS”为主题发布消息(事件流),目的是“让消费者系统知道新订单被放到了我们的分布式系统环境中,这样消费者系统就可以通过执行自己的操作和发布其他事件集来响应这些事件。
发布主题为“ORDERS.created”的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
const (
subjectName ="ORDERS.created"
)

func createOrder(js nats.JetStreamContext) error{
var order model.Order
for i := 1; i <= 10; i++ {
order = model.Order{
OrderID: i,
CustomerID: "Cust-" + strconv.Itoa(i),
Status: "created",
}
orderJSON, _ := json.Marshal(order)
_, err := js.Publish(subjectName, orderJSON)
if err!=nil {
return err
}
log.Printf("Order with OrderID:%d has been published\n",i)
}
return nil
}

下面是在我们的例子中使用的Order结构体:

1
2
3
4
5
type Order struct {
OrderID int
CustomerID string
Status string
}

下面是示例完整代码块,它创建了Stream并在主题“ORDERS.created”上发布消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package main
import (
"encoding/json"
"log"
"strconv"

"github.com/nats-io/nats.go"

"github.com/shijuvar/go-distsys/jsdemo/model"
)

const (
streamName = "ORDERS"
streamSubjects = "ORDERS.*"
subjectName ="ORDERS.created"
)

func main() {

nc, _ := nats.Connect(nats.DefaultURL)

js, err := nc.JetStream()
checkErr(err)

err = createStream(js)
checkErr(err)

err= createOrder(js)
checkErr(err)
}

func createOrder(js nats.JetStreamContext) error{
var order model.Order
for i := 1; i <= 10; i++ {
order = model.Order{
OrderID: i,
CustomerID: "Cust-" + strconv.Itoa(i),
Status: "created",
}
orderJSON, _ := json.Marshal(order)
_, err := js.Publish(subjectName, orderJSON)
if err!=nil {
return err
}
log.Printf("Order with OrderID:%d has been published\n",i)
}
return nil
}

func createStream(js nats.JetStreamContext) error {

stream, err := js.StreamInfo(streamName)
if err != nil {
log.Println(err)
}
if stream == nil {
log.Printf("creating stream %q and subjects %q", streamName, streamSubjects)
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{streamSubjects},
})
if err != nil {
return err
}
}
return nil
}

func checkErr(err error) {
if err != nil {
log.Fatal(err)
}
}

基于拉的消费者对事件的反应

一个基于Pull的消费者从主题“ORDERS”中订阅消息的例子。最终通过主题“ORDERS.approved”发布消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package main

import (
"context"
"encoding/json"
"log"
"time"

"github.com/nats-io/nats.go"

"github.com/shijuvar/go-distsys/jsdemo/model"
)

const (
subSubjectName ="ORDERS.created"
pubSubjectName ="ORDERS.approved"

)
func main() {

nc, _ := nats.Connect(nats.DefaultURL)
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}


sub, _ := js.PullSubscribe(subSubjectName, "order-review", nats.PullMaxWaiting(128))
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

for {
select {
case <-ctx.Done():
return
default:
}
msgs, _ := sub.Fetch(10, nats.Context(ctx))
for _, msg := range msgs {
msg.Ack()
var order model.Order
err := json.Unmarshal(msg.Data, &order)
if err != nil {
log.Fatal(err)
}
log.Println("order-review service")
log.Printf("OrderID:%d, CustomerID: %s, Status:%s\n", order.OrderID, order.CustomerID, order.Status)
reviewOrder(js,order)
}
}
}

func reviewOrder(js nats.JetStreamContext, order model.Order) {

order.Status ="approved"
orderJSON, _ := json.Marshal(order)
_, err := js.Publish(pubSubjectName, orderJSON)
if err != nil {
log.Fatal(err)
}
log.Printf("Order with OrderID:%d has been %s\n",order.OrderID, order.Status)
}

JetStreamContext的PullSubscribe方法创建了一个可以获取消息的订阅。它将返回一个Subscription对象。Subscription的Fetch方法从流中为Pull消费者提取一批消息。msg.Ack()向服务器发出手动确认。

基于Push的消费者使用通配符订阅

在下面示例中,另一个消费者是一个基于Push的订阅者,它使用通配符订阅消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package main

import (
"encoding/json"
"log"
"runtime"

"github.com/nats-io/nats.go"

"github.com/shijuvar/go-distsys/jsdemo/model"
)

func main() {

nc, _ := nats.Connect(nats.DefaultURL)
js, err := nc.JetStream()
if err != nil {
log.Fatal(err)
}

js.Subscribe("ORDERS.*", func(msg *nats.Msg) {
msg.Ack()
var order model.Order
err := json.Unmarshal(msg.Data, &order)
if err != nil {
log.Fatal(err)
}

log.Printf("monitor service subscribes from subject:%s\n", msg.Subject)
log.Printf("OrderID:%d, CustomerID: %s, Status:%s\n", order.OrderID, order.CustomerID, order.Status)
}, nats.Durable("monitor"),nats.ManualAck())

runtime.Goexit()

}

对于已经在NATS和NATS流媒体服务器上工作过的NATS开发人员来说,订阅API是一个非常熟悉的API。

总结

当我们考虑使用分布式流系统作为构建分布式系统、基于微服务的分布式系统、基于物联网的系统、下一代Edge系统的核心架构时,可以考虑使用NATS JetStream。在未来的计算中,处理大量的事件流和消息流将是一个巨大的挑战,特别是边缘计算。 NATS JetStream提供了分布式安全、多租户和水平扩展能力。在NATS JetStream的早期版本中,一些设计目标没有通过集群选项实现。但随着后来的发布和改进,我希望NATS JetStream能够实现所有的设计目标,成为一种极具竞争力的流技术。考虑到简单性和适应性边缘架构,我肯定会选择NATS JetStream而不是Kafka,并等待这个技术更加成熟。

常见属性

StreamConfig全属性

属性 描述
MaxAge 流中消息的最大年龄,以微秒为单位
MaxBytes 流的最大存储容量,当合并的流大小超过这个大小时,旧的消息就会被删除
MaxMsgSize 流能接收的最大消息尺寸
MaxMsgs 流中能存储的最大消息条数,当数量超过这个值时,旧的消息就会被删除
MaxConsumers 流最多能有多少个消费者,为-1时无限制
Name 流的名称,不能带有空格,tab或者.
NoAck 禁用流接收的消息的ACK
Replicas 消息在集群中的副本数量,最大值为5
Retention 消息的保留策略,有LimitsPolicy(默认),InterestPolicyWorkQueuePolicy
Discard 当流触及限制后,DiscardNew策略会拒绝新的消息,DiscardOld(默认)会删除旧的消息
Storage 消息的存储方式,有filememory两种
Subjects 待消费的subjects集合,支持通配符
Duplicates 消息去重使用的时间窗

保留策略(Retention)

保留策略 描述
LimitsPolicy 对消息的数量、存储容量和年龄进行限制
WorkQueuePolicy 直到被一个观察者消耗之前,消息都会保存
InterestPolicy 只要有消费者处于活跃状态,消息就会被保存下来

上面提到的MaxMsgsMaxMsgsMaxAge用来对消息进行限制,它们也是LimitsPolicy策略仅能使用的属性。

WorkQueuePolicy策略下,一旦有消费者收到确认,消息就会被立即删除。而启用InterestPolicy时,如果没有消费者在线就会立即将消息删除。

需要注意,在WorkQueuePolicyInterestPolicy下,MaxMsgsMaxMsgsMaxAge这三个属性依然是生效的,并且是作为前置条件存在。

确认模型(Acknowledgement Models)

消费者有三种确认模式:

模式 描述
AckExplicit 要求每个消息都进行手动确认,这也是拉模型下唯一支持的方式
AckAll 这个模式下,如果你确认了第100个消息,那么1-99个消息都会自动确认,适用于批处理任务,以减少确认带来的额外开销
AckNone 不支持任何确认

ConsumerConfig的配置属性列表

属性 描述
AckPolicy 消息的确认方式,支持AckNone``, ``AckAllAckExplicit
AckWait 在消息被重新投递之前,允许消息在多长时间之内处于未确认状态
DeliverPolicy 消费者的起始位置策略,支持DeliverAll, DeliverLast, DeliverNew, DeliverByStartSequenceDeliverByStartTime
DeliverySubject 传递观察到的消息的subject,如果没有设置的话,会创建一个拉模式的消费者
Durable 消费者的名称
FilterSubject 当从一个有许多subject或通配符的流中消费时,只选择一个特定的传入subject,支持通配符。
MaxDeliver 特定消息的最大传递次数,以此来避免使你的系统陷入崩溃的毒药类型的消息(如陷入死循环)
OptStartSeq 当第一次从流中消费消息时,从集合中的这个特定的消息开始读取
ReplayPolicy 消息的发送方式,支持ReplayInstantReplayOriginal
SampleFrequency 确认的消息应占观测样本的百分比,区间为0-100
OptStartTime 当第一次从流中消费消息时,从这个时间或之后的消息开始
RateLimit 以bit/s为单位的消息传递速率
MaxAckPending 未确认的最大消息数量,一旦达到此限制,将暂停发送消息

消费者起始位置(Consumer Starting Position)

配置消费者时可以决定从什么位置开始消费,NATS支持以下DeliverPolicy:

策略 描述
all 传递所有可用的消息
last 传递最近的消息,类似于tail -n 1 -f
new 只传递在订阅之后新到来的消息
by_start_time 传递特定时间之后的消息,需要设置OptStartTime
by_start_sequence 从流中特定序号的消息开始,需要设置OptStartSeq