源码阅读 libevent - 事件主循环
在 libevent
中,事件主循环的作用就是执行一个循环,在循环中监听事件以及超时的事件并且将这些激活的事件进行处理。libevent
提供了对用户开放了两种执行事件主循环的函数:
int event_base_dispatch(struct event_base *); int event_base_loop(struct event_base *, int);
事件主循环
源码阅读 libevent - 结构体:event中提到了
event
结构体的生命周期:
事件主循环做的工作就是该流程图的下半部分:
pending 状态:等待事件发生 active 状态:事件已经发生,等待调用事件回调
总体流程分为三部分
流程 | 调用相关函数 | event 状态 |
---|---|---|
事件发生 | event_active() | pending --> avtive |
单次事件处理 | callback done | avtive --> non-pending |
持久事件处理 | callback done | avtive --> pending |
事件主循环函数 event_base_dispatch()
其实就是调用 event_base_loop()
:
int event_base_dispatch(struct event_base *event_base) { return (event_base_loop(event_base, 0)); }
event_base_loop()
函数实现如下:
int event_base_loop(struct event_base *base, int flags) { const struct eventop *evsel = base->evsel; struct timeval tv; struct timeval *tv_p; int res, done, retval = 0; /* 加锁. 在调用 evsel->dispatch 时会进行解锁, 在调用用户回调函数时也会解锁 */ EVBASE_ACQUIRE_LOCK(base, th_base_lock); if (base->running_loop) { event_warnx("%s: reentrant invocation. Only one event_base_loop can run on each event_base at once.", __func__); EVBASE_RELEASE_LOCK(base, th_base_lock); return -1; } base->running_loop = 1; /* 只允许一个事件主循环 */ clear_time_cache(base); /* 清空缓存的超时 */ if (base->sig.ev_signal_added && base->sig.ev_n_signals_added) evsig_set_base_(base); done = 0; /* 用来确定是否退出循环 */ #ifndef EVENT__DISABLE_THREAD_SUPPORT base->th_owner_id = EVTHREAD_GET_ID(); /* 保存当前线程 ID */ #endif base->event_gotterm = base->event_break = 0; while (!done) { base->event_continue = 0; base->n_deferreds_queued = 0; /* 中止循环的两个条件 */ if (base->event_gotterm) break; if (base->event_break) break; tv_p = &tv; /* 如果当前 event_base 里没有已经激活的事件,就将时间最小堆,堆顶的超时值取出来,作为下一轮 dispatch 的超时值 ** 否则就将超时时间置为 0,evsel->dispatch 会立马超时返回,激活的事件得以处理 */ if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) timeout_next(base, &tv_p); else evutil_timerclear(&tv); /* If we have no events, we just exit */ if (0 == (flags & EVLOOP_NO_EXIT_ON_EMPTY) && !event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) { event_debug(("%s: no events registered.", __func__)); retval = 1; goto done; } event_queue_make_later_events_active(base); clear_time_cache(base); res = evsel->dispatch(base, tv_p); if (res == -1) { event_debug(("%s: dispatch returned unsuccessfully.", __func__)); retval = -1; goto done; } update_time_cache(base); /* 将 base 的 min_heap 中所有超时的事件以超时激活类型添加到激活队列中 */ timeout_process(base); if (N_ACTIVE_CALLBACKS(base)) { /* 如果激活队列中有事件 */ /* 执行激活队列中的 event 相应的回调函数,返回的 n 是成功执行的非内部事件数目 */ int n = event_process_active(base); /* 如果设置了 EVLOOP_ONCE,并且所有激活的事件都处理完了,那么就退出 event_loop */ if ((flags & EVLOOP_ONCE) && N_ACTIVE_CALLBACKS(base) == 0 && n != 0) done = 1; } else if (flags & EVLOOP_NONBLOCK) /* 如果设置了 EVLOOP_NONBLOCK 那么也会退出 event_loop 循环 */ done = 1; } event_debug(("%s: asked to terminate loop.", __func__)); done: clear_time_cache(base); base->running_loop = 0; EVBASE_RELEASE_LOCK(base, th_base_lock); return (retval); }
直接阅读 event_base_loop()
较为困难,我们根据上述流程进行分析
- 事件发生
- 如何监听事件发生
- 事件发生后如何激活事件
- 事件处理
- 如何处理已激活事件
- 如何持久化事件
事件发生
如何监听事件发生
监听方式
libevent 提供了多种监听事件的方案,如单次监听、循环监听等,监听方案由 event_base_loop()
参数决定:
/* 阻塞, 直到一个 event 变成 active. 在 active 状态的 event 的 Callback 函数执行后,就退出。 */ #define EVLOOP_ONCE 0x01 /* 不会阻塞,它仅仅是查看是否已经有 event ready. 有则运行其 callback. 然后退出 */ #define EVLOOP_NONBLOCK 0x02 /* 哪怕 event_base 中没有 active 或者 pending 的 event 了。也不退出。直到调用 event_base_loopbreak() or event_base_loopexit(). */ #define EVLOOP_NO_EXIT_ON_EMPTY 0x04 /* 当 flags 什么都不设置时,则 loop 一直运行,检查到 event 变为 active 时,运行其 callback 函数。 ** 当没有 active 或 pending 的 event 后,退出 loop。 ** 有人调用了 event_base_loopbreak() or event_base_loopexit(),也退出 loop. */
有关 event_base_loopbreak () 和 event_base_loopexit () 参见:Post not found: 源码阅读 libevent - 事件主循环的停止
监听超时时间的计算
监听 IO 事件和 Signal 事件
libevent
调用 evsel->dispatch
回调函数监听 IO
事件和 Signal
事件的发生:
以 select.c
为例:
select
static int select_dispatch(struct event_base *base, struct timeval *tv) { int res=0, i, j, nfds; struct selectop *selectop = base->evbase; if (selectop->resize_out_sets) { fd_set *readset_out = NULL, *writeset_out = NULL; size_t sz = selectop->event_fdsz; if (!(readset_out = mm_realloc(selectop->event_readset_out, sz))) return (-1); selectop->event_readset_out = readset_out; if (!(writeset_out = mm_realloc(selectop->event_writeset_out, sz))) { /* We don't free readset_out here, since it was already successfully reallocated. ** The next time we call select_dispatch, the realloc will be a no-op. */ return (-1); } selectop->event_writeset_out = writeset_out; selectop->resize_out_sets = 0; } memcpy(selectop->event_readset_out, selectop->event_readset_in, selectop->event_fdsz); memcpy(selectop->event_writeset_out, selectop->event_writeset_in, selectop->event_fdsz); nfds = selectop->event_fds + 1; EVBASE_RELEASE_LOCK(base, th_base_lock); res = select(nfds, selectop->event_readset_out, selectop->event_writeset_out, NULL, tv); EVBASE_ACQUIRE_LOCK(base, th_base_lock); if (res == -1) { if (errno != EINTR) return (-1); return (0); } event_debug(("%s: select reports %d", __func__, res)); i = evutil_weakrand_range_(&base->weakrand_seed, nfds); for (j = 0; j < nfds; ++j) { if (++i>= nfds) i = 0; res = 0; if (FD_ISSET(i, selectop->event_readset_out)) res |= EV_READ; if (FD_ISSET(i, selectop->event_writeset_out)) res |= EV_WRITE; if (res == 0) continue; evmap_io_active_(base, i, res); } return (0); }
在事件发生后调用 evmap_io_active_()
将发生的事件加入到激活事件队列中,参见 事件发生后如何激活事件。
事件发生后如何激活事件
三种事件的激活方式如下图所示:

激活 IO 事件
evmap_io_active_
从 select_dispatch
函数中可以看出,对于每一个事件发生的 fd
,均会调用一次 evmap_io_active_
函数。
void evmap_io_active_(struct event_base *base, evutil_socket_t fd, short events) { struct event_io_map *io = &base->io; struct evmap_io *ctx; struct event *ev; if (fd < 0 || fd>= io->nentries) return; /* 在 IO 事件的哈希表中获取该 fd 已添加的事件 */ GET_IO_SLOT(ctx, io, fd, evmap_io); if (NULL == ctx) return; /* 遍历哈希表中所有该 fd 已添加的事件 */ LIST_FOREACH(ev, &ctx->events, ev_io_next) { /* 比对 dispatch 的事件和已添加的事件 */ if (ev->ev_events & events) s(ev, ev->ev_events & events, 1); } }
event_active_nolock_
evmap_io_active_
对 dispatch
的所有事件进行过滤后,对于所有已添加事件,就需要调用 event_active_nolock_
进行激活。
void event_active_nolock_(struct event *ev, int res, short ncalls) { struct event_base *base; base = ev->ev_base; EVENT_BASE_ASSERT_LOCKED(base); /* #define ev_flags ev_evcallback.evcb_flags */ if (ev->ev_flags & EVLIST_FINALIZING) return; /* event 是否在激活链表和下一次激活链表中 */ switch ((ev->ev_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER))) { default: case EVLIST_ACTIVE|EVLIST_ACTIVE_LATER: EVUTIL_ASSERT(0); break; case EVLIST_ACTIVE: /* We get different kinds of events, add them together */ ev->ev_res |= res; return; case EVLIST_ACTIVE_LATER: ev->ev_res |= res; break; case 0: ev->ev_res = res; break; } if (ev->ev_pri < base->event_running_priority) base->event_continue = 1; /* 对于 signal 事件,需要对 IO 事件触发次数进行计数 */ if (ev->ev_events & EV_SIGNAL) { ev->ev_ncalls = ncalls; ev->ev_pncalls = NULL; } event_callback_activate_nolock_(base, event_to_event_callback(ev)); }
event_callback_activate_nolock_
int event_callback_activate_nolock_(struct event_base *base, struct event_callback *evcb) { int r = 1; if (evcb->evcb_flags & EVLIST_FINALIZING) return 0; /* event 是否在激活链表和下一次激活链表中 */ switch (evcb->evcb_flags & (EVLIST_ACTIVE|EVLIST_ACTIVE_LATER)) { default: EVUTIL_ASSERT(0); EVUTIL_FALLTHROUGH; case EVLIST_ACTIVE_LATER: event_queue_remove_active_later(base, evcb); /* 删除下一次激活列表中的该 event */ r = 0; break; case EVLIST_ACTIVE: return 0; case 0: break; } /* 没有在激活列表和在下一次激活列表中的 event 会走到这里 */ event_queue_insert_active(base, evcb); /* 加入激活队列 */ if (EVBASE_NEED_NOTIFY(base))evthread_notify_base(base); return r; }
event_queue_insert_active
static void event_queue_insert_active(struct event_base *base, struct event_callback *evcb) { EVENT_BASE_ASSERT_LOCKED(base); /* Double insertion is possible for active events */ if (evcb->evcb_flags & EVLIST_ACTIVE) return; INCR_EVENT_COUNT(base, evcb->evcb_flags); /* 非内部事件计数自增 increase */ evcb->evcb_flags |= EVLIST_ACTIVE; /* event 事件增加激活标志位 */ base->event_count_active++; /* 激活事件计数自增 */ MAX_EVENT_COUNT(base->event_count_active_max, base->event_count_active); /* #define ev_pri ev_evcallback.evcb_pri 事件优先级 */ EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues); /* 插入到对应优先级的激活队列尾部 */ TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next); }
激活 Signal 事件
对于 Signal
事件,libevent
将其统一转换为了 IO
事件,在 IO
事件中,有一个特殊的事件专门用来接收信号事件,该特殊事件的回调函数会调用 evmap_signal_active_
将发生的 Signal
事件添加到激活事件列表中。
evmap_signal_active_
代码如下,其最终也是调用了 event_active_nolock_
进行事件的激活。
void evmap_signal_active_(struct event_base *base, evutil_socket_t sig, int ncalls) { struct event_signal_map *map = &base->sigmap; struct evmap_signal *ctx; struct event *ev; if (sig < 0 || sig>= map->nentries) return; /* 在 Signal 事件的数组中获取该 Signal 已添加的事件 */ GET_SIGNAL_SLOT(ctx, map, sig, evmap_signal); if (!ctx) return; /* 将该 Signal 已添加的事件全部加入到激活事件列表中 */ LIST_FOREACH(ev, &ctx->events, ev_signal_next) event_active_nolock_(ev, EV_SIGNAL, ncalls); }
激活超时事件
遍历检查小根堆中每个事件是否超时,如果超时,则将其加入到激活队列中,激活事件调用的函数也为 event_active_nolock_
。
static void timeout_process(struct event_base *base) { /* Caller must hold lock. */ struct timeval now; struct event *ev; if (min_heap_empty_(&base->timeheap)) return; gettime(base, &now); while ((ev = min_heap_top_(&base->timeheap))) { /* 根据小根堆的特性,如果顶部的事件没有超时,其他事件就不用再遍历了 */ if (evutil_timercmp(&ev->ev_timeout, &now, >)) break; /* delete this event from the I/O queues */ event_del_nolock_(ev, EVENT_DEL_NOBLOCK); event_debug(("timeout_process: event: %p, call %p", ev, ev->ev_callback)); event_active_nolock_(ev, EV_TIMEOUT, 1); } }
总结
IO
事件的激活是在libevent
调用多路IO
复用函数后,然后将发生的事件添加入激活队列Signal
事件的激活在libevent
处理IO
事件的回调中- 超时事件的激活在
libevent
调用完多路IO
复用函数后,检查小根堆里的超时情况时
单次事件处理
如何处理已激活事件
event_process_active
static int event_process_active(struct event_base *base) { /* Caller must hold th_base_lock */ struct evcallback_list *activeq = NULL; int i, c = 0; const struct timeval *endtime; struct timeval tv; const int maxcb = base->max_dispatch_callbacks; const int limit_after_prio = base->limit_callbacks_after_prio; if (base->max_dispatch_time.tv_sec >= 0) { update_time_cache(base); gettime(base, &tv); evutil_timeradd(&base->max_dispatch_time, &tv, &tv); endtime = &tv; } else endtime = NULL; /* 按优先级遍历激活队列中的事件 */ for (i = 0; i < base->nactivequeues; ++i) { if (TAILQ_FIRST(&base->activequeues[i]) != NULL) { /* 同一个优先级下可以有多个事件 */ base->event_running_priority = i; /* 设置当前的优先级 */ activeq = &base->activequeues[i]; /* 获取优先级 i 下的所有 event 组成的链表 */ /* 遍历 activeq 链表,调用其中每个 event 的回调函数 */ if (i < limit_after_prio) c = event_process_active_single_queue(base, activeq, INT_MAX, NULL); else c = event_process_active_single_queue(base, activeq, maxcb, endtime); if (c < 0) goto done; /* c 是执行的非内部事件数目 */ else if (c> 0) /* Processed a real event; do not consider lower-priority events */ break; /* If we get here, all of the events we processed were internal. Continue. */ } } done: base->event_running_priority = -1; return c; }
event_process_active_single_queue
static int event_process_active_single_queue(struct event_base *base, struct evcallback_list *activeq, int max_to_process, const struct timeval *endtime) { struct event_callback *evcb; int count = 0; EVUTIL_ASSERT(activeq != NULL); /* activeq 为某一优先级的激活队列,该处遍历该优先级的激活队列中的所有事件 */ /* 从遍历结束的结束条件和遍历下一个事件的方式就可知,每次遍历会在激活队列中删除当前事件 */ for (evcb = TAILQ_FIRST(activeq); evcb; evcb = TAILQ_FIRST(activeq)) { struct event *ev = NULL; /* 在激活队列中删除当前事件 */ if (evcb->evcb_flags & EVLIST_INIT) { /* 激活队列中仅插入了 event 结构体中的 event_callback 结构体,需要获取 event 所在地址 */ ev = event_callback_to_event(evcb); if (ev->ev_events & EV_PERSIST || ev->ev_flags & EVLIST_FINALIZING) event_queue_remove_active(base, evcb); /* 永久事件从激活队列中删除 */ else event_del_nolock_(ev, EVENT_DEL_NOBLOCK); /* 非永久事件从所有队列中删除 */ } else event_queue_remove_active(base, evcb); if (!(evcb->evcb_flags & EVLIST_INTERNAL)) ++count; /* 非内部事件回调次数计数 */ base->current_event = evcb; switch (evcb->evcb_closure) { /* 在调用回调函数是否进行其他行为 */ case EV_CLOSURE_EVENT_SIGNAL: EVUTIL_ASSERT(ev != NULL); event_signal_closure(base, ev); break; case EV_CLOSURE_EVENT_PERSIST: /* 对于永久事件,在调用回调函数之前会重新调用 event_add 来添加该事件到对应队列中 */ EVUTIL_ASSERT(ev != NULL); event_persist_closure(base, ev); break; case EV_CLOSURE_EVENT: /* 对于一般事件,直接调用回调函数 */ void (*evcb_callback)(evutil_socket_t, short, void *); short res; EVUTIL_ASSERT(ev != NULL); evcb_callback = *ev->ev_callback; res = ev->ev_res; EVBASE_RELEASE_LOCK(base, th_base_lock); evcb_callback(ev->ev_fd, res, ev->ev_arg); break; case EV_CLOSURE_CB_SELF: void (*evcb_selfcb)(struct event_callback *, void *) = evcb->evcb_cb_union.evcb_selfcb; EVBASE_RELEASE_LOCK(base, th_base_lock); evcb_selfcb(evcb, evcb->evcb_arg); break; case EV_CLOSURE_EVENT_FINALIZE: case EV_CLOSURE_EVENT_FINALIZE_FREE: void (*evcb_evfinalize)(struct event *, void *); int evcb_closure = evcb->evcb_closure; EVUTIL_ASSERT(ev != NULL); base->current_event = NULL; evcb_evfinalize = ev->ev_evcallback.evcb_cb_union.evcb_evfinalize; EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_FINALIZING)); EVBASE_RELEASE_LOCK(base, th_base_lock); evcb_evfinalize(ev, ev->ev_arg); event_debug_note_teardown_(ev); if (evcb_closure == EV_CLOSURE_EVENT_FINALIZE_FREE) mm_free(ev); break; case EV_CLOSURE_CB_FINALIZE: void (*evcb_cbfinalize)(struct event_callback *, void *) = evcb->evcb_cb_union.evcb_cbfinalize; base->current_event = NULL; EVUTIL_ASSERT((evcb->evcb_flags & EVLIST_FINALIZING)); EVBASE_RELEASE_LOCK(base, th_base_lock); evcb_cbfinalize(evcb, evcb->evcb_arg); break; default: EVUTIL_ASSERT(0); } EVBASE_ACQUIRE_LOCK(base, th_base_lock); base->current_event = NULL; if (base->event_break) return -1; if (count>= max_to_process) return count; if (count && endtime) { struct timeval now; update_time_cache(base); gettime(base, &now); if (evutil_timercmp(&now, endtime,>=)) return count; } if (base->event_continue) break; } return count; }
如何持久化事件
libevent
持久化事件是在调用事件的回调函数之前,调用 event_add_nolock_
重新将事件添加到事件列表中:
/* Closure function invoked when we're activating a persistent event. */ static inline void event_persist_closure(struct event_base *base, struct event *ev) { void (*evcb_callback)(evutil_socket_t, short, void *); // Other fields of *ev that must be stored before executing evutil_socket_t evcb_fd; short evcb_res; void *evcb_arg; /* reschedule the persistent event if we have a timeout. */ if (ev->ev_io_timeout.tv_sec || ev->ev_io_timeout.tv_usec) { /* If there was a timeout, we want it to run at an interval of ev_io_timeout after the last time ** it was _scheduled_ for, not ev_io_timeout after _now_. If it fired for another reason, ** though,the timeout ought to start ticking _now_. */ struct timeval run_at, relative_to, delay, now; ev_uint32_t usec_mask = 0; EVUTIL_ASSERT(is_same_common_timeout(&ev->ev_timeout, &ev->ev_io_timeout)); gettime(base, &now); if (is_common_timeout(&ev->ev_timeout, base)) { delay = ev->ev_io_timeout; usec_mask = delay.tv_usec & ~MICROSECONDS_MASK; delay.tv_usec &= MICROSECONDS_MASK; if (ev->ev_res & EV_TIMEOUT) { relative_to = ev->ev_timeout; relative_to.tv_usec &= MICROSECONDS_MASK; } else relative_to = now; } else { delay = ev->ev_io_timeout; if (ev->ev_res & EV_TIMEOUT) relative_to = ev->ev_timeout; else relative_to = now; } evutil_timeradd(&relative_to, &delay, &run_at); if (evutil_timercmp(&run_at, &now, <)) { /* Looks like we missed at least one invocation due to a clock jump, not running the event ** loop for a while, really slow callbacks, or something. Reschedule relative to now.*/ evutil_timeradd(&now, &delay, &run_at); } run_at.tv_usec |= usec_mask; event_add_nolock_(ev, &run_at, 1); } // Save our callback before we release the lock evcb_callback = ev->ev_callback; evcb_fd = ev->ev_fd; evcb_res = ev->ev_res; evcb_arg = ev->ev_arg; // Release the lock EVBASE_RELEASE_LOCK(base, th_base_lock); // Execute the callback (evcb_callback)(evcb_fd, evcb_res, evcb_arg); }