NSQ客户端
NSQ客户端
TCP 协议规范
NSQ 协议足够简单,用任何语言编译客户端都很容易。我们提供官方的 Go 和 Python 客户端库。
nsqd
进程通过监听配置的 TCP 端口来接受客户端连接。
连接后,客户端必须发送一个 4 字节的 "magic" 标识码,表示通讯协议的版本。
V2
(4 个字节的 ASCII[space][space][V][2]
) 消费用到的推送流协议(和发布用到的请求/响应协议)
认证后,客户端可以发送 IDENTIFY
命令来停供常用的元数据(比如,更多的描述标识码)和协商特性。为了消费消息,客户端必须 SUB
到一个通道(channel)。
订阅的时候,客户端的 RDY
状态为 0。意味着没有消息会被发送到客户端。当客户端已经准备好接受消息时,需要把 RDY
设置为 #。比如设置为 100,不需要任何附加命令,将会有 100 条消息推送到客户端(每次服务端都会相应的减少 RDY
的值)。
V2 版本的协议让客户端拥有心跳功能。每隔 30 秒(默认设置),nsqd
将会发送一个 _heartbeat_
响应,并期待返回。如果客户端空闲,发送 NOP
命令。如果 2 个 _heartbeat_
响应没有被应答, nsqd
将会超时,并且强制关闭客户端连接。IDENTIFY
命令可以用来改变/禁用这个行为。
注意
- 除非 stated,所有的传输的二级制大小/整数都是网络字节顺序。(列如. big endian)
- 有效的*话题(topic)和通道(channel)*名必须是字符
[.a-zA-Z0-9_-]
和数字1 < length <= 64
(在nsqd
0.2.28
版本前最长32
位)
命令
IDENTIFY
更新服务器上的客户端元数据和协商功能。
1 | IDENTIFY\n |
注意: 这个命令包含 JSON 的相关内容,包括:
short_id (
nsqd
v0.2.28+
版本之后已经抛弃,使用 client_id 替换)这个标示符是描述的简易格式(比如,主机名)long_id (
v0.2.28+
版之后已经抛弃,使用hostname
替换)这个标示符是描述的长格式。(比如. 主机名全名)client_id 这个标示符用来消除客户端的歧义 (比如. 一些指定给消费者)
hostname 部署了客户端的主机名
feature_negotiation (
nsqd
v0.2.19+
) bool, 用来标示客户端支持的协商特性。如果服务器接受,将会以 JSON 的形式发送支持的特性和元数据。heartbeat_interval (
nsqd
v0.2.19+
) 心跳的毫秒数.有效范围:
1000 <= heartbeat_interval <= configured_max
(-1
禁用心跳)--max-heartbeat-interval
(nsqd 标志位) 控制最大值默认值
--client-timeout / 2
output_buffer_size (
nsqd
v0.2.21+
) 当 nsqd 写到这个客户端时将会用到的缓存的大小(字节数)。有效范围:
64 <= output_buffer_size <= configured_max
(-1
禁用输出缓存)--max-output-buffer-size
(nsqd 标志位) 控制最大值默认值
16kb
output_buffer_timeout (
nsqd
v0.2.21+
)超时后,nsqd 缓冲的数据都会刷新到此客户端。有效范围:
1ms <= output_buffer_timeout <= configured_max
(-1
禁用 timeouts)--max-output-buffer-timeout
(nsqd 标志位) 控制最大值默认值
250ms
警告: 使用极小值
output_buffer_timeout
(< 25ms
) 配置客户端,将会显著提高nsqd
CPU 的使用率(通常客户端连接时> 50
)。这依赖于 Go 的 timers 的实现,它通过 Go 的优先队列运行时间维护。
tls_v1 (
nsqd
v0.2.22+
) 允许 TLS 来连接--tls-cert
and--tls-key
(nsqd 标志位s) 允许 TLS 并配置服务器证书如果服务器支持 TLS,将会回复
"tls_v1": true
。客户端读取
IDENTIFY
响应后,必须立即开始 TLS 握手。完成 TLS 握手后服务器将会响应
OK
.snappy (
nsqd
v0.2.23+
) 允许 snappy 压缩这次连接--snappy
(nsqd 标志位) 允许服务端支持客户端不允许同时
snappy
和deflate
。deflate (
nsqd
v0.2.23+
) 允许 deflate 压缩这次连接--deflate
(nsqd 标志位) 允许服务端支持客户端不允许同时
snappy
和deflate
。deflate_level (
nsqd
v0.2.23+
) 配置 deflate 压缩这次连接的级别--max-deflate-level
(nsqd 标志位) 配置允许的最大值有效范围:
1 <= deflate_level <= configured_max
值越高压缩率越好,但是 CPU 负载也高。
sample_rate (
nsqd
v0.2.25+
) 投递此次连接的消息接收率。有效范围:
0 <= sample_rate <= 99
(0
禁用)默认值
0
user_agent (
nsqd
v0.2.25+
) 这个客户端的代理字符串默认值:
<client_library_name>/<version>
msg_timeout (
nsqd
v0.2.28+
) 配置服务端发送消息给客户端的超时时间
成功后响应:
1 | OK |
注意: 如果客户端发送了 feature_negotiation
(并且服务端支持),响应体将会是 JSON。
错误后的响应内容:
1 | E_INVALID |
SUB
订阅话题(topic) /通道(channel)
1 | SUB <topic_name> <channel_name>\n |
成功后响应:
1 | OK |
错误后响应:
1 | E_INVALID |
PUB
发布一个消息到 话题(topic):
1 | PUB <topic_name>\n |
成功后响应:
1 | OK |
错误后响应:
1 | E_INVALID |
MPUB
发布多个消息到 话题(topic) (自动):
注意: nsqd
v0.2.16+
有效
1 | MPUB <topic_name>\n |
成功后响应:
1 | OK |
错误后响应:
1 | E_INVALID |
RDY
更新 RDY
状态 (表示你已经准备好接收N
消息)
注意: nsqd
v0.2.20+
使用 --max-rdy-count
表示这个值
1 | RDY <count>\n |
注意: 这个没有成功后响应
错误后响应:
1 | E_INVALID |
FIN
完成一个消息 (表示成功处理)
1 | FIN <message_id>\n |
注意: 这里没有成功后响应
错误后响应:
1 | E_INVALID |
REQ
重新将消息队列(表示处理失败)
这个消息放在队尾,表示已经发布过,但是因为很多实现细节问题,不要严格信赖这个,将来会改进。
简单来说,消息在传播途中,并且超时就表示 REQ
。
1 | REQ <message_id> <timeout>\n |
注意: 这里没有成功后响应
错误后响应:
1 | E_INVALID |
TOUCH
重置传播途中的消息超时时间
注意: 在 nsqd
v0.2.17+
可用
1 | TOUCH <message_id>\n |
注意: 这里没有成功后响应
错误后响应:
1 | E_INVALID |
CLS
清除连接(不再发送消息)
1 | CLS\n |
成功后响应s:
1 | CLOSE_WAIT |
错误后响应:
1 | E_INVALID |
NOP
No-op
1 | NOP\n |
注意: 这里没有 response
AUTH
注意: 在 nsqd
v0.2.29+
可用
如果 IDENTIFY
响应中有 auth_required=true
,客户端必须在 SUB
, PUB
或 MPUB
命令前前发送 AUTH
。否则,客户端不需要认证。
当 nsqd
接收到 AUTH
命令,它通过执行 HTTP 配置 --auth-http-address
,这个请求包括以下查询参数:连接的远程地址,TLS 状态,支持的认证密码。更多细节参见:AUTH
1 | AUTH\n |
成功后响应:
JSON 包含授权给客户端的身份,可选的 URL,和授权过的权限列表。
1 | {"identity":"...", "identity_url":"...", "permission_count":1} |
错误后响应:
1 | E_AUTH_FAILED - An error occurred contacting an auth server |
数据格式
数据异步传输给客户端,并且支持各种回复体,比如
1 | [x][x][x][x][x][x][x][x][x][x][x][x]... |
客户端必须是以下类型之一:
1 | FrameTypeResponse int32 = 0 |
以及消息格式:
1 | [x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x][x]... |
客户端库
下面表里的信息是否过时了?你是否在使用这些客户端库来开发?用 mailing list 或 Twitter @imsnakes / @jehiah 告诉我们。
Name | Language | SUB | PUB | Discovery | Backoff | TLS | Snappy | Sampling | AUTH | Notes |
---|---|---|---|---|---|---|---|---|---|---|
nsqd | HTTP | √ | built-in | |||||||
go-nsq | Go | √ | √ | √ | √ | √ | √ | √ | √ | official |
pynsq | Python | √ | √ | √ | √ | √ | √ | √ | √ | official |
nsqjs | JavaScript (CoffeeScript) | √ | √ | √ | √ | √ | √ | √ | √ | official |
nsq-py | Python | √ | √ | √ | √ | √ | √ | √ | √ | |
gnsq | Python | √ | √ | √ | √ | √ | √ | √ | √ | |
krakow | Ruby | √ | √ | √ | √ | √ | √ | √ | ||
JavaNSQClient | Java | √ | √ | √ | √ | √ | √ | √ | ||
ensq | Erlang | √ | √ | √ | √ | |||||
nsq.js | JavaScript | √ | √ | √ | ||||||
TrendrrNSQClient | Java | √ | √ | √ | ||||||
nsqjava | Java | √ | √ | |||||||
nsqphp | PHP | √ | √ | √ | ||||||
node-nsqueue | JavaScript | √ | √ | |||||||
ruby_nsq | Ruby | √ | √ | √ | ||||||
libnsq | C | √ | official | |||||||
nsq-ruby | Ruby | √ | √ | √ | ||||||
NsqSpinner | Python | √ | √ | √ | √ | √ | √ | √ | ||
nsq-java | Java | √ | √ | √ | ||||||
NSQnet | .NET | √ | √ | √ | ||||||
nsq-client | JavaScript | √ | √ | |||||||
hsnsq | Haskell | √ | √ | |||||||
perl-anyevent-nsq | Perl | √ | √ | √ | ||||||
nsq-clojure | Clojure | |||||||||
nsqie | Scala | √ | √ | |||||||
nodensq | JavaScript | √ | √ |
编译客户端库
NSQ 将一些功能集成到客户端库中,以便维持集群的健壮性和性能。
这篇文章试图列出客户端库通常需要完成的功能。因为发布到 nsqd
非常的琐碎(仅用 HTTP POST /put
节点就可),这个文档主要关注消费者。
通过规范,我们希望各种语言实现的时候都能保持一致性。
配置
从高层看,配置相关的设计理念是希望系统能支持不同的工作负载,使用相同的默认值能立即可用,并且能将拨号数最小化。
消费者通过 TCP 连接到 nsqd
实例,订阅通道(channel)
上的 话题(topic)
。每个连接只能订阅一个话题(topic),因此消费多个话题(topic),必须响应的结构化。
使用 nsqlookupd
来发现是方案之一,所以客户端库必须支持消费者直接连接一个或多个 nsqd
实例,或者它可用轮询一个或多个 nsqlookupd
实例。当消费者轮询 nsqlookupd
的时候,时间间隔必须是可配置的。另外,因为 NSQ 的标准部署是分布式环境,包含很多消费者和生产者,客户端库必须根据配置值得随机性自动添加抖动。更多细节参考发现.
对于消费者来说,在 nsqd
响应前能接收到多少消息是非常重要的指标。这个管道促进缓存,批处理,异步消息处理。这个值称为 max_in_flight
,并且它影响了 RDY
状态。更多细节参见 RDY 状态。
设计系统时通常会考虑优雅处理失败,客户端库希望能实现失败消息的重试,并提供边界参数来处理每个消息尝试次数。更多细节参见消息处理。
当消息处理失败的时候,客户端库能自动将消息重新队列。NSQ 支持使用 REQ
命令发送延迟。客户端库需要能提供延迟的初始化值(第一次失败时),以及重新队列失败该如何改变。更多细节参见 Backoff.
最重要的时,客户端库必须支持消息处理的回调函数配置。这些回调函数必须简单,通常都支持一个参数(消息对象的实例)。
发现
nsqlookupd
是 NSQ 的重要组成部分,它为消费者发现服务提供来定位 nsqd
,它在运行时提供一个指定话题(topic)。
虽然使用 nsqlookupd
能大幅减少配置数目,但是需要维持并放大一个巨大的分布式 NSQ 集群。
当消费者使用 nsqlookupd
来发现时,客户端库必须管理轮询所有 nsqlookupd
实例的进程,最新的 nsqd
组合以问题形式提供了话题(topic),并且管理到这些 nsqd
的连接。
查询一个 nsqlookupd
实例非常的简单。执行一个 HTTP 请求,使用消费者试图发现的话题(topic) 作为查询参数来查找节点(例如/lookup?topic=clicks
). 响应体是 JSON:
1 | { |
broadcast_address
和 tcp_port
必须用来连接 nsqd
。 因为从设计上来说 nsqlookupd
实例不会分享或协调他们的数据,客户端库必须联合它接收到得所有 nsqlookupd
查询列表来建立 nsqd
最终列表。使用 broadcast_address:tcp_port
作为这个联合的唯一 KEY。
必须用周期性的计时器来重复的轮询 nsqlookupd
的配置,这样消费者能自动的发现新的 nsqd
。客户端库必须自动的初始化到所有新发现的实例的连接。
当客户端库开始执行的时候,它必须通过踢开配置 nsqlookupd
实例的一组请求,来引导轮询。
连接处理
一旦消费者有一个 nsqd
可以连接(通过发现或手工配置), 它就必须打开一个 TCP 连接到 broadcast_address:port
。一个单独的 TCP 连接必须能让消费者可以订阅到每个 nsqd
的 话题(topic)。
当连接到一个 nsqd
实例时,客户端库必须发送以下数据,顺序是:
- 魔术标识符
- 一个
IDENTIFY
命令 (和负载) 和读/验证响应 - 一个
SUB
命令 (指定需要的话题(topic)) 和读/验证响应 - 一个初始化
RDY
值 1
(低级别的细节参见 spec)
重新连接
客户端库必须通过以下方法自动重新连接:
- 如果消费者通过特定的
nsqd
列表指定,重新连接必须通过延迟重试来处理。(列如,8s, 16s, 32s, 等等, 到最大值后重试)。 - 如果消费者通过
nsqlookupd
来发现实例,必须通过轮询间隔来自动处理重新连接(例如,如果消费者断开和nsqd
的连接,客户端库仅在随后的nsqlookupd
轮询发现的实例后重新连接)。这能保证消费者了解nsqd
。
特性协商
IDENTIFY
命令可以用来设置 nsqd
端的元数据,修改客户端设置,并特性协商,它满足亮点:
- 某些情况下,客户端可能会修改
nsqd
的交互方式(比如,修改客户端的心跳间隔,并允许压缩,TLS,输出缓存,等等-完整列表参见 spec) nsqd
使用 JSON payload 来响应IDENTIFY
命令,它包含了重要的服务端配置值,客户端和之交互时必须遵守。
连接后,根据用户的配置, 客户端库必须发送一个 IDENTIFY
命令, 它的内容是 JSON payload:
1 | { |
feature_negotiation
位表示客户端可以接受返回值是 JSON payload。 client_id
和 hostname
是随意的文本字段,nsqd
(和 nsqadmin
)会用来区别客户端. heartbeat_interval
配置每个客户端的心跳间隔。
nsqd
必须响应 OK
,如果它不支持特性协商 (nsqd``v0.2.20+
引入), 否则:
1 | { |
数据流和心跳
一旦消费者处于订阅状态,NSQ 协议里的数据流时异步的。对于消费者来说,这就是说如果想建立一个健壮并高效的客户端库,就必须使用异步的网络 IO 循环和/或“线程”(线程表示 OS 级别的线程和用户空间(userland)的进程,比如协同程序(coroutines))。
另外,期望客户端能响应它们连接到的 nsqd
实例的周期性心跳。通常这个周期是 30 秒。客户端可以使用任何命令响应,不过通常方便起见,使用 NOP
响应心跳。更多细节参见 protocol spec。
“进程”必须专注于读取 TCP socket 的数据,解包帧数据,并执行多路逻辑来传输。这也是处理心跳最佳点。从最低级别看,读取协议包括以下步骤:
- 读取 4 字节 big endian uint32 大小
- 读取字节大小数据
- 解包数据
- ...
- profit
- goto 1
一个和错误相关小插曲
根据系统的异步特性,会采用更多的状态来追踪相关协议的由命令产生的错误。我们会采用“快速错误”("fail fast")方法,所以大量协议级别错误处理都是致命的。这意味着如果客户端发送一个无效命令(或者自己是无效状态),通过强制关闭连接(如果可能,发送一个错误给客户端),它连接到的 nsqd
实例将会保护自己(和系统)。和之前提到的连接处理相配合,使得系统更加健壮和稳定。
仅有的几个非致命错误是:
E_FIN_FAILED
-FIN
命令, 无效的消息 IDE_REQ_FAILED
-REQ
命令 无效的消息 IDE_TOUCH_FAILED
-TOUCH
命令 无效的消息 ID
因为这些错误通常和时间有关,所以不当做致命错误。这些错误通常发生在 nsqd
端消息超时,重新队列时,和投递到其他消费者时。原先的接受者不再允许响应这个消息。
消息处理
当 IO 循环解包包含消息的帧数据时,它必须路由这个消息给配置处理函数来处理。
发送 nsqd
,在配置消息超时时希望收到回复(默认:60秒)。可能有以下场景:
- 处理函数表示消息已经成功处理
- 处理函数表示消息正处理成功
- 处理函数表示需要更多的时间来处理消息
- in-flight 超时,并且
nsqd
自动重新队列消息
前 3 个情况,客户端库必须发送合适消费者方面的命令 (FIN
,REQ
,和 TOUCH
)。
FIN
命令最简单。它告诉 nsqd
它能安全的抛弃消息。FIN
也能抛弃那些你不想处理或重试的消息。
REQ
命令告诉 nsqd
,消息必须重新队列(可选参数指定了重试的次数)。如果消费者没有指定可选参数,客户端库必须自动算出相关联的消息处理的时长(通常设置为多倍,这样效率更高)。客户端库必须抛弃超过最多重试次数的消息。当它发生的时候,必须执行用户提供的回调来通知,并运行特定的回调。
如果消息处理函数需要的时间超过配置的超时时间,可以用 TOUCH
命令来重置 nsqd
端的计时器。可以重复这个动作,直到消息 FIN
或 REQ
,或发送 nsqd
的配置属性 max_msg_timeout
。客户端库不能自动 TOUCH
代表消费者。
如果发送 nsqd
实例没有接收到响应,消息将会超时,并会自动重新队列来投递到可用的消费者。
最后,每个消息的属性是尝试次数。客户端库必须比较这个值和配置的最大值,并且抛弃已经超过这个值得消息。当消息已经抛弃的时候,需要触发回调。通常这个回调的实现必须包括写入磁盘,日志等等。用户必须能重写默认的处理函数。
RDY 状态
因为消息是从 nsqd
推送到消费者那,我们必须拥有一个方法来管理数据流,而不仅依赖于低级别的 TCP 语法。消费者的 RDY
状态是 NSQ 的流控制机制。
如配置中列出的内容,通过 max_in_flight
配置消费者。这是并行的并且性能 knob。比如一些 downstream 系统可以更加容易进行消息批处理,并对更高级的 max-in-flight
有利。
当消费者连接到nsqd
(并且订阅) ,RDY
初始化状态为 0
。不会投递任何消息。
客户端库拥有很少的责任:
- 引导并最终分布配置
max_in_flight
到所有的连接。 - 永远不允许汇集所有连接
RDY
的和(total_rdy_count
),为超过max_in_flight
的配置。 - 永远不要超过每个连接
nsqd
配置的max_rdy_count
。 - 暴露一个 API 方法给值得信赖的消息流。
1. 引导和分布
为连接选择 RDY
值,需要考虑的因素很少(最终分布为 max_in_flight
):
- 连接 # 是动态的,通常并不知道次数(例如,当通过
nsqlookupd
发现nsqd
)。 max_in_flight
可能会小于你的连接数
为了开始消息流,客户端库必须发送一个初始的 RDY
值。因为最终的连接数并不知道(通常从 '1' 开始),所以客户端库必能公平对待每个连接。
另外,每个消息处理后,客户端库必须评估什么时候更新 RDY
状态。如果当前值是 '0',或者低于最后发送的值的 25% 必须触发更新。
客户端库必须一直尝试最终分布 RDY
值到所有的连接。
通常来说,它可以通过 max_in_flight / num_conns
实现。
然而,当 max_in_flight < num_conns
这个简单的公式无效的时候。客户端库必须执行一个动态的运行评估,自从通过之前的连接接收到得消息后,连接的 nsqd
'活跃度'的时间。当配置到期后,他必须重新分布,不论 RDY
值是否对于新的 nsqd
有效。这么做,你能保证你可以通过消息找到 nsqd
。这些会有延迟的影响。
2. 维护 max_in_flight
客户端库必须维护指定消费者的消息 in flight 的最大值。尤其,汇集每个连接的 RDY
值永远不能超过配置的 max_in_flight
值。
底下的 Python 代码,它指出 RDY 值是否对于指定的连接有效。
1 | def send_ready(reader, conn, count): |
3. nsqd
最大 RDY 值
每个 nsqd
通过 --max-rdy-count
配置,如果消费者发送的 RDY
值超过了可接受的范围,它的连接将强制关闭。为了向后兼容,这个值必须假设为 2500
,如果 nsqd
实例不能支持特性协商。
4. 消息流 Starvation
最终,客户端库必须提供一个 API 方法,来表示消息流 starvation。对于消费者(消费者处理函数)来说,简单比较 in-flight 的消息数和 max_in_flight
值,来决定是否”批处理“不太合适。有两种情况有问题:
- 当消费者配置
max_in_flight > 1
, 根据变量num_conns
,max_in_flight
除num_conns
除不尽。因为你永远不能超过max_in_flight
,你必须降低,并且在RDY
值少于max_in_flight
时结束。 - 如果仅仅
nsqd
的子集有消息,因为even distribution 的RDY
预期值, 这些活跃nsqd
仅有max_in_flight
的片段。
以上两种情况,消费者实际上永远不会接受消息的 max_in_flight
。因此,客户端库必须暴露一个方法 is_starved
,表示任何连接是否 starved,如下:
1 | def is_starved(conns): |
is_starved
方法必须由消息处理函数使用,来发现什么时候处理批量消息。
Backoff
消息处理失败的时候如何处理是一个非常复杂的问题。消息处理章节介绍了客户端库动作,它会处理和时间相关的失败的消息。其他的问题是是否减少吞吐量。这两个功能对于整个系统的稳定性至关重要。
通过减慢处理的速率,或者 "backing off",消费者允许 downstream 系统回收传输失败。然而这个行为必须是可配置的,因为不是什么时候都能称心如意,这种情况下延迟必须优先处理。
Backoff 必须通过发送 RDY 0
到合适的 nsqd
来实现,停止消息流。这个状态的时长通过重试的失败来计算。处理成功会减少这个时长,直到 reader 不再是 backoff 状态。
当 reader 是 backoff 状态时,超时后,客户端库必须仅发送过 RDY 1
,而不是 max_in_flight
。 在返回完整的 throttle 前,这是有效的 "tests the waters"。另外,backoff 超时时,客户端库必须忽略任何和计算 backoff 时间成功或者失败结果。(比如,每次超时时它仅信任一个结果)
加密/压缩
NSQ 支持加密和/或压缩特性协商,通过IDENTIFY
命令。 TLS 用来加密。 Snappy 和 DEFLATE 都支持压缩。Snappy 可作为第三方库使用,但是基本所有的语言都支持 DEFLATE。
收到 IDENTIFY
响应时,并且你通过 tls_v1
标志位请求 TLS,你得到的东西和以下内容类似:
1 | { |
确认 tls_v1
为 true
后(意味着服务器支持 TLS),在接受和发送任何消息前,你需要初始化 TLS 握手(例如,Python 使用 ssl.wrap_socket
表示完成)。TLS 握手成功后,你必须立即读取一个 NSQ 加密的 OK
响应。
如果你想压缩,可以设置 snappy
或 deflate
为 true
,并且使用合适压缩(解压缩)调用读写。同样的你必须立即读取一个 NSQ 压缩的 OK
响应。
这些压缩特性是互斥的。
你不能阻止缓存直到加密/压缩协商完成,或者确保小心的读取到内存。
汇总
分布式系统非常有意思。
不同的 NSQ 集群部门间交互在一个平台上,它健壮,高性能,并且稳定。希望您能这篇文章里了解到客户端是多么重要。
这些细节的实现,我们将 pynsq 和 go-nsq 作为代码基础。pynsq 可以切割为 2 个部分:
Message
- 高级别的消息对象,它暴露了状态方法,来响应nsqd
(FIN
,REQ
,TOUCH
等等),同时元数据包含目的和时间戳。Connection
- 高级别的封装,包含 TCP 连接到一个指定的nsqd
,它包含 flight 消息,RDY
状态,协商特性,和不同时间。消费者
- 和用户打交道的 API,它处理发现,创建连接(和订阅),引导和管理RDY
状态,解析收到的数据,创建消息对象,和分发消息给处理函数。Producer
-和用户打交道的 API,处理发布。
我们很高兴能帮助任何对编写客户端库有兴趣的人。我们希望大家能加入到社区,扩展目前已经存在的库。社区已经开源很多客户端库。