NATS和NATS Streaming

NATS

源码安装

1
2
3
4
5
# Go client
go get github.com/nats-io/nats.go/

# Server
go get github.com/nats-io/nats-server

二进制:https://nats.io/download/nats-io/gnatsd/

When using or transitioning to Go modules support:

1
2
3
4
5
6
7
8
9
# Go client latest or explicit version
go get github.com/nats-io/nats.go/@latest
go get github.com/nats-io/nats.go/@v1.9.1

# For latest NATS Server, add /v2 at the end
go get github.com/nats-io/nats-server/v2

# NATS Server v1 is installed otherwise
# go get github.com/nats-io/nats-server

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import nats "github.com/nats-io/nats.go"

// Connect to a server
nc, _ := nats.Connect(nats.DefaultURL)

// Simple Publisher
nc.Publish("foo", []byte("Hello World"))

// Simple Async Subscriber
nc.Subscribe("foo", func(m *nats.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
})

// Responding to a request message
nc.Subscribe("request", func(m *nats.Msg) {
m.Respond([]byte("answer is 42"))
})

// Simple Sync Subscriber
sub, err := nc.SubscribeSync("foo")
m, err := sub.NextMsg(timeout)

// Channel Subscriber
ch := make(chan *nats.Msg, 64)
sub, err := nc.ChanSubscribe("foo", ch)
msg := <- ch

// Unsubscribe
sub.Unsubscribe()

// Drain
sub.Drain()

// Requests
msg, err := nc.Request("help", []byte("help me"), 10*time.Millisecond)

// Replies
nc.Subscribe("help", func(m *nats.Msg) {
nc.Publish(m.Reply, []byte("I can help!"))
})

// Drain connection (Preferred for responders)
// Close() not needed if this is called.
nc.Drain()

// Close connection
nc.Close()

编码连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
nc, _ := nats.Connect(nats.DefaultURL)
c, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer c.Close()

// Simple Publisher
c.Publish("foo", "Hello World")

// Simple Async Subscriber
c.Subscribe("foo", func(s string) {
fmt.Printf("Received a message: %s\n", s)
})

// EncodedConn can Publish any raw Go type using the registered Encoder
type person struct {
Name string
Address string
Age int
}

// Go type Subscriber
c.Subscribe("hello", func(p *person) {
fmt.Printf("Received a person: %+v\n", p)
})

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street, San Francisco, CA"}

// Go type Publisher
c.Publish("hello", me)

// Unsubscribe
sub, err := c.Subscribe("foo", nil)
// ...
sub.Unsubscribe()

// Requests
var response string
err = c.Request("help", "help me", &response, 10*time.Millisecond)
if err != nil {
fmt.Printf("Request failed: %v\n", err)
}

// Replying
c.Subscribe("help", func(subj, reply string, msg string) {
c.Publish(reply, "I can help!")
})

// Close connection
c.Close();

新身份验证 (Nkeys 和用户凭据)

这需要服务器的版本> =2.0.0

NATS 服务器有一个新的安全和身份验证机制来使用用户凭据和 Nkeys 认证。 最简单的形式是使用方法 userciles(credsFilepath)。

1
nc, err := nats.Connect(url, nats.UserCredentials("user.creds"))

The helper methods creates two callback handlers to present the user JWT and sign the nonce challenge from the server. The core client library never has direct access to your private key and simply performs the callback for signing the server challenge. The helper will load and wipe and erase memory it uses for each connect or reconnect.

The helper also can take two entries, one for the JWT and one for the NKey seed file.

1
nc, err := nats.Connect(url, nats.UserCredentials("user.jwt", "user.nk"))

You can also set the callback handlers directly and manage challenge signing directly.

1
nc, err := nats.Connect(url, nats.UserJWT(jwtCB, sigCB))

Bare Nkeys are also supported. The nkey seed should be in a read only file, e.g. seed.txt

1
2
3
> cat seed.txt
# This is my seed nkey!
SUAGMJH5XLGZKQQWAWKRZJIGMOU4HPFUYLXJMXOO5NLFEO2OOQJ5LPRDPM

This is a helper function which will load and decode and do the proper signing for the server nonce. It will clear memory in between invocations. You can choose to use the low level option and provide the public key and a signature callback on your own.

