【openwrt】uloop_run过程分析
uloop_run负责处理如下任务:监控指定的fd(epoll实现)定时器处理子进程管理文章目录uloop_runuloop_process_timeoutsuloop_handle_processesuloop_get_next_timeoutuloop_run_eventsuloop_run_events(Ext)uloop_fd_stack_eventuloop_fetch_events参考
uloop_run
负责处理如下任务:
- 监控指定的fd(epoll实现)
- 定时器处理(维护一条定时器链表)
- 子进程管理(维护一条子进程链表)
文章目录
uloop_fd_add
#define ULOOP_MAX_EVENTS 10
static struct uloop_fd_event cur_fds[ULOOP_MAX_EVENTS];
struct uloop_fd
{
uloop_fd_handler cb;
int fd;
bool eof;
bool error;
bool registered;
uint8_t flags;
};
int uloop_fd_add(struct uloop_fd *sock, unsigned int flags)
{
unsigned int fl;
int ret;
if (!(flags & (ULOOP_READ | ULOOP_WRITE)))
return uloop_fd_delete(sock);
if (!sock->registered && !(flags & ULOOP_BLOCKING)) {
fl = fcntl(sock->fd, F_GETFL, 0);
fl |= O_NONBLOCK;
fcntl(sock->fd, F_SETFL, fl);
}
ret = register_poll(sock, flags);
if (ret < 0)
goto out;
sock->registered = true;
sock->eof = false;
sock->error = false;
out:
return ret;
}
1.flag
必须是ULOOP_READ
和ULOOP_WRITE
,或者其中之一,否则就从cur_fds[]
中删除此sock
。cur_fds[]
是一个struct uloop_fd_event
类型的全局数组,用于保存epoll
监听就绪的fd
2.如果是非阻塞,需要调用fcntl
设置fd
的flag为O_NONBLOCK
3.register_poll
将需要监听的fd添加到epoll
4.置位sock
相关标志位
register_poll
static int register_poll(struct uloop_fd *fd, unsigned int flags)
{
struct epoll_event ev;
int op = fd->registered ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
memset(&ev, 0, sizeof(struct epoll_event));
if (flags & ULOOP_READ)
ev.events |= EPOLLIN | EPOLLRDHUP;
if (flags & ULOOP_WRITE)
ev.events |= EPOLLOUT;
if (flags & ULOOP_EDGE_TRIGGER)
ev.events |= EPOLLET;
ev.data.ptr = fd;
fd->flags = flags;
return epoll_ctl(poll_fd, op, fd->fd, &ev);
}
1.register_poll
会调用epoll_ctl
对fd
进行控制
2.ev.data.ptr = fd
传入epoll_ctl
的是struct uloop_fd*
类型的指针,而不是fd->fd
.后面epoll_wait
返回后,从就绪的event
中取到的也是struct uloop_fd*
类型的指针.
ubus_add_uloop
static inline void ubus_add_uloop(struct ubus_context *ctx)
{
uloop_fd_add(&ctx->sock, ULOOP_BLOCKING | ULOOP_READ);
}
1.ubus_add_uloop
是对uloop_fd_add
的封装,ubus_add_uloop
是添加指定ctx->sock
到epoll
,只关心ULOOP_READ
事件,并且是阻塞的。
uloop_run
static inline int uloop_run(void)
{
return uloop_run_timeout(-1);
}
int uloop_run_timeout(int timeout)
{
int next_time = 0;
struct timeval tv;
uloop_run_depth++;
uloop_status = 0;
uloop_cancelled = false;
while (!uloop_cancelled)
{
uloop_gettime(&tv);
uloop_process_timeouts(&tv);
if (do_sigchld)
uloop_handle_processes();
if (uloop_cancelled)
break;
uloop_gettime(&tv);
next_time = uloop_get_next_timeout(&tv);
if (timeout >= 0 && timeout < next_time)
next_time = timeout;
uloop_run_events(next_time);
}
--uloop_run_depth;
return uloop_status;
}
1.uloop_run
调用uloop_run_timeout
,参数-1
表示永不超时
2.uloop_cancelled
是个全局变量,控制uloop_run
循环,当uloop_cancelled = true
时,uloop_run
退出循环,uloop_end()
函数就是将此值设为true
让uloop
结束。
3.uloop_gettime
获取当前时间
4.uloop_process_timeouts
处理定时器事件,如果超时,会执行对应的回调函数
5.uloop_handle_processes
处理sigchld
信号事件
6.uloop_gettime
再次获取当前时间
7.uloop_get_next_timeout
获取下一个定时器的超时时间,如果已经超时,则返回0,同时这个时间会作为epoll_wait
的超时时间
8.由于timeout=-1
,所以next_time = timeout
永远不会执行
9.uloop_run_events
处理epoll_wait
事件
uloop_process_timeouts
static void uloop_process_timeouts(struct timeval *tv)
{
struct uloop_timeout *t;
while (!list_empty(&timeouts)) {
t = list_first_entry(&timeouts, struct uloop_timeout, list);
if (tv_diff(&t->time, tv) > 0)
break;
uloop_timeout_cancel(t);
if (t->cb)
t->cb(t);
}
}
1.list_first_entry
取出定时器链表第一个定时器,判断其是否超时,未超时则直接退出,若已超时,执行其回调函数
uloop_handle_processes
(to be continued…)
uloop_get_next_timeout
static int uloop_get_next_timeout(struct timeval *tv)
{
struct uloop_timeout *timeout;
int diff;
if (list_empty(&timeouts))
return -1;
timeout = list_first_entry(&timeouts, struct uloop_timeout, list);
diff = tv_diff(&timeout->time, tv);
if (diff < 0)
return 0;
return diff;
}
1.list_first_entry
取出定时器链表第一个定时器,判断是否超时,如果已超时则返回0,否则返回超时时间(还有多久会超时),然后将这个时间传入uloop_run_events
,作为epoll_wait
的超时时间
2.如果定时器链表为空,则返回-1,epoll_wait
就会无限阻塞。
uloop_run_events
static struct uloop_fd_event cur_fds[ULOOP_MAX_EVENTS];
static int cur_fd, cur_nfds;
static void uloop_run_events(int timeout)
{
struct uloop_fd_event *cur;
struct uloop_fd *fd;
if (!cur_nfds) {
cur_fd = 0;
cur_nfds = uloop_fetch_events(timeout);
if (cur_nfds < 0)
cur_nfds = 0;
}
while (cur_nfds > 0) {
struct uloop_fd_stack stack_cur;
unsigned int events;
cur = &cur_fds[cur_fd++];
cur_nfds--;
fd = cur->fd;
events = cur->events;
if (!fd)
continue;
if (!fd->cb)
continue;
if (uloop_fd_stack_event(fd, cur->events))
continue;
stack_cur.next = fd_stack;
stack_cur.fd = fd;
fd_stack = &stack_cur;
do {
stack_cur.events = 0;
fd->cb(fd, events);
events = stack_cur.events & ULOOP_EVENT_MASK;
} while (stack_cur.fd && events);
fd_stack = stack_cur.next;
return;
}
}
1.uloop_fetch_events
通过epoll_wait
获取已经就绪的fd,全局变量cur_nfds
是当前就绪(epoll)的fd个数,所有就绪的event
已经存放在cur_fds[]
结构体数组中。
2.全局变量cur_fd
是cur_fds[]
数组成员下标
3.取出每一个就绪的event
,执行对应fd->cb
回调,然后退出
4.这里并不是一次性处理完所有的event
,而是每进入此函数一次,处理一个event
,这样做的目的是uloop_run
还要处理定时器等其他事件。
5.下一次进入uloop_run_events
时,如果cur_nfds
不为0,说明还有未处理完的事件,会接着处理,而不会请求新的就绪事件。
6.stack_cur
和uloop_fd_stack_event
是为了处理递归调用的情况,将放在拓展部分介绍,这里只介绍常规流程。
uloop_run_events(Ext)
以下是libubox
中针对uloop_run_events
的一个patch,它是这样描述的:
uloop: fix corner cases with recursive uloop_run calls
With multiple recursive calls to uloop_run, the callback for the same fd
can be run multiple times from different levels in the stack.
Prevent this by tracking the stack of uloop_fd callbacks and buffering new
incoming events for fds already on the stack.
部分修改如下,完整patch请参考:fix corner cases with recursive uloop_run calls
--- a/uloop.c
+++ b/uloop.c
@@ -43,6 +43,14 @@ struct uloop_fd_event {
unsigned int events;
};
+struct uloop_fd_stack {
+ struct uloop_fd_stack *next;
+ struct uloop_fd *fd;
+ unsigned int events;
+};
+
+static struct uloop_fd_stack *fd_stack = NULL;
+
#define ULOOP_MAX_EVENTS 10
static struct list_head timeouts = LIST_HEAD_INIT(timeouts);
@@ -285,6 +293,32 @@ static int uloop_fetch_events(int timeout)
#endif
+static bool uloop_fd_stack_event(struct uloop_fd *fd, int events)
+{
+ struct uloop_fd_stack *cur;
+
+ /*
+ * Do not buffer events for level-triggered fds, they will keep firing.
+ * Caller needs to take care of recursion issues.
+ */
+ if (!(fd->flags & ULOOP_EDGE_TRIGGER))
+ return false;
+
+ for (cur = fd_stack; cur; cur = cur->next) {
+ if (cur->fd != fd)
+ continue;
+
+ if (events < 0)
+ cur->fd = NULL;
+ else
+ cur->events |= events | ULOOP_EVENT_BUFFERED;
+
+ return true;
+ }
+
+ return false;
+}
+
static void uloop_run_events(int timeout)
{
struct uloop_fd_event *cur;
@@ -298,17 +332,33 @@ static void uloop_run_events(int timeout)
}
while (cur_nfds > 0) {
+ struct uloop_fd_stack stack_cur;
+ unsigned int events;
+
cur = &cur_fds[cur_fd++];
cur_nfds--;
fd = cur->fd;
+ events = cur->events;
if (!fd)
continue;
if (!fd->cb)
continue;
- fd->cb(fd, cur->events);
+ if (uloop_fd_stack_event(fd, cur->events))
+ continue;
+
+ stack_cur.next = fd_stack;
+ stack_cur.fd = fd;
+ fd_stack = &stack_cur;
+ do {
+ stack_cur.events = 0;
+ fd->cb(fd, events);
+ events = stack_cur.events & ULOOP_EVENT_MASK;
+ } while (stack_cur.fd && events);
+ fd_stack = stack_cur.next;
+
return;
}
}
@@ -352,6 +402,7 @@ int uloop_fd_delete(struct uloop_fd *fd)
cur_fds[cur_fd + i].fd = NULL;
}
fd->registered = false;
+ uloop_fd_stack_event(fd, -1);
return __uloop_fd_delete(fd);
}
从patch可以看出,在这条提交之前,uloop_run_events
函数是这样的:
static void uloop_run_events(int timeout)
{
struct uloop_fd_event *cur;
struct uloop_fd *fd;
if (!cur_nfds) {
cur_fd = 0;
cur_nfds = uloop_fetch_events(timeout);
if (cur_nfds < 0)
cur_nfds = 0;
}
while (cur_nfds > 0) {
cur = &cur_fds[cur_fd++];
cur_nfds--;
fd = cur->fd;
if (!fd)
continue;
if (!fd->cb)
continue;
fd->cb(fd, cur->events);
return;
}
}
似乎这更符合我们的正常思维:epoll_wait
成功返回之后,依次执行就绪fd->cb
。
但是可能存在uloop_run_events
被递归调用的情况,那么fd->cb
就会被执行多次,当然这是一种极端情况,一般不会遇到。
uloop_fd_stack_event
struct uloop_fd_stack {
struct uloop_fd_stack *next;
struct uloop_fd *fd;
unsigned int events;
};
static bool uloop_fd_stack_event(struct uloop_fd *fd, int events)
{
struct uloop_fd_stack *cur;
/*
* Do not buffer events for level-triggered fds, they will keep firing.
* Caller needs to take care of recursion issues.
*/
if (!(fd->flags & ULOOP_EDGE_TRIGGER))
return false;
for (cur = fd_stack; cur; cur = cur->next) {
if (cur->fd != fd)
continue;
if (events < 0)
cur->fd = NULL;
else
cur->events |= events | ULOOP_EVENT_BUFFERED;
return true;
}
return false;
}
1.uloop_fd_stack
是存储uloop_fd_event
的栈
2.fd_stack
是全局变量
3.如果fd
是水平触发,返回false
4.遍历fd_stack
,如果和传入的fd不相等,说明是未经处理的fd,进行下一次循环;如果相等,标记此fd对应的events
为ULOOP_EVENT_BUFFERED
状态,表明是缓存状态。
再次回顾最新的uloop_run_events()
struct uloop_fd_stack stack_cur;
unsigned int events;
cur = &cur_fds[cur_fd++];
cur_nfds--;
fd = cur->fd;
events = cur->events;
if (!fd)
continue;
if (!fd->cb)
continue;
if (uloop_fd_stack_event(fd, cur->events))
continue;
stack_cur.next = fd_stack;
stack_cur.fd = fd;
fd_stack = &stack_cur;
do {
stack_cur.events = 0;
fd->cb(fd, events);
events = stack_cur.events & ULOOP_EVENT_MASK;
} while (stack_cur.fd && events);
fd_stack = stack_cur.next;
1.stack_cur
是当前的栈缓存
2.stack_cur.next
指向fd_stack
,此时的fd_stack
是空的,stack_cur.fd=fd
将当前处理的fd保存到stack_cur
,再将fd_stack
指向stack_cur
3.将stack_cur.events = 0
,然后执行fd->cb
,执行结束后stack_cur.events
应该还是0,所以下面的while条件不成立。
4.最后将fd_stack
指向stack_cur.next
目前来看这个stack_cur
和fd_stack
没有任何意义,现在考虑极端情况:
1.现假设fd->cb
中调用了uloop_run_events()
,这时候就会出现递归调用的情况,当再次运行到uloop_fd_stack_event()
时,此时的fd_stack
不是空的,里面保存着上一次未处理完的fd
,如果这一次传入的fd
和上一次的一样,说明同样的fd
已经在处理,不需要再次处理,uloop_fd_stack_event()
会返回true
,循环结束。
2.如果没有这个栈缓存机制,那么当出现递归调用时,相同的fd->cb
会被执行两次。
uloop_fetch_events
static int uloop_fetch_events(int timeout)
{
int n, nfds;
nfds = epoll_wait(poll_fd, events, ARRAY_SIZE(events), timeout);
for (n = 0; n < nfds; ++n) {
struct uloop_fd_event *cur = &cur_fds[n];
struct uloop_fd *u = events[n].data.ptr;
unsigned int ev = 0;
cur->fd = u;
if (!u)
continue;
if (events[n].events & (EPOLLERR|EPOLLHUP)) {
u->error = true;
if (!(u->flags & ULOOP_ERROR_CB))
uloop_fd_delete(u);
}
if(!(events[n].events & (EPOLLRDHUP|EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP))) {
cur->fd = NULL;
continue;
}
if(events[n].events & EPOLLRDHUP)
u->eof = true;
if(events[n].events & EPOLLIN)
ev |= ULOOP_READ;
if(events[n].events & EPOLLOUT)
ev |= ULOOP_WRITE;
cur->events = ev;
}
return nfds;
}
1.epoll_wait
等待监听的fd就绪,timeout
是上文提到的uloop_get_next_timeout
返回值,这样做的目的也是因为uloop_run
还要处理定时器等其他事件。
2.fd
就绪后,epoll_wait
会返回就绪的fd数量,同时从event
数组里取出就绪fd
的私有数据(正常情况下我们取的是就绪fd,这里是私有数据),这个私有数据存的就是struct uloop_fd
类型的结构体
3.cur = &cur_fds[n]
利用指针cur
操作数组
4.将uloop_fd
赋值给cur->fd
5.根据events[n].events
填充cur->events
,EPOLLRDHUP
事件代表对端断开连接,需要置位u->eof = true
6.每一个就绪的fd都对应cur_fds[]
的一个成员,最后返回就绪的fd个数nfds
参考
openwrt libubox
OpenWRT procd 进程初始化分析4 libubox一些细节与问题
libubox
uloop: fix corner cases with recursive uloop_run calls
wiki-epoll
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)