NATS Streaming集群

支持的持久化

为了以群集模式运行NATS Streaming Server,您需要指定一个持久性存储。 目前,您可以在“ FILE”和“ SQL”之间进行选择。NATS Streaming将服务器元信息,消息和订阅存储到使用--store选项配置的存储中。

但是,在群集模式下,我们使用RAFT进行领导者选举。RAFT使用其自己的存储,这些存储当前必须基于文件。 RAFT存储的位置默认为以群集ID命名的子目录下的当前目录,或者您可以使用--cluster_log_path对其进行配置。

这意味着,即使您选择了一个SQL存储,仍然需要在文件系统上存储数据。

配置

我们可以通过使用-cluster_peers标志提供集群拓扑来组建NATS流集群。 这只是参与集群的一组节点ID。 请注意,一旦建立领导者,我们便可以启动后续服务器而无需提供此配置,因为它们将自动加入领导者。 如果服务器正在恢复,它将使用恢复的群集配置。

这是在集群中启动三台服务器的示例。 对于此示例,我们运行流服务器连接到的单独的NATS服务器。

1
2
3
4
5
nats-streaming-server -store file -dir store-a -clustered -cluster_node_id a -cluster_peers b,c -nats_server nats://localhost:4222

nats-streaming-server -store file -dir store-b -clustered -cluster_node_id b -cluster_peers a,c -nats_server nats://localhost:4222

nats-streaming-server -store file -dir store-c -clustered -cluster_node_id c -cluster_peers a,b -nats_server nats://localhost:4222

需要注意的是,一旦一个领导人选举,随后服务器可以不提供群集配置开始。 他们将自动加入集群。类似地,集群节点ID不需要被设置,它将被自动分配。 只要使用了文件存储,此ID就会在重新启动时恢复。

1
nats-streaming-server -store file -dir store-d -clustered -nats_server nats://localhost:4222

可以在“ cluster”组下的配置文件中指定等效的集群配置。有关更多信息,请参见Configuring部分。

这是使用以下配置文件的3个节点的群集的示例。这些节点分别在“ host1”,“ host2”和“ host3”上运行。 注意如果您已经有一个NATS集群,并且想在其上运行NATS流集群,请参阅本节末尾的详细信息。

在“ host1”上,此配置指示服务器将在端口4222上接受客户端连接。它将在端口6222上接受路由连接。它将创建2条路由,分别指向“ host2”和“ host3”集群端口。

它定义了NATS Streaming群集名称为“ mycluster”,使用指向“ store”目录的存储文件。 streaming内部的cluster部分使NATS Streaming服务器以集群模式运行。此配置显式定义每个节点ID(“ host1”的“ a”)并列出其对等节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# NATS specific configuration
port: 4222
cluster {
listen: 0.0.0.0:6222
routes: ["nats://host2:6222", "nats://host3:6222"]
}

# NATS Streaming specific configuration
streaming {
id: mycluster
store: file
dir: store
cluster {
node_id: "a"
peers: ["b", "c"]
}
}

以下是在“ host2”上运行的服务器的配置。 注意路由现在如何到达“ host1”和“ host3”。 更改的另一件事是将节点标识设置为“ b”,并且对等节点也相应更新为“ a”和“ c”。

注意,“ dir”配置也是“ store”,但是它们是本地目录,并且不共享(实际上不能共享)。 每个节点将拥有自己的数据存储副本。 如果需要,您可以使每个配置的dir值不同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# NATS specific configuration
port: 4222
cluster {
listen: 0.0.0.0:6222
routes: ["nats://host1:6222", "nats://host3:6222"]
}

# NATS Streaming specific configuration
streaming {
id: mycluster
store: file
dir: store
cluster {
node_id: "b"
peers: ["a", "c"]
}
}

对于host3而言,路由现在分别指向host1和host2,节点ID为c,而对等节点为a和b。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# NATS specific configuration
port: 4222
cluster {
listen: 0.0.0.0:6222
routes: ["nats://host1:6222", "nats://host2:6222"]
}

# NATS Streaming specific configuration
streaming {
id: mycluster
store: file
dir: store
cluster {
node_id: "c"
peers: ["a", "b"]
}
}

在上面的示例中,配置假定不存在现有的NATS群集,因此在每个节点之间配置NATS路由。 如果要使用现有的NATS群集,请不要包括“ NATS特定配置”部分,而应在“ streaming”部分中添加“ nats_server_url”以指向所需的NATS服务器。

