源码阅读 libevent - 超时管理:min_heap
libevent
允许创建一个超时 event
,使用 evtimer_new
宏。
#define evtimer_new(b, cb, arg) event_new((b), -1, 0, (cb), (arg))
从宏的实现来看,它一样是用到了一般的 event_new
,并且不使用任何的文件描述符。从超时 event
宏的实现来看,无论是 evtimer
创建的 event
还是一般 event_new
创建的 event
,都能使得 libevent
进行超时监听。
使得 libevent
对一个 event
进行超时监听的原因是:在调用 int event_add(struct event *ev, const struct timeval *tv)
的时候,第二参数不能为 NULL
,要设置一个超时值。如果为 NULL
,那么 libevent
将不会为这个 event
监听超时。下文统一称设置了超时值的 event
为超时 event
。
超时 event 的原理
libevent
对超时进行监听的原理不同于之前讲到的对信号的监听,libevent
对超时的监听的原理是,多路 IO
复用函数都是有一个超时值。如果用户需要 libevent
同时监听多个超时 event
,那么 libevent
就把超时值最小的那个作为多路 IO
复用函数的超时值。自然,当时间一到,就会从多路 IO
复用函数返回。此时对超时 event
进行处理即可。
libevent
运行用户同时监听多个超时 event
,那么就必须要对这个超时值进行管理。libevent
提供了小根堆 min_heap
和通用超时 common timeout
两种管理方式。本文首先分析小根堆 min_heap
超时管理机制。
设置超时值
首先调用 event_add 时要设置一个超时值,这样才能成为一个超时 event。
int event_add(struct event *ev, const struct timeval *tv) {
......
res = event_add_nolock_(ev, tv, 0);
......
}
int event_add_nolock_(struct event *ev, const struct timeval *tv, int tv_is_absolute) {
...... /* common timeout 相关代码没有展示 */
/* tv 不为 NULL, 就说明是一个超时 event, 在小根堆中为其留一个位置 */
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {
if (min_heap_reserve_(&base->timeheap, 1 + min_heap_size_(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}
...... /* 将 IO 或者信号 event 插入到对应的队列中 */
if (res != -1 && tv != NULL) {
struct timeval now;
/* 用户把这个 event 设置成 EV_PERSIST。即永久 event 如果没有这样设置的话,那么只会超时一次
** 设置了,那么就可以超时多次。那么就要记录用户设置的超时值。 */
if (ev->ev_closure == EV_CLOSURE_EVENT_PERSIST && !tv_is_absolute)
ev->ev_io_timeout = *tv;
/* 因为可以在次线程调用 event_add。而主线程刚好在执行 event_base_dispatch */
if ((ev->ev_flags & EVLIST_ACTIVE) && (ev->ev_res & EV_TIMEOUT)) {
if (ev->ev_events & EV_SIGNAL) {
if (ev->ev_ncalls && ev->ev_pncalls) { /* Abort loop */
*ev->ev_pncalls = 0;
}
}
/* 从超时队列中删除这个 event。因为下次会再次加入。多次对同一个超时 event 调用 event_add, 那么只能保留最后的那个。 */
event_queue_remove_active(base, event_to_event_callback(ev));
}
gettime(base, &now); /* 获取现在的时间 */
/* 虽然用户在 event_add 时只需用一个相对时间,但实际上在 Libevent 内部还是要把这个时间转换成绝对时间。
** 从存储的角度来说,存绝对时间只需一个变量。而相对时间则需两个,一个存相对值,另一个存参照物。 */
if (tv_is_absolute) ev->ev_timeout = *tv; /* 该参数指明时间是否为一个绝对时间 */
else evutil_timeradd(&now, tv, &ev->ev_timeout); /* 参照时间 + 相对时间 ev_timeout 存的是绝对时间 */
event_queue_insert_timeout(base, ev); /* 将该超时 event 插入到超时队列中 */
struct event* top = NULL;
/* 本次插入的超时值,是所有超时中最小的。那么此时就需要通知主线程。. */
if (min_heap_elt_is_top_(ev)) notify = 1;
else if ((top = min_heap_top_(&base->timeheap)) != NULL && evutil_timercmp(&top->ev_timeout, &now, <))
notify = 1;
}
/* 如果代码逻辑中是需要通知的。并且本线程不是主线程。那么就通知主线程 */
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base)) evthread_notify_base(base);
return (res);
}
对于同一个 event
,如果是 IO event
或者 sigal event
,那么将无法多次添加。但如果是一个超时 event
,那么是可以多次添加的。并且对应超时值会使用最后添加时指明的那个,之前的统统不要,即替换掉之前的超时值。
调用多路 IO 复用函数等待超时
event_base_loop
现在来看一下 event_base_loop
函数,看其是怎么处理超时 event
的。
/* 非超时相关代码没有展示 */
int event_base_loop(struct event_base *base, int flags) {
const struct eventop *evsel = base->evsel;
struct timeval tv;
struct timeval *tv_p;
int res, done, retval = 0;
done = 0;
while (!done) {
tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
/* 根据 Timer 事件计算 evsel->dispatch 的最大等待时间(超时值最小) */
timeout_next(base, &tv_p);
} else {
/* if we have active events, we just poll new events without waiting. */
evutil_timerclear(&tv);
}
res = evsel->dispatch(base, tv_p);
/* 处理超时事件,将超时事件插入到激活链表中 */
timeout_process(base);
}
done:
return (retval);
}
event_base_loop()
中有关超时管理方面工作不多,最主要的工作有两部分:
- 设置
dispatch
回调的第二个参数tv
,这个参数如果为0
, 则无论是否有事件发生,都会立即返回。如果设置了
EVLOOP_NONBLOCK
标志位,则会调用evutil_timerclear()
将tv
设置为0
/* 不会阻塞,它仅仅是查看是否已经有 event ready. 有则运行其 callback. 然后退出 */ #define EVLOOP_NONBLOCK 0x02 #define evutil_timerclear(tvp) (tvp)->tv_sec = (tvp)->tv_usec = 0
如果不存在激活的
event
,也会调用evutil_timerclear()
将tv
设置为0
如果存在激活的
event
且没有设置EVLOOP_NONBLOCK
标志位,则需要调用timeout_next()
获取最近的超时event
,并作为等待事件的时间传给dispatch
回调函数
- 调用
timeout_process()
函数将超时了的event
加入激活队列。
timeout_next
timeout_next()
用来计算出本次调用多路 IO
复用函数的等待时间:
static int timeout_next(struct event_base *base, struct timeval **tv_p) {
/* Caller must hold th_base_lock */
struct timeval now;
struct event *ev;
struct timeval *tv = *tv_p;
int res = 0;
/* 堆的首元素具有最小的超时值,这个是小根堆的性质。 */
ev = min_heap_top_(&base->timeheap);
if (ev == NULL) { *tv_p = NULL; goto out; } /* 堆中没有元素 */
if (gettime(base, &now) == -1) { res = -1; goto out; } /* /* 获取当前时间 */
/* 如果超时时间 <= 当前时间,不能等待,需要立即返回
** 因为 ev_timeout 这个时间是由 event_add 调用时的绝对时间 + 相对时间。所以 ev_timeout 是绝对时间。
** 可能在调用 event_add 之后,过了一段时间才调用 event_base_diapatch, 所以现在可能都过了用户设置的超时时间。 */
if (evutil_timercmp(&ev->ev_timeout, &now, <=)) { evutil_timerclear(tv); goto out; }
/* 计算等待的时间 = 当前时间 - 最小的超时时间 */
evutil_timersub(&ev->ev_timeout, &now, tv);
event_debug(("timeout_next: event: %p, in %d seconds, %d useconds", ev, (int)tv->tv_sec, (int)tv->tv_usec));
out:
return (res);
}
timeout_process
timeout_process()
函数将超时了的 event
加入激活队列:
static void timeout_process(struct event_base *base) {
struct timeval now;
struct event *ev;
if (min_heap_empty_(&base->timeheap)) return;
gettime(base, &now);
/* 遍历小根堆的元素。之所以不是只取堆顶那一个元素,是因为当主线程调用多路 IO 复用函数进入等待时,次线程可能添加了多个超时值更小的 event */
while ((ev = min_heap_top_(&base->timeheap))) {
/* ev->ev_timeout 存的是绝对时间,超时时间比此刻时间大,说明该 event 还没超时。那么余下的小根堆元素更不用检查了。 */
if (evutil_timercmp(&ev->ev_timeout, &now, >)) break;
/* 下面说到的 del 是等同于调用 event_del. 把 event 从这个 event_base 中 (所有的队列都) 删除。event_base 不再监听之。
** 这里是 timeout_process 函数。所以对于有超时的 event,才会被 del 掉。
** 对于有 EV_PERSIST 选项的 event,在处理激活 event 的时候,会再次添加进 event_base 的。
** 这样做的一个好处就是,再次添加的时候,又可以重新计算该 event 的超时时间 (绝对时间)。 */
event_del_nolock_(ev, EVENT_DEL_NOBLOCK);
/* 把这个 event 加入到 event_base 的激活队列中。event_base 的激活队列又有该 event 了。
** 如果该 event 是 EV_PERSIST 的,可以再次添加进该 event_base */
event_active_nolock_(ev, EV_TIMEOUT, 1);
}
}
当从多路 IO
复用函数返回时,就检查时间小根堆,看有多少个 event
已经超时了。如果超时了,那就把这个 event
加入到 event_base
的激活队列中。并且把这个超时删除掉。