NATS Streaming

NATS Streaming 概念

NATS Streaming是一个由NATS驱动的数据流系统,用Go编程语言编写。 NATS Streaming服务器的可执行文件名是nats-streaming-server。 NATS Streaming与核心NATS平台无缝嵌入,扩展和互操作。 NATS Streaming服务器作为Apache-2.0许可下的开源软件提供。 Synadia积极维护和支持NATS Streaming服务器。

特点

除了核心NATS平台的功能外,NATS Streaming还提供以下功能:

  • 增强的消息协议 - NATS Streaming使用谷歌协议缓冲区实现自己的增强型消息格式。这些消息通过二进制数据流在NATS核心平台进行传播,因此不需要改变NATS的基本协议。NATS Streaming信息包含以下字段:
    • 序列 - 一个全局顺序序列号为主题的通道
        - 主题 - 是NATS Streaming 交付对象
        - 答复内容 - 对应"reply-to"对应的对象内容
        - 数据 - 真是数据内容
        - 时间戳 - 接收的时间戳,单位是纳秒
        - 重复发送 - 标志这条数据是否需要服务再次发送
        - CRC32 - 一个循环冗余数据校验选项,在数据存储和数据通讯领域里,为了保证数据的正确性所采用的检错手段,这里使用的是 IEEE CRC32 算法
  • 消息/事件持久性 - NATS Streaming提供了可配置的消息持久化,持久目的地可以为内存或者文件。另外,对应的存储子系统使用了一个公共接口允许我们开发自己自定义实现来持久化对应的消息。
  • 至少一次传送 - NATS Streaming提供了发布者和服务器之间的消息确认(发布操作) 和订阅者和服务器之间的消息确认(确认消息发送)。其中消息被保存在服务器端内存或者辅助存储(或其他外部存储器)用来为需要重新接受消息的订阅者进行重发消息。
  • 发布者发送速率限定 - NATS Streaming提供了一个连接选项叫 MaxPubAcksInFlight,它能有效的限制一个发布者可能随意的在任何时候发送的未被确认的消息。当达到这个配置的最大数量时,异步发送调用接口将会被阻塞,直到未确认消息降到指定数量之下。
  • 每个订户的速率匹配/限制 - NATS Streaming运行指定的订阅中设置一个参数为 MaxInFlight,它用来指定已确认但未消费的最大数据量,当达到这个限制时,NATS Streaming 将暂停发送消息给订阅者,直到未确认的数据量小于设定的量为止。
  • 以主题重发的历史数据 - 新订阅的可以在已经存储起来的订阅的主题频道指定起始位置消息流。通过使用这个选项,消息就可以开始发送传递了:
    • 订阅的主题存储的最早的信息
    • 与当前订阅主题之前的最近存储的数据,这通常被认为是 "最后的值" 或 "初值" 对应的缓存
    • 一个以纳秒为基准的 日期/时间
    • 一个历史的起始位置相对当前服务的 日期/时间,例如:最后30秒
    • 一个特定的消息序列号
  • 持久订阅 - 订阅也可以指定一个“持久化的名称”可以在客户端重启时不受影响。持久订阅会使得对应服务跟踪客户端最后确认消息的序列号和持久名称。当这个客户端重启或者重新订阅的时候,使用相同的客户端ID 和 持久化的名称,对应的服务将会从最早的未被确认的消息处恢复。

安装

NATS为Linux,Mac和Windows提供服务器二进制文件。您可以在您选择的任何平台上从源安装服务器。

用法,配置和管理

NATS Streaming提供了一组丰富的命令和参数来配置服务器的所有方面。有关使用,配置和管理的更多信息,请参阅自述文件

安装NATS Streaming Server

安装并运行NATS Streaming Server

在本教程中,您将安装并运行NATS Streaming服务器(nats-streaming-server)。您可以在任何时候运行NATS Streaming服务器时遵循相同的过程。

安装NATS Streaming服务器

有许多方法可以安装NATS Streaming服务器。
GitHub发布

GitHub版本页面上始终提供最新的官方发行版二进制文件。可以使用以下平台:

  • Linux (x86, x86_64, ARM)
  • Windows (x86, x86_64)
  • macOS

