nsq介绍

nsq 是一个基于 Go 语言的分布式实时消息平台,nsq 可用于大规模系统中的实时消息服务,并且每天能够处理数亿级别的消息,其设计目标是为在分布式环境下运行的去中心化服务提供一个强大的基础架构。

nsq 具有分布式、去中心化的拓扑结构,该结构具有无单点故障、故障容错、高可用性以及能够保证消息的可靠传递的特征。NSQ 非常容易配置和部署,且具有最大的灵活性,支持众多消息协议。另外,官方还提供了拆箱即用 Go 和 Python 库。如果读者兴趣构建自己的客户端的话,还可以参考官方提供的协议规范

nsq 是由四个重要组件构成:

  • nsqd:一个负责接收、排队、转发消息到客户端的守护进程
  • nsqlookupd:管理拓扑信息并提供最终一致性的发现服务的守护进程
  • nsqadmin:一套 Web 用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
  • utilities:常见基础功能、数据流处理工具,如 nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

nsq 的主要特点如下:

  • 具有分布式且无单点故障的拓扑结构 支持水平扩展,在无中断情况下能够无缝地添加集群节点
  • 低延迟的消息推送,参见官方提供的性能说明文档
  • 具有组合式的负载均衡和多播形式的消息路由
  • 既擅长处理面向流(高吞吐量)的工作负载,也擅长处理面向 Job 的(低吞吐量)工作负载
  • 消息数据既可以存储于内存中,也可以存储在磁盘中
  • 实现了生产者、消费者自动发现和消费者自动连接生产者,参见 nsqlookupd
  • 支持安全传输层协议(TLS),从而确保了消息传递的安全性
  • 具有与数据格式无关的消息结构,支持 JSON、Protocol Buffers、MsgPacek 等消息格式
  • 非常易于部署(几乎没有依赖)和配置(所有参数都可以通过命令行进行配置)
  • 使用了简单的 TCP 协议且具有多种语言的客户端功能库
  • 具有用于信息统计、管理员操作和实现生产者等的 HTTP 接口
  • 为实时检测集成了统计数据收集器 StatsD
  • 具有强大的集群管理界面,参见 nsqadmin

nsqlookupd(中心管理服务)

  1. nsqlookupd 是守护进程负责管理拓扑信息。客户端通过查询 nsqlookupd 来发现指定话题(topic)的生产者,并且 nsqd 节点广播话题(topic)和通道(channel)信息
  2. 简单的说 nsqlookupd 就是中心管理服务,它使用 tcp(默认端口 4160) 管理 nsqd 服务,使用 http(默认端口 4161) 管理 nsqadmin 服务。同时为客户端提供查询功能
  3. nsqlookupd 具有以下功能或特性
  • 唯一性,在一个 Nsq 服务中只有一个 nsqlookupd 服务。当然也可以在集群中部署多个 nsqlookupd,但它们之间是没有关联的
  • 去中心化,即使 nsqlookupd 崩溃,也会不影响正在运行的 nsqd 服务
  • 充当 nsqd 和 naqadmin 信息交互的中间件
  • 提供一个 http 查询服务,给客户端定时更新 nsqd 的地址目录

nsqadmin(展示数据)

  1. 是一套 WEB UI,用来汇集集群的实时统计,并执行不同的管理任务
  2. nsqadmin 具有以下功能或特性
  • 提供一个对 topic 和 channel 统一管理的操作界面以及各种实时监控数据的展示,界面设计的很简洁,操作也很简单
  • 展示所有 message 的数量,恩…. 装 X 利器
  • 能够在后台创建 topic 和 channel,这个应该不常用到
  • nsqadmin 的所有功能都必须依赖于 nsqlookupd,nsqadmin 只是向 nsqlookupd 传递用户操作并展示来自 nsqlookupd 的数据

nsqd (真正干活的)

  1. nsqd 是一个守护进程,负责接收,排队,投递消息给客户端
  2. 简单的说,真正干活的就是这个服务,它主要负责 message 的收发,队列的维护。nsqd 会默认监听一个 tcp 端口 (4150) 和一个 http 端口 (4151) 以及一个可选的 https 端口
  3. nsqd 具有以下功能或特性
  • 对订阅了同一个 topic,同一个 channel 的消费者使用负载均衡策略(不是轮询)
  • 只要 channel 存在,即使没有该 channel 的消费者,也会将生产者的 message 缓存到队列中(注意消息的过期处理)
  • 保证队列中的 message 至少会被消费一次,即使 nsqd 退出,也会将队列中的消息暂存磁盘上 (结束进程等意外情况除外)
  • 限定内存占用,能够配置 nsqd 中每个 channel 队列在内存中缓存的 message 数量,一旦超出,message 将被缓存到磁盘中
  • topic,channel 一旦建立,将会一直存在,要及时在管理台或者用代码清除无效的 topic 和 channel,避免资源的浪费

消费者

消费者有两种方式与 nsqd 建立连接

  • 消费者直连 nsqd,这是最简单的方式,缺点是 nsqd 服务无法实现动态伸缩了 (当然,自己去实现一个也是可以的)
  • 消费者通过 http 查询 nsqlookupd 获取该 nsqlookupd 上所有 nsqd 的连接地址,然后再分别和这些 nsqd 建立连接 (官方推荐的做法),但是客户端会不停的向 nsqlookupd 查询最新的 nsqd 地址目录 (不喜欢用 http 轮询这种方式…)

生产者

生产者必须直连 nsqd 去投递 message(网上说,可以连接到 nsqlookupd,让 nsqlookupd 自动选择一个 nsqd 去完成投递,但是我用 Producer 的 tcp 是连不上 nsqlookupd 的,不知道 http 可不可以…),

这里有一个问题就是如果生产者所连接的 nsqd 炸了,那么 message 就会投递失败,所以在客户端必须自己实现相应的备用方案

  • Producer 断线后不会重连,需要自己手动重连,Consumer 断线后会自动重连
  • Consumer 的重连时间配置项有两个功能 (这个设计必须吐槽一下,分开配置更好一点)
  • Consumer 检测到与 nsqd 的连接断开后,每隔 x 秒向 nsqd 请求重连
  • Consumer 每隔 x 秒,向 nsqlookud 进行 http 轮询,用来更新自己的 nsqd 地址目录
  • Consumer 的重连时间默认是 60s(… 菜都凉了),我改成了 1s
  • Consumer 可以同时接收不同 nsqd node 的同名 topic 数据,为了避免混淆,就必须在客户端进行处理
  • 在 AddConurrentHandlers 和 AddHandler 中设置的接口回调是在另外的 goroutine 中执行的
  • Producer 不能发布 (Publish) 空 message,否则会导致 panic