miniob源码 架构概览

整体架构

如下图,简单描述了,observer启动后,建立监听、注册libevent事件,recv后触发各stags的handle_event、处理结果回调、threadpool运行机制等等几个方面对整体线程模型、reactor模型和各组件工作流进行分析。

Reactor模型

  • miniob运行框架是通过libevent实现了对网络事件的监听,当链接建立后,读缓冲区事件触发回调recv函数的处理流程。
event_set(&client_context->read_event, client_context->fd, EV_READ | EV_PERSIST, recv, client_context);

  • 在recv函数的最后将SessionEvent添加到SessionStage的事件队列中,同时将SessionStage添加到threapool的Stage队列中。
SessionEvent *sev = new SessionEvent(client);session_stage_->add_event(sev);
  • thread_pool的stage队列(thread_pool.h)
std::deque<Stage *> run_queue_;//< list of stages with work to do
  • stage_event队列(stage.h)
std::deque<StageEvent *> event_list_;// event queue

线程模型

miniob采用多线程的架构,通过Threadpool类创建线程池,根据etc/observer.ini中的配置创建线程。

# threadpools' name, it will contain the threadpool's sectionThreadPools=SQLThreads,IOThreads,DefaultThreads[SQLThreads]# the thread number of this threadpool, 0 means cpu's cores.# if miss the setting of count, it will use cpu's core number;count=3#count=0[IOThreads]# the thread number of this threadpool, 0 means cpu's cores.# if miss the setting of count, it will use cpu's core number;count=3#count=0[DefaultThreads]# If Stage haven't set threadpool, it will use this threadpool# This threadpool is used for backend operation, such as timer, sedastats and so on.# the thread number of this threadpool, 0 means cpu's cores.# if miss the setting of count, it will use cpu's core number;count=3

ThreadPools 配置了3个线程池:SQLThreads,IOThreads,DefaultThreads,每个线程池可以配置count,即线程的个数。

  • 创建线程池和线程,初始化时通过new ThreadPools,在构造函数中调用add_threads,根据线程个数的配置创建线程。
// attempt to start the requested number of threadsfor (i = 0; i < threads; i++) {int stat = pthread_create(&pthread, &pthread_attrs, Threadpool::run_thread, (void *)this);if (stat != 0) {LOG_WARN("Failed to create one thread\n");break;}char tmp[32] = {};sprintf(tmp,"%s%u",name_.c_str(), i);pthread_setname_np(pthread, tmp);}

注意:8,9,10三行是我后加的代码,目的是在top的时候显示线程名称。

  • 在TimerStage初始化的时候bool TimerStage::initialize()函数也创建了一个线程。