1
2
3
4
5
opt, err := nats.NkeyOptionFromSeed("seed.txt")
nc, err := nats.Connect(serverUrl, opt)

// Direct
nc, err := nats.Connect(serverUrl, nats.Nkey(pubNkey, sigCB))

TLS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// tls as a scheme will enable secure connections by default. This will also verify the server name.
nc, err := nats.Connect("tls://nats.demo.io:4443")

// If you are using a self-signed certificate, you need to have a tls.Config with RootCAs setup.
// We provide a helper method to make this case easier.
nc, err = nats.Connect("tls://localhost:4443", nats.RootCAs("./configs/certs/ca.pem"))

// If the server requires client certificate, there is an helper function for that too:
cert := nats.ClientCert("./configs/certs/client-cert.pem", "./configs/certs/client-key.pem")
nc, err = nats.Connect("tls://localhost:4443", cert)

// You can also supply a complete tls.Config

certFile := "./configs/certs/client-cert.pem"
keyFile := "./configs/certs/client-key.pem"
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
t.Fatalf("error parsing X509 certificate/key pair: %v", err)
}

config := &tls.Config{
ServerName: opts.Host,
Certificates: []tls.Certificate{cert},
RootCAs: pool,
MinVersion: tls.VersionTLS12,
}

nc, err = nats.Connect("nats://localhost:4443", nats.Secure(config))
if err != nil {
t.Fatalf("Got an error on Connect with Secure Options: %+v\n", err)
}

使用 Go 通道

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
nc, _ := nats.Connect(nats.DefaultURL)
ec, _ := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
defer ec.Close()

type person struct {
Name string
Address string
Age int
}

recvCh := make(chan *person)
ec.BindRecvChan("hello", recvCh)

sendCh := make(chan *person)
ec.BindSendChan("hello", sendCh)

me := &person{Name: "derek", Age: 22, Address: "140 New Montgomery Street"}

// Send via Go channels
sendCh <- me

// Receive via Go channels
who := <- recvCh

