storm源代码之tuple是如何发送的
这篇文章我们来看一下storm里面的tuple
·
这篇文章我们来看一下storm里面的tuple到底是如何从一个bolt到另一个bolt上去的
首先bolt在发射一个tuple的时候是调用OutputCollector的emit或者emitDirect方法,而这两个方法最终调用的是clojure代码里面的mk-transfer-fn方法:
; worker.clj
(defn mk-transfer-fn [transfer-queue]
(fn [task ^Tuple tuple]
(.put ^LinkedBlockingQueue
transfer-queue [task tuple])
))
这个方法其实只是往一个LinkedBlockingQueue里面放入一条新纪录(task-id,tuple),然后这个queue里面的内容会被下面这段代码处理
; worker.clj
; 这里面的这个socket到底是什么东西?
(async-loop
(fn [^ArrayList drainer
^KryoTupleSerializer serializer]
; 从transfer-queue里面取出一个任务来
; 这个任务其实就是(task, tuple)
(let [felem (.take transfer-queue)]
(.add drainer felem)
(.drainTo transfer-queue drainer))
(read-locked endpoint-socket-lock
; 获取从node+port到socket的映射
(let [node+port->socket @node+port->socket
; 获取从task-id到node+port的映射
task->node+port @task->node+port]
(doseq [[task ^Tuple tuple] drainer]
; 获取task对应的socket
(let [socket
(node+port->socket
(task->node+port task))
; 序列化这个tuple
ser-tuple (.serialize serializer tuple)]
; 发送这个tuple
(msg/send socket task ser-tuple)
))
))
)
从上面代码可见,tuple最终是被序列化之后由msg/send方法通过socket发送给指定的task的。注意上面代码里面的async-loop表示会创建一个单独的线程来执行这些代码。可以storm会起一个独立线程来专门发送待发送的消息。
我们来看下这个socket到底是个怎样的东西。这个socket是在worker.clj里面被初始化的,代码如下:
; socket(worker.clj)
(swap! node+port->socket
merge
(into {}
(dofor
[[node port :as endpoint] new-connections]
[endpoint
(msg/connect
mq-context
((:node->host assignment) node)
port)
]
)))
最后,storm对于tuple的处理/创建过程:
1.bolt创建一个tuple
2.worker把tuple以及这个tuple要发送的地址(task-id)组成一个对象(task-id,tuple)放进待发送队列。(LinkedBlockingQueue)
3.一个单独的线程会取出发送队列中的每个tuple来处理
a>worker创建从当前task到目的task的netty连接
b>序列化这个tuple并且通过这个netty的连接来发送这个tuple
今天认识一种说法,当你有3台机器,多个topo,每个topo只有一个worker,每次启动一个topo,每次启动worker分布任务后悔重新排序可用的slot,可能会导致其他的机器一直分配不上worker,所有的worker都分布在当前机器上的情况。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献1条内容
所有评论(0)