bool TimerStage::initialize(){// The TimerStage does not send messages to any other stage.ASSERT(next_stage_list_.size() == 0, "Invalid NextStages list.");// Start the thread to maintain the timerconst pthread_attr_t *thread_attrs = NULL;void *thread_args = (void *)this;int status = pthread_create(&timer_thread_id_, thread_attrs, &TimerStage::start_timer_thread, thread_args);if (status != 0)LOG_ERROR("failed to create timer thread: status=%d\n", status);pthread_setname_np(timer_thread_id_, "TimerStage");return (status == 0);}

注意:低12行是我后加的代码,目的是在top的时候显示线程名称。

  • 启动observer后可以通过top看到各线程情况
top -p `ps -ef | grep observer | grep -v grep | awk '{print $2}'` -H

注意:可以看到observer的线程情况为,主线observer + SQLThreads(3个)+ IOThreads(3个)+ DefaultThreads(3个)+ TimerStage 共11 个线程

  • 初始化stage时,安装配置etc/observer.ini为每个stage分别了不同的线程池
[SessionStage]ThreadId=SQLThreadsNextStages=PlanCacheStage[PlanCacheStage]ThreadId=SQLThreads#NextStages=OptimizeStageNextStages=ParseStage[ParseStage]ThreadId=SQLThreadsNextStages=ResolveStage[ResolveStage]ThreadId=SQLThreadsNextStages=QueryCacheStage[QueryCacheStage]ThreadId=SQLThreadsNextStages=OptimizeStage[OptimizeStage]ThreadId=SQLThreadsNextStages=ExecuteStage[ExecuteStage]ThreadId=SQLThreadsNextStages=DefaultStorageStage,MemStorageStage[DefaultStorageStage]ThreadId=IOThreadsBaseDir=./miniobSystemDb=sys[MemStorageStage]ThreadId=IOThreads[MetricsStage]NextStages=TimerStage
  • bug修复,在分配线程池的时候代码有逻辑错误

注意:低315行,317行为调整好的代码,建issue:stage线程分配问题 #75 ,PR:stage线程分配问题 #75 #76

Stage模型

可以把stage模型看做一个链,每个stage都是链上的节点,节点的入口是handle_event,每个handle_event都会调用下一个stage的handle_event。

NextStages在配置文件中给出,但是实际上代码已经写的比较固化,比如支持配置多个NextStages,但成员变量中已经固定了一个或者两个

handle_event的参数是SQLStageEvent,SQLStageEvent中包括Stmt、Query、sql_等重要的数据结构

private:SessionEvent *session_event_ = nullptr;std::string sql_;Query *query_ = nullptr;Stmt *stmt_ = nullptr;

其中包含SessionEvent的原因是SessionEvent 中带有的回调函数可以向客户端返回应答。

private:ConnectionContext *client_;std::string response_;
  • SessionStage::callback_event回调函数,向客户端返回执行结果
void SessionStage::callback_event(StageEvent *event, CallbackContext *context){LOG_TRACE("Enter\n");SessionEvent *sev = dynamic_cast<SessionEvent *>(event);if (nullptr == sev) {LOG_ERROR("Cannot cat event to sessionEvent");return;}const char *response = sev->get_response();int len = sev->get_response_len();if (len <= 0 || response == nullptr) {response = "No data\n";len = strlen(response) + 1;}Server::send(sev->get_client(), response, len);if ('\0' != response[len - 1]) {// 这里强制性的给发送一个消息终结符,如果需要发送多条消息,需要调整char end = 0;Server::send(sev->get_client(), &end, 1);}// sev->done();LOG_TRACE("Exit\n");return;}

注意:低17行向客户端发送处理结果response。

  • SessionEvent是在ParseStage阶段被调用的
void ParseStage::callback_event(StageEvent *event, CallbackContext *context){LOG_TRACE("Enter\n");SQLStageEvent *sql_event = static_cast<SQLStageEvent *>(event);sql_event->session_event()->done_immediate();sql_event->done_immediate();LOG_TRACE("Exit\n");return;}

注意:第5行为回调SessionEvent,直接将结果返回客户端。

  • 词法解析、语法解析是在ParseStage的handle_event(request)进行的
RC ParseStage::handle_request(StageEvent *event){SQLStageEvent *sql_event = static_cast<SQLStageEvent *>(event);const std::string &sql = sql_event->sql();Query *query_result = query_create();if (nullptr == query_result) {LOG_ERROR("Failed to create query.");return RC::INTERNAL;}RC ret = parse(sql.c_str(), query_result);if (ret != RC::SUCCESS) {// set error information to eventsql_event->session_event()->set_response("Failed to parse sql\n");query_destroy(query_result);return RC::INTERNAL;}

注意:第12行电源parse进行语法、词法解析,生成语法树Query *query_result。

总结

整体架构是基于Reactor事件驱动的异步消息处理模型,使用线程池,通过配置文件编排stage链,完成SQL处理流水线。当链接建立后,libevent的epool监听读缓冲区,收到可读event后,触发recv函数事件,recv将收到的数据和session信息打包成SessionEvent,并以SessionStage形式添加到thread_pool的stage队列中,同时触发SessionStage的handle_event,启动流水线链,然后按照流水线配置进行逐步处理,最后通过SessionStage的回调函数callback_event将处理结果发送至客户端。

整体架构应该比较清晰了,后续将逐步对各组件进行详细分析。