#include #include #include #include #include "ae.h" void redisAeReadEvent(aeEventLoop *el, int fd, void *privdata, int mask) { ((void)el); ((void)fd); ((void)mask); redisAeEvents *e = (redisAeEvents*)privdata; redisAsyncHandleRead(e->context); } void aeStop(aeEventLoop *eventLoop) { eventLoop->stop = 1; } void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { aeProcessEvents(eventLoop, AE_ALL_EVENTS| AE_CALL_BEFORE_SLEEP| AE_CALL_AFTER_SLEEP); } } monotime (*getMonotonicUs)(void) = NULL; /* Process time events */ static int processTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; long long maxId; te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; monotime now = getMonotonicUs(); while(te) { long long id; /* Remove events scheduled for deletion. */ if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; /* If a reference exists for this timer event, * don't free it. This is currently incremented * for recursive timerProc calls */ if (te->refcount) { te = next; continue; } if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; if (te->finalizerProc) { te->finalizerProc(eventLoop, te->clientData); now = getMonotonicUs(); } hi_free(te); te = next; continue; } /* Make sure we don't process time events created by time events in * this iteration. Note that this check is currently useless: we always * add new timers on the head, however if we change the implementation * detail, this check may be useful again: we keep it here for future * defense. */ if (te->id > maxId) { te = te->next; continue; } if (te->when <= now) { int retval; id = te->id; te->refcount++; retval = te->timeProc(eventLoop, id, te->clientData); te->refcount--; processed++; now = getMonotonicUs(); if (retval != AE_NOMORE) { te->when = now + retval * 1000; } else { te->id = AE_DELETED_EVENT_ID; } } te = te->next; } return processed; } static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, numevents = 0; retval = epoll_wait(state->epfd,state->events,eventLoop->setsize, tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1); if (retval > 0) { int j; numevents = retval; for (j = 0; j < numevents; j++) { int mask = 0; struct epoll_event *e = state->events+j; if (e->events & EPOLLIN) mask |= AE_READABLE; if (e->events & EPOLLOUT) mask |= AE_WRITABLE; if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE; if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE; eventLoop->fired[j].fd = e->data.fd; eventLoop->fired[j].mask = mask; } } else if (retval == -1 && errno != EINTR) { //panic("aeApiPoll: epoll_wait, %s", strerror(errno)); } return numevents; } static int64_t usUntilEarliestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; if (te == NULL) return -1; aeTimeEvent *earliest = NULL; while (te) { if (!earliest || te->when < earliest->when) earliest = te; te = te->next; } monotime now = getMonotonicUs(); return (now >= earliest->when) ? 0 : earliest->when - now; } /* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * If flags is 0, the function does nothing and returns. * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * if flags has AE_FILE_EVENTS set, file events are processed. * if flags has AE_TIME_EVENTS set, time events are processed. * if flags has AE_DONT_WAIT set, the function returns ASAP once all * the events that can be handled without a wait are processed. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. * * The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents; /* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; /* Note that we want to call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; struct timeval tv, *tvp; int64_t usUntilTimer = -1; if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) usUntilTimer = usUntilEarliestTimer(eventLoop); if (usUntilTimer >= 0) { tv.tv_sec = usUntilTimer / 1000000; tv.tv_usec = usUntilTimer % 1000000; tvp = &tv; } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } } if (eventLoop->flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* Call the multiplexing API, will return only on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop); for (j = 0; j < numevents; j++) { int fd = eventLoop->fired[j].fd; aeFileEvent *fe = &eventLoop->events[fd]; int mask = eventLoop->fired[j].mask; int fired = 0; /* Number of events fired for current fd. */ /* Normally we execute the readable event first, and the writable * event later. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsyncing a file to disk, * before replying to a client. */ int invert = fe->mask & AE_BARRIER; /* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ } /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } } processed++; } } /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); return processed; /* return the number of processed file/time events */ } void redisAeWriteEvent(aeEventLoop *el, int fd, void *privdata, int mask) { ((void)el); ((void)fd); ((void)mask); redisAeEvents *e = (redisAeEvents*)privdata; redisAsyncHandleWrite(e->context); } int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; ee.events = 0; mask |= eventLoop->events[fd].mask; /* Merge old events */ if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; return 0; } int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) return AE_ERR; fe->mask |= mask; if (mask & AE_READABLE) fe->rfileProc = proc; if (mask & AE_WRITABLE) fe->wfileProc = proc; fe->clientData = clientData; if (fd > eventLoop->maxfd) eventLoop->maxfd = fd; return AE_OK; } void redisAeAddRead(void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; aeEventLoop *loop = e->loop; if (!e->reading) { e->reading = 1; aeCreateFileEvent(loop,e->fd,AE_READABLE,redisAeReadEvent,e); } } void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) { aeApiState *state = eventLoop->apidata; struct epoll_event ee = {0}; /* avoid valgrind warning */ int mask = eventLoop->events[fd].mask & (~delmask); ee.events = 0; if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; if (mask != AE_NONE) { epoll_ctl(state->epfd,EPOLL_CTL_MOD,fd,&ee); } else { /* Note, Kernel < 2.6.9 requires a non null event pointer even for * EPOLL_CTL_DEL. */ epoll_ctl(state->epfd,EPOLL_CTL_DEL,fd,&ee); } } void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { if (fd >= eventLoop->setsize) return; aeFileEvent *fe = &eventLoop->events[fd]; if (fe->mask == AE_NONE) return; /* We want to always remove AE_BARRIER if set when AE_WRITABLE * is removed. */ if (mask & AE_WRITABLE) mask |= AE_BARRIER; aeApiDelEvent(eventLoop, fd, mask); fe->mask = fe->mask & (~mask); if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { /* Update the max fd */ int j; for (j = eventLoop->maxfd-1; j >= 0; j--) if (eventLoop->events[j].mask != AE_NONE) break; eventLoop->maxfd = j; } } void redisAeDelRead(void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; aeEventLoop *loop = e->loop; if (e->reading) { e->reading = 0; aeDeleteFileEvent(loop,e->fd,AE_READABLE); } } void redisAeAddWrite(void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; aeEventLoop *loop = e->loop; if (!e->writing) { e->writing = 1; aeCreateFileEvent(loop,e->fd,AE_WRITABLE,redisAeWriteEvent,e); } } void redisAeDelWrite(void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; aeEventLoop *loop = e->loop; if (e->writing) { e->writing = 0; aeDeleteFileEvent(loop,e->fd,AE_WRITABLE); } } void redisAeCleanup(void *privdata) { redisAeEvents *e = (redisAeEvents*)privdata; redisAeDelRead(privdata); redisAeDelWrite(privdata); hi_free(e); } /* Enable the FD_CLOEXEC on the given fd to avoid fd leaks. * This function should be invoked for fd's on specific places * where fork + execve system calls are called. */ int anetCloexec(int fd) { int r; int flags; do { r = fcntl(fd, F_GETFD); } while (r == -1 && errno == EINTR); if (r == -1 || (r & FD_CLOEXEC)) return r; flags = r | FD_CLOEXEC; do { r = fcntl(fd, F_SETFD, flags); } while (r == -1 && errno == EINTR); return r; } int aeApiCreate(aeEventLoop *eventLoop) { aeApiState *state = hi_malloc(sizeof(aeApiState)); if (!state) return -1; state->events = hi_malloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { hi_free(state); return -1; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { hi_free(state->events); hi_free(state); return -1; } anetCloexec(state->epfd); eventLoop->apidata = state; return 0; } aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; if ((eventLoop = hi_malloc(sizeof(*eventLoop))) == NULL) goto err; eventLoop->events = hi_malloc(sizeof(aeFileEvent)*setsize); eventLoop->fired = hi_malloc(sizeof(aeFiredEvent)*setsize); if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; eventLoop->setsize = setsize; eventLoop->timeEventHead = NULL; eventLoop->timeEventNextId = 0; eventLoop->stop = 0; eventLoop->maxfd = -1; eventLoop->beforesleep = NULL; eventLoop->aftersleep = NULL; eventLoop->flags = 0; if (aeApiCreate(eventLoop) == -1) goto err; /* Events with mask == AE_NONE are not set. So let's initialize the * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; return eventLoop; err: if (eventLoop) { hi_free(eventLoop->events); hi_free(eventLoop->fired); hi_free(eventLoop); } return NULL; } int redisAeAttach(aeEventLoop *loop, redisAsyncContext *ac) { redisContext *c = &(ac->c); redisAeEvents *e; /* Nothing should be attached when something is already attached */ if (ac->ev.data != NULL) return REDIS_ERR; /* Create container for context and r/w events */ e = (redisAeEvents *) hi_malloc(sizeof(*e)); if (e == NULL) return REDIS_ERR; e->context = ac; e->loop = loop; e->fd = c->fd; e->reading = e->writing = 0; /* Register functions to start/stop listening for events */ ac->ev.addRead = redisAeAddRead; ac->ev.delRead = redisAeDelRead; ac->ev.addWrite = redisAeAddWrite; ac->ev.delWrite = redisAeDelWrite; ac->ev.cleanup = redisAeCleanup; ac->ev.data = e; return REDIS_OK; }