也可以使用以下方法。请注意,这些方法可能无法安装最新发布的版本:
Go

确保设置了Go环境

1
% go get github.com/nats-io/nats-streaming-server

请注意,此方法可能无法安装最新发布的版本。
Docker Hub

Docker Hub上始终提供最新的官方Docker映像

Windows

在Windows上,也可以通过Chocolatey安装NATS Streaming服务器:

1
% choco install nats-streaming-server

macOS

在macOS上,可以通过Homebrew安装NATS Streaming服务器:

1
% brew install nats-streaming-server

启动NATS Streaming服务器

您可以调用NATS Streaming服务器二进制文件,没有选项,也没有配置文件,以启动具有可接受的独立默认值的服务器(无身份验证,无群集)。

1
% nats-streaming-server

当服务器成功启动时,您将看到NATS Streaming服务器在TCP端口4222上侦听客户端连接:

1
2
3
4
5
6
7
8
9
10
11
12
13
[18085] 2016/10/31 13:11:44.059012 [INF] Starting nats-streaming-server[test-cluster] version 0.3.1
[18085] 2016/10/31 13:11:44.059830 [INF] Starting nats-server version 0.9.4
[18085] 2016/10/31 13:11:44.061544 [INF] Listening for client connections on 0.0.0.0:4222
[18085] 2016/10/31 13:11:44.061966 [INF] Server is ready
[18085] 2016/10/31 13:11:44.396819 [INF] STAN: Message store is MEMORY
[18085] 2016/10/31 13:11:44.396832 [INF] STAN: --------- Store Limits ---------
[18085] 2016/10/31 13:11:44.396837 [INF] STAN: Channels: 100 *
[18085] 2016/10/31 13:11:44.396839 [INF] STAN: -------- channels limits -------
[18085] 2016/10/31 13:11:44.396842 [INF] STAN: Subscriptions: 1000 *
[18085] 2016/10/31 13:11:44.396844 [INF] STAN: Messages : 1000000 *
[18085] 2016/10/31 13:11:44.396855 [INF] STAN: Bytes : 976.56 MB *
[18085] 2016/10/31 13:11:44.396858 [INF] STAN: Age : unlimited *
[18085] 2016/10/31 13:11:44.396859 [INF] STAN: --------------------------------

启用NATS监控启动NATS流服务器(可选)

NATS Streaming服务器在端口8222上公开其嵌入式NATS服务器(nats-server)的监视接口。

1
% nats-streaming-server -m 8222

如果在启用监视的情况下运行NATS Streaming服务器,则会看到以下消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[18122] 2016/10/31 13:13:10.048663 [INF] Starting nats-streaming-server[test-cluster] version 0.3.1
[18122] 2016/10/31 13:13:10.048843 [INF] Starting nats-server version 0.9.4
[18122] 2016/10/31 13:13:10.048890 [INF] Starting http monitor on 0.0.0.0:8222
[18122] 2016/10/31 13:13:10.048968 [INF] Listening for client connections on 0.0.0.0:4222
[18122] 2016/10/31 13:13:10.048992 [INF] Server is ready
[18122] 2016/10/31 13:13:10.388282 [INF] STAN: Message store is MEMORY
[18122] 2016/10/31 13:13:10.388301 [INF] STAN: --------- Store Limits ---------
[18122] 2016/10/31 13:13:10.388309 [INF] STAN: Channels: 100 *
[18122] 2016/10/31 13:13:10.388312 [INF] STAN: -------- channels limits -------
[18122] 2016/10/31 13:13:10.388316 [INF] STAN: Subscriptions: 1000 *
[18122] 2016/10/31 13:13:10.388319 [INF] STAN: Messages : 1000000 *
[18122] 2016/10/31 13:13:10.388333 [INF] STAN: Bytes : 976.56 MB *
[18122] 2016/10/31 13:13:10.388338 [INF] STAN: Age : unlimited *
[18122] 2016/10/31 13:13:10.388341 [INF] STAN: --------------------------------

NATS Streaming入门

本教程使用示例Go NATS Streaming客户端演示NATS Streaming。

先决条件

安装

下载并安装NATS Streaming Server