通配符订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// "*" matches any token, at any level of the subject.
nc.Subscribe("foo.*.baz", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

nc.Subscribe("foo.bar.*", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// ">" matches any length of the tail of a subject, and can only be the last token
// E.g. 'foo.>' will match 'foo.bar', 'foo.bar.baz', 'foo.foo.bar.bax.22'
nc.Subscribe("foo.>", func(m *Msg) {
fmt.Printf("Msg received on [%s] : %s\n", m.Subject, string(m.Data));
})

// Matches all of the above
nc.Publish("foo.bar.baz", []byte("Hello World"))

队列组

1
2
3
4
5
6
7
8
// All subscriptions with the same queue name will form a queue group.
// Each message will be delivered to only one subscriber per queue group,
// using queuing semantics. You can have as many queue groups as you wish.
// Normal subscribers will continue to work as expected.

nc.QueueSubscribe("foo", "job_workers", func(_ *Msg) {
received += 1;
})

高级用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Flush connection to server, returns when all messages have been processed.
nc.Flush()
fmt.Println("All clear!")

// FlushTimeout specifies a timeout value as well.
err := nc.FlushTimeout(1*time.Second)
if err != nil {
fmt.Println("All clear!")
} else {
fmt.Println("Flushed timed out!")
}

// Auto-unsubscribe after MAX_WANTED messages received
const MAX_WANTED = 10
sub, err := nc.Subscribe("foo")
sub.AutoUnsubscribe(MAX_WANTED)

// Multiple connections
nc1 := nats.Connect("nats://host1:4222")
nc2 := nats.Connect("nats://host2:4222")

nc1.Subscribe("foo", func(m *Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
})

nc2.Publish("foo", []byte("Hello World!"));

集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
var servers = "nats://localhost:1222, nats://localhost:1223, nats://localhost:1224"

nc, err := nats.Connect(servers)

// Optionally set ReconnectWait and MaxReconnect attempts.
// This example means 10 seconds total per backend.
nc, err = nats.Connect(servers, nats.MaxReconnects(5), nats.ReconnectWait(2 * time.Second))

// Optionally disable randomization of the server pool
nc, err = nats.Connect(servers, nats.DontRandomize())

// Setup callbacks to be notified on disconnects, reconnects and connection closed.
nc, err = nats.Connect(servers,
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
fmt.Printf("Got disconnected! Reason: %q\n", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
fmt.Printf("Got reconnected to %v!\n", nc.ConnectedUrl())
}),
nats.ClosedHandler(func(nc *nats.Conn) {
fmt.Printf("Connection closed. Reason: %q\n", nc.LastError())
})
)

// When connecting to a mesh of servers with auto-discovery capabilities,
// you may need to provide a username/password or token in order to connect
// to any server in that mesh when authentication is required.
// Instead of providing the credentials in the initial URL, you will use
// new option setters:
nc, err = nats.Connect("nats://localhost:4222", nats.UserInfo("foo", "bar"))

// For token based authentication:
nc, err = nats.Connect("nats://localhost:4222", nats.Token("S3cretT0ken"))

// You can even pass the two at the same time in case one of the server
// in the mesh requires token instead of user name and password.
nc, err = nats.Connect("nats://localhost:4222",
nats.UserInfo("foo", "bar"),
nats.Token("S3cretT0ken"))

// Note that if credentials are specified in the initial URLs, they take
// precedence on the credentials specified through the options.
// For instance, in the connect call below, the client library will use
// the user "my" and password "pwd" to connect to localhost:4222, however,
// it will use username "foo" and password "bar" when (re)connecting to
// a different server URL that it got as part of the auto-discovery.
nc, err = nats.Connect("nats://my:pwd@localhost:4222", nats.UserInfo("foo", "bar"))

上下文支持 (+ Go1.7)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

nc, err := nats.Connect(nats.DefaultURL)

// Request with context
msg, err := nc.RequestWithContext(ctx, "foo", []byte("bar"))

// Synchronous subscriber with context
sub, err := nc.SubscribeSync("foo")
msg, err := sub.NextMsgWithContext(ctx)

// Encoded Request with context
c, err := nats.NewEncodedConn(nc, nats.JSON_ENCODER)
type request struct {
Message string `json:"message"`
}
type response struct {
Code int `json:"code"`
}
req := &request{Message: "Hello"}
resp := &response{}
err := c.RequestWithContext(ctx, "foo", req, resp)

NATS Streaming

源码安装

1
2
3
4
5
# Go client
go get github.com/nats-io/stan.go/

# Server
go get github.com/nats-io/nats-streaming-server

二进制:https://nats.io/download/nats-io/nats-streaming-server/

When using or transitioning to Go modules support:

1
2
3
# Go client latest or explicit version
go get github.com/nats-io/stan.go/@latest
go get github.com/nats-io/stan.go/@v0.5.0

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import stan "github.com/nats-io/stan.go"

sc, _ := stan.Connect(clusterID, clientID)

// Simple Synchronous Publisher
sc.Publish("foo", []byte("Hello World")) // does not return until an ack has been received from NATS Streaming

// Simple Async Subscriber
sub, _ := sc.Subscribe("foo", func(m *stan.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
})

// Unsubscribe
sub.Unsubscribe()

// Close connection
sc.Close()

订阅启动 (i.e.重播)选项

NATS Streaming subscriptions are similar to NATS subscriptions, but clients may start their subscription at an earlier point in the message stream, allowing them to receive messages that were published before this client registered interest.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Subscribe starting with most recently published value
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
}, stan.StartWithLastReceived())

// Receive all stored values in order
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
}, stan.DeliverAllAvailable())

// Receive messages starting at a specific sequence number
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
}, stan.StartAtSequence(22))

// Subscribe starting at a specific time
var startTime time.Time
...
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
}, stan.StartAtTime(startTime))

// Subscribe starting a specific amount of time in the past (e.g. 30 seconds ago)
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
}, stan.StartAtTimeDelta(time.ParseDuration("30s")))

持久订阅

Replay of messages offers great flexibility for clients wishing to begin processing at some earlier point in the data stream. However, some clients just need to pick up where they left off from an earlier session, without having to manually track their position in the stream of messages. Durable subscriptions allow clients to assign a durable name to a subscription when it is created. Doing this causes the NATS Streaming server to track the last acknowledged message for that clientID + durable name, so that only messages since the last acknowledged message will be delivered to the client.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
sc, _ := stan.Connect("test-cluster", "client-123")

