NSQ概述

快速开始

下面的步骤将通过推送(publishing)、消费(consuming)和归档(archiving)消息到本地磁盘,在本地环境演示一个小型的 NSQ 集群

  1. 根据文档安装安装 NSQ。

  2. 在另外一个 shell 中,运行 nsqlookupd:

    1
    $ nsqlookupd
  3. 再开启一个 shell,运行 nsqd:

    1
    $ nsqd --lookupd-tcp-address=127.0.0.1:4160
  4. 再开启第三个 shell,运行 nsqadmin:

    1
    $ nsqadmin --lookupd-http-address=127.0.0.1:4161
  5. 开启第四个 shell,推送一条初始化数据(并且在集群中创建一个 topic):

    1
    $ curl -d 'hello world 1' 'http://127.0.0.1:4151/put?topic=test'
  6. 最后,开启第五个 shell, 运行 nsq_to_file:

    1
    $ nsq_to_file --topic=test --output-dir=/tmp --lookupd-http-address=127.0.0.1:4161
  7. 推送更多地数据到 nsqd:

    1
    2
    $ curl -d 'hello world 2' 'http://127.0.0.1:4151/put?topic=test'
    $ curl -d 'hello world 3' 'http://127.0.0.1:4151/put?topic=test'
  8. 按照预先设想的,在浏览器中打开 http://127.0.0.1:4171/ 就能查看 nsqadmin 的 UI 界面和队列统计数据。同时,还可以在 /tmp 目录下检查 (test.*.log) 文件.

这个教程中最重要的是:nsq_to_file (客户端)没有明确地指出 test 主题从哪里产生,它从 nsqlookupd 获取信息,即使在消息推送之后才开始连接 nsqd,消息也并没有消失。

特性和担保

NSQ 是分布式实时消息系统。

特性

  • 支持无 SPOF 的分布式拓扑
  • 水平扩展(没有中间件,无缝地添加更多的节点到集群)
  • 低延迟消息传递性能
  • 结合负载均衡和多播消息路由风格
  • 擅长面向流媒体(高通量)和工作(低吞吐量)工作负载
  • 主要是内存中(除了高水位线消息透明地保存在磁盘上)
  • 运行时发现消费者找到生产者服务(nsqlookupd)
  • 传输层安全性 (TLS)
  • 数据格式不可知
  • 一些依赖项(容易部署)和健全的,有界,默认配置
  • 任何语言都有简单 TCP 协议支持客户端库
  • HTTP 接口统计、管理行为和生产者(不需要客户端库发布)
  • 为实时检测集成了statsd
  • 健壮的集群管理界面 (nsqadmin)

担保

对于任何分布式系统来说,都是通过智能权衡来实现目标。通过这些透明的可靠性指标,我们希望能使得 NSQ 在部署到产品上的行为是可达预期的。

消息不可持久化(默认)

虽然系统支持消息持久化存储在磁盘中(通过 --mem-queue-size ),不过默认情况下消息都在内存中.

如果将 --mem-queue-size 设置为 0,所有的消息将会存储到磁盘。我们不用担心消息会丢失,nsq 内部机制保证在程序关闭时将队列中的数据持久化到硬盘,重启后就会恢复。

NSQ 没有内置的复制机制,却有各种各样的方法管理这种权衡,比如部署拓扑结构和技术,在容错的时候从属并持久化内容到磁盘。

消息最少会被投递一次

如上所述,这个假设成立于 nsqd 节点没有错误。

因为各种原因,消息可以被投递多次(客户端超时,连接失效,重新排队,等等)。由客户端负责操作。

接收到的消息是无序的

不要依赖于投递给消费者的消息的顺序。

和投递消息机制类似,它是由重新队列(requeues),内存和磁盘存储的混合导致的,实际上,节点间不会共享任何信息。

它是相对的简单完成疏松队列,(例如,对于某个消费者来说,消息是有次序的,但是不能给你作为一个整体跨集群),通过使用时间窗来接收消息,并在处理前排序(虽然为了维持这个变量,必须抛弃时间窗外的消息)。

消费者最终找出所有话题的生产者

这个服务(nsqlookupd) 被设计成最终一致性。nsqlookupd 节点不会维持状态,也不会回答查询。

网络分区并不会影响可用性,分区的双方仍然能回答查询。部署性拓扑可以显著的减轻这类问题。

常见问题

部署

  • 有什么为 nsqd 推荐的拓扑结构?

    强烈推荐 nsqd生产消息的服务一起运行。

    nsqd 是一个相对轻量的进程,它能很好和其他进程协同运行。

    这个模式有利于结构化消息流为一个消费问题,而不是一个生产问题。

    另一个好处是它能将来自服务端的内容形成有效的独立,分享,简仓(silo)的数据。

    注意: 这并不是必须得要求,它只是能让事情简单些(参见下面的问题)。

  • 为什么不能用 nsqlookupd 来查询生产的内容给谁?

    NSQ 提升了消费端的发现模型,减轻了前期的配置负载(需要告诉所有消费者去那里找他们要的内容)。

    然而,它并没有提供任何方法来解决发布端将内容发布给谁。这是鸡和蛋的问题,在发布前并不存在内容。

    通过使用 nsqd ,你可以避开这个问题(你的服务只是简单的将内容发布给本地的 nsqd),并且允许 NSQ 实时发现系统正常运行。

  • 我只是想在某个节点上将 nsqd 作为一个工作队列来使用,有没有合适的例子?

    是的,nsqd 可以很好的单独运行。

    nsqlookupd 非常有利于大型分布式环境。

  • 我需要运行多少个 nsqlookupd ?

    依赖于集群的大小,nsqd 的节点数量,消费者,和你希望的容错能力。

    3 个或 5 个就可以非常好的服务于百级别的主机和千级的消费者。

    nsqlookupd 节点不需要回答查询。集群里的元数据是最终一致的。

