NSQ组件
NSQ组件
nsqd
nsqd
是一个守护进程,负责接收,排队,投递消息给客户端。
它可以独立运行,不过通常它是由 nsqlookupd
实例所在集群配置的(它在这能声明 topics 和 channels,以便大家能找到)。
它在 2 个 TCP 端口监听,一个给客户端,另一个是 HTTP API。同时,它也能在第三个端口监听 HTTPS。
命令行选项
1 | -auth-http-address=: <addr>:<port> 查询授权服务器 (可能会给多次) |
HTTP API
/ping
- 活跃度/info
- 版本/stats
- 检查综合运行/pub
- 发布消息到话题(topic)/mpub
- 发布多个消息到话题(topic)/debug/pprof
- pprof 调试入口/debug/pprof/profile
- 生成 pprof CPU 配置文件/debug/pprof/goroutine
- 生成 pprof 计算配置文件/debug/pprof/heap
- 生成 pprof 堆配置文件/debug/pprof/block
- 生成 pprof 块配置文件/debug/pprof/threadcreate
- 生成 pprof OS 线程配置文件
v1
命名空间 (as of nsqd
v0.2.29+
):
/topic/create
- 创建一个新的话题(topic)/topic/delete
- 删除一个话题(topic)/topic/empty
- 清空话题(topic)/topic/pause
- 暂停话题(topic)的消息流/topic/unpause
- 恢复话题(topic)的消息流/channel/create
- 创建一个新的通道(channel)/channel/delete
- 删除一个通道(channel)/channel/empty
- 清空一个通道(channel)/channel/pause
- 暂停通道(channel)的消息流/channel/unpause
- 恢复通道(channel)的消息流
以抛弃的命名空间:
/create_topic
- 创建一个新的话题(topic)/delete_topic
- 删除一个话题(topic)/empty_topic
- 清空话题(topic)/pause_topic
- 暂停话题(topic)的消息流/unpause_topic
- 恢复话题(topic)的消息流/create_channel
- 创建一个新的通道(channel)/delete_channel
- 删除一个通道(channel)/empty_channel
- 清空一个通道(channel)/pause_channel
- 暂停通道(channel)的消息流/unpause_channel
- 恢复通道(channel)的消息流
NOTE: 这些结束点返回 "wrapped" JSON:
1 | {"status_code":200, "status_text":"OK", "data":{...}} |
发送 Accept: application/vnd.nsq; version=1.0
头将会协商使用未封装的 JSON 响应格式 (as of nsqd
v0.2.29+
)。
/pub
发布一个消息
参数:
1 | topic - the topic to publish to |
/mpub
一个往返发布多个消息
参数:
1 | topic - 发布到的话题(topic) |
注意:默认的 /mpub
希望消息使用 \n
切割,使用 ?binary=true
查询参数来允许二进制模式,希望发送的消息体能成为以下的格式(HTTP 'Content-Length' 头必须是将要发送的消息体的总大小):
1 | [ 4-byte num messages ] |
/topic/create
已经抛弃的别名 /create_topic
创建一个话题(topic)
参数:
1 | 话题(topic) - 将要创建的话题(topic) |
/topic/delete
已经抛弃的别名 : /delete_topic
删除一个已经存在的话题(topic) (和所有的通道(channel))
参数:
1 | topic - 现有的话题(topic) to delete |
/channel/create
已抛弃的别名: /create_channel
为现有的话题(topic) 创建一个通道(channel)
参数:
1 | topic - 现有的话题(topic) |
/channel/delete
已抛弃的别名: /delete_channel
删除现有的话题(topic) 一个的通道(channel)
参数:
1 | topic - 现有的话题(topic) |
/topic/empty
已抛弃的别名: /empty_topic
清空现有话题(topic) 队列中所有的消息(内存和磁盘中)
参数:
1 | topic - 待清空的话题(topic) |
/channel/empty
已抛弃的别名: /empty_channel
清空现有通道(channel) 队列中所有的消息(内存和磁盘中)
参数:
1 | topic - 现有的话题(topic) |
/topic/pause
已抛弃的别名: /pause_topic
暂停已有话题(topic) 的所有通道(channel)的消息(消息将会在话题(topic) 里排队)
参数:
1 | topic - 现有的话题(topic) |
/topic/unpause
已抛弃的别名: /unpause_topic
为现有的话题(topic) 的通道(channel) 重启消息流
参数:
1 | topic - 现有的话题(topic) |
/channel/pause
已抛弃的别名: /channel_pause
暂停发送已有的通道(channel) 给消费者(消息将会队列)
参数:
1 | topic - 现有的话题(topic) |
/channel/unpause
已抛弃的别名: /unpause_channel
重新发送通道(channel) 里的消息给消费者
参数:
1 | topic - 现有的话题(topic) |
/stats
返回内部统计数据
参数
1 | format - (可选) `text` or `json` (默认 = `text`) |
/ping
监控结束点,必须返回 OK
。如果有问题返回 500。同时,如果写消息到磁盘失败将会返回错误状态。
/info
返回版本信息
/debug/pprof
可用的调试节点的页码
/debug/pprof/profile
开始 30秒的 pprof
CPU 配置,并通过请求返回。
注意,因为它在运行时的性能和时间,这个结束点并没在 /debug/pprof 页面列表中。
/debug/pprof/goroutine
为所有运行的 goroutines 返回栈记录。
/debug/pprof/heap
返回堆和内存配置信息(前面的内容可作为 pprof
配置信息)
/debug/pprof/block
返回 goroutine 块配置信息
/debug/pprof/threadcreate
返回 goroutine 栈记录
Debugging and Profiling
nsqd
提供一套节点的配置信息,直接通过 Go 的 pprof 工具。如果你有 go 工具套装,只要运行:
1 | memory profiling |
TLS
为了加强安全性,可以通过 --tls-cert
和 --tls-key
客户端配置 nsqd
,升级他们的链接为 TLS。
另外,你可以要求客户端使用 --tls-required
(nsqd
v0.2.28+
)协商 TLS。
你可以通过--tls-client-auth-policy
(require
或 require-verify
)配置一个 nsqd
客户端证书:
require
- 客户端必须提供一个证书,否则将会被拒绝require-verify
- 客户端必须提供一个有效的证书,根据--tls-root-ca-file
指定的链接或者默认的 CA,否则将会被拒绝。
可以当做客户端授权的表单(nsqd
v0.2.28+
)。
如果你想生成一个 password-less,自签名证书,用:
1 | openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes |
AUTH
注意: 在 nsqd
v0.2.29+
可用
通过使用一个遵从 Auth HTTP 协议的授权服务器,指定 -auth-http-address=host:port
标志,你可以配置 nsqd
。
注意: 希望当仅有 nsqd
TCP 协议暴露给外部客户端时使用授权,而不是 HTTP(S) 节点。参见底下说明:
Auth 服务器必须接受 HTTP 请求:
1 | /auth?remote_ip=...&tls=...&auth_secret=... |
返回结果格式如下:
1 | { |
注意话题(topic) 和通道(channel) 字符串必须用 nsqd
的正则表达式来申请授权。nsqd
将会为 TTL 间隔,并会在这个间隔时间里重新请求授权。
通常情况,将会使用 TLS 来加强安全性。nsqd
和 授权服务器间通过信任的网络通信(并没被加密)。如果一个授权服务器通过远程 IP 信息来授权,客户端可以使用占位符(比如 .
),作为 AUTH
命令(Auth 服务器忽略)。
授权服务器例子 pynsqauthd。
帮助服务器暴露 nsqlookupd
和 nsqd
/stats
数据给客户端,从授权服务器通过权限过滤,在以下可以找到 nsqauthfilter。
当使用命令行工具,可以通过使用 --reader-opt
标志来授权。
1 | nsq_tail ... -reader-opt="tls_v1,true" -reader-opt="auth_secret,$SECRET" |
点对点处理延迟
你可以选择设置 nsqd
来收集和发射点对点信息处理延迟,通过 --e2e-processing-latency-percentile
标志位来配置百分比。
使用概率百分比技术(参见 Effective Computation of Biased Quantiles over Data Streams)来计算值。我们通过 bmizerany 来使用 perks 包,它能实现这个算法。
我们内部维持 2 个通道(channel),每个通道(channel)存储 N/2
分钟的延迟数据。每个 N/2
分钟我们重置了每个通道(channel)(并开始插入新的数据)。
因为我们仅在通道级别收集数据,对于话题我们聚合并合并所有的通道数量的 quantiles。如果数据在同一个 nsqd
实例上时,可以使用这个技术。然而当数据已经精确的通过 nsqd
(通过 nsqlookupd
),我们为每个 nsqd
取平均值。为了维持统计的精确性,除了平均值,我们也提供最大最小值。
注意: 如果没有消费者连接,不能更新值,尽管消息队列的点对点时间会缓慢增长。这是因为仅在 nsqd
收到从客户端发来 FIN
消息时才会重新计算。当消费者重新连接,这些值将会重新调整。
Statsd / Graphite Integration
当使用 --statsd-address
来为statsd (或类似 statsdaemon)指定 UDP <addr>:<port>
时,nsqd
将会在 --statsd-interval
定期推送数据给 statsd(注意:这个间隔必须始终小于等于 graphite 的刷入间隔)。设置 nsqadmin
可以显示图标。
推荐以下配置(但是这些选择必须建立在你的可用资源和要求上)。同时,statsd 的刷入间隔必须小于或者等于 storage-schemas.conf
的最小值,并且 nsqd
必须通过 --statsd-interval
来确认刷入时间小于等于时间间隔。
1 | storage-schemas.conf |
nsqd
实例将会推送给以下 statsd
路径:
1 | nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.backend_depth [gauge] |
nsqlookupd
nsqlookupd
是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd
来发现指定话题(topic)的生产者,并且 nsqd
节点广播话题(topic)和通道(channel)信息。
有两个接口:TCP 接口,nsqd
用它来广播。HTTP 接口,客户端用它来发现和管理。
命令行选项
1 | -http-address="0.0.0.0:4161": <addr>:<port> 监听 HTTP 客户端 |
HTTP 接口
/lookup
返回某个话题(topic)的生产者列表。
参数:
1 | topic - the 话题(topic) to list producers for |
/topics
返回所有已知的话题(topic)
/channels
返回已知话题(topic)里的通道(channel)
参数:
1 | topic - the topic to list 通道(channel)s for |
/nodes
返回所有已知的 nsqd
列表
/delete_topic
删除一个已存在的话题(topic)
参数:
1 | topic - 需要删除的话题(topic) |
/delete_channel
删除一个已存在话题(topic)的通道(channel)
参数:
1 | topic - 已经存在的话题(topic) |
/tombstone_topic_producer
逻辑删除(Tombstones)某个话题(topic)的生产者。参见 deletion and tombstones.
参数:
1 | topic - 已经存在的话题(topic) |
/ping
监控端点,必须返回 OK
/info
返回版本信息
删除和逻辑删除(Tombstones)
当一个话题(topic)不再全局生产,相对简单的操作是从集群里清理这个消息。假设所有的应用生产的消息下降,使用 /delete_topic
结束nsqlookupd
实例的,是必须要完成的操作。(内部来说,它将会识别 nsqd
生产者,并对这些节点执行合适的操作)。
全局来看,通道(channel)删除进程都很类似,不同点是你需用 /delete_channel
结束 nsqlookupd
实例,并且你必须保证所有的订阅了通道(channel)得消费者已经下降(downed)。
然而,当话题(topic)不再在节点的子集上生产的时候情况比较复杂。因为消费者查询 nsqlookupd
的方法并且连接到所有生产者,你加入的竞争环境尝试移除集群的信息,消费者发现这些节点并重新连接。(因此推送更新,话题(topic)仍然在节点上生产)。解决办法就是逻辑删除(tombstones)。逻辑删除(tombstones)在 nsqlookupd
上下文是特定的生产者和最后的配置 --tombstone-lifetime
时间。在这个窗口中,生产者不会在 /lookup
查询中列出,允许节点删除话题(topic),扩散这些信息到 nsqlookupd
(接着逻辑删除(tombstoned)生产者),并阻止生产者重新发现这个节点。
nsqadmin
nsqadmin
是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务。
命令行选项
1 | -graphite-url="": URL to graphite HTTP 地址 |
statsd / Graphite Integration
使用 nsqd --statsd-address=...
的时候,你可以指定一个 nsqadmin --graphite-url=http://graphite.yourdomain.com
允许 nsqadmin
上的 graphite 图表。如果使用一个统计克隆 (例如 statsdaemon),它没有前缀的键值,也可以指定 --use-statsd-prefix=false
。
Admin 通知
如果设置了 --notification-http-endpoint
标志,每次 admin 动作执行的时候(例如暂停一个通道(channel)),nsqadmin
将会发送一个 POST 请求到指定(完全限定)端点。
请求的内容包含的动作信息,例如:
1 | { |
如果在请求时用户名可用,user
字段将会填充,如果之前使用 htpasswd 授权,或者google-auth-proxy 之后,否则卫空字符串。当不可用时 channel
字段也会为空。
提示: 通过设置 --notification-http-endpoint
为 http://addr.of.nsqd/put?topic=admin_actions
,你可以创建一个 admin 的动作通知 NSQ 流,话题(topic)名为 admin_actions
。
工具
这些工具辅助到数据流的通用功能和内部检查。
nsq_stat
为所有的话题(topic)和通道(channel)的生产者轮询 /stats
,并显示统计数据:
1 | ---------------depth---------------+--------------metadata--------------- |
命令行参数
1 | -channel="": NSQ 通道(channel) |
nsq_tail
消费指定的话题(topic)/通道(channel),并写到 stdout (和 tail(1) 类似)。
命令行参数
1 | -channel="": NSQ 通道(channel) |
nsq_to_file
消费指定的话题(topic)/通道(channel),并写到文件中,有选择的滚动和/或压缩文件。
命令行参数
1 | -channel="nsq_to_file": nsq 通道(channel) |
nsq_to_http
消费指定的话题(topic)/通道(channel)和执行 HTTP requests (GET/POST) 到指定的端点。
命令行参数
1 | -channel="nsq_to_http": nsq 通道(channel) |
nsq_to_nsq
消费者指定的话题/通道和重发布消息到目的地 nsqd
通过 TCP。
命令行参数
1 | -channel="nsq_to_nsq": nsq 通道(channel) |
to_nsq
采用 stdin 流,并分解到新行(默认),通过 TCP 重新发布到目的地 nsqd
。
命令行参数
1 | -delimiter="\n": 分割字符串(默认'\n') |