// Subscribe with durable name
sc.Subscribe("foo", func(m *stan.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
}, stan.DurableName("my-durable"))
...
// client receives message sequence 1-40
...
// client disconnects for an hour
...
// client reconnects with same clientID "client-123"
sc, _ := stan.Connect("test-cluster", "client-123")

// client re-subscribes to "foo" with same durable name "my-durable"
sc.Subscribe("foo", func(m *stan.Msg) {
fmt.Printf("Received a message: %s\n", string(m.Data))
}, stan.DurableName("my-durable"))
...
// client receives messages 41-current

队列组

所有具有相同队列名称的订阅(不管连接是什么) 它们起源于)将形成一个队列组。 每条消息将被传递给每个队列组只有一个订阅者, 使用队列语义。您可以希望有多少队列组。

正常的订户将继续按预期工作。

创建队列组

当第一个队列订阅者是自动创建队列组 创建。如果组已经存在,则将成员添加到组。

1
2
3
4
5
6
7
8
9
10
11
12
sc, _ := stan.Connect("test-cluster", "clientid")

// Create a queue subscriber on "foo" for group "bar"
qsub1, _ := sc.QueueSubscribe("foo", "bar", qcb)

// Add a second member
qsub2, _ := sc.QueueSubscribe("foo", "bar", qcb)

// Notice that you can have a regular subscriber on that subject
sub, _ := sc.Subscribe("foo", cb)

// A message on "foo" will be received by sub and qsub1 or qsub2.
起始位置

注意,一旦形成队列组,成员的起始位置就会被忽略 当加入小组时。它将开始接收消息从最后 在小组中的位置。

假设频道foo存在与存在500存储的消息组bar已经创建有两个成员和最后一个 消息序列发送100.增加了一个新成员。注意它的起始位置:

1
sc.QueueSubscribe("foo", "bar", qcb, stan.StartAtSequence(200))

这不会产生错误,但是开始位置将被忽略。假定的 这个成员将是接收下一个消息的人它将接收消息 序列101.

离开队列组

离开组的方法有两种:关闭订阅者的连接或调用Unsubscribe

1
2
// Have qsub leave the queue group
qsub.Unsubscribe()

如果离开成员有未确认的消息,则重新分配这些消息 致其余的成员。

关闭队列组

这方面没有特别的 API。一旦所有成员都离开了Unsubscribe, 或者他们的连接是关闭的),组从服务器中删除。

下一个要求QueueSubscribe使用相同的组名将创建一个全新的组 也就是说,开始位置将生效,交付将从那里开始。

持久队列组

如上所述,对于非持久队列订阅者,当最后一个成员离开组, 那群人被撤走了。持久队列组允许您让所有成员离开但仍然 保持状态。当一个成员重新加入时,它从该组的最后一个位置开始。

创建持久队列组

持久队列组的创建方式与标准队列组的创建方式类似, 除了DurableName必须使用选项来指定持久性。

1
sc.QueueSubscribe("foo", "bar", qcb, stan.DurableName("dur"))

A group called dur:bar (the concatenation of durable name and group name) is created in the server. This means two things:

  • The character : is not allowed for a queue subscriber's durable name.
  • 具有相同名称的持久和非持久队列组可以共存。
1
2
3
4
5
6
7
// Non durable queue subscriber on group "bar"
qsub, _ := sc.QueueSubscribe("foo", "bar", qcb)

// Durable queue subscriber on group "bar"
durQsub, _ := sc.QueueSubscribe("foo", "bar", qcb, stan.DurableName("mydurablegroup"))

// The same message produced on "foo" would be received by both queue subscribers.
起始位置

非持久队列订阅者的规则适用于持久订阅者。

离开队列组

至于非持久队列订阅者,如果成员的连接已关闭,或如果Unsubscribe它的名称,成员离开组。任何未确认的消息 被转移到剩下的成员。查看关闭小组与非持久队列订户重要的区别 。

关闭队列组

最后的成员调用Unsubscribe将关闭 (那就是摧毁) 组所以如果你想保持持久的集团你不应该 呼叫Unsubscribe.

