NSQ组件

nsqd

nsqd 是一个守护进程,负责接收,排队,投递消息给客户端。

它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的(它在这能声明 topics 和 channels,以便大家能找到)。

它在 2 个 TCP 端口监听,一个给客户端,另一个是 HTTP API。同时,它也能在第三个端口监听 HTTPS。

命令行选项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
-auth-http-address=: <addr>:<port> 查询授权服务器 (可能会给多次)
-broadcast-address="": 通过 lookupd 注册的地址(默认名是 OS)
-config="": 配置文件路径
-data-path="": 缓存消息的磁盘路径
-deflate=true: 运行协商压缩特性(客户端压缩)
-e2e-processing-latency-percentile=: 消息处理时间的百分比(通过逗号可以多次指定,默认为 none)
-e2e-processing-latency-window-time=10m0s: 计算这段时间里,点对点时间延迟(例如,60s 仅计算过去 60 秒)
-http-address="0.0.0.0:4151": 为 HTTP 客户端监听 <addr>:<port>
-https-address="": 为 HTTPS 客户端 监听 <addr>:<port>
-lookupd-tcp-address=: 解析 TCP 地址名字 (可能会给多次)
-max-body-size=5123840: 单个命令体的最大尺寸
-max-bytes-per-file=104857600: 每个磁盘队列文件的字节数
-max-deflate-level=6: 最大的压缩比率等级(> values == > nsqd CPU usage)
-max-heartbeat-interval=1m0s: 在客户端心跳间,最大的客户端配置时间间隔
-max-message-size=1024768: (弃用 --max-msg-size) 单个消息体的最大字节数
-max-msg-size=1024768: 单个消息体的最大字节数
-max-msg-timeout=15m0s: 消息超时的最大时间间隔
-max-output-buffer-size=65536: 最大客户端输出缓存可配置大小(字节)
-max-output-buffer-timeout=1s: 在 flushing 到客户端前,最长的配置时间间隔。
-max-rdy-count=2500: 客户端最大的 RDY 数量
-max-req-timeout=1h0m0s: 消息重新排队的超时时间
-mem-queue-size=10000: 内存里的消息数(per topic/channel)
-msg-timeout="60s": 自动重新队列消息前需要等待的时间
-snappy=true: 打开快速选项 (客户端压缩)
-statsd-address="": 统计进程的 UDP <addr>:<port>
-statsd-interval="60s": 从推送到统计的时间间隔
-statsd-mem-stats=true: 切换发送内存和 GC 统计数据
-statsd-prefix="nsq.%s": 发送给统计keys 的前缀(%s for host replacement)
-sync-every=2500: 磁盘队列 fsync 的消息数
-sync-timeout=2s: 每个磁盘队列 fsync 平均耗时
-tcp-address="0.0.0.0:4150": TCP 客户端 监听的 <addr>:<port>
-tls-cert="": 证书文件路径
-tls-client-auth-policy="": 客户端证书授权策略 ('require' or 'require-verify')
-tls-key="": 私钥路径文件
-tls-required=false: 客户端连接需求 TLS
-tls-root-ca-file="": 私钥证书授权 PEM 路径
-verbose=false: 打开日志
-version=false: 打印版本
-worker-id=0: 进程的唯一码(默认是主机名的哈希值)

HTTP API

v1 命名空间 (as of nsqd v0.2.29+):

以抛弃的命名空间:

NOTE: 这些结束点返回 "wrapped" JSON:

1
{"status_code":200, "status_text":"OK", "data":{...}}

发送 Accept: application/vnd.nsq; version=1.0 头将会协商使用未封装的 JSON 响应格式 (as of nsqd v0.2.29+)。

/pub

发布一个消息

参数:

1
2
3
4
topic - the topic to publish to

POST body - the raw message bytes
$ curl -d "<message>" http://127.0.0.1:4151/pub?topic=message_topic`

/mpub

一个往返发布多个消息

参数:

1
2
3
4
topic - 发布到的话题(topic)
binary - bool ('true' or 'false') 允许二进制模式

POST body - `\n` 分离原始消息字节

注意:默认的 /mpub 希望消息使用 \n 切割,使用 ?binary=true 查询参数来允许二进制模式,希望发送的消息体能成为以下的格式(HTTP 'Content-Length' 头必须是将要发送的消息体的总大小):

