uloop_run负责处理如下任务:

  1. 监控指定的fd(epoll实现)
  2. 定时器处理(维护一条定时器链表)
  3. 子进程管理(维护一条子进程链表)

在这里插入图片描述

uloop_fd_add

#define ULOOP_MAX_EVENTS 10
static struct uloop_fd_event cur_fds[ULOOP_MAX_EVENTS];
struct uloop_fd
{
	uloop_fd_handler cb;
	int fd;
	bool eof;
	bool error;
	bool registered;
	uint8_t flags;
};
int uloop_fd_add(struct uloop_fd *sock, unsigned int flags)
{
	unsigned int fl;
	int ret;

	if (!(flags & (ULOOP_READ | ULOOP_WRITE)))
		return uloop_fd_delete(sock);

	if (!sock->registered && !(flags & ULOOP_BLOCKING)) {
		fl = fcntl(sock->fd, F_GETFL, 0);
		fl |= O_NONBLOCK;
		fcntl(sock->fd, F_SETFL, fl);
	}

	ret = register_poll(sock, flags);
	if (ret < 0)
		goto out;

	sock->registered = true;
	sock->eof = false;
	sock->error = false;

out:
	return ret;
}

1.flag必须是ULOOP_READULOOP_WRITE,或者其中之一,否则就从cur_fds[]中删除此sockcur_fds[]是一个struct uloop_fd_event类型的全局数组,用于保存epoll监听就绪的fd
2.如果是非阻塞,需要调用fcntl设置fd的flag为O_NONBLOCK
3.register_poll将需要监听的fd添加到epoll
4.置位sock相关标志位

register_poll

static int register_poll(struct uloop_fd *fd, unsigned int flags)
{
	struct epoll_event ev;
	int op = fd->registered ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;

	memset(&ev, 0, sizeof(struct epoll_event));

	if (flags & ULOOP_READ)
		ev.events |= EPOLLIN | EPOLLRDHUP;

	if (flags & ULOOP_WRITE)
		ev.events |= EPOLLOUT;

	if (flags & ULOOP_EDGE_TRIGGER)
		ev.events |= EPOLLET;

	ev.data.ptr = fd;
	fd->flags = flags;

	return epoll_ctl(poll_fd, op, fd->fd, &ev);
}

1.register_poll会调用epoll_ctlfd进行控制
2.ev.data.ptr = fd传入epoll_ctl的是struct uloop_fd*类型的指针,而不是fd->fd.后面epoll_wait返回后,从就绪的event中取到的也是struct uloop_fd*类型的指针.

ubus_add_uloop

static inline void ubus_add_uloop(struct ubus_context *ctx)
{
	uloop_fd_add(&ctx->sock, ULOOP_BLOCKING | ULOOP_READ);
}

1.ubus_add_uloop是对uloop_fd_add的封装,ubus_add_uloop是添加指定ctx->sockepoll,只关心ULOOP_READ事件,并且是阻塞的。

uloop_run

static inline int uloop_run(void)
{
	return uloop_run_timeout(-1);
}

int uloop_run_timeout(int timeout)
{
	int next_time = 0;
	struct timeval tv;

	uloop_run_depth++;

	uloop_status = 0;
	uloop_cancelled = false;
	while (!uloop_cancelled)
	{
		uloop_gettime(&tv);
		uloop_process_timeouts(&tv);

		if (do_sigchld)
			uloop_handle_processes();

		if (uloop_cancelled)
			break;

		uloop_gettime(&tv);

		next_time = uloop_get_next_timeout(&tv);
		if (timeout >= 0 && timeout < next_time)
			next_time = timeout;
		uloop_run_events(next_time);
	}

	--uloop_run_depth;

	return uloop_status;
}

1.uloop_run调用uloop_run_timeout,参数-1表示永不超时
2.uloop_cancelled是个全局变量,控制uloop_run循环,当uloop_cancelled = true时,uloop_run退出循环,uloop_end()函数就是将此值设为trueuloop结束。
3.uloop_gettime获取当前时间
4.uloop_process_timeouts处理定时器事件,如果超时,会执行对应的回调函数
5.uloop_handle_processes处理sigchld信号事件
6.uloop_gettime再次获取当前时间
7.uloop_get_next_timeout获取下一个定时器的超时时间,如果已经超时,则返回0,同时这个时间会作为epoll_wait的超时时间
8.由于timeout=-1,所以next_time = timeout永远不会执行
9.uloop_run_events处理epoll_wait事件

