



  1. 事件任务,即在每次进行网络epoll之前或者完成之后都会立马执行,例如多线程处理网络数据的读写等工作。
  2. 事件驱动处理程序任务,即网络IO的读写事件通过epoll来触发执行,获取可执行的网络事件。
  3. 定时任务,在处理完成任务1和任务2之后,才会判断是否时间任务触发。






/* Initialize the data structures needed for threaded I/O. */void initThreadedIO(void) {    server.io_threads_active = 0; /* We start with threads not active. */  //设置多线程未初始化完成    /* Don't spawn any thread if the user selected a single thread:     * we'll handle I/O directly from the main thread. */    if (server.io_threads_num == 1) return;    if (server.io_threads_num > IO_THREADS_MAX_NUM) {        serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "                             "The maximum number is %d.", IO_THREADS_MAX_NUM); // 如果配置线程太多则报错退出        exit(1);    }    /* Spawn and initialize the I/O threads. */    for (int i = 0; i < server.io_threads_num; i++) {  // 根据多线程数量来进行初始化        /* Things we do for all the threads including the main thread. */        io_threads_list[i] = listCreate();        if (i == 0) continue; /* Thread 0 is the main thread. */        /* Things we do only for the additional threads. */        pthread_t tid;        pthread_mutex_init(&io_threads_mutex[i],NULL);        io_threads_pending[i] = 0;        pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */        if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { // 创建线程并传入执行函数IOThreadMain            serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");            exit(1);        }        io_threads[i] = tid;  // 报错tid    }}
    aeSetBeforeSleepProc(server.el,beforeSleep);    aeSetAfterSleepProc(server.el,afterSleep);



int aeProcessEvents(aeEventLoop *eventLoop, int flags){    int processed = 0, numevents;    /* Nothing to do? return ASAP */    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;    /* Note that we want call select() even if there are no     * file events to process as long as we want to process time     * events, in order to sleep until the next time event is ready     * to fire. */    if (eventLoop->maxfd != -1 ||        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {        int j;        aeTimeEvent *shortest = NULL;        struct timeval tv, *tvp;        ...        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)            eventLoop->beforesleep(eventLoop);        /* Call the multiplexing API, will return only on timeout or when         * some event fires. */        numevents = aeApiPoll(eventLoop, tvp);        /* After sleep callback. */        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)            eventLoop->aftersleep(eventLoop);       ...    }...}...    void aeMain(aeEventLoop *eventLoop) {    eventLoop->stop = 0;    while (!eventLoop->stop) {        aeProcessEvents(eventLoop, AE_ALL_EVENTS|                                   AE_CALL_BEFORE_SLEEP|                                   AE_CALL_AFTER_SLEEP);    }}



/* When threaded I/O is also enabled for the reading + parsing side, the * readable handler will just put normal clients into a queue of clients to * process (instead of serving them synchronously). This function runs * the queue using the I/O threads, and process them in order to accumulate * the reads in the buffers, and also parse the first command available * rendering it in the client structures. */int handleClientsWithPendingReadsUsingThreads(void) {    if (!server.io_threads_active || !server.io_threads_do_reads) return 0;    int processed = listLength(server.clients_pending_read);    if (processed == 0) return 0;    if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);    /* Distribute the clients across N different lists. */    listIter li;    listNode *ln;    listRewind(server.clients_pending_read,&li);    int item_id = 0;    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        int target_id = item_id % server.io_threads_num;        listAddNodeTail(io_threads_list[target_id],c);  // 先将数据放置在每个线程对应的队列上面        item_id++;    }    /* Give the start condition to the waiting threads, by setting the     * start condition atomic var. */    io_threads_op = IO_THREADS_OP_READ;    for (int j = 1; j < server.io_threads_num; j++) {        int count = listLength(io_threads_list[j]);  // 将每个线程对应的count数量设置在线程的队列上,此时设置之后线程检查到数据不为0就开始执行        io_threads_pending[j] = count;    }    /* Also use the main thread to process a slice of clients. */    listRewind(io_threads_list[0],&li);  // 主线程也进行一个队列的执行    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        readQueryFromClient(c->conn);   // 读取网络数据    }    listEmpty(io_threads_list[0]);    /* Wait for all the other threads to end their work. */    while(1) {        unsigned long pending = 0;        for (int j = 1; j < server.io_threads_num; j++)            pending += io_threads_pending[j];        if (pending == 0) break;   // 等待所有的网络读事件完成    }    if (tio_debug) printf("I/O READ All threads finshed\n");    /* Run the list of clients again to process the new buffers. */    while(listLength(server.clients_pending_read)) { // 此时所有的网络数据读取并解析        ln = listFirst(server.clients_pending_read);        client *c = listNodeValue(ln);        c->flags &= ~CLIENT_PENDING_READ;        listDelNode(server.clients_pending_read,ln);        /* Clients can become paused while executing the queued commands,         * so we need to check in between each command. If a pause was         * executed, we still remove the command and it will get picked up         * later when clients are unpaused and we re-queue all clients. */        if (clientsArePaused()) continue;        if (processPendingCommandsAndResetClient(c) == C_ERR) {  // 挨个执行解析处理的命令            /* If the client is no longer valid, we avoid             * processing the client later. So we just go             * to the next. */            continue;        }        processInputBuffer(c);          /* We may have pending replies if a thread readQueryFromClient() produced         * replies and did not install a write handler (it can't).         */        if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))            clientInstallWriteHandler(c);   // 注册到写事件列表    }    /* Update processed count on server */    server.stat_io_reads_processed += processed;    return processed;}