因此,与非持久队列订阅者不同,维护队列组是可能的 服务器中没有成员。当一个新成员重新加入持久队列组, 它将从团队离开的地方恢复实际上首先接收所有未被承认的信息 可能在最后一个成员离开时留下的消息。

通配符订阅

NATS 流媒体订阅支持通配符。

高级用法

连接配置,如 TLS 等

如果您想要更高级的配置底层 NATS 连接您将需要 创建一个 NATS 连接调用stan.NatsConn()选项并将该连接传递到stan.Connect()

1
2
3
4
5
6
7
8
9
10
11
12
// Create a NATS connection that you can configure the way you want
nc, err = nats.Connect("tls://localhost:4443", nats.ClientCert("mycerts/client-cert.pem", "mycerts/client-key.pem"))
if (err != nil)
...

// Then pass it to the stan.Connect() call.
sc, err = stan.Connect("test-cluster", "me", stan.NatsConn(nc))
if (err != nil)
...

// Note that you will be responsible for closing the NATS Connection after the streaming
// connection has been closed.

连接状态

NATS 流服务器和客户机之间没有直接连接,这对了解客户机是否仍然有效提出了挑战。 当客户端断开时流服务器不被通知因此调用的重要性Close().服务器发送心跳 到客户端的私人收件箱如果它错过了一定数量的响应它将考虑客户端的连接丢失并删除它 从它的状态。

前版本0.4.0,客户端库没有将 PINS 发送到流服务器,以检测连接失败。这是有问题的 特别是如果应用程序从未发送数据(例如只有订阅)。图片的情况下一个客户连接到一个 NATS 服务器有一条通往 NATS 流服务器的路径(连接到独立的 NATS 服务器或它嵌入的服务器)。如果 流服务器和客户端的 NATS 服务器之间的连接中断,客户端的 NATS 连接仍然可以,但是没有 与流服务器通信是可能的。这就是为什么依赖Conn.NatsConn()检查状态是没有帮助的。

开始版本0.4.0这个库和服务器的0.10.0,客户端库现在将定期发送 PING(默认为5秒) 并将关闭流连接后,一定数量的 PINS 已发送没有任何响应(默认是3)。当那时候 碰巧,回调(如果注册了)将被调用,以通知用户连接永久丢失,以及原因 因为失败了。

以下是您将如何指定自己的 PING 值和回调:

1
2
3
4
5
6
7
// Send PINGs every 10 seconds, and fail after 5 PINGs without any response.
sc, err := stan.Connect(clusterName, clientName,
stan.Pings(10, 5),
stan.SetConnectionLostHandler(func(_ stan.Conn, reason error) {
log.Fatalf("Connection lost, reason: %v", reason)
})
)

注意,要通知的唯一方法是设置回调。如果没有设置回调,则仍然发送 PING 和连接 如果需要,将关闭,但应用程序将不知道它是否只有订阅。

当连接丢失时,应用程序将不得不重新创建它和所有订阅(如果有的话)。

当没有 NATS 连接提供给Connect()调用,库创建它自己的 NATS 连接,现在将 将重新连接尝试设置为 “无限”,以前不是这样的。因此图书馆应该有可能 总是重新连接但这并不意味着流连接不会被关闭即使您设置的非常高 PINS 的阈值最大输出值。请记住,当客户端断开时,服务器正在向其发送心跳信号 客户机也是如此,当没有得到任何响应时,它将把客户机从它的状态中删除。当通信恢复时, 发送到服务器的 PINS 将允许检测此条件并向客户机报告连接现在已关闭。

此外,当客户机与服务器 “断开连接” 时,另一个连接到流服务器的应用程序可能会 连接并使用相同的客户端 ID。当检测重复的客户端 ID 时服务器将尝试与第一个客户端联系 要知道它是否应该拒绝第二个客户端的连接请求。由于通信之间的服务器和 第一个客户机被破坏,服务器将得不到响应,因此将用第二个客户机替换第一个客户机。