uloop_process_timeouts

static void uloop_process_timeouts(struct timeval *tv)
{
	struct uloop_timeout *t;

	while (!list_empty(&timeouts)) {
		t = list_first_entry(&timeouts, struct uloop_timeout, list);

		if (tv_diff(&t->time, tv) > 0)
			break;

		uloop_timeout_cancel(t);
		if (t->cb)
			t->cb(t);
	}
}

1.list_first_entry取出定时器链表第一个定时器,判断其是否超时,未超时则直接退出,若已超时,执行其回调函数

uloop_handle_processes

(to be continued…)

uloop_get_next_timeout

static int uloop_get_next_timeout(struct timeval *tv)
{
	struct uloop_timeout *timeout;
	int diff;

	if (list_empty(&timeouts))
		return -1;

	timeout = list_first_entry(&timeouts, struct uloop_timeout, list);
	diff = tv_diff(&timeout->time, tv);
	if (diff < 0)
		return 0;

	return diff;
}

1.list_first_entry取出定时器链表第一个定时器,判断是否超时,如果已超时则返回0,否则返回超时时间(还有多久会超时),然后将这个时间传入uloop_run_events,作为epoll_wait的超时时间
2.如果定时器链表为空,则返回-1,epoll_wait就会无限阻塞。

uloop_run_events


static struct uloop_fd_event cur_fds[ULOOP_MAX_EVENTS];
static int cur_fd, cur_nfds;

static void uloop_run_events(int timeout)
{
	struct uloop_fd_event *cur;
	struct uloop_fd *fd;

	if (!cur_nfds) {
		cur_fd = 0;
		cur_nfds = uloop_fetch_events(timeout);
		if (cur_nfds < 0)
			cur_nfds = 0;
	}

	while (cur_nfds > 0) {
		struct uloop_fd_stack stack_cur;
		unsigned int events;

		cur = &cur_fds[cur_fd++];
		cur_nfds--;

		fd = cur->fd;
		events = cur->events;
		if (!fd)
			continue;

		if (!fd->cb)
			continue;

		if (uloop_fd_stack_event(fd, cur->events))
			continue;

		stack_cur.next = fd_stack;
		stack_cur.fd = fd;
		fd_stack = &stack_cur;
		do {
			stack_cur.events = 0;
			fd->cb(fd, events);
			events = stack_cur.events & ULOOP_EVENT_MASK;
		} while (stack_cur.fd && events);
		fd_stack = stack_cur.next;

		return;
	}
}

1.uloop_fetch_events通过epoll_wait获取已经就绪的fd,全局变量cur_nfds是当前就绪(epoll)的fd个数,所有就绪的event已经存放在cur_fds[]结构体数组中。
2.全局变量cur_fdcur_fds[]数组成员下标
3.取出每一个就绪的event,执行对应fd->cb回调,然后退出
4.这里并不是一次性处理完所有的event,而是每进入此函数一次,处理一个event,这样做的目的是uloop_run还要处理定时器等其他事件。
5.下一次进入uloop_run_events时,如果cur_nfds不为0,说明还有未处理完的事件,会接着处理,而不会请求新的就绪事件。
6.stack_curuloop_fd_stack_event是为了处理递归调用的情况,将放在拓展部分介绍,这里只介绍常规流程。

uloop_run_events(Ext)

以下是libubox中针对uloop_run_events的一个patch,它是这样描述的:

uloop: fix corner cases with recursive uloop_run calls
With multiple recursive calls to uloop_run, the callback for the same fd
can be run multiple times from different levels in the stack.
Prevent this by tracking the stack of uloop_fd callbacks and buffering new
incoming events for fds already on the stack.

部分修改如下,完整patch请参考:fix corner cases with recursive uloop_run calls

--- a/uloop.c
+++ b/uloop.c
@@ -43,6 +43,14 @@ struct uloop_fd_event {
        unsigned int events;
 };
 