1
2
3
4
[ 4-byte num messages ]
[ 4-byte message #1 size ][ N-byte binary data ]
... (repeated <num_messages> times)
$ curl -d "<message>\n<message>\n<message>" http://127.0.0.1:4151/mpub?topic=message_topic`

/topic/create

已经抛弃的别名 /create_topic

创建一个话题(topic)

参数:

1
话题(topic) - 将要创建的话题(topic)

/topic/delete

已经抛弃的别名 : /delete_topic

删除一个已经存在的话题(topic) (和所有的通道(channel))

参数:

1
topic - 现有的话题(topic) to delete

/channel/create

已抛弃的别名: /create_channel

为现有的话题(topic) 创建一个通道(channel)

参数:

1
2
topic - 现有的话题(topic)
channel - the channel to create

/channel/delete

已抛弃的别名: /delete_channel

删除现有的话题(topic) 一个的通道(channel)

参数:

1
2
topic - 现有的话题(topic)
channel - 待删除的通道(channel)

/topic/empty

已抛弃的别名: /empty_topic

清空现有话题(topic) 队列中所有的消息(内存和磁盘中)

参数:

1
topic - 待清空的话题(topic)

/channel/empty

已抛弃的别名: /empty_channel

清空现有通道(channel) 队列中所有的消息(内存和磁盘中)

参数:

1
2
topic - 现有的话题(topic)
channel - 待清空的通道(channel)

/topic/pause

已抛弃的别名: /pause_topic

暂停已有话题(topic) 的所有通道(channel)的消息(消息将会在话题(topic) 里排队)

参数:

1
topic - 现有的话题(topic)

/topic/unpause

已抛弃的别名: /unpause_topic

为现有的话题(topic) 的通道(channel) 重启消息流

参数:

1
topic - 现有的话题(topic)

/channel/pause

已抛弃的别名: /channel_pause

暂停发送已有的通道(channel) 给消费者(消息将会队列)

参数:

1
2
topic - 现有的话题(topic)
channel - 已有的通道(channel)将会被暂停

/channel/unpause

已抛弃的别名: /unpause_channel

重新发送通道(channel) 里的消息给消费者

参数:

1
2
topic - 现有的话题(topic)
channel - 将要暂停的通道(channel)

/stats

返回内部统计数据

参数

1
format - (可选) `text` or `json` (默认 = `text`)

/ping

监控结束点,必须返回 OK。如果有问题返回 500。同时,如果写消息到磁盘失败将会返回错误状态。

/info

返回版本信息

/debug/pprof

可用的调试节点的页码

/debug/pprof/profile

开始 30秒的 pprof CPU 配置,并通过请求返回。

注意,因为它在运行时的性能和时间,这个结束点并没在 /debug/pprof 页面列表中。

/debug/pprof/goroutine

为所有运行的 goroutines 返回栈记录。

/debug/pprof/heap

返回堆和内存配置信息(前面的内容可作为 pprof 配置信息)

/debug/pprof/block

返回 goroutine 块配置信息

/debug/pprof/threadcreate

返回 goroutine 栈记录

Debugging and Profiling

nsqd 提供一套节点的配置信息,直接通过 Go 的 pprof 工具。如果你有 go 工具套装,只要运行:

1
2
3
4
5
# memory profiling
$ go 工具 pprof http://localhost:4151/debug/pprof/heap

# cpu profiling
$ go 工具 pprof http://localhost:4151/debug/pprof/profile

TLS

为了加强安全性,可以通过 --tls-cert--tls-key 客户端配置 nsqd,升级他们的链接为 TLS。

另外,你可以要求客户端使用 --tls-required (nsqd v0.2.28+)协商 TLS。

你可以通过--tls-client-auth-policy (requirerequire-verify)配置一个 nsqd 客户端证书:

  • require - 客户端必须提供一个证书,否则将会被拒绝
  • require-verify - 客户端必须提供一个有效的证书,根据 --tls-root-ca-file 指定的链接或者默认的 CA,否则将会被拒绝。

可以当做客户端授权的表单(nsqd v0.2.28+)。

如果你想生成一个 password-less,自签名证书,用:

1
$ openssl req -x509 -newkey rsa:2048 -keyout key.pem -out cert.pem -days 365 -nodes

AUTH

注意: 在 nsqd v0.2.29+ 可用

通过使用一个遵从 Auth HTTP 协议的授权服务器,指定 -auth-http-address=host:port 标志,你可以配置 nsqd

注意: 希望当仅有 nsqd TCP 协议暴露给外部客户端时使用授权,而不是 HTTP(S) 节点。参见底下说明:

Auth 服务器必须接受 HTTP 请求:

1
/auth?remote_ip=...&tls=...&auth_secret=...

返回结果格式如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"ttl": 3600,
"identity": "username",
"identity_url": "http://....",
"authorizations": [
{
"permissions": [
"subscribe",
"publish"
],
"topic": ".*",
"channels": [
".*"
]
}
]
}

