一、前言

1.1 基本介绍

NNG/nanomsg 是最近项目上使用到的一个通信库,用来实现进程间过程调用和线程间通信,很是方便。

NNG 是 nanomsg 的继任版本,而 nanomsg 则是流行的 ZMQ (一个简单好用的传输层,像框架一样的一个 socket library)的 C 重写版。

NNG 将通信使用的协议和传输分离,同一个协议可以工作在不同的传输层上,类似与 TCP/IP 的应用层和传输层的分层,同时接口上屏蔽了底层细节,统一用字符串 URL 来描述传输模式。这样当使用场景修改时,可以通过简单修改 URL 来实现适应,极具灵活性。

同时如 NNG 描述所言 “light-weight brokerless messaging”,NNG 中的通信各方是不需要第三方程序介入的,这与 MQTT/Redis 通信需要服务器不同。这样很适合作为通信库来使用而没有其他依赖。

1.2 通讯协议

  • PAIR 一对一双向通信。
  • PIPELINE(PUSH/PULL) 单向通信,类似与生产者消费者模型的消息队列。
  • PUB/SUB 单向广播。
  • REQ/REP 请求-应答模式,类似与 RPC 模式。
  • BUS 网状连接通信,每个加入节点都可以发送/接受广播消息。
  • SURVEY 用于多节点表决或者服务发现。

1.3 传输模式

  • inproc 进程内线程间传输
  • ipc 主机内进程间传输
  • tcp 网络内主机间传输

1.4 通讯模式

通信协议里除了 PAIR 之外,基本都是一对多的通信模式,这点需要注意,以 PIPELINE 和 PUB/SUB 为例:

  1. PIPELINE 的 PUSH 端是 client,一个 PUSH 可以连接多个 PULL 端,发送数据时会选择其中一个可用的发送;PULL 端是 server,一个 PULL 可以接收多个 PUSH 连接和数据。
  2. PUB/SUB 的 SUB 端是 client,一个 SUB 可以连接多个不同的 PUB 端,接收多个 PUB 端广播的数据;PUB 端是 server,一个 PUB 可以接收多个 SUB 连接并广播数据。

基于以上,多个程序是没办法共用一个 PUB/SUB 通道来广播数据的,这与 ROS 里的 topic 和 LCM 中的 channel 模式不同。如果要实现类似功能,则可以使用 PIPELINE + PUB/SUB 来处理:

  • 独立一个话题发布的程序,拥有一个 PULL 和 PUB。
  • PULL 约定一个 URL,所有需要发布该话题的程序都 PUSH 数据到该 URL 上。
  • PUB 约定一个 URL,所有需要获取该话题的程序都 SUB 到该 URL 上。
  • 程序内部循环将 PULL 读取的数据发送到 PUB 上。

以上则可以模拟出 ROS topic 数据合并 或者 LCM 中 channel 的类似功能。

整体上看,NNG 的 API 很简约,主要是 4 个,open/recv/send/close,open 根据协议不同使用的函数会不同。配置则是 setopt/getopt,与 UNIX API 类似。API 中没有上下文环境(context-less)依赖,只需要一个 nng_socket,这种设计和实现方法值得去学习一下(初步揣测应该是使用指针值作为handle,如果要强制编译器做类型检测,则会套上一层 struct,如 typedef struct { _nng_xxx_socket * p } nng_socket;)。

NNG 协议基本上囊括了常见的通信需求,一些特殊的需求,也可以通过组合协议来实现,比如上面的模拟 ROS topic 或者 LCM channel 的方法。这样一来,如果在程序中使用 NNG,不管是多进程,还是多线程,通过设计,可以进一步增强模块化,同时不乏灵活性。如果环境变化,程序不管是由多进程改成多线程,还是由多线程改成多主机,都很容易实现。

常见模块/进程/线程间通信,可以依据具体需求来使用 PIPELINE(消息队列) 还是 REQ/REP(过程调用),而不是锁+全局变量,每个模块单元只需要做单一相关的具体事务,无需知晓全局状态。

1.5 代码结构

nng.h:

nng对外暴露的 api 接口

transport.h:

通信层定义,主要是为了暴露给用户以实现扩展,但目前包含了utils下的相关头文件,其中inproc.h/ipc.h/tcp.h是对应的transport

protocol.h:

协议层定义,也是为了暴露给用户以实现扩展,其中reqrep.h/pubsub.h/bus.h/pair.h/pipeline.h/survey.h是对应的protocol

utils/:

实用工具包,包含基本数据结构(list/queue/hash)、互斥及原子操作(mutex/atomic)等

transports/:

通信层实现,包括(inproc:进程内通信;ipc:进程间通信;tcp:tcp通信)

protocols/:

协议层实现,包括(REQREP:请求响应;PUBSUB:订阅发布等)

core/:

通用代码

aio/:

线程池模拟的异步操作,带状态机的事件驱动等

二 结构介绍

2.1 nng_aio

一个异步 I/O 句柄。这个 aio 结构的细节是 AIO 框架私有的。该结构具有公共名称 (nng_aio),以便我们最大限度地减少公共 API 命名空间中的污染。 AIO 框架之外的任何东西访问这些成员中的任何一个都是一个编码错误——这里提供定义是为了方便内联,但这应该是唯一的用途。

2.2 nni_id_map

我们发现我们经常希望有一个由数字 ID 列出的事物列表,它通常是单调递增的。这通常是管道 ID。为了帮助保持这些事物的集合由它们的 ID(可能从一个非常大的值开始)索引,我们提供了一个哈希表。哈希表使用开放寻址,但我们使用更好的探针(取自 Python)以避免命中相同的位置。我们的哈希算法只是低位,我们使用的表大小是 2 的幂。请注意,散列项必须为非 NULL。该表受内部锁保护。

三、数据传输

3.1 发送数据

 

nng_sendmsg

nng_aio_set_timeout

nng_aio_set_msg

nng_send_aio

nni_aio_get_msg

nni_sock_find

nni_sock_send --> sock_send

nni_sock_rele

nng_aio_wait

nng_aio_result

3.2 接收数据

 

nng_recvmsg

nng_aio_set_timeout

nng_recv_aio

nni_sock_find

nni_sock_recv --> sock_recv

nni_sock_rele

nng_aio_wait

nng_aio_result

nng_aio_free

四、AIO

4.1 AIO 状态

AIO 结构可以携带最多 4 个不同的输入值,最多 4 个不同的输出值,以及最多 4 个不同的“私有状态”值。 输入和输出的含义由被调用的 I/O 函数决定。

 

typedef enum {

NNG_INIT_RECV = 0,

NNG_RECV_RET_SEND,

NNG_SEND_RET_RECV,

NNG_RECV_RET_RECV,

} nng_aio_state_t;

4.2 AIO 介绍

AIO 只能由调用者“完成”,调用者必须调用 nni_aio_finish 。在发生这种情况之前,调用者保证 AIO 有效。调用者必须保证一个 AIO 将“完成”(通过调用 nni_aio_finish )。

请注意,取消例程可能会被框架多次调用。框架(或消费者)保证 AIO 将在这些调用中保持有效,以便提供者可以自由地检查 aio 的列表成员资格等。但是提供者不能多次调用完成。

nni_aio_lk 用于保护 AIO 上的标志以及 AIO 上的过期列表。 如果到期未完成,我们将不允许将 AIO 标记为已完成。

为了与过期同步,我们将 aio 记录为过期,并在销毁它之前等待该记录被清除(或至少不等于 aio)。

aio 框架与 taskq 框架紧密结合。当调用者将 aio 标记为开始(使用 nni_aio_begin)时,我们为 aio“准备”任务,并将任务标记为忙碌。然后,当我们想知道操作本身是否完成时,我们所要做的就是等待任务完成(忙碌标志被清除)。

为了防止在拆卸期间 aio 重用,我们设置了 a_stop 标志。在该点之后为新操作初始化的任何尝试都将失败,并且调用者将获得 NNG_ECANCELED 指示这一点。调用 nni_aio_begin() 的提供者必须检查返回值,如果返回非零值 (NNG_ECANCELED),那么它必须简单地丢弃请求并返回。

调用 nni_aio_wait 等待当前未完成的操作完成,但不会阻止另一个操作在同一个 aio 上启动。要同步停止 aio 并防止在其上启动任何进一步的操作,请调用 nni_aio_stop。为了防止操作开始,而无需等待任何现有操作完成,请调用 nni_aio_close。

在某些地方,我们想检查 aio 是否未使用。从技术上讲,如果这些检查通过,那么它们就不需要用锁来完成,因为调用者应该拥有对它们的唯一引用。然而,竞争检测器不一定知道这个语义,并且可能会抱怨潜在的数据竞争。要抑制误报,请定义 NNG_RACE_DETECTOR。注意这会导致获取额外的锁,影响性能,所以不要在生产中使用它。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