+struct uloop_fd_stack {
+       struct uloop_fd_stack *next;
+       struct uloop_fd *fd;
+       unsigned int events;
+};
+
+static struct uloop_fd_stack *fd_stack = NULL;
+
 #define ULOOP_MAX_EVENTS 10
 
 static struct list_head timeouts = LIST_HEAD_INIT(timeouts);
@@ -285,6 +293,32 @@ static int uloop_fetch_events(int timeout)
 
 #endif
 
+static bool uloop_fd_stack_event(struct uloop_fd *fd, int events)
+{
+       struct uloop_fd_stack *cur;
+
+       /*
+        * Do not buffer events for level-triggered fds, they will keep firing.
+        * Caller needs to take care of recursion issues.
+        */
+       if (!(fd->flags & ULOOP_EDGE_TRIGGER))
+               return false;
+
+       for (cur = fd_stack; cur; cur = cur->next) {
+               if (cur->fd != fd)
+                       continue;
+
+               if (events < 0)
+                       cur->fd = NULL;
+               else
+                       cur->events |= events | ULOOP_EVENT_BUFFERED;
+
+               return true;
+       }
+
+       return false;
+}
+
 static void uloop_run_events(int timeout)
 {
        struct uloop_fd_event *cur;
@@ -298,17 +332,33 @@ static void uloop_run_events(int timeout)
        }
 
        while (cur_nfds > 0) {
+               struct uloop_fd_stack stack_cur;
+               unsigned int events;
+
                cur = &cur_fds[cur_fd++];
                cur_nfds--;
 
                fd = cur->fd;
+               events = cur->events;
                if (!fd)
                        continue;
 
                if (!fd->cb)
                        continue;
 
-               fd->cb(fd, cur->events);
+               if (uloop_fd_stack_event(fd, cur->events))
+                       continue;
+
+               stack_cur.next = fd_stack;
+               stack_cur.fd = fd;
+               fd_stack = &stack_cur;
+               do {
+                       stack_cur.events = 0;
+                       fd->cb(fd, events);
+                       events = stack_cur.events & ULOOP_EVENT_MASK;
+               } while (stack_cur.fd && events);
+               fd_stack = stack_cur.next;
+
                return;
        }
 }
@@ -352,6 +402,7 @@ int uloop_fd_delete(struct uloop_fd *fd)
                cur_fds[cur_fd + i].fd = NULL;
        }
        fd->registered = false;
+       uloop_fd_stack_event(fd, -1);
        return __uloop_fd_delete(fd);
 }

从patch可以看出,在这条提交之前,uloop_run_events函数是这样的:

static void uloop_run_events(int timeout)
{
	struct uloop_fd_event *cur;
	struct uloop_fd *fd;

	if (!cur_nfds) {
		cur_fd = 0;
		cur_nfds = uloop_fetch_events(timeout);
		if (cur_nfds < 0)
			cur_nfds = 0;
	}

	while (cur_nfds > 0) {
		cur = &cur_fds[cur_fd++];
		cur_nfds--;

		fd = cur->fd;
		if (!fd)
			continue;

		if (!fd->cb)
			continue;

		fd->cb(fd, cur->events);
		return;
	}
}

似乎这更符合我们的正常思维:epoll_wait成功返回之后,依次执行就绪fd->cb
但是可能存在uloop_run_events被递归调用的情况,那么fd->cb就会被执行多次,当然这是一种极端情况,一般不会遇到。

uloop_fd_stack_event

struct uloop_fd_stack {
	struct uloop_fd_stack *next;
	struct uloop_fd *fd;
	unsigned int events;
};

static bool uloop_fd_stack_event(struct uloop_fd *fd, int events)
{
	struct uloop_fd_stack *cur;

	/*
	 * Do not buffer events for level-triggered fds, they will keep firing.
	 * Caller needs to take care of recursion issues.
	 */
	if (!(fd->flags & ULOOP_EDGE_TRIGGER))
		return false;

	for (cur = fd_stack; cur; cur = cur->next) {
		if (cur->fd != fd)
			continue;

		if (events < 0)
			cur->fd = NULL;
		else
			cur->events |= events | ULOOP_EVENT_BUFFERED;

		return true;
	}

	return false;
}