在客户之前0.4.0与服务器0.10.0,如果要恢复第一个客户机和服务器之间的通信, 而应用程序将发送消息服务器将接受这些消息因为发布的消息客户端 ID 是 有效,尽管客户端是无效的。与客户在0.4.0+与服务器0.10.0+,每个人都会发送更多的信息 消息,允许服务器拒绝来自已被另一个客户机替换的客户机的消息。

异步发布

基本发布 API (Publish(subject, payload))是同步的;在 NATS 流服务器确认收到消息之前,它不会将控制返回给调用者。为了做到这一点nuid是为创建时的消息生成的,客户端库在服务器返回控制权给调用者之前等待来自服务器的发布确认,这可能是由于服务器问题或授权错误导致操作不成功的错误。

高级用户可能希望手动处理这些发布确认,通过在发布操作期间不等待单个确认来实现更高的发布吞吐量。为此提供了异步发布 API:

1
2
3
4
5
6
7
8
9
10
11
12
ackHandler := func(ackedNuid string, err error) {
if err != nil {
log.Printf("Warning: error publishing msg id %s: %v\n", ackedNuid, err.Error())
} else {
log.Printf("Received ack for msg id %s\n", ackedNuid)
}
}

nuid, err := sc.PublishAsync("foo", []byte("Hello World"), ackHandler) // returns immediately
if err != nil {
log.Printf("Error publishing msg %s: %v\n", nuid, err.Error())
}

消息确认和回复

NATS 流提供了至少一次交付语义,这意味着一旦消息被传递给了合格的订阅者,如果在配置的超时间隔内没有接收到确认,NATS 流将尝试重新发送消息。 此超时间隔由订阅选项指定AckWait,默认为30秒。

默认情况下,在调用订阅服务器的消息处理程序后,NATS 流客户端库自动确认消息。但是,在某些情况下,订阅客户端希望加速或延迟对消息的确认。 要做到这一点,客户端必须在订阅上设置手动确认模式,并调用Ack()关于Msg. 例:

1
2
3
4
5
6
7
// Subscribe with manual ack mode, and set AckWait to 60 seconds
aw, _ := time.ParseDuration("60s")
sub, err := sc.Subscribe("foo", func(m *stan.Msg) {
m.Ack() // ack message before performing I/O intensive operation
///...
fmt.Printf("Received a message: %s\n", string(m.Data))
}, stan.SetManualAckMode(), stan.AckWait(aw))

速率限制/匹配

发布-订阅消息传递的一个经典问题是消息生产者的速率与消息使用者的速率匹配。 消息生成程序通常可以超过使用它们的消息的订阅者的速度。 这种不匹配通常被称为 “快速生产者/慢消费者” 问题,并可能导致底层消息传递系统的资源利用率急剧上升,因为它试图缓冲消息,直到慢消费者能够赶上。

发布者速率限制

NATS 流提供了一个名为连接选项MaxPubAcksInflight这有效地限制了发布者在任何给定时间内可能有的未确认消息的数量。当达到这个最大值时进一步PublishAsync()调用将阻塞,直到未确认消息的数量低于指定的限制。 例:

1
2
3
4
5
6
7
8
9
10
11
12
sc, _ := stan.Connect(clusterID, clientID, MaxPubAcksInflight(25))

ah := func(nuid string, err error) {
// process the ack
...
}

for i := 1; i < 1000; i++ {
// If the server is unable to keep up with the publisher, the number of outstanding acks will eventually
// reach the max and this call will block
guid, _ := sc.PublishAsync("foo", []byte("Hello World"), ah)
}

用户速率限制

使用称为订阅选项的订阅选项也可以在订阅方按每次订阅的方式实现速率限制MaxInflight. 此选项指定 NATS 流允许给定订阅的未完成确认(已交付但未确认的消息)的最大数量。 当达到此限制时,NATS 流将暂停向此订阅交付消息,直到未确认消息的数量低于指定的限制。 例:

1
2
3
4
5
6
7
8
// Subscribe with manual ack mode and a max in-flight limit of 25
sc.Subscribe("foo", func(m *stan.Msg) {
fmt.Printf("Received message #: %s\n", string(m.Data))
...
// Does not ack, or takes a very long time to ack
...
// Message delivery will suspend when the number of unacknowledged messages reaches 25
}, stan.SetManualAckMode(), stan.MaxInflight(25))