自动配置

我们还可以通过使用-cluster_bootstrap标志启动一个服务器作为种子节点来组建NATS流集群。 该节点将选举自己为领导者,因此避免启动多个服务器作为种子非常重要。 启动种子节点后,其他服务器将自动加入群集。 如果服务器正在恢复,它将使用恢复的群集配置。

以下是通过启动一个服务器作为种子并让其他服务器自动加入来启动集群中的三台服务器的示例:

1
2
3
4
5
nats-streaming-server -store file -dir store-a -clustered -cluster_bootstrap -nats_server nats://localhost:4222

nats-streaming-server -store file -dir store-b -clustered -nats_server nats://localhost:4222

nats-streaming-server -store file -dir store-c -clustered -nats_server nats://localhost:4222

对于给定的集群ID,如果启动多个服务器并将cluster_bootstrap设置为true,则每个具有此参数的服务器将报告配置错误并退出。

引导群集的第一台服务器可以重新启动,但是,操作员必须删除使用bootstrap参数错误启动的其他服务器的数据存储,然后再尝试重新启动它们。 如果以现有状态重新启动它们,即使没有-cluster_bootstrap参数,它们也将再次以领导者身份启动。

连接到NATS Streaming

首先,建议您了解Streaming和核心NATS之间的关系。 您应该熟悉concept

NATS流式传输是NATS之上的一项服务。 要连接到服务,您首先要连接到NATS,然后使用客户端库通过您的NATS连接与服务器进行通信。 大多数库都提供了一种方便的机制,可用于一步连接。 这些便捷方法将采用一些NATS选项,例如群集ID,并首先执行NATS连接,然后运行协议以连接到流服务器。

连接到流服务器需要由服务器配置定义的群集ID和由客户端定义的客户端ID。

客户编号应仅包含字母数字字符“-”或“ _”

连接到在默认端口上本地运行的服务器非常简单:

1
sc, err := stan.Connect(clusterID, clientID)

如果服务器在端口1234上运行:

1
sc, err := stan.Connect(clusterID, clientID, stan.NatsURL(“nats://localhost:1234))

有时,您可能想提供流媒体库connect方法中不可用的NATS设置。 或者,您可能想重用NATS连接而不是创建新的连接。 在这种情况下,库通常提供一种通过现有的NATS连接来连接流的方法:

1
sc, err := stan.Connect(clusterID, clientID, stan.NatsConn(nc))

连接到集群

1
2
3
4
5
6
url = "nats://localhost:4222,nats://localhost:6222"   
nc, err := nats.Connect(url, nats.DontRandomize(), nats.MaxReconnects(5)}
sc, err := stan.Connect(clusterID, clientID, stan.NatsConn(nc),
stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
log.Fatalf("Connection lost, reason: %v", reason)
}))

发布到通道

流客户端库可以提供一种用于同步发布的方法。 这些发布方法将阻塞,直到服务器返回ACK。 错误或异常用于指示超时或其他错误。

1
err := sc.Publish("foo", []byte("Hello World"))

流库还可以提供一种异步发布的方法。 需要某种ACK回调。 该库将发布消息,并在ACK或超时时通知回调。 与发送的消息关联的全局ID从发布中返回,以便应用程序可以在回调时识别它。

1
2
3
ackHandler := func(ackedNuid string, err error){ ... }

nuid, err := sc.PublishAsync("foo", []byte("Hello World"), ackHandler)

即使在这种模式下,如果库中有许多已发布的消息而未从服务器接收到ACK,则呼叫仍将阻塞。 创建连接时可以更改默认值。

1
sc, err := sc.Connect(clusterID, clientName, stan.MaxPubAcksInflight(1000))

从通道接收消息

客户按名称订阅频道。不支持通配符。接收消息类似于核心NATS。流中的消息使用协议缓冲区,并且比NATS不透明消息具有更多的结构。客户端消息仍作为原始/不透明二进制数据显示并接受。协议缓冲区的使用是透明的。

订阅有几种形式:

  • 定期
  • 持久
  • 队列
  • 队列/持久

有关各种类型的更多详细信息,请查看concepts部分。

注意:消息回调是按顺序调用的,一次调用一条消息。如果您的应用程序不关心处理顺序,而是希望同时发送消息,则应用程序有责任将其移动到某个内部队列以供线程/执行例程接收。