克隆以下存储库:

  • NATS Streaming Server: git clone https://github.com/nats-io/nats-streaming-server.git
  • NATS Streaming Client: git clone https://github.com/nats-io/go-nats-streaming.git

启动NATS Streaming Server

两种选择:

运行您下载的二进制文件,例如:$ ./nats-streaming-server

或者,从源代码运行:

1
2
% cd $GOPATH/src/github.com/nats-io/nats-streaming-server
% go run nats-streaming-server.go

您应该看到以下内容,表明NATS Streaming Server正在运行:

1
2
3
4
5
6
7
% go run nats-streaming-server.go
[89999] 2016/06/25 08:54:35.399071 [INF] Starting nats-streaming-server[test-cluster] version 0.1.0
[89999] 2016/06/25 08:54:35.399315 [INF] Starting nats-server version 0.9.0.beta
[89999] 2016/06/25 08:54:35.399326 [INF] Listening for client connections on localhost:4222
[89999] 2016/06/25 08:54:35.400721 [INF] Server is ready
[89999] 2016/06/25 08:54:35.737589 [INF] STAN: Message store is MEMORY
[89999] 2016/06/25 08:54:35.737610 [INF] STAN: Maximum of 1000000 will be stored

运行发布者客户端

发布多条消息。对于每个出版物,您应该得到一个结果。

1
2
3
4
5
6
7
% cd $GOPATH/src/github.com/nats-io/go-nats-streaming/examples/stan-pub
% go run main.go foo "msg one"
Published [foo] : 'msg one'
% go run main.go foo "msg two"
Published [foo] : 'msg two'
% go run main.go foo "msg three"
Published [foo] : 'msg three'

运行订户客户端

使用--all标志接收所有已发布的消息。

