NATS-Server(JetStream)和NATS Streaming Server对比

在我吐槽了无数次之后,NATS JetStream终于结束了beta阶段正式进入RC阶段。终于官方也在最近刚刚正式回复了我正式版本在处理几个问题之后就会正式发布。那么在这个比较重要的NATS-Server特性发布之际聊一下NATS产品本身区别和新特性的使用,还有更多的潜在的区别。

概念区分:NATS-Server / NATS Streaming Server / NATS JetStream

NATS-Server

NATS-Server(或者叫nats)是一个开源的、云原生的、高性能的消息传递系统,是NATS的最基础的产品。它的核心是一个发布/订阅(Pub/Sub)系统,客户端可以在不同集群中的服务间nats进行通讯,而不需要关注具体的消息在哪个服务上。换而言之,客户端可以在任意一个集群的服务端上发布消息,同时在任意集群客户端上尝试读取消息。在官方与其他同类消息队列产品功能对比中,我们也可以管窥一下产品的功能列表。nats支持多流多服务进行pub/sub,负载均衡,保障消息最多/最少一次送达,多租户和用户认证等功能。虽然看上去优点很多,但是nats不是一个应用很广的消息队列的重要原因是,它缺少了一些对消息队列而言很最重要的产品特性,比如持久化支持,比如消息确保一次送达。这意味着当你的消息发送出去之后,你的消息是在处理过程中可能丢失的,甚至是可能送达不到的。

NATS Streaming Server

NATS Streaming Server(或者叫stan)是用于尝试解决上面提到的nats的已存在问题的。stan添加了持久化功能和消息送达策略支持。stan中自带了nats服务端,但是在使用过程中,natsstan不能进行混用。在官方文档中,是这么描述stannats之间的关系的:

NATS客户端和NATS Streaming Server客户端之间不能相互交换数据。也就是说,如果一个NATS Streaming Server客户端在foo上发布消息,在同一主题上订阅的NATS客户端将不会收到消息。NATS Streaming Server消息是由protobuf组成的NATS消息。NATS Streaming Server要向生产者发送ACK,并接收消费者的ACK。如果与NATS客户端自由交换消息,就会引起问题。

stan的具体架构如下图:

但是stan虽然提供了持久化和消息传递策略支持,但是在架构设计上却出现了问题,导致在最开始设计时遗留了很多问题,比如当你确定stan集群是固定的不能无限制水平扩容(#999),比如不支持多租户功能(#1122),比如客户端无法主动拉取消息只能被推送等等

NATS JetStream

NATS JetStream(或者叫JetStream)是NATS基于Raft算法实现的最新的架构设计尝试解决上述问题的新方案。在区别于原有的stan功能上,提供了新的持久化功能和消息送达策略,同时支持水平扩容。同时,新的JetStream也为大消息做了一些优化,不再将这特性功能作为nats的客户端存在而是嵌入NATS Server中作为其中的一个功能存在。也就是说,如果在对这几项技术进行选择时,JetStream应该是最应该被选择的方案。更多详细情况具体可以查看官方的指导文档

NATS JetStream使用

理论介绍过了,接下来说说实际使用的事情。现在JetStream还是RC阶段,

编译和启动客户端

下载nats-server源码,解压之后执行:

1
2
3
cd nats-server-master
go build -o nats-server -ldflags="-s -w -buildid=" .
./nats-server -js

这样就可以启动一个支持JetStream功能的服务端了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
[19940] 2022/03/03 16:23:17.473191 [INF] Starting nats-server
[19940] 2022/03/03 16:23:17.494913 [INF] Version: 2.7.2
[19940] 2022/03/03 16:23:17.494913 [INF] Git: [135475b]
[19940] 2022/03/03 16:23:17.494913 [INF] Name: n3-c1
[19940] 2022/03/03 16:23:17.494913 [INF] Node: hxTjz3J4
[19940] 2022/03/03 16:23:17.494913 [INF] ID: NCCKYYLMZBGZZB4RZQYZAO6CZPHXHKJPBESSIEON3T2F5MLGNAHDOQKV
[19940] 2022/03/03 16:23:17.494913 [INF] Using configuration file: ./n3_c1.conf
[19940] 2022/03/03 16:23:17.494913 [INF] Starting JetStream
[19940] 2022/03/03 16:23:17.496910 [INF] _ ___ _____ ___ _____ ___ ___ _ __ __
[19940] 2022/03/03 16:23:17.496910 [INF] _ | | __|_ _/ __|_ _| _ \ __| /_\ | \/ |
[19940] 2022/03/03 16:23:17.496910 [INF] | || | _| | | \__ \ | | | / _| / _ \| |\/| |
[19940] 2022/03/03 16:23:17.496910 [INF] \__/|___| |_| |___/ |_| |_|_\___/_/ \_\_| |_|
[19940] 2022/03/03 16:23:17.496910 [INF]
[19940] 2022/03/03 16:23:17.496910 [INF] https://docs.nats.io/jetstream
[19940] 2022/03/03 16:23:17.496910 [INF]
[19940] 2022/03/03 16:23:17.496910 [INF] ---------------- JETSTREAM ----------------
[19940] 2022/03/03 16:23:17.496910 [INF] Max Memory: 953.67 MB
[19940] 2022/03/03 16:23:17.496910 [INF] Max Storage: 953.67 MB
[19940] 2022/03/03 16:23:17.496910 [INF] Store Directory: "\nats\n3-c1\storage\jetstream"
[19940] 2022/03/03 16:23:17.497907 [INF] -------------------------------------------
[19940] 2022/03/03 16:23:17.498905 [INF] Starting JetStream cluster
[19940] 2022/03/03 16:23:17.498905 [INF] Creating JetStream metadata controller
[19940] 2022/03/03 16:23:17.502423 [INF] JetStream cluster recovering state
[19940] 2022/03/03 16:23:17.505415 [INF] Listening for client connections on 0.0.0.0:4224
[19940] 2022/03/03 16:23:17.543574 [INF] Server is ready

编写JetStream DEMO

接下来我们看一下如何使用JetStream进行消息发布/订阅功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 连接到nats的服务器
conn, err := nats.Connect("nats://127.0.0.1:4222")
if err != nil {
log.Panic(err)
}
defer conn.Close()

// 初始化JetStream功能
js, err := conn.JetStream()
if err != nil {
log.Panic(err)
}

// 判断Stream是否存在,如果不存在,那么需要创建这个Stream,否则会导致pub/sub失败
stream, err := js.StreamInfo(streamName)
if err != nil {
log.Println(err) // 如果不存在,这里会有报错
}
if stream == nil {
log.Printf("creating stream %q and subject %q", streamName, subject)
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{subject},
MaxAge: 3 * 24 * time.Hour,
})
if err != nil {
log.Panicln(err)
}
}

// 订阅消息
sub, err := js.Subscribe(subject, cbHandle, nats.AckAll(), nats.DeliverNew())
if err != nil {
log.Panic(err)
return
}
defer sub.Unsubscribe()

// 发送消息
js.Publish(subject, []byte("Hello World! "+time.Now().Format(time.RFC3339)))

time.Sleep(5 * time.Second)
log.Println("Exiting...")

在这个例子中,有个值得注意的功能需要额外强调一下,在Subscribe消息时,我们在这里特别声明了nats.DeliverNew()这个选项。如果不声明,则默认为nats.DeliverAll();除了这两个参数,还有一个nats.DeliverLast()参数,这分别对应了3种开始订阅时的方式:默认方式nats.DeliverAll()是会读取有效生命周期内的所有消息,甚至包含已被处理的消息;nats.DeliverLast()是会包含消息队列中的最后一条消息,即使被处理过的消息;nats.DeliverNew()则只处理订阅之后的新消息。