发布

  • 是否需要客户端库来发布消息?

    不需要!使用 HTTP 节点来发布消息就好(/pub/mpub)。它简单,容易,在任意一个开发环境都可用。

    绝大多数人使用 HTTP 来发布 NSQ 部署。

  • 为什么强制客户端响应 TCP 协议 PUB 和 MPUB 命令?

    我们相信 NSQ 操作的默认模式必须安全优先,并且我们希望协议简单并完整。

  • 什么时候 PUB 或 MPUB 会失败?

    1. 话题(topic)的名字没有正确格式化(长度限制)。参见topic and channel name spec

    2. 消息过大(具体限制参见 nsqd 的参数)。

    3. 中间的话题(topic)被删除。

    4. nsqd 被清除。

    5. 发布的时候客户端产生连接失败

      (1) 和 (2) 是开发错误。(3) 和 (4) 很少见, (5) 是基于 TCP 协议都会遇到的问题。

  • 如何避免之前 (3) 出现的问题?

    删除话题(topic)是少见的操作。如果你想删除一个话题(topic),需要精确计算时间,确保删除后有充足的时间,发布的话题(topic)不会被执行。

设计和理论

  • 如何命名话题(topic)和通道(channel)?

    话题(topic)名需要描述在流中的数据。

    通道(channel)名需要描述消费者的工作类型。

    例如, 好的话题(topic)名 编码(encode), 解码(decode), api_请求(api_request),页面_视图 。好的通道(channel)名归档(archive), 分析_增长(analytics_increment),垃圾_分析(spam_analysis)

  • 一个 nsqd 最多能支持多少个话题(topic)和通道(channel)?

    没有内置的限制。它仅和 nsqd 所在的服务端的内存,CPU 限制有关(每个客户端 CPU 使用率已经大为改进了issue #236)。

  • 如何为集群声明一个新的话题(topic)?

    话题(topic)的第一个 PUBSUB ,将会在 nsqd 上创建一个话题(topic)。话题(topic)的元数据将会传播给 nsqlookupd 的配置。其他的读者将会通过周期性的查询 nsqlookupd 发现这个话题(topic)。

  • NSQ 能操作 RPC 吗?

是的,有这个可能性, 但是 NSQ 并不是为它设计的。

我们想发布一些文档说明它是如何结构化的,如果你感兴趣,可以来帮我们。

特定的 pynsq

  • 为什么强制我使用 Tornado?

    pynsq 初始设计的时候,就聚焦于消费端的库,并且 NSQ 协议和 Python 的异步架构非常类似(尤其和 NSQ 的面向推送协议)。

    Tornado 的 API 非常简单并且执行合理。

  • Tornado IOLoop 是否必须发布?

    不,nsqd 为了发布简单,暴露了 HTPP 端(/pub/mpub) 。

    不必担心 HTTP 的过载。同时,/mpub 通过批量发布,减少了 HTTP 的过载。

  • 那么什么时候使用 Writer?

    当高性能,低负载优先级比较高的时候。

    Writer 使用 TCP 协议里的 PUBMPUB 命令, 它们比 HTTP 负载更低。

  • 如果我就想”启动并忘记“将会发生什么(我能容忍消息丢失!)?

    使用 Writer 并且不给发布的方法指定回调。

    注意: 仅在简单的客户端代码有效, pynsq 场景必须处理 nsqd 的消息(比如,做这些事情不会导致性能提高)。

性能

分布式性能

主仓库包含一段代码(bench/bench.py),它能在 EC2 上自动完成分布式基准。

它引导 N 个节点,一些运行 nsqd,一些运行加载生成工具(PUBSUB),并分析它们的输出来提供聚合。

初始化

下面的代码反应了默认参数6 c3.2xlarge,这个实例支持 1g 比特的连接。3 个节点运行 nsqd 实例,剩下的运行 bench_reader (SUB) 和 bench_writer (PUB) 实例,来生成依赖于基准模式的负载。

1
2
3
4
5
6
7
8
9
10
$ ./bench/bench.py --access-key=... --secret-key=... --ssh-key-name=...
[I 140917 10:58:10 bench:102] launching 6 instances
[I 140917 10:58:12 bench:111] waiting for instances to launch...
...
[I 140917 10:58:37 bench:130] (1) bootstrapping ec2-54-160-145-64.compute-1.amazonaws.com (i-0a018ce1)
[I 140917 10:59:37 bench:130] (2) bootstrapping ec2-54-90-195-149.compute-1.amazonaws.com (i-0f018ce4)
[I 140917 11:00:00 bench:130] (3) bootstrapping ec2-23-22-236-55.compute-1.amazonaws.com (i-0e018ce5)
[I 140917 11:00:41 bench:130] (4) bootstrapping ec2-23-23-40-113.compute-1.amazonaws.com (i-0d018ce6)
[I 140917 11:01:10 bench:130] (5) bootstrapping ec2-54-226-180-44.compute-1.amazonaws.com (i-0c018ce7)
[I 140917 11:01:43 bench:130] (6) bootstrapping ec2-54-90-83-223.compute-1.amazonaws.com (i-10018cfb)

生产者吞吐量

这个基准仅反应了生产者吞吐量。消息体有 100 个字节,并且消息通过 3 个话题(topic)分布。

1
2
3
4
5
$ ./bench/bench.py --access-key=... --secret-key=... --ssh-key-name=... --mode=pub --msg-size=100 run
[I 140917 12:39:37 bench:140] launching nsqd on 3 host(s)
[I 140917 12:39:41 bench:163] launching 9 producer(s) on 3 host(s)
...
[I 140917 12:40:20 bench:248] [bench_writer] 10.002s - 197.463mb/s - 2070549.631ops/s - 4.830us/op

入口处 ~2.07mm msgs/sec,使用了 197mb/s 的带宽。

生产和消费吞吐量

通过服务生产者和消费者,这个基准更加准确的反应了实际情况。这个消息也是 100 个字节,并且通过 3 个话题(topic)分布,每个都包含一个 通道(channel)(每个 通道(channel) 24 个客户端)。

1
2
3
4
5
6
7
$ ./bench/bench.py --access-key=... --secret-key=... --ssh-key-name=... --msg-size=100 run
[I 140917 12:41:11 bench:140] launching nsqd on 3 host(s)
[I 140917 12:41:15 bench:163] launching 9 producer(s) on 3 host(s)
[I 140917 12:41:22 bench:186] launching 9 consumer(s) on 3 host(s)
...
[I 140917 12:41:55 bench:248] [bench_reader] 10.252s - 76.946mb/s - 806838.610ops/s - 12.706us/op
[I 140917 12:41:55 bench:248] [bench_writer] 10.030s - 80.315mb/s - 842149.615ops/s - 11.910us/op

入口处的 ~842k ~806k msgs/s, 合计消费带宽 156mb/s,我们已经尽力提升了 nsqd 节点的 CPU 处理能力。通过引入消费者,nsqd 需要维持每个 通道(channel),因此负载自然会高一点。

消费者的数量略微少于生产者,因为消费者发送2次命令(每个消息都要发送 FIN 命令)。

增加两个节点(一个是 nsqd 另一个是产生负载),达到了 1mm msgs/s:

1
2
3
4
5
6
7
$ ./bench/bench.py --access-key=... --secret-key=... --ssh-key-name=... --msg-size=100 run
[I 140917 13:38:28 bench:140] launching nsqd on 4 host(s)
[I 140917 13:38:32 bench:163] launching 16 producer(s) on 4 host(s)
[I 140917 13:38:43 bench:186] launching 16 consumer(s) on 4 host(s)
...
[I 140917 13:39:12 bench:248] [bench_reader] 10.561s - 100.956mb/s - 1058624.012ops/s - 9.976us/op
[I 140917 13:39:12 bench:248] [bench_writer] 10.023s - 105.898mb/s - 1110408.953ops/s - 9.026us/op

单个节点性能

声明:请牢记 NSQ 设计的初衷是分布式。单个节点的性能非常重要,但这并不是我们所追求的。

  • 2012 MacBook Air i7 2ghz
  • go1.2
  • NSQ v0.2.24
  • 200 byte messages

GOMAXPROCS=1 (单个生产者,单个消费者)

1
2
3
4
$ ./bench.sh 
results...
PUB: 2014/01/12 22:09:08 duration: 2.311925588s - 82.500mb/s - 432539.873ops/s - 2.312us/op
SUB: 2014/01/12 22:09:19 duration: 6.009749983s - 31.738mb/s - 166396.273ops/s - 6.010us/op

GOMAXPROCS=4 (4 个生产者, 4 个消费者)

1
2
3
4
$ ./bench.sh 
results...
PUB: 2014/01/13 16:58:05 duration: 1.411492441s - 135.130mb/s - 708469.965ops/s - 1.411us/op
SUB: 2014/01/13 16:58:16 duration: 5.251380583s - 36.321mb/s - 190426.114ops/s - 5.251us/op

设计

注意:可视化的演示参见 slide deck

NSQ 是继承于 simplequeue(部分的 simplequeue),因此被设计为(排名不分先后)

  • 提供更简单的拓扑方案,达到高可用性和消除单点故障
  • 满足更强的消息可靠传递的保证
  • 限制单个进程的内存占用(通过持久化一些消息到硬盘上)
  • 极大简化了生产者和消费者的配置要求
  • 提供了一个简单的升级路径
  • 提升效率

简化配置和管理

单个 nsqd 实例被设计成可以同时处理多个数据流。流被称为“话题”和话题有 1 个或多个“通道”。每个通道都接收到一个话题中所有消息的拷贝。在实践中,一个通道映射到下行服务消费一个话题.

话题和通道都没有预先配置。话题由第一次发布消息到命名的话题或第一次通过订阅一个命名话题来创建。通道被第一次订阅到指定的通道创建。

话题 和通道的所有缓冲的数据相互独立,防止缓慢消费者造成对其他通道的积压(同样适用于话题级别)。

一个通道一般会有多个客户端连接。假设所有已连接的客户端处于准备接收消息的状态,每个消息将被传递到一个随机的客户端。例如:

nsqd clients

总之,消息从话题->通道是多路传送的(每个通道接收的所有该话题消息的副本),即使均匀分布在通道->消费者之间(每个消费者收到该通道的消息的一部分)。

NSQ 还包括一个辅助应用程序,nsqlookupd,它提供了一个目录服务,消费者可以查找到提供他们感兴趣订阅话题的 nsqd 地址 。在配置方面,把消费者与生产者解耦开(它们都分别只需要知道哪里去连接 nsqlookupd 的共同实例,而不是对方),降低复杂性和维护。

在更底的层面,每个 nsqd 有一个与 nsqlookupd 的长期 TCP 连接,定期推动其状态。这个数据被 nsqlookupd 用于给消费者通知 nsqd 地址。对于消费者来说,一个暴露的 HTTP /lookup 接口用于轮询。

为话题引入一个新的消费者,只需启动一个配置了 nsqlookup 实例地址的 NSQ 客户端。无需为添加任何新的消费者或生产者更改配置,大大降低了开销和复杂性。

注:在将来的版本中,启发式 nsqlookupd 可以基于深度,已连接的客户端数量,或其他“智能”策略来返回地址。当前的实现是简单的返回所有地址。最终的目标是要确保所有深度接近零的生产者被读取。

值得注意的是,重要的是 nsqdnsqlookupd 守护进程被设计成独立运行,没有相互之间的沟通或协调。

我们还认为重要的是有一个方式来聚合查看,监测,并管理集群。我们建立 nsqadmin 做到这一点。它提供了一个 Web UI 来浏览 topics/channels/consumers 和深度检查每一层的关键统计数据。此外,它还支持几个管理命令例如,移除通道和清空通道(这是一个有用的工具,当在一个通道中的信息可以被安全地扔掉,以使深度返回到 0)。

nsqadmin

简单的升级路径

这是我们的高优先级之一。我们的生产系统处理大量的流量,都建立在我们现有的消息工具上,所以我们需要一种方法来慢慢地,有条不紊地升级我们特定部分的基础设施,而不产生任何影响。

首先,在消息生产者方面,我们建立 nsqd 匹配 simplequeue。具体来说,nsqd 暴露了一个 HTTP /PUT 端点,就像 simplequeue,上传二进制数据(需要注意的一点是 endpoint 需要一个额外的查询参数来指定”话题”)。想切换到发布消息到 nsqd 的服务只需要很少的代码变更。

第二,我们建立了兼容已有库功能和语义的 Python 和 Go 库。这使得消息的消费者通过很少的代码改变就可使用。所有的业务逻辑保持不变。

最后,我们建立工具连接起新旧组件。这些都在仓库的示例(examples)目录中:

  • nsq_pubsub - 在 NSQ 集群中以 HTTP 接口的形式暴露的一个 pubsub
  • nsq_to_file - 将一个给定话题的所有消息持久化到文件
  • nsq_to_http - 对一个话题的所有消息的执行 HTTP 请求到(多个)endpoints。

消除单点故障

NSQ被设计以分布的方式被使用。nsqd 客户端(通过 TCP )连接到指定话题的所有生产者实例。没有中间人,没有消息代理,也没有单点故障:

nsq clients

这种拓扑结构消除单链,聚合,反馈。相反,你的消费者直接访问所有生产者。从技术上讲,哪个客户端连接到哪个 NSQ 不重要,只要有足够的消费者连接到所有生产者,以满足大量的消息,保证所有东西最终将被处理。

对于 nsqlookupd,高可用性是通过运行多个实例来实现。他们不直接相互通信和数据被认为是最终一致。消费者轮询所有的配置的 nsqlookupd 实例和合并 response。失败的,无法访问的,或以其他方式故障的节点不会让系统陷于停顿。

消息传递担保

NSQ 保证消息将交付至少一次,虽然消息可能是重复的。消费者应该关注到这一点,删除重复数据或执行idempotent等操作

这个担保是作为协议和工作流的一部分,工作原理如下(假设客户端成功连接并订阅一个话题):

  1. 客户表示他们已经准备好接收消息
  2. NSQ 发送一条消息,并暂时将数据存储在本地(在 re-queue 或 timeout)
  3. 客户端回复 FIN(结束)或 REQ(重新排队)分别指示成功或失败。如果客户端没有回复, NSQ 会在设定的时间超时,自动重新排队消息

这确保了消息丢失唯一可能的情况是不正常结束 nsqd 进程。在这种情况下,这是在内存中的任何信息(或任何缓冲未刷新到磁盘)都将丢失。

如何防止消息丢失是最重要的,即使是这个意外情况可以得到缓解。一种解决方案是构成冗余 nsqd对(在不同的主机上)接收消息的相同部分的副本。因为你实现的消费者是幂等的,以两倍时间处理这些消息不会对下游造成影响,并使得系统能够承受任何单一节点故障而不会丢失信息。

附加的是 NSQ 提供构建基础以支持多种生产用例和持久化的可配置性。

限定内存占用

nsqd 提供一个 --mem-queue-size 配置选项,这将决定一个队列保存在内存中的消息数量。如果队列深度超过此阈值,消息将透明地写入磁盘。nsqd 进程的内存占用被限定于 --mem-queue-size * #of_channels_and_topics

message overflow

此外,一个精明的观察者可能会发现,这是一个方便的方式来获得更高的传递保证:把这个值设置的比较低(如 1 或甚至是 0)。磁盘支持的队列被设计为在不重启的情况下存在(虽然消息可能被传递两次)。

此外,涉及到信息传递保证,干净关机(通过给 nsqd 进程发送 TERM 信号)坚持安全地把消息保存在内存中,传输中,延迟,以及内部的各种缓冲区。

请注意,一个以 #ephemeral 结束的通道名称不会在超过 mem-queue-size 之后刷新到硬盘。这使得消费者并不需要订阅频道的消息担保。这些临时通道将在最后一个客户端断开连接后消失。

效率

NSQ 被设计成一个使用简单 size-prefixed 为前缀的,与“memcached-like”类似的命令协议。所有的消息数据被保持在核心中,包括像尝试次数、时间截等元数据类。这消除了数据从服务器到客户端来回拷贝,当重新排队消息时先前工具链的固有属性。这也简化了客户端,因为他们不再需要负责维护消息的状态。

此外,通过降低配置的复杂性,安装和开发的时间大大缩短(尤其是在有超过 > 1 消费者的话题)。

对于数据的协议,我们做了一个重要的设计决策,通过推送数据到客户端最大限度地提高性能和吞吐量的,而不是等待客户端拉数据。这个概念,我们称之为 RDY 状态,基本上是客户端流量控制的一种形式。

当客户端连接到 nsqd 和并订阅到一个通道时,它被放置在一个 RDY 为 0 状态。这意味着,还没有信息被发送到客户端。当客户端已准备好接收消息发送,更新它的命令 RDY 状态到它准备处理的数量,比如 100。无需任何额外的指令,当 100 条消息可用时,将被传递到客户端(服务器端为那个客户端每次递减 RDY 计数)。

客户端库的被设计成在 RDY 数达到配置 max-in-flight 的 25% 发送一个命令来更新 RDY 计数(并适当考虑连接到多个 nsqd 情况下,适当地分配)。

nsq protocol

这是一个重要的性能控制,使一些下游系统能够更轻松地批量处理信息,并从更高的 max-in-flight 中受益。

值得注意的是,因为它既是基于缓冲和推送来满足需要(通道)流的独立副本的能力,我们已经提供了行为像 simplequeue 和 pubsub 相结合的守护进程。这是简化我们的系统拓扑结构的强大工具,如上述讨论那样我们会维护传统的 toolchain。

Go

我们很早做了一个战略决策,利用 Go 来建立 NSQ 的核心。我们最近的博客上讲述我们在 bitly 如何使用 Go,并提到这个适合的项目-通过浏览那篇文章可能对理解我们如何重视这么语言有所帮助。

关于 NSQ ,Go channels(不要与 NSQ 通道混淆),并且内置并发性功能的语言的非常适合于的 nsqd的内部工作。我们充分利用缓冲的通道来管理我们在内存中的消息队列和无缝把溢出消息放到硬盘。

标准库让我们很容易地编写网络层和客户端代码。只需要付出很少的努力,来整合内置的内存和 CPU 剖析进行优化。我们还发现它易于单独测试组件,模拟类型接口,以迭代方式构建功能。

内幕

NSQ 由 3 个守护进程组成:

  • nsqd 是接收、队列和传送消息到客户端的守护进程。
  • nsqlookupd 是管理的拓扑信息,并提供了最终一致发现服务的守护进程。
  • nsqadmin 是一个 Web UI 来实时监控集群(和执行各种管理任务)。

在 NSQ 数据流建模为一个消息流和消费者的树。一个话题(topic)是一个独特的数据流。一个 通道(channel) 是消费者订阅了某个 话题 的逻辑分组。

topics/channels

单个 nsqd 可以有很多的话题,每个话题可以有多通道。一个通道接收到一个话题中所有消息的副本,启用组播方式的传输,使消息同时在每个通道的所有订阅用户间分发,从而实现负载平衡。

这些原语组成一个强大的框架,用于表示各种简单和复杂的拓扑结构

有关 NSQ 的设计的更多信息请参见设计文档

话题和通道

话题(topic)和通道(channel),NSQ 的核心基础,最能说明如何把 Go 语言的特点无缝地转化为系统设计。

Go 语言中的通道(channel)(为消除歧义以下简称为“go-chan”)是实现队列一种自然的方式,因此一个 NSQ 话题(topic)/通道(channel),其核心,只是一个缓冲的 go-chan Message指针。缓冲区的大小等于 --mem-queue-size 的配置参数。

在懂了读数据后,发布消息到一个话题(topic)的行为涉及到:

  1. 消息结构的初始化(和消息体的内存分配)
  2. 获取 话题(topic) 时的读-锁;
  3. 是否能发布的读-锁;
  4. 发布缓存的 go-chan

从一个话题中的通道获取消息不能依赖于经典的 go-chan 语义,因为多个 goroutines 在一个 go-chan 上接收消息将会分发消息,而最终要的结果是复制每个消息到每一个通道(goroutine)。

替代的是,每个话题维护着 3 个主要的 goroutines。第一个被称为 router,它负责用来从 incoming go-chan 读取最近发布的消息,并把消息保存到队列中(内存或硬盘)。

第二个,称为 messagePump,是负责复制和推送消息到如上所述的通道。

第三个是负责 DiskQueue IO 和将在后面讨论。

通道是一个有点复杂,但共享着 go-chan 单一输入和输出(抽象出来的事实是,在内部,消息可能会在内存或磁盘上):

queue goroutine

此外,每个通道的维护负责 2 个时间排序优先级队列,用来实现传输中(in-flight)消息超时(第 2 个随行 goroutines 用于监视它们)。

并行化的提高是通过每个数据结构管理一个通道,而不是依靠 Go 运行时的全局定时器调度。

注意:在内部,Go 运行时使用一个单一优先级队列和的 goroutine 来管理定时器。这支持(但不局限于)的整个 time package。它通常避免了需要一个用户空间的时间顺序的优先级队列,但要意识到这是一个很重要的一个有着单一锁的数据结构,有可能影响GOMAXPROCS > 1 的表现。

Backend / DiskQueue

NSQ 的设计目标之一就是要限定保持在内存中的消息数。它通过 DiskQueue 透明地将溢出的消息写入到磁盘上(对于一个话题或通道而言,DiskQueue 拥有的第三个主要的 goroutine)。

由于内存队列只是一个 go-chan,把消息放到内存中显得不重要,如果可能的话,则退回到磁盘:

1
2
3
4
5
6
7
8
9
10
for msg := range c.incomingMsgChan {
select {
case c.memoryMsgChan <- msg:
default:
err := WriteMessageToBackend(&msgBuf, msg, c.backend)
if err != nil {
// ... handle errors ...
}
}
}

说到 Go select 语句的优势在于用在短短的几行代码实现这个功能:default 语句只在 memoryMsgChan 已满的情况下执行。

NSQ 还具有的临时通道的概念。临时的通道将丢弃溢出的消息(而不是写入到磁盘),在没有客户端订阅时消失。这是一个完美的 Go’s Interface 案例。话题和通道有一个结构成员声明为一个 Backend interface,而不是一个具体的类型。正常的话题和通道使用 DiskQueue,而临时通道连接在 DummyBackendQueue中,它实现了一个 no-op 的Backend

降低 GC 的压力

在任何垃圾回收环境中,你可能会关注到吞吐量量(做无用功),延迟(响应),并驻留集大小(footprint)。

Go 的1.2版本,GC 采用,mark-and-sweep (parallel), non-generational, non-compacting, stop-the-world 和 mostly precise。这主要是因为剩余的工作未完成(它预定于Go 1.3 实现)。

Go 的 GC 一定会不断改进,但普遍的真理是:你创建的垃圾越少,收集的时间越少

首先,重要的是要了解 GC 在真实的工作负载下是如何表现。为此,nsqdstatsd 格式发布的 GC 统计(伴随着其他的内部指标)。nsqadmin 显示这些度量的图表,让您洞察 GC 的影响,频率和持续时间:

single node view

为了切实减少垃圾,你需要知道它是如何生成的。再次 Go toolchain 提供了答案:

  1. 使用 testing package 和 go test -benchmem 来 benchmark 热点代码路径。它分析每个迭代分配的内存数量(和 benchmark 运行可以用 benchcmp 进行比较)。
  2. 编译时使用 go build -gcflags -m,会输出逃逸分析的结果。

考虑到这一点,下面的优化证明对 nsqd 是有用的:

  1. 避免 []bytestring 的转换
  2. buffers 或 object 的重新利用(并且某一天可能面临 sync.Pool 又名 issue 4720
  3. 预先分配 slices(在 make 时指定容量)并且总是知道其中承载元素的数量和大小
  4. 对各种配置项目使用一些明智的限制(例如消息大小)
  5. 避免装箱(使用 interface{})或一些不必要的包装类型(例如一个多值的”go-chan” 结构体)
  6. 避免在热点代码路径使用 defer (它也消耗内存)

TCP 协议

NSQ 的 TCP 协议 protocol_spec 是一个这些 GC 优化概念发挥了很大作用的的例子。

该协议用含有长度前缀的帧构造,使其可以直接高效的编码和解码:

1
2
3
4
5
[x][x][x][x][x][x][x][x][x][x][x][x]...
| (int32) || (int32) || (binary)
| 4-byte || 4-byte || N-byte
------------------------------------...
size frame ID data

由于提前知道了帧部件的确切类型与大小,我们避免了 encoding/binary 便利 Read()Write() 包装(以及它们外部 interface 的查询与转换),而是直接调用相应的 binary.BigEndian 方法。

为了减少 socket 的 IO 系统调用,客户端 net.Conn 都用 bufio.Readerbufio.Writer 包装。Reader 暴露了 ReadSlice() ,它会重复使用其内部缓冲区。这几乎消除了从 socket 读出数据的内存分配,大大降低 GC 的压力。这可能是因为与大多数命令关联的数据不会被忽视(在边缘情况下,这是不正确的,数据是显示复制的)。

在一个更低的水平,提供一个 MessageID 被声明为 [16]byte,以便能够把它作为一个 map key(slice 不能被用作 map key)。然而,由于从 socket 读取数据存储为 []byte,而不是通过分配字符串键产生垃圾,并避免从 slice 的副本拷贝的数组形式的MessageIDunsafe package 是用来直接把 slice 转换成一个 MessageID

1
id := *(*nsq.MessageID)(unsafe.Pointer(&msgID))

: 这是一个 hack。它将不是必要的,如果编译器优 和 Issue 3512 解决这个问题。另外值得一读通过issue 5376,其中谈到的“const like” byte 类型 与 string 类型可以互换使用,而不需要分配和复制。

同样,Go 标准库只提供了一个数字转换成 string 的方法。为了避免 string 分配,nsqd 使用一个自定义的10进制转换方法在 []byte 直接操作。

这些看似微观优化,但却包含了 TCP 协议中一些最热门的代码路径。总体而言,每秒上万消息的速度,对分配和开销的数目显著影响:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
benchmark                    old ns/op    new ns/op    delta
BenchmarkProtocolV2Data 3575 1963 -45.09%

benchmark old ns/op new ns/op delta
BenchmarkProtocolV2Sub256 57964 14568 -74.87%
BenchmarkProtocolV2Sub512 58212 16193 -72.18%
BenchmarkProtocolV2Sub1k 58549 19490 -66.71%
BenchmarkProtocolV2Sub2k 63430 27840 -56.11%

benchmark old allocs new allocs delta
BenchmarkProtocolV2Sub256 56 39 -30.36%
BenchmarkProtocolV2Sub512 56 39 -30.36%
BenchmarkProtocolV2Sub1k 56 39 -30.36%
BenchmarkProtocolV2Sub2k 58 42 -27.59%

HTTP

NSQ 的 HTTP API 是建立在 Go 的 net/http 包之上。因为它只是 net/http,它可以利用没有特殊的客户端库的几乎所有现代编程环境。

它的简单性掩盖了它的能力,作为 Go 的 HTTP tool-chest 最有趣的方面之一是广泛的调试功能支持。该 net/http/pprof 包直接集成了原生的 HTTP 服务器,暴露获取 CPU,堆,goroutine 和操作系统线程性能的 endpoints。这些可以直接从 go tool 找到:

1
$ go tool pprof http://127.0.0.1:4151/debug/pprof/profile

这对调试和分析一个运行的进程非常有价值!

此外,/stats endpoint 返回的指标以任何 JSON 或良好格式的文本来呈现,很容易使管理员能够实时从命令行监控:

1
$ watch -n 0.5 'curl -s http://127.0.0.1:4151/stats | grep -v connected'

这产生的连续输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[page_views     ] depth: 0     be-depth: 0     msgs: 105525994 e2e%: 6.6s, 6.2s, 6.2s
[page_view_counter ] depth: 0 be-depth: 0 inflt: 432 def: 0 re-q: 34684 timeout: 34038 msgs: 105525994 e2e%: 5.1s, 5.1s, 4.6s
[realtime_score ] depth: 1828 be-depth: 0 inflt: 1368 def: 0 re-q: 25188 timeout: 11336 msgs: 105525994 e2e%: 9.0s, 9.0s, 7.8s
[variants_writer ] depth: 0 be-depth: 0 inflt: 592 def: 0 re-q: 37068 timeout: 37068 msgs: 105525994 e2e%: 8.2s, 8.2s, 8.2s

[poll_requests ] depth: 0 be-depth: 0 msgs: 11485060 e2e%: 167.5ms, 167.5ms, 138.1ms
[social_data_collector ] depth: 0 be-depth: 0 inflt: 2 def: 3 re-q: 7568 timeout: 402 msgs: 11485060 e2e%: 186.6ms, 186.6ms, 138.1ms

[social_data ] depth: 0 be-depth: 0 msgs: 60145188 e2e%: 199.0s, 199.0s, 199.0s
[events_writer ] depth: 0 be-depth: 0 inflt: 226 def: 0 re-q: 32584 timeout: 30542 msgs: 60145188 e2e%: 6.7s, 6.7s, 6.7s
[social_delta_counter ] depth: 17328 be-depth: 7327 inflt: 179 def: 1 re-q: 155843 timeout: 11514 msgs: 60145188 e2e%: 234.1s, 234.1s, 231.8s

[time_on_site_ticks] depth: 0 be-depth: 0 msgs: 35717814 e2e%: 0.0ns, 0.0ns, 0.0ns
[tail821042#ephemeral ] depth: 0 be-depth: 0 inflt: 0 def: 0 re-q: 0 timeout: 0 msgs: 33909699 e2e%: 0.0ns, 0.0ns, 0.0ns

最后,每个 Go release 版本带来可观的 HTTP 性能提升autobench。与 Go 的最新版本重新编译时,它总是很高兴为您提供免费的性能提升!

依赖

对于其它生态系统,Go 依赖关系管理(或缺乏)的哲学需要一点时间去适应。

NSQ 从一个单一的巨大仓库衍化而来的,包含相关的 imports 和小到未分离的内部 packages,完全遵守构建和依赖管理的最佳实践。

有两大流派的思想:

  1. Vendoring: 拷贝正确版本的依赖到你的应用程序的仓库,并修改您的 import 路径来引用本地副本。
  2. Virtual Env: 列出你在构建时所需要的依赖版本,产生一种原生的 GOPATH 环境变量包含这些固定依赖。

注: 这确实只适用于二进制包,因为它没有任何意义的一个导入的包,使中间的决定,如一种依赖使用的版本。

NSQ 使用 gpm 提供如上述2种的支持。

它的工作原理是在 Godeps 文件记录你的依赖,方便日后构建 GOPATH 环境。为了编译,它在环境里包装并执行的标准 Go toolchain。该 Godeps 文件仅仅是 JSON 格式,可以进行手工编辑。

测试

Go 提供了编写测试和基准测试的内建支持,这使用 Go 很容易并发操作进行建模,这是微不足道的建立起来的一个完整的实例 nsqd 到您的测试环境中。

然而,最初实现有可能变成测试问题的一个方面:全局状态。最明显的 offender 是运行时使用该持有 nsqd 的引用实例的全局变量,例如包含配置元数据和到 parent nsqd 的引用。

某些测试会使用短形式的变量赋值,无意中在局部范围掩盖这个全局变量,即 nsqd := NewNSQd(...) 。这意味着,全局引用没有指向了当前正在运行的实例,破坏了测试实例。

要解决这个问题,一个包含配置元数据和到 parent nsqd 的引用上下文结构被传来传去。到全局状态的所有引用都替换为本地的语境,允许 children(话题(topic),通道(channel),协议处理程序等)来安全地访问这些数据,使之更可靠的测试。

健壮性

一个面对不断变化的网络条件或突发事件不健壮的系统,不会是一个在分布式生产环境中表现良好的系统。

NSQ 设计和的方式是使系统能够容忍故障而表现出一致的,可预测的和令人吃惊的方式来实现。

总体理念是快速失败,把错误当作是致命的,并提供了一种方式来调试发生的任何问题。

但是,为了应对,你需要能够检测异常情况。

心跳和超时

NSQ 的 TCP 协议是面向 push 的。在建立连接,握手,和订阅后,消费者被放置在一个为 0 的 RDY 状态。当消费者准备好接收消息,它更新的 RDY 状态到准备接收消息的数量。NSQ 客户端库不断在幕后管理,消息控制流的结果。

每隔一段时间,nsqd 将发送一个心跳线连接。客户端可以配置心跳之间的间隔,但 nsqd 会期待一个回应在它发送下一个心掉之前。

组合应用级别的心跳和 RDY 状态,避免头阻塞现象,也可能使心跳无用(即,如果消费者是在后面的处理消息流的接收缓冲区中,操作系统将被填满,堵心跳)

为了保证进度,所有的网络 IO 时间上限势必与配置的心跳间隔相关联。这意味着,你可以从字面上拔掉之间的网络连接 nsqd 和消费者,它会检测并正确处理错误。

当检测到一个致命错误,客户端连接被强制关闭。在传输中的消息会超时而重新排队等待传递到另一个消费者。最后,错误会被记录并累计到各种内部指标。

管理 Goroutines

非常容易启动 goroutine。不幸的是,不是很容易以协调他们的清理工作。避免死锁也极具挑战性。大多数情况下这可以归结为一个顺序的问题,在上游 goroutine 发送消息到 go-chan 之前,另一个 goroutine 从 go-chan 上接收消息。

为什么要关心这些?这很显然,孤立的 goroutine 是内存泄漏。内存泄露在长期运行的守护进程中是相当糟糕的,尤其当期望的是你的进程能够稳定运行,但其它都失败了。

更复杂的是,一个典型的 nsqd 进程中有许多参与消息传递 goroutines。在内部,消息的“所有权”频繁变化。为了能够完全关闭,统计全部进程内的消息是非常重要的。

虽然目前还没有任何灵丹妙药,下列技术使它变得更轻松管理。

WaitGroups

sync 包提供了 sync.WaitGroup, 可以被用来累计多少个 goroutine 是活跃的(并且意味着一直等待直到它们退出)。

为了减少典型样板,nsqd 使用以下装饰器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
type WaitGroupWrapper struct {
sync.WaitGroup
}

func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1)
go func() {
cb()
w.Done()
}()
}

// can be used as follows:
wg := WaitGroupWrapper{}
wg.Wrap(func() { n.idPump() })
...
wg.Wait()
退出信号

有一个简单的方式在多个 child goroutine 中触发一个事件是提供一个 go-chane,当你准备好时关闭它。所有在那个 go-chan 上挂起的 go-chan 都将会被激活,而不是向每个 goroutine 中发送一个单独的信号。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func work() {
exitChan := make(chan int)
go task1(exitChan)
go task2(exitChan)
time.Sleep(5 * time.Second)
close(exitChan)
}
func task1(exitChan chan int) {
<-exitChan
log.Printf("task1 exiting")
}

func task2(exitChan chan int) {
<-exitChan
log.Printf("task2 exiting")
}
退出时的同步

实现一个可靠的,无死锁的,所有传递中的消息的退出路径是相当困难的。一些提示:

  1. 理想的情况是负责发送到 go-chan 的 goroutine 中也应负责关闭它。
  2. 如果 message 不能丢失,确保相关的 go-chan 被清空(尤其是无缓冲的!),以保证发送者可以取得进展。
  3. 另外,如果消息是不重要的,发送给一个单一的 go-chan 应转换为一个 select 附加一个退出信号(如上所述),以保证取得进展。
  4. 一般的顺序应该是
    1. 停止接受新的连接(close listeners)
    2. 发送退出信号给 c hild goroutines (如上文)
    3. WaitGroup 等待 goroutine 退出(如上文)
    4. 恢复缓冲数据
    5. 刷新所有东西到硬盘
日志

最后,日志是您所获得的记录 goroutine 进入和退出的重要工具!。这使得它相当容易识别造成死锁或泄漏的情况的罪魁祸首。

nsqd 日志行包括 goroutine 与他们的 siblings(and parent)的信息,如客户端的远程地址或话题(topic)/通道(channel)名。

该日志是详细的,但不是详细的日志是压倒性的。有一条细线,但 nsqd 倾向于发生故障时在日志中提供更多的信息,而不是试图减少繁琐的有效性为代价。