1
2
3
4
5
6
7
8
% cd $GOPATH/src/github.com/nats-io/go-nats-streaming/examples/stan-sub
% go run main.go --all -c test-cluster -id myID foo
Connected to nats://localhost:4222 clusterID: [test-cluster] clientID: [myID]
subscribing with DeliverAllAvailable
Listening on [foo], clientID=[myID], qgroup=[] durable=[]
[#1] Received on [foo]: 'sequence:1 subject:"foo" data:"msg one" timestamp:1465962202884478817 '
[#2] Received on [foo]: 'sequence:2 subject:"foo" data:"msg two" timestamp:1465962208545003897 '
[#3] Received on [foo]: 'sequence:3 subject:"foo" data:"msg three" timestamp:1465962215567601196

探索其他订阅选项

1
2
3
4
5
6
--seq <seqno>       Start at seqno
--all Deliver all available messages
--last Deliver starting with last published message
--since <duration> Deliver messages in last interval (e.g. 1s, 1hr, https://golang.org/pkg/time/#ParseDuration)
--durable <name> Durable subscriber name
--unsubscribe Unsubscribe the durable on exit

NATS流媒体服务器安全

验证用户

要从命令行启用用户身份验证,可以使用与NATS服务器(nats-server)相同的机制。您传入-user 和-pass 命令或--auth参数,NATS流服务器将自动使用这些凭据。或者,您可以将配置文件与单个用户或令牌一起使用。

使用具有多用户授权的配置文件时,必须将-user和-pass参数与NATS流服务器一起使用,与配置文件中的用户匹配,以指定NATS流服务器应对其进行身份验证的用户。嵌入式NATS服务器。

例如,如果您向NATS流媒体服务器传递包含多个用户的文件,则必须以配置文件中定义的“Joe”等用户身份运行流服务器。

使用TLS

虽然NATS Streaming服务器有几个与TLS相关的参数,但保护服务器的连接非常简单。但是,请记住,NATS Streaming服务器嵌入NATS服务器,从而产生客户端 - 服务器关系,其中NATS Streaming服务器是其嵌入式NATS服务器的客户端。

这意味着必须使用两组TLS配置参数:嵌入式NATS服务器的TLS服务器参数,以及NATS流服务器本身的TLS客户端参数。

流服务器使用以下三个参数指定它的TLS客户端证书:

  • -tls_client_key流服务器的客户端密钥
  • -tls_client_cert流服务器的客户端证书
  • -tls_client_cacert流媒体服务器的客户端证书CA.

这些可能与您的NATS流媒体客户端使用的证书相同。

嵌入式NATS服务器使用以下命令指定TLS服务器证书:

  • --tlscert 服务器证书文件
  • --tlskey 服务器证书的私钥
  • --tlscacert 用于验证的客户端证书CA.

服务器参数的使用方法与安全典型NATS服务器的方式相同。

正确使用NATS Streaming Server需要使用客户端和服务器参数。

例如:

1
% nats-streaming-server -tls_client_cert client-cert.pem -tls_client_key client-key.pem -tls_client_cacert ca.pem -tlscert server-cert.pem -tlskey server-key.pem -tlscacert ca.pem

进一步的TLS相关功能可以在Securing NATS> TLS中找到。请注意,如果需要指定密码套件,则可以通过-config命令行参数传递嵌入式NATS服务器的配置文件。

NATS Streaming Protocol

The NATS streaming protocol sits atop the core NATS protocol and uses Google’s Protocol Buffers. Protocol buffer messages are marshaled into bytes and published as NATS messages on specific subjects described below. In communicating with the NATS Streaming Server, the NATS request/reply pattern is used for all protocol messages that have a corresponding reply.

NATS streaming protocol conventions

Subject names: Subject names, including reply subject (INBOX) names, are case-sensitive and must be non-empty alphanumeric strings with no embedded whitespace, and optionally token-delimited using the dot character (.), e.g.:

FOO, BAR, foo.bar, foo.BAR, FOO.BAR and FOO.BAR.BAZ are all valid subject names

FOO. BAR, foo. .bar andfoo..bar are *not- valid subject names

Wildcards: NATS streaming does *not- support wildcards in subject subscriptions

Protocol definition: The fields of NATS streaming protocol messages are defined in the go-nats-streaming protocol file.

NATS streaming protocol messages

The following table briefly describes the NATS streaming protocol messages.

Click the name to see more detailed information, including usage:

Message Name Sent By Description
ConnectRequest Client Request to connect to the NATS Streaming Server
ConnectResponse Server Result of a connection request
SubscriptionRequest Client Request sent to subscribe and retrieve data
SubscriptionResponse Server Result of a subscription request
UnsubscribeRequest Client Unsubscribe from a subject
PubMsg Client Publish a message to a subject, with optional reply subject
PubAck Server An acknowledgement that a published message has been processed on the server
MsgProto Server A message from the NATS Streaming Server to a subscribing client
Ack Client Acknowledges that a message has been received
CloseRequest Client Request sent to close the connection to the NATS Streaming Server
CloseResp Server Result of the close request

The following sections explain each protocol message.

ConnectRequest

Description

A connection request is sent when a streaming client connects to the NATS Streaming Server. The connection request contains a unique identifier representing the client, and an inbox subject the client will listen on for incoming heartbeats. The identifier *must- be unique; a connection attempt with an identifier currently in use will fail. The inbox subject is the subject where the client receives incoming heartbeats, and responds by publishing an empty NATS message to the reply subject, indicating it is alive. The NATS Streaming Server will return a ConnectResponse message to the reply subject specified in the NATS request message.

This request is published to a subject comprised of the <discover-prefix>.cluster-id, for example, if a NATS Streaming Server was started with a cluster-id of mycluster, and the default prefix was used, the client publishes to _STAN.discover.mycluster

Message Structure

  • clientID: A unique identifier for a client
  • heartbeatInbox: An inbox to which the NATS Streaming Server will send heartbeats for the client to process

ConnectResponse

Description

After a ConnectRequest is published, the NATS Streaming Server responds with this message on the reply subject of the underlying NATS request. The NATS Streaming Server requires the client to make requests and publish messages on certain subjects (described above), and when a connection is successful, the client saves the information returned to be used in sending other NATS streaming protocol messages. In the event the connection was not successful, an error is returned in the error field.

Message Structure

  • pubPrefix: Prefix to use when publishing
  • subRequests: Subject used for subscription requests
  • unsubRequests: Subject used for unsubscribe requests
  • closeRequests: Subject for closing a connection
  • error: An error string, which will be empty/omitted upon success
  • publicKey: Reserved for future use

SubscriptionRequest

Description

A SubscriptionRequest is published on the subject returned in the subRequests field of a ConnectResponse, and creates a subscription to a subject on the NATS Streaming Server. This will return a SubscriptionResponse message to the reply subject specified in the NATS protocol request message.

Message Structure

  • clientID: Client ID originally provided in the ConnectRequest
  • subject: Formal subject to subscribe to, e.g. foo.bar
  • qGroup: Optional queue group
  • inbox: Inbox subject to deliver messages on
  • maxInFlight: Maximum inflight messages without an acknowledgement allowed
  • ackWaitInSecs: Timeout for receiving an acknowledgement from the client
  • durableName: Optional durable name which survives client restarts
  • startPosition: An enumerated type specifying the point in history to start replaying data
  • startSequence: Optional start sequence number
  • startTimeDelta: Optional start time

StartPosition enumeration

  • NewOnly: Send only new messages
  • LastReceived: Send only the last received message
  • TimeDeltaStart: Send messages from duration specified in the startTimeDelta field.
  • SequenceStart: Send messages starting from the sequence in the startSequence field.
  • First: Send all available messages

SubscriptionResponse

Description

The SubscriptionResponse message is the response from the SubscriptionRequest. After a client has processed an incoming MsgProto message, it must send an acknowledgement to the ackInbox subject provided here.

Message Structure

  • ackInbox: subject the client sends message acknowledgements to the NATS Streaming Server
  • error: error string, empty/omitted if no error

UnsubscribeRequest

Description

The UnsubscribeRequest unsubcribes the connection from the specified subject. The inbox specified is the inbox returned from the NATS Streaming Server in the SubscriptionResponse.

Message Structure

  • clientID: Client ID originally provided in the ConnectRequest
  • subject: Subject for the subscription
  • inbox: Inbox subject to identify subscription
  • durableName: Optional durable name which survives client restarts

PubMsg

Description

The PubMsg protocol message is published from a client to the NATS Streaming Server. The GUID must be unique, and is returned in the PubAck message to correlate the success or failure of storing this particular message.

Message Structure

  • clientID: Client ID originally provided in the ConnectRequest
  • guid: a guid generated for this particular message
  • subject: subject
  • reply: optional reply subject
  • data: payload
  • sha256: optional sha256 of payload data

PubAck

Description

The PubAck message is an acknowledgement from the NATS Streaming Server that a message has been processed. The message arrives on the subject specified on the reply subject of the NATS message the PubMsg was published on. The GUID is the same GUID used in the PubMsg being acknowledged. If an error string is present, the message was not persisted by the NATS Streaming Server and no guarantees regarding persistence are honored. PubAck messages may be handled asynchronously from their corresponding PubMsg in the client.

Message Structure

  • guid: GUID of the message being acknowledged by the NATS Streaming Server
  • error: An error string, empty/omitted if no error

MsgProto

Description

The MsgProto message is received by client from the NATS Streaming Server, containing the payload of messages sent by a publisher. A MsgProto message that is not acknowledged with an Ack message within the duration specified by the ackWaitInSecs field of the subscription request will be redelivered.

Message Structure

  • sequence: Globally ordered sequence number for the subject’s channel
  • subject: Subject
  • reply: Optional reply
  • data: Payload
  • timestamp: Time the message was stored in the server.
  • redelivered: Flag specifying if the message is being redelivered
  • CRC32: Optional IEEE CRC32

Ack

Description

An Ack message is an acknowledgement from the client that a MsgProto message has been considered received. It is published to the ackInbox field of the SubscriptionResponse.

Message Structure

  • subject: Subject of the message being acknowledged
  • sequence: Sequence of the message being acknowledged

CloseRequest

Description

A CloseRequest message is published on the closeRequests subject from the ConnectResponse, and notifies the NATS Streaming Server that the client connection is closing, allowing the server to free up resources. This message should *always- be sent when a client is finished using a connection.

Message Structure

CloseResponse

Description

The CloseResponse is sent by the NATS Streaming Server on the reply subject of the CloseRequest NATS message. This response contains any error that may have occurred with the corresponding close call.

Message Structure

  • error: error string, empty/omitted if no error