注意话题(topic) 和通道(channel) 字符串必须用 nsqd 的正则表达式来申请授权。nsqd 将会为 TTL 间隔,并会在这个间隔时间里重新请求授权。

通常情况,将会使用 TLS 来加强安全性。nsqd 和 授权服务器间通过信任的网络通信(并没被加密)。如果一个授权服务器通过远程 IP 信息来授权,客户端可以使用占位符(比如 .),作为 AUTH 命令(Auth 服务器忽略)。

授权服务器例子 pynsqauthd

帮助服务器暴露 nsqlookupdnsqd /stats 数据给客户端,从授权服务器通过权限过滤,在以下可以找到 nsqauthfilter

当使用命令行工具,可以通过使用 --reader-opt 标志来授权。

1
$ nsq_tail ... -reader-opt="tls_v1,true" -reader-opt="auth_secret,$SECRET"

点对点处理延迟

你可以选择设置 nsqd 来收集和发射点对点信息处理延迟,通过 --e2e-processing-latency-percentile 标志位来配置百分比。

使用概率百分比技术(参见 Effective Computation of Biased Quantiles over Data Streams)来计算值。我们通过 bmizerany 来使用 perks 包,它能实现这个算法。

我们内部维持 2 个通道(channel),每个通道(channel)存储 N/2 分钟的延迟数据。每个 N/2 分钟我们重置了每个通道(channel)(并开始插入新的数据)。

因为我们仅在通道级别收集数据,对于话题我们聚合并合并所有的通道数量的 quantiles。如果数据在同一个 nsqd 实例上时,可以使用这个技术。然而当数据已经精确的通过 nsqd (通过 nsqlookupd),我们为每个 nsqd 取平均值。为了维持统计的精确性,除了平均值,我们也提供最大最小值。

注意: 如果没有消费者连接,不能更新值,尽管消息队列的点对点时间会缓慢增长。这是因为仅在 nsqd 收到从客户端发来 FIN 消息时才会重新计算。当消费者重新连接,这些值将会重新调整。

Statsd / Graphite Integration

当使用 --statsd-address 来为statsd (或类似 statsdaemon)指定 UDP <addr>:<port> 时,nsqd 将会在 --statsd-interval 定期推送数据给 statsd(注意:这个间隔必须始终小于等于 graphite 的刷入间隔)。设置 nsqadmin 可以显示图标。

推荐以下配置(但是这些选择必须建立在你的可用资源和要求上)。同时,statsd 的刷入间隔必须小于或者等于 storage-schemas.conf 的最小值,并且 nsqd 必须通过 --statsd-interval 来确认刷入时间小于等于时间间隔。

1
2
3
4
5
6
7
8
9
10
# storage-schemas.conf
[nsq]
pattern = ^nsq\..*
retentions = 1m:1d,5m:30d,15m:1y

# storage-aggregation.conf
[default_nsq]
pattern = ^nsq\..*
xFilesFactor = 0.2
aggregationMethod = average