1.uloop_fd_stack是存储uloop_fd_event的栈
2.fd_stack是全局变量
3.如果fd是水平触发,返回false
4.遍历fd_stack,如果和传入的fd不相等,说明是未经处理的fd,进行下一次循环;如果相等,标记此fd对应的eventsULOOP_EVENT_BUFFERED状态,表明是缓存状态。

再次回顾最新的uloop_run_events()

	struct uloop_fd_stack stack_cur;
	unsigned int events;
	
	cur = &cur_fds[cur_fd++];
	cur_nfds--;
	
	fd = cur->fd;
	events = cur->events;
	if (!fd)
		continue;
	
	if (!fd->cb)
		continue;
	
	if (uloop_fd_stack_event(fd, cur->events))
		continue;
	
	stack_cur.next = fd_stack;
	stack_cur.fd = fd;
	fd_stack = &stack_cur;
	do {
		stack_cur.events = 0;
		fd->cb(fd, events);
		events = stack_cur.events & ULOOP_EVENT_MASK;
	} while (stack_cur.fd && events);
	fd_stack = stack_cur.next;

1.stack_cur是当前的栈缓存
2.stack_cur.next指向fd_stack,此时的fd_stack是空的,stack_cur.fd=fd将当前处理的fd保存到stack_cur,再将fd_stack指向stack_cur
3.将stack_cur.events = 0,然后执行fd->cb,执行结束后stack_cur.events应该还是0,所以下面的while条件不成立。
4.最后将fd_stack指向stack_cur.next

目前来看这个stack_curfd_stack没有任何意义,现在考虑极端情况:
1.现假设fd->cb中调用了uloop_run_events(),这时候就会出现递归调用的情况,当再次运行到uloop_fd_stack_event()时,此时的fd_stack不是空的,里面保存着上一次未处理完的fd,如果这一次传入的fd和上一次的一样,说明同样的fd已经在处理,不需要再次处理,uloop_fd_stack_event()会返回true,循环结束。
2.如果没有这个栈缓存机制,那么当出现递归调用时,相同的fd->cb会被执行两次。

uloop_fetch_events

static int uloop_fetch_events(int timeout)
{
	int n, nfds;

	nfds = epoll_wait(poll_fd, events, ARRAY_SIZE(events), timeout);
	for (n = 0; n < nfds; ++n) {
		struct uloop_fd_event *cur = &cur_fds[n];
		struct uloop_fd *u = events[n].data.ptr;
		unsigned int ev = 0;

		cur->fd = u;
		if (!u)
			continue;

		if (events[n].events & (EPOLLERR|EPOLLHUP)) {
			u->error = true;
			if (!(u->flags & ULOOP_ERROR_CB))
				uloop_fd_delete(u);
		}

		if(!(events[n].events & (EPOLLRDHUP|EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP))) {
			cur->fd = NULL;
			continue;
		}

		if(events[n].events & EPOLLRDHUP)
			u->eof = true;

		if(events[n].events & EPOLLIN)
			ev |= ULOOP_READ;

		if(events[n].events & EPOLLOUT)
			ev |= ULOOP_WRITE;

		cur->events = ev;
	}

	return nfds;
}

1.epoll_wait等待监听的fd就绪,timeout是上文提到的uloop_get_next_timeout返回值,这样做的目的也是因为uloop_run还要处理定时器等其他事件。
2.fd就绪后,epoll_wait会返回就绪的fd数量,同时从event数组里取出就绪fd的私有数据(正常情况下我们取的是就绪fd,这里是私有数据),这个私有数据存的就是struct uloop_fd类型的结构体
3.cur = &cur_fds[n]利用指针cur操作数组
4.将uloop_fd赋值给cur->fd
5.根据events[n].events填充cur->events,EPOLLRDHUP 事件代表对端断开连接,需要置位u->eof = true
6.每一个就绪的fd都对应cur_fds[]的一个成员,最后返回就绪的fd个数nfds

参考

openwrt libubox
OpenWRT procd 进程初始化分析4 libubox一些细节与问题
libubox
uloop: fix corner cases with recursive uloop_run calls
wiki-epoll

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