int handleClientsWithPendingWritesUsingThreads(void) {    int processed = listLength(server.clients_pending_write);  // 查看写列表    if (processed == 0) return 0; /* Return ASAP if there are no clients. */    /* If I/O threads are disabled or we have few clients to serve, don't     * use I/O threads, but thejboring synchronous code. */    if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {        return handleClientsWithPendingWrites();    }    /* Start threads if needed. */    if (!server.io_threads_active) startThreadedIO();    if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);    /* Distribute the clients across N different lists. */    listIter li;    listNode *ln;    listRewind(server.clients_pending_write,&li);      int item_id = 0;    while((ln = listNext(&li))) {   // 主线程将待写的列表分发到各个工作列表中        client *c = listNodeValue(ln);        c->flags &= ~CLIENT_PENDING_WRITE;        /* Remove clients from the list of pending writes since         * they are going to be closed ASAP. */        if (c->flags & CLIENT_CLOSE_ASAP) {            listDelNode(server.clients_pending_write, ln);            continue;        }        int target_id = item_id % server.io_threads_num;        listAddNodeTail(io_threads_list[target_id],c);        item_id++;    }    /* Give the start condition to the waiting threads, by setting the     * start condition atomic var. */    io_threads_op = IO_THREADS_OP_WRITE;    for (int j = 1; j < server.io_threads_num; j++) {        int count = listLength(io_threads_list[j]);  // 设置每个队列上面的长度,此时线程开始工作        io_threads_pending[j] = count;    }    /* Also use the main thread to process a slice of clients. */    listRewind(io_threads_list[0],&li);   // 主线程也处理一个队列    while((ln = listNext(&li))) {        client *c = listNodeValue(ln);        writeToClient(c,0);    }    listEmpty(io_threads_list[0]);    /* Wait for all the other threads to end their work. */    while(1) {        unsigned long pending = 0;    // 等待所有的线程执行完成        for (int j = 1; j < server.io_threads_num; j++)            pending += io_threads_pending[j];        if (pending == 0) break;    }    if (tio_debug) printf("I/O WRITE All threads finshed\n");    /* Run the list of clients again to install the write handler where     * needed. */    listRewind(server.clients_pending_write,&li);    while((ln = listNext(&li))) {   // 注册写事件        client *c = listNodeValue(ln);        /* Install the write handler if there are pending writes in some         * of the clients. */        if (clientHasPendingReplies(c) &&                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)        {            freeClientAsync(c);        }    }    listEmpty(server.clients_pending_write);    /* Update processed count on server */    server.stat_io_writes_processed += processed;    return processed;}


void *IOThreadMain(void *myid) {    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is     * used by the thread to just manipulate a single sub-array of clients. */    long id = (unsigned long)myid;    char thdname[16];    snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);    redis_set_thread_title(thdname);    redisSetCpuAffinity(server.server_cpulist);    makeThreadKillable();    while(1) {        /* Wait for start */        for (int j = 0; j < 1000000; j++) {   // 线程开启之后就会在此地循环执行等待队列数量不为0            if (io_threads_pending[id] != 0) break;        }        /* Give the main thread a chance to stop this thread. */        if (io_threads_pending[id] == 0) {   // 可通过锁来控制线程的启停            pthread_mutex_lock(&io_threads_mutex[id]);            pthread_mutex_unlock(&io_threads_mutex[id]);            continue;        }        serverAssert(io_threads_pending[id] != 0);        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));        /* Process: note that the main thread will never touch our list         * before we drop the pending count to 0. */        listIter li;        listNode *ln;        listRewind(io_threads_list[id],&li);        while((ln = listNext(&li))) {  //获取每个队列中的数据            client *c = listNodeValue(ln);            if (io_threads_op == IO_THREADS_OP_WRITE) {  // 如果是写则写数据到客户端                writeToClient(c,0);            } else if (io_threads_op == IO_THREADS_OP_READ) {  // 如果是读则读取数据并解析                readQueryFromClient(c->conn);            } else {                serverPanic("io_threads_op value is unknown");            }        }        listEmpty(io_threads_list[id]);        io_threads_pending[id] = 0;        if (tio_debug) printf("[%ld] Done\n", id);    }}




void readQueryFromClient(connection *conn) {...    /* Check if we want to read from the client later when exiting from     * the event loop. This is the case if threaded I/O is enabled. */    if (postponeClientRead(c)) return;    ...}/* Return 1 if we want to handle the client read later using threaded I/O. * This is called by the readable handler of the event loop. * As a side effect of calling this function the client is put in the * pending read clients and flagged as such. */int postponeClientRead(client *c) {    if (server.io_threads_active &&        server.io_threads_do_reads &&        !clientsArePaused() &&        !ProcessingEventsWhileBlocked &&        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))    {        c->flags |= CLIENT_PENDING_READ;        listAddNodeHead(server.clients_pending_read,c);  // 如果开启多线程则添加到队列中不立马处理读请求        return 1;    } else {        return 0;    }}...



/* Add the object 'obj' string representation to the client output buffer. */void addReply(client *c, robj *obj) {    if (prepareClientToWrite(c) != C_OK) return;    ...}...  int prepareClientToWrite(client *c) {    /* If it's the Lua client we always return ok without installing any     * handler since there is no socket at all. */    if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;    /* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */    if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;    /* CLIENT REPLY OFF / SKIP handling: don't send replies. */    if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;    /* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag     * is set. */    if ((c->flags & CLIENT_MASTER) &&        !(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;    if (!c->conn) return C_ERR; /* Fake client for AOF loading. */    /* Schedule the client to write the output buffers to the socket, unless     * it should already be setup to do so (it has already pending data).     *     * If CLIENT_PENDING_READ is set, we're in an IO thread and should     * not install a write handler. Instead, it will be done by     * handleClientsWithPendingReadsUsingThreads() upon return.     */    if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))            clientInstallWriteHandler(c);  // 键入到写队列中    /* Authorize the caller to queue in the output buffer of this client. */    return C_OK;}



import threadingimport timethread_nums = 3work_nums = [0 for i in range(thread_nums)]work_locks = [threading.Lock() for i in range(thread_nums)]work_queue = []for i in range(thread_nums):    work_queue.append([])def worker(index):    while True:        for i in range(1000000):            if work_nums[index] != 0:                break        if work_nums[index] == 0:            print(" acquire ", index)            work_locks[index].acquire()            print(" release ", index)            work_locks[index].release()            continue        print(" start work  ", index, work_queue[index])        while len(work_queue[index]):            work_queue[index].pop()        work_nums[index] = 0for lock in work_locks:    lock.acquire()for i in range(thread_nums):    t = threading.Thread(target=worker, args=(i, ))    t.start()while True:    time.sleep(20)    for lock in work_locks:        lock.release()    for i in range(thread_nums):        work_queue[i] = ["index {0} value".format(i) for j in range(i+5)]    for i in range(thread_nums):        work_nums[i] = len(work_queue[i])    for lock in work_locks:        lock.acquire()