nsqd 实例将会推送给以下 statsd 路径:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.backend_depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.message_count
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.backend_depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.clients [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.deferred_count [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.depth [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.in_flight_count [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.message_count
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.requeue_count
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.timeout_count

# if --statsd-mem-stats is enabled
nsq.<nsqd_host>_<nsqd_port>.mem.heap_objects [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.heap_idle_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.heap_in_use_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.heap_released_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_100 [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_99 [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_pause_usec_95 [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.mem.next_gc_bytes [gauge]
nsq.<nsqd_host>_<nsqd_port>.mem.gc_runs

# if --e2e-processing-latency-percentile is specified, for each percentile
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.e2e_processing_latency_<percent> [gauge]
nsq.<nsqd_host>_<nsqd_port>.topic.<topic_name>.channel.<channel_name>.e2e_processing_latency_<percent> [gauge]

nsqlookupd

nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息。

有两个接口:TCP 接口,nsqd 用它来广播。HTTP 接口,客户端用它来发现和管理。

命令行选项

1
2
3
4
5
6
7
-http-address="0.0.0.0:4161": <addr>:<port> 监听 HTTP 客户端
-inactive-producer-timeout=5m0s: 从上次 ping 之后,生产者驻留在活跃列表中的时长
-tcp-address="0.0.0.0:4160": TCP 客户端监听的 <addr>:<port>
-broadcast-address: 这个 lookupd 节点的外部地址, (默认是 OS 主机名)
-tombstone-lifetime=45s: 生产者保持 tombstoned 的时长
-verbose=false: 允许输出日志
-version=false: 打印版本信息

HTTP 接口

/lookup

返回某个话题(topic)的生产者列表。

参数:

1
topic - the 话题(topic) to list producers for

/topics

返回所有已知的话题(topic)

/channels

返回已知话题(topic)里的通道(channel)

参数:

1
topic - the topic to list 通道(channel)s for

/nodes

返回所有已知的 nsqd 列表

/delete_topic

删除一个已存在的话题(topic)

参数:

1
topic - 需要删除的话题(topic)

/delete_channel

删除一个已存在话题(topic)的通道(channel)

参数:

1
2
topic - 已经存在的话题(topic)
channel - 将要删除的通道(channel)

/tombstone_topic_producer

逻辑删除(Tombstones)某个话题(topic)的生产者。参见 deletion and tombstones.

参数:

1
2
topic - 已经存在的话题(topic)
node - 将要逻辑删除(tombstones)的生产者(nsqd) (通过 <broadcast_address>:<http_port> 识别)

/ping

监控端点,必须返回 OK

/info

返回版本信息

删除和逻辑删除(Tombstones)

当一个话题(topic)不再全局生产,相对简单的操作是从集群里清理这个消息。假设所有的应用生产的消息下降,使用 /delete_topic 结束nsqlookupd 实例的,是必须要完成的操作。(内部来说,它将会识别 nsqd 生产者,并对这些节点执行合适的操作)。

全局来看,通道(channel)删除进程都很类似,不同点是你需用 /delete_channel 结束 nsqlookupd 实例,并且你必须保证所有的订阅了通道(channel)得消费者已经下降(downed)。

然而,当话题(topic)不再在节点的子集上生产的时候情况比较复杂。因为消费者查询 nsqlookupd 的方法并且连接到所有生产者,你加入的竞争环境尝试移除集群的信息,消费者发现这些节点并重新连接。(因此推送更新,话题(topic)仍然在节点上生产)。解决办法就是逻辑删除(tombstones)。逻辑删除(tombstones)在 nsqlookupd 上下文是特定的生产者和最后的配置 --tombstone-lifetime 时间。在这个窗口中,生产者不会在 /lookup 查询中列出,允许节点删除话题(topic),扩散这些信息到 nsqlookupd(接着逻辑删除(tombstoned)生产者),并阻止生产者重新发现这个节点。

nsqadmin

nsqadmin 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务。

命令行选项

1
2
3
4
5
6
7
8
9
-graphite-url="": URL to graphite HTTP 地址
-http-address="0.0.0.0:4171": <addr>:<port> HTTP clients 监听的地址和端口
-lookupd-http-address=[]: lookupd HTTP 地址 (可能会提供多次)
-notification-http-endpoint="": HTTP 端点 (完全限定) ,管理动作将会发送到
-nsqd-http-address=[]: nsqd HTTP 地址 (可能会提供多次)
-proxy-graphite=false: Proxy HTTP requests to graphite
-template-dir="": 临时目录路径
-use-statsd-prefixes=true: expect statsd prefixed keys in graphite (ie: 'stats_counts.')
-version=false: 打印版本信息

statsd / Graphite Integration

使用 nsqd --statsd-address=...的时候,你可以指定一个 nsqadmin --graphite-url=http://graphite.yourdomain.com 允许 nsqadmin 上的 graphite 图表。如果使用一个统计克隆 (例如 statsdaemon),它没有前缀的键值,也可以指定 --use-statsd-prefix=false

Admin 通知

如果设置了 --notification-http-endpoint 标志,每次 admin 动作执行的时候(例如暂停一个通道(channel)),nsqadmin 将会发送一个 POST 请求到指定(完全限定)端点。

请求的内容包含的动作信息,例如:

1
2
3
4
5
6
7
8
9
{
"action": "unpause_channel",
"channel": "mouth",
"topic": "beer",
"timestamp": 1357683731,
"user": "df",
"user_agent": "Mozilla/5.0 (Macintosh; Iphone 8)"
"remote_ip": "1.2.3.4:5678"
}

如果在请求时用户名可用,user 字段将会填充,如果之前使用 htpasswd 授权,或者google-auth-proxy 之后,否则卫空字符串。当不可用时 channel 字段也会为空。

提示: 通过设置 --notification-http-endpointhttp://addr.of.nsqd/put?topic=admin_actions,你可以创建一个 admin 的动作通知 NSQ 流,话题(topic)名为 admin_actions

工具

这些工具辅助到数据流的通用功能和内部检查。

nsq_stat

为所有的话题(topic)和通道(channel)的生产者轮询 /stats,并显示统计数据:

1
2
3
4
5
---------------depth---------------+--------------metadata---------------
total mem disk inflt def | req t-o msgs clients
24660 24660 0 0 20 | 102688 0 132492418 1
25001 25001 0 0 20 | 102688 0 132493086 1
21132 21132 0 0 21 | 102688 0 132493729 1

命令行参数

1
2
3
4
5
6
-channel="": NSQ 通道(channel)
-lookupd-http-address=: lookupd HTTP 地址 (可能会给多次)
-nsqd-http-address=: nsqd HTTP 地址 (可能会给多次)
-status-every=2s: 轮询/打印输出见的时间间隔
-topic="": NSQ 话题(topic)
-version=false: 打印版本

nsq_tail

消费指定的话题(topic)/通道(channel),并写到 stdout (和 tail(1) 类似)。

命令行参数

1
2
3
4
5
6
7
8
9
-channel="": NSQ 通道(channel)
-consumer-opt=: 传递给 nsq.Consumer (可能会给多次, http://godoc.org/github.com/bitly/go-nsq#Config)
-lookupd-http-address=: lookupd HTTP 地址 (可能会给多次)
-max-in-flight=200: 最大的消息数 to allow in flight
-n=0: total messages to show (will wait if starved)
-nsqd-tcp-address=: nsqd TCP 地址 (可能会给多次)
-reader-opt=: (已经抛弃) 使用 --consumer-opt
-topic="": NSQ 话题(topic)
-version=false: 打印版本信息

nsq_to_file

消费指定的话题(topic)/通道(channel),并写到文件中,有选择的滚动和/或压缩文件。

命令行参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-channel="nsq_to_file": nsq 通道(channel)
-consumer-opt=: 传递给 nsq.Consumer 的参数 (可能会给多次, http://godoc.org/github.com/bitly/go-nsq#Config)
-datetime-format="%Y-%m-%d_%H": strftime,和 filename 里 <DATETIME> 格式兼容
-filename-format="<TOPIC>.<HOST><GZIPREV>.<DATETIME>.log": output 文件名格式 (<TOPIC>, <HOST>, <DATETIME>, <GZIPREV> 重新生成. <GZIPREV> 是当已经存在的 gzip 文件的前缀)
-gzip=false: gzip 输出文件
-gzip-compression=3: (已经抛弃) 使用 --gzip-level, gzip 压缩级别(1 = 速度最佳, 2 = 最近压缩, 3 = 默认压缩)
-gzip-level=6: gzip 压缩级别 (1-9, 1=BestSpeed, 9=BestCompression)
-host-identifier="": 输出到 log 文件,提供主机名。 <SHORT_HOST> 和 <HOSTNAME> 是有效的替换者
-lookupd-http-address=: lookupd HTTP 地址 (可能会给多次)
-max-in-flight=200: 最大的消息数 to allow in flight
-nsqd-tcp-address=: nsqd TCP 地址 (可能会给多次)
-output-dir="/tmp": 输出文件所在的文件夹
-reader-opt=: (已经抛弃) 使用 --consumer-opt
-skip-empty-files=false: 忽略写空文件
-topic=: nsq 话题(topic) (可能会给多次)
-topic-refresh=1m0s: 话题(topic)列表刷新的频率是多少?
-version=false: 打印版本信息

nsq_to_http

消费指定的话题(topic)/通道(channel)和执行 HTTP requests (GET/POST) 到指定的端点。

命令行参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-channel="nsq_to_http": nsq 通道(channel)
-consumer-opt=: 参数,通过 nsq.Consumer (可能会给多次, http://godoc.org/github.com/bitly/go-nsq#Config)
-content-type="application/octet-stream": the Content-Type 使用d for POST requests
-get=: HTTP 地址 to make a GET request to. '%s' will be printf replaced with data (可能会给多次)
-http-timeout=20s: timeout for HTTP connect/read/write (each)
-http-timeout-ms=20000: (已经抛弃) 使用 --http-timeout=X, timeout for HTTP connect/read/write (each)
-lookupd-http-address=: lookupd HTTP 地址 (可能会给多次)
-max-backoff-duration=2m0s: (已经抛弃) 使用 --consumer-opt=max_backoff_duration,X
-max-in-flight=200: 最大的消息数 to allow in flight
-mode="round-robin": the upstream request mode options: multicast, round-robin, hostpool
-n=100: number of concurrent publishers
-nsqd-tcp-address=: nsqd TCP 地址 (可能会给多次)
-post=: HTTP 地址 to make a POST request to. data will be in the body (可能会给多次)
-reader-opt=: (已经抛弃) 使用 --consumer-opt
-round-robin=false: (已经抛弃) 使用 --mode=round-robin, enable round robin mode
-sample=1: % of messages to publish (float b/w 0 -> 1)
-status-every=250: the # of requests between logging status (per handler), 0 disables
-throttle-fraction=1: (已经抛弃) 使用 --sample=X, publish only a fraction of messages
-topic="": nsq 话题(topic)
-version=false: 打印版本信息

nsq_to_nsq

消费者指定的话题/通道和重发布消息到目的地 nsqd 通过 TCP。

命令行参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-channel="nsq_to_nsq": nsq 通道(channel)
-consumer-opt=: 参数,通过 nsq.Consumer (可能会给多次, see http://godoc.org/github.com/bitly/go-nsq#Config)
-destination-nsqd-tcp-address=: destination nsqd TCP 地址 (可能会给多次)
-destination-topic="": destination nsq 话题(topic)
-lookupd-http-address=: lookupd HTTP 地址 (可能会给多次)
-max-backoff-duration=2m0s: (已经抛弃) 使用 --consumer-opt=max_backoff_duration,X
-max-in-flight=200: 允许 flight 最大的消息数
-mode="round-robin": 上行请求的参数: round-robin (默认), hostpool
-nsqd-tcp-address=: nsqd TCP 地址 (可能会给多次)
-producer-opt=: 传递到 nsq.Producer (可能会给多次, 参见 http://godoc.org/github.com/bitly/go-nsq#Config)
-reader-opt=: (已经抛弃) 使用 --consumer-opt
-require-json-field="": JSON 消息: 仅传递消息,包含这个参数
-require-json-value="": JSON 消息: 仅传递消息要求参数有这个值
-status-every=250: # 请求日志的状态(每个目的地), 0 不可用
-topic="": nsq 话题(topic)
-version=false: 打印版本信息
-whitelist-json-field=: JSON 消息: 传递这个字段 (可能会给多次)

to_nsq

采用 stdin 流,并分解到新行(默认),通过 TCP 重新发布到目的地 nsqd

命令行参数

1
2
3
4
-delimiter="\n": 分割字符串(默认'\n')
-nsqd-tcp-address=: 目的地 nsqd TCP 地址 (可能会给多次)
-producer-opt=: 参数,通过 nsq.Producer (可能会给多次, http://godoc.org/github.com/bitly/go-nsq#Config)
-topic="": 发布到的 NSQ 话题(topic)