订阅使用位置或时间设置创建时的起始位置。例如,在Go中,您可以从以下位置开始:

  • 最后收到的消息
1
2
3
sub, err := sc.Subscribe("foo",
func(m *stan.Msg) {...},
stan.StartWithLastReceived())
  • The beginning of the channel
1
2
3
sub, err := sc.Subscribe("foo",
func(m *stan.Msg) {...},
stan.DeliverAllAvailable())
  • 一条特定的消息,索引从1开始
1
2
3
sub, err := sc.Subscribe("foo",
func(m *stan.Msg) {...},
stan.StartAtSequence(22))
  • A specific time the message arrived in the channel
1
2
3
4
5
var startTime time.Time
...
sub, err := sc.Subscribe("foo",
func(m *stan.Msg) {...},
stan.StartAtTime(startTime))

要设置延迟,服务器应在该延迟之后尝试重新传递尚未收到确认的消息:

1
2
3
sub, err := sc.Subscribe("foo",
func(m *stan.Msg) {...},
stan.AckWait(20*time.Second))

当应用程序希望停止接收但希望保持连接打开时,应关闭订阅。 有两种停止订阅的方法,“关闭”或“取消订阅”。 对于非持久订阅,这是等效的,因为订阅将被完全删除。 对于持久订阅,关闭表示服务器将停止传递,但请记住持久订阅。 但是,取消订阅意味着服务器将删除此订阅的状态。

要简单地关闭:

1
err := sub.Close()

取消订阅:

1
err := sub.Unsubscribe()

注意:如果在没有显式关闭订阅的情况下关闭了连接,则订阅将隐式关闭,而不是取消订阅。

持久订阅

常规订阅会记住客户端连接后的位置。 如果客户断开连接,头寸将丢失。 即使客户端断开连接,持久订阅也会记住其位置。

持久订阅使用名称标识自己。 连接和断开连接不会影响该频道中的长期订阅位置。

1
sc.Subscribe("foo", func(m *stan.Msg) {...}, stan.DurableName("my-durable"))

取消订阅将导致服务器完全删除持久订阅。

检查concepts部分以获取更多信息。

NATS流中的队列订阅

队列订阅是与其他订阅一样创建的,只是添加了队列名称。

1
2
3
4
5
qsub1, _ := sc.QueueSubscribe(channelName,
queueName, func(m *stan.Msg) {...})

qsub2, _ := sc.QueueSubscribe(channelName,
queueName, func(m *stan.Msg) {...})

使用相同通道和队列名称的多个预订是同一队列组的成员。 这意味着,如果在该频道上发布了一条消息,则该组中只有一个成员会收到该消息。 其他订阅独立于队列组接收消息,即,将消息传递给所有订阅和每个队列组的一个成员。

要创建持久队列订阅,只需添加一个持久名称:

1
2
3
qsub, err := sc.QueueSubscribe(channelName,
queueName, func(m *stan.Msg) {...},
stan.DurableName("durable-name"))

订阅选项分别适用于每个成员,尤其是“ AckWait”和“ MaxInflight”。 同一队列组的这两个成员使用不同的选项进行重新和最大限度交付。

1
2
3
4
5
6
7
8
9
qsub1, _ := sc.QueueSubscribe(channelName,
queueName, func(m *stan.Msg) {...},
stan.AckWait(5*time.Second),
stan.MaxInflight(5))

qsub2, _ := sc.QueueSubscribe(channelName,
queueName, func(m *stan.Msg) {...},
stan.AckWait(20*time.Second),
stan.MaxInflight(10))

如果队列订阅是持久的,则只有最后一个调用Unsubscribe()的成员才会导致持久队列组从服务器中删除。

检查concepts部分以获取更多信息。

应答

订户可以使用自动确认或手动确认。 自动确认是大多数客户端的默认设置,并在消息回调返回时由库发送。 手动确认可提供更多控制。 订阅选项提供标志以:

  • 将手动设置为true
  • 设置服务器用于此订阅消息的确认等待时间

ack wait是服务器在重新发送消息之前将等待的时间。

1
2
3
4
sub, err := sc.Subscribe("foo",
func(m *stan.Msg) {
m.Ack()
}, stan.SetManualAckMode(), stan.AckWait(aw))

Max In Flight

MaxInflight设置为1可确保按顺序处理每个消息。

1
2
3
sc.Subscribe("foo", func(m *stan.Msg) {...},
stan.SetManualAckMode(),
stan.MaxInflight(25))