个人主页:平凡的小苏
学习格言:命运给你一个低的起点,是想看你精彩的翻盘,而不是让你自甘堕落,脚下的路虽然难走,但我还能走,比起向阳而生,我更想尝试逆风翻盘
C++项目实战C++项目实战
> 家人们更新不易,你们的点赞和⭐关注⭐真的对我真重要,各位路 过的友友麻烦多多点赞关注。 欢迎你们的私信提问,感谢你们的转发! 关注我,关注我,关注我,你们将会看到更多的优质内容!!

一、Http服务器认识

概念

HTTP(Hyper Text Transfer Protocol),超⽂本传输协议是应⽤层协议,是⼀种简单的请求-响应协议(客⼾端根据⾃⼰的需要向服务器发送请求,服务器针对请求提供服务,完毕后通信结束)。

实现⼀个HTTP服务器很简单,但是实现⼀个⾼性能的服务器并不简单,这个单元中将讲解基于Reactor模式的⾼性能服务器实现。

当然准确来说,因为我们要实现的服务器本⾝并不存在业务,咱们要实现的应该算是⼀个⾼性能服务器基础库,是⼀个基础组件。

二、Reactor模型

概念

Reactor 模式,是指通过⼀个或多个输⼊同时传递给服务器进⾏请求处理时的事件驱动处理模式

服务端程序处理传⼊多路请求,并将它们同步分派给请求对应的处理线程,Reactor 模式也叫Dispatcher 模式

简单理解就是使⽤ I/O多路复⽤ 统⼀监听事件,收到事件后分发给处理进程或线程,是编写⾼性能⽹络服务器的必备技术之⼀。

单Reactor单线程:单I/O多路复⽤+业务处理

  • 通过IO多路复⽤模型进⾏客⼾端请求监控

  • 触发事件后,进⾏事件处理

    • 如果是新建连接请求,则获取新建连接,并添加⾄多路复⽤模型进⾏事件监控。

    • 如果是数据通信请求,则进⾏对应数据处理(接收数据,处理数据,发送响应)。

优点:所有操作均在同⼀线程中完成,思想流程较为简单,不涉及进程/线程间通信及资源争抢问题。

缺点:⽆法有效利⽤CPU多核资源,很容易达到性能瓶颈。

适⽤场景:适⽤于客⼾端数量较少,且处理速度较为快速的场景。(处理较慢或活跃连接较多,会导致串⾏处理的情况下,后处理的连接⻓时间⽆法得到响应).

单Reactor多线程:单I/O多路复⽤+线程池(业务处理)

  • Reactor线程通过I/O多路复⽤模型进⾏客⼾端请求监控

  • 触发事件后,进⾏事件处理

    • 如果是新建连接请求,则获取新建连接,并添加⾄多路复⽤模型进⾏事件监控。
    • 如果是数据通信请求,则接收数据后分发给Worker线程池进⾏业务处理。
    • ⼯作线程处理完毕后,将响应交给Reactor线程进⾏数据响应

优点:充分利⽤CPU多核资源

缺点:多线程间的数据共享访问控制较为复杂,单个Reactor 承担所有事件的监听和响应,在单线程中运⾏,⾼并发场景下容易成为性能瓶颈。

多Reactor多线程:多I/O多路复⽤+线程池(业务处理)

  • 在主Reactor中处理新连接请求事件,有新连接到来则分发到⼦Reactor中监控

  • 在⼦Reactor中进⾏客⼾端通信监控,有事件触发,则接收数据分发给Worker线程池

  • Worker线程池分配独⽴的线程进⾏具体的业务处理

    • ⼯作线程处理完毕后,将响应交给⼦Reactor线程进⾏数据响应

优点:充分利⽤CPU多核资源,主从Reactor各司其职

⽬标定位:主从Reactor模型⾼并发服务器

我要实现的是主从Reactor模型服务器,也就是主Reactor线程仅仅监控监听描述符,获取新建连接,保证获取新连接的⾼效性,提⾼服务器的并发性能。

主Reactor获取到新连接后分发给⼦Reactor进⾏通信事件监控。⽽⼦Reactor线程监控各⾃的描述符的读写事件进⾏数据读写以及业务处理。(该项目从Reactor主要作用:IO事件监控+IO操作+业务处理)(比较轻量)。

当前实现中,因为并不确定组件使⽤者的使⽤意向,因此并不提供业务层⼯作线程池的实现,只实现主从Reactor,⽽Worker⼯作线程池,可由组件库的使⽤者的需要⾃⾏决定是否使⽤和实现。

三、功能模块划分

我要实现的是⼀个带有协议⽀持的Reactor模型⾼性能服务器,因此将整个项⽬的实现划分为两个⼤的模块:

  • SERVER模块:实现Reactor模型的TCP服务器;

  • 协议模块:对当前的Reactor模型服务器提供应⽤层协议⽀持。

SERVER模块

SERVER模块就是对所有的连接以及线程进⾏管理,让它们各司其职,在合适的时候做合适的事,最终

完成⾼性能服务器组件的实现。

⽽具体的管理也分为三个⽅⾯:

• 监听连接管理:对监听连接进⾏管理。

• 通信连接管理:对通信连接进⾏管理。

• 超时连接管理:对超时连接进⾏管理。

基于以上的管理思想,将这个模块进⾏细致的划分⼜可以划分为以下多个⼦模块:

Buffer子模块

Buffer模块是⼀个缓冲区模块,⽤于实现通信中⽤⼾态的接收缓冲区和发送缓冲区功能

意义:1、防止接收到的数据不是一条完整的数据,因此对接收的数据进行缓冲

​ 2、对于客户端响应的数据,应该是在套接字可写的情况下进行发送

功能设计
1、向缓冲区中添加数据
2、从缓冲区中取出数据

Socket子模块

Socket模块是对套接字操作封装的⼀个模块,主要实现的socket的各项操作。

意义:程序中对于套接字的各项操作更加简便

功能设计

  1. ​ 创建套接字
  2. ​ 绑定地址信息
  3. ​ 开始监听
  4. ​ 向服务器发起连接
  5. ​ 获取新连接
  6. ​ 接收数据
  7. ​ 发送数据
  8. ​ 关闭套接字
  9. ​ 创建一个监听连接
  10. ​ 创建一个客户端连接

Channel模块

Channel模块是对⼀个描述符需要进⾏的IO事件管理的模块,实现对描述符可读,可写,错误…事件的管理操作,以及Poller模块对描述符进⾏IO事件监控就绪后,根据不同的事件,回调不同的处理函数功能。

意义:对于描述符的监控事件在用户态更容易维护,以及触发事件后的操作流程更加的清晰

功能设计

  1. 对监控事件的管理
    描述符是否可读
    描述符是否可写
    对描述符监控可读
    对描述符监控可写
    解除可读事件监控
    解除可写事件监控
    解除所有事件监控
  2. 对监控事件触发后的处理
    设置对于不同事件的回调处理函数,明确触发了某个事件之后应该怎么处理

Connection模块

功能

  • ​ 这是一个对于通信连接进行整体管理的一个模块,对一个连接的操作都是通过这个模块进行的
  • ​ Connection模块,一个连接有任何的事件该怎么处理都是由这个模块来进行处理的,因为组件的设计也不知道使用者要如何处理事 件,因此只能是提供一些事件回调函数由使用者设置

• Connection模块内部包含有三个由组件使⽤者传⼊的回调函数:连接建⽴完成回调,事件回调,新数据回调,关闭回调。

• Connection模块内部包含有两个组件使⽤者提供的接⼝:数据发送接⼝,连接关闭接⼝

• Connection模块内部包含有两个⽤⼾态缓冲区:⽤⼾态接收缓冲区,⽤⼾态发送缓冲区

• Connection模块内部包含有⼀个Socket对象:完成描述符⾯向系统的IO操作

• Connection模块内部包含有⼀个Channel对象:完成描述符IO事件就绪的处理

具体处理流程如下

  1. 实现向Channel提供可读,可写,错误等不同事件的IO事件回调函数,然后将Channel和对应的描述符添加到Poller事件监控中。

  2. 当描述符在Poller模块中就绪了IO可读事件,则调⽤描述符对应Channel中保存的读事件处理函数,进⾏数据读取,将socket接收缓冲区全部读取到Connection管理的⽤⼾态接收缓冲区中。然后调⽤由组件使⽤者传⼊的新数据到来回调函数进⾏处理。

  3. 组件使⽤者进⾏数据的业务处理完毕后,通过Connection向使⽤者提供的数据发送接⼝,将数据写⼊Connection的发送缓冲区中。

  4. 启动描述符在Poll模块中的IO写事件监控,就绪后,调⽤Channel中保存的写事件处理函数,将发送缓冲区中的数据通过Socket进⾏⾯向系统的实际数据发送。

Acceptor模块:

Acceptor模块是对Socket模块,Channel模块的⼀个整体封装,实现了对⼀个监听套接字的整体的管理。

• Acceptor模块内部包含有⼀个Socket对象:实现监听套接字的操作

• Acceptor模块内部包含有⼀个Channel对象:实现监听套接字IO事件就绪的处理

具体处理流程如下

  1. 实现向Channel提供可读事件的IO事件处理回调函数,函数的功能其实也就是获取新连接
  2. 为新连接构建⼀个Connection对象出来。

意义
当获取了一个新建连接的描述符之后,需要为这个通信连接,封装一个Connection对象,设置各种不同回调

注意

​ 因为Acceptor模块本身并不知道一个连接产生了某个事件该如何处理,因此获取一个通信连接后,Connection的封装,以及事件回调的设置都应该由服务器模块来进行

TimerQueue模块

TimerQueue模块是实现固定时间定时任务的模块,可以理解就是要给定时任务管理器,向定时任务管理器中添加⼀个任务,任务将在固定时间后被执⾏,同时也可以通过刷新定时任务来延迟任务的执⾏

这个模块主要是对Connection对象的⽣命周期管理,对⾮活跃连接进⾏超时后的释放功能。

TimerQueue模块内部包含有⼀个timerfd:linux系统提供的定时器。

TimerQueue模块内部包含有⼀个Channel对象:实现对timerfd的IO时间就绪回调处理

功能设计:添加定时任务、刷新定时任务、希望一个定时任务重新开始计时、取消定时任务

Poller模块

Poller模块是对epoll进⾏封装的⼀个模块,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能。

意义:对epoll进行的封装,让对描述符进行事件监控的操作更加简单

功能接口:添加事件监控、Channel模块、修改事件监控、移除事件监控

EventLoop模块

EventLoop模块可以理解就是我们上边所说的Reactor模块,它是对Poller模块,TimerQueue模块,Socket模块的⼀个整体封装,进⾏所有描述符的事件监控。

意义:对于服务器中的所有的事件都是由EventLoop模块来完成
每一个Connection连接,都会绑定一个EventLoop模块和线程,因为外界对于连接的所有操作,都是要放到同一个线程中进行的

思想:1、对所有的连接进行事件监控

​ 2、连接触发事件后调用回调进行处理

​ 3、对于连接的所有操作,都要放到eventloop模块来完成

功能设计:将对连接的操作任务添加到任务队列、定时任务的添加、定时任务的刷新、定时任务的取消

具体操作流程

​ 1、通过Poller模块对当前模块管理内的所有描述符进⾏IO事件监控,有描述符事件就绪后,通过描述符对应的Channel进⾏事件 处理。

​ 2、所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作顺序进⾏执⾏。

​ 3、 由于epoll的事件监控,有可能会因为没有事件到来⽽持续阻塞,导致任务队列中的任务不能及时得到执⾏,因此创建了 eventfd,添加到Poller的事件监控中,⽤于实现每次向任务队列添加任务的时候,通过向eventfd写⼊数据来唤醒epoll的阻塞

TcpServer模块

这个模块是⼀个整体Tcp服务器模块的封装,内部封装了Acceptor模块,EventLoopThreadPool模块。

​ • TcpServer中包含有⼀个EventLoop对象:在超轻量使⽤场景中不需要EventLoop线程池,只需要在主线程中完成所有操作的情况。

​ • TcpServer模块内部包含有⼀个EventLoopThreadPool对象:其实就是EventLoop线程池,也就是⼦Reactor线程池

​ • TcpServer模块内部包含有⼀个Acceptor对象:⼀个TcpServer服务器,必然对应有⼀个监听套接字,能够完成获取客⼾端新连接,并处理的任务。

​ • TcpServer模块内部包含有⼀个std::shared_ptr的hash表:保存了所有的新建连接对应的Connection,注意,所有的Connection使⽤shared_ptr进⾏管理,这样能够保证在hash表中删除了Connection信息后,在shared_ptr计数器为0的情况下完成对Connection资源的释放操作。

意义:让组件使用者可以更加轻便的完成一个服务器的搭建

功能:对于监听连接的管理,获取一个新连接之后如何处理,由Server模块设置

​ 对于通信连接的管理,连接产生的某个事件如何处理,由Server模块设置
​ 对于超时连接的管理,连接非活跃超时是否关闭,由Server模块设置
​ 对于事件监控的管理,启动多少个线程,有多少个EventLoop,由Server设置
​ 事件回调函数的设置,一个连接产生了一个事件,对于这个事件如何处理,只有组件使用者知道,因此一个事件的处理回调,一定 是组件使用者,设置给TcpServer,TcpServer设置给各个Connection连接

具体流程如下

  1. 在实例化TcpServer对象过程中,完成BaseLoop的设置,Acceptor对象的实例化,以及EventLoop线程池的实例化,以及std::shared_ptr的hash表的实例化。

  2. 为Acceptor对象设置回调函数:获取到新连接后,为新连接构建Connection对象,设置Connection的各项回调,并使⽤shared_ptr进⾏管理,并添加到hash表中进⾏管理,并为Connection选择⼀个EventLoop线程,为Connection添加⼀个定时销毁任务,为Connection添加事件监控,

  3. 启动BaseLoop。

通信连接管理模块关系图

四、Server代码实现

1、前置技术知识

C++11中的bind

bind (Fn&& fn, Args&&... args);

我们可以将bind接⼝看作是⼀个通⽤的函数适配器,它接受⼀个函数对象,以及函数的各项参数,然后返回⼀个新的函数对象,但是这个函数对象的参数已经被绑定为设置的参数。运⾏的时候相当于总是调⽤传⼊固定参数的原函数。

但是如果进⾏绑定的时候,给与的参数为 std::placeholders::_1, _2... 则相当于为新适配⽣成的函数对象的调⽤预留⼀个参数进⾏传递。

2、简单的秒级定时任务实现

在当前的⾼并发服务器中,我们不得不考虑⼀个问题,那就是连接的超时关闭问题。我们需要避免⼀个连接⻓时间不通信,但是也不关闭,空耗资源的情况。这时候我们就需要⼀个定时任务,定时的将超时过期的连接进⾏释放。

Linux提供给我们的定时器

#include int timerfd_create(int clockid, int flags); clockid: CLOCK_REALTIME-系统实时时间,如果修改了系统时间就会出问题; CLOCK_MONOTONIC-从开机到现在的时间是⼀种相对时间; flags: 0-默认阻塞属性返回值:小于0则为错误int timerfd_settime(int fd, int flags, struct itimerspec *new, structitimerspec *old); fd: timerfd_create返回的⽂件描述符 flags: 0-相对时间, 1-绝对时间;默认设置为0即可. new: ⽤于设置定时器的新超时时间 old: ⽤于接收原来的超时时间struct timespec {time_t tv_sec; /* Seconds */long tv_nsec; /* Nanoseconds */};struct itimerspec {struct timespec it_interval; /* 第⼀次之后的超时间隔时间 */struct timespec it_value; /* 第⼀次超时时间 */};定时器会在每次超时时,⾃动给fd中写⼊8字节的数据,表⽰在上⼀次读取数据到当前读取数据期间超时了多少次。

示例:

#include #include #include #include int main(){int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if(timerfd < 0){perror("timerfd_create error");return -1;}struct itimerspec itime;itime.it_value.tv_sec = 3;itime.it_value.tv_nsec = 0; // 第一次超时时间为1s后itime.it_interval.tv_sec = 1;itime.it_interval.tv_nsec = 0; //第一次超时后,每次超时的间隔timerfd_settime(timerfd, 0, &itime, nullptr);while(1){uint64_t times;int ret = read(timerfd, & times, 8);if(ret < 0){perror("read error");return -1;}printf("超时了,距离上一次超时了%d次\n",times);}close(timerfd);return 0;}

上边例⼦,是⼀个定时器的使⽤⽰例,是每隔3s钟触发⼀次定时器超时,否则就会阻塞在read读取数据这⾥

时间轮思想

  • 上述的例⼦,存在⼀个很⼤的问题,每次超时都要将所有的连接遍历⼀遍,如果有上万个连接,效率⽆疑是较为低下的。

  • 时间轮的思想来源于钟表,如果我们定了⼀个3点钟的闹铃,则当时针⾛到3的时候,就代表时间到了。

  • 同样的道理,如果我们定义了⼀个数组,并且有⼀个指针,指向数组起始位置,这个指针每秒钟向后⾛动⼀步,⾛到哪⾥,则代表哪⾥的任务该被执⾏了,那么如果我们想要定⼀个3s后的任务,则只需要将任务添加到tick+3位置,则每秒中⾛⼀步,三秒钟后tick⾛到对应位置,这时候执⾏对应位置的任务即可。

  • 但是,同⼀时间可能会有⼤批量的定时任务,因此我们可以给数组对应位置下拉⼀个数组,这样就可

​ 以在同⼀个时刻上添加多个定时任务了。

  • 上述操作也有⼀些缺陷,⽐如我们如果要定义⼀个60s后的任务,则需要将数组的元素个数设置为60才可以,如果设置⼀⼩时后的定时任务,则需要定义3600个元素的数组,这样⽆疑是⽐较⿇烦的。

  • 因此,可以采⽤多层级的时间轮,有秒针轮,分针轮,时针轮,当指针指向了时针轮所对应的位置的时候,那么就会像分针轮进行移动,当指针指向了分针轮所对应的位置的时候,指针就会向秒针轮进行移动。

  • 因为当前我们的应⽤中,倒是不⽤设计的这么⿇烦,因为我们的定时任务通常设置的30s以内,所以简单的单层时间轮就够⽤了。

但是,我们也得考虑⼀个问题,当前的设计是时间到了,则主动去执⾏定时任务,释放连接,那能不能在时间到了后,⾃动执⾏定时任务呢?

作为一个时间轮定时器,本身并不关注任务类型,只要是时间到了就需要被执行。

解决方案:类的析构函数 + 智能指针share_ptr, 通过这两个技术可以实现定时任务的延时

​ 1、使用一个类,对定时任务进行封装,类实例化的每一个对象,就是一个定时任务对象,当对象被销毁的时候,再去执行定时任务

​ (将定时任务的执行,放到析构函数中)

​ 2、shared_ptr用于对new的对象进行空间管理,当shared_ptr对一个对象进行管理的时候,内部有一个计数器,计数器为0的时候,

​ 则释放所管理的对象。

​ int *a = new int;

​ std::shared_ptr pi(a); —a对象只有在pi技术为0的时候,才会被释放

​ std::shared_ptr pi1(pi) –当针对pi又构建了一个shared_ptr对象,则pi和pi1计数器为2

​ 当pi和pi1中任意一个被释放的时候,只是计数器-1,因此他们管理的a对象并没有被释放,

​ 只有当pi和pi1都被释放了,计数器为0了,这时候才会释放管理的a 对象

示例代码

#include #include #include #include #include #include #include using std::cout;using std::endl;using TaskFunc = std::function;using ReleaseFunc = std::function;class TimerTask{public:TimerTask(uint64_t id, uint32_t delay,const TaskFunc &cb):_id(id),_timeout(delay),_task_cb(cb),_canceled(false){}void SetRelease(const ReleaseFunc &cb){_release = cb;}uint32_t DelayTime(){return _timeout;}void Cancel(){_canceled = true;}~TimerTask(){if(_canceled == false) _task_cb();//定时任务没有被取消才会执行_release();}private:uint64_t _id;//定时器任务对象IDuint32_t _timeout;//定时任务的超时时间TaskFunc _task_cb;//定时器对象要执行的定时任务ReleaseFunc _release;//用于删除TimerWheel中保存的定时器对象信息bool _canceled; //false-表示任务没有被取消,true-表示任务被取消};class TimerWheel{public:TimerWheel():_capacity(60),_tick(0),_wheel(_capacity){}void TimerAdd(u_int64_t id, uint32_t delay, const TaskFunc &cb)//添加定时任务{PtrTask pt(new TimerTask(id,delay,cb));pt->SetRelease(std::bind(&TimerWheel::RemoveTimer,this,id));_timers[id] = WeakTask(pt);int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}//刷新/延迟定时任务void TimerRefresh(u_int64_t id){//通过保存我的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中auto it = _timers.find(id);if(it == _timers.end()){return;//没有找到定时任务,没法刷新,没法延迟}PtrTask pt = it->second.lock();//lock获取weak_ptr管理的对象对应的shared_ptrint delay = pt->DelayTime();int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}//这个函数应该每秒钟执行一次,相当于秒针向后走了一步void RunTimerTask(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear();}//取消定时任务void TimerCancel(uint64_t id){auto it = _timers.find(id);if(it == _timers.end()){return;//没有找到定时任务,没法刷新,没法延迟}PtrTask pt = it->second.lock();if(pt) pt->Cancel();}private:void RemoveTimer(uint64_t id){auto it = _timers.find(id);if(it != _timers.end()){_timers.erase(it);}}private:using PtrTask = std::shared_ptr;using WeakTask = std::weak_ptr;int _tick; //当前的秒针,走到哪里释放哪里,就相当于执行哪里的任务 int _capacity; //表盘最大数量---其实就是最大延迟时间std::vector<std::vector> _wheel;std::unordered_map _timers;};class Test{public:Test() {cout << "构造" << endl;}~Test() {cout << "析构" <<endl;}};void DelTest(Test *t){delete t;}int main(){//测试代码TimerWheel tw;Test *t = new Test();tw.TimerAdd(888,5,std::bind(DelTest, t));for(int i = 0; i < 5; i++){sleep(1);tw.TimerRefresh(888);//刷新定时任务tw.RunTimerTask();cout << "刷新了一下定时任务,重新需要5s中后才会销毁" << endl;}tw.TimerCancel(888);//取消定时任务就不会被销毁了while(1){sleep(1);cout << "-----------------" << endl;tw.RunTimerTask();}return 0;}

3、正则表达式的简单使用

正则表达式(regular expression)描述了⼀种字符串匹配的模式(pattern),可以⽤来检查⼀个串是否含有某种⼦串、将匹配的⼦串替换或者从某个串中取出符合某个条件的⼦串等。

std::regex_match(const std::string &src, std::smatch &matches, std::regex &e)src: 原始字符串matches: 正则表达式可以从原始字符串中匹配并提取符合某种规则的数据,提取的数据就放在matches中是一个类似于数组的容器e:正则表达式的匹配规则返回值:用于确定匹配是否成功

示例代码

#include #include #include int main(){std::string str = "/numbers/1234";//需要提取数字字符串//匹配以/number/起始,后边跟了一个或多个数字字符的字符串,并且在匹配过程中提取这个匹配到的数字字符串std::regex e("/numbers/(\\d+)");std::smatch matches;bool ret = std::regex_match(str,matches,e);if(ret == false){std::cout << "匹配失败" << std::endl;}for(auto s : matches){std::cout << s << std::endl;}return 0;}

正则表达式提取HTTP请求方法

示例代码

#include #include #include int main(){//HTTP请求格式: GET /bitejiuyeke/login" />

4、通用类型any类型设计

  • 每⼀个Connection对连接进⾏管理,最终都不可避免需要涉及到应⽤层协议的处理,因此在Connection中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议千千万,为了降低耦合度,这个协议接收解析上下⽂就不能有明显的协议倾向,它可以是任意协议的上下⽂信息,因此就需要⼀个通⽤的类型来保存各种不同的数据结构。

  • 在C语⾔中,通⽤类型可以使⽤void*来管理,但是在C++中,boost库和C++17给我们提供了⼀个通⽤类型any来灵活使⽤,如果考虑增加代码的移植性,尽量减少第三⽅库的依赖,则可以使⽤C++17特性中的any,或者⾃⼰来实现。

1、一个连接必须拥有一个请求接收与解析的上下文

2、上下文的类型或者说结构不能固定,因为服务器支持的协议有可能会不断增多不同的协议,可能都会有不同的上下文结构

结论:必须拥有一个容器,能够保存各种不同的类型结构数据

通用类型:any

解决方案

  • Any类主要是实现⼀个通⽤类型出来,在c++17和boost库中都有现成的可以使⽤,但是这⾥实现⼀下了解其思想,这样也就避免了第三⽅库的使⽤了
  • ⾸先Any类肯定不能是⼀个模板类,否则编译的时候 Any a, Anyb,需要传类型作为模板参数,也就是说在使⽤的时候就要确定其类型
  • 这是⾏不通的,因为保存在Content中的协议上下⽂,我们在定义any对象的时候是不知道他们的协议类型的,因此⽆法传递类型作为模板参数
  • 因此考虑Any内部设计⼀个模板容器holder类,可以保存各种类型数据
  • ⽽因为在Any类中⽆法定义这个holder对象或指针,因为any也不知道这个类要保存什么类型的数据,因此⽆法传递类型参数
  • 所以,定义⼀个基类placehoder,让holder继承于placeholde,⽽Any类保存⽗类指针即可
  • 当需要保存数据时,则new⼀个带有模板参数的⼦类holder对象出来保存数据,然后让Any类中的⽗类指针,指向这个⼦类对象就搞定了

示例代码

#include #include #include using std::cout;using std::endl;class Any{private:class holder{public:virtual ~holder() {}virtual const std::type_info& type() = 0;virtual holder* clone() = 0;};templateclass placeholder : public holder{public:placeholder(const T&val):_val(val) {}virtual ~placeholder() {}//获取子类对象保存的数据类型virtual const std::type_info& type() {return typeid(T);}//针对当前的对象自身,克隆出一个新的子类对象virtual holder* clone() {return new placeholder(_val);}public:T _val;};holder* _content;public:Any():_content(nullptr) {}templateAny(const T &val):_content(new placeholder(val)) {}Any(const Any& other):_content(other._content " />();cout << *pa << endl;a = std::string("nihao");std::string *ps = a.get();cout << *ps << endl;return 0;}

5、缓冲区Buffer类设计

实现思想

​ 1、实现缓冲区得有一块内存空间,采用vector,vector底层其实使用的就是一个线性的内存空间

2、要素

​ (1)默认的空间大小

​ (2)当前的读取数据位置

​ (3)当前的写入数据位置

3、操作

​ (1)写入数据:当前写入位置指向哪里,就从哪里开始写入,如果后续剩余空闲不够了

​ 考虑整体缓冲区空闲空间是否足够(因为都位置也会向后偏移,前边有可能会有空闲空间)

​ 足够:将数据移动到起始位置即可

​ 不够:扩容,从当前写位置开始扩容足够大小

​ 数据一旦写入成功,当前写位置,就要向后偏移

​ (2)读取数据

​ 当前的读取位置指向哪里,就从哪里开始读取,前提是有数据可读

​ 可读数据大小:当前写入位置,减去当前读取位置

4、主要功能

​ (1)获取当前写位置地址

​ (2)确保可写空间足够(移动+扩容)

​ (3)获取前沿空闲空间大小

​ (4)获取后沿空闲空间大小

​ (5)将写位置向后移动指定长度

​ (6)获取当前都位置地址

​ (7)获取可读数据大小

​ (8)将读位置向后移动指定长度

​ (9)写入数据

​ (10)读取数据

​ (11)清理功能

代码

class Buffer{public:Buffer():_reader_idx(0),_writer_idx(0),_buffer(BUFFER_DEFAULT_SIZE){}//获取起始地址char* Begin() {return &*_buffer.begin();}// 获取当前写位置起始地址char* WritePosition(){return Begin() + _writer_idx;}// 获取当前读位置起始地址char* ReadPosition(){return Begin() + _reader_idx;}// 获取前沿空闲空间大小uint64_t HeadIdleSize(){return _reader_idx;}// 获取后沿空闲空间大小uint64_t TailIdleSize(){return _buffer.size() - _writer_idx;}// 获取可读数据大小uint64_t ReadAbleSize(){return _writer_idx - _reader_idx;}// 确保可写空间足够(移动+扩容)void EnsureWriteSpace(uint64_t len){//末尾空间足够,直接返回if(TailIdleSize() >= len) {return;}//末尾空闲空间不够,则判断加上起始位置的空闲空间按大小是否足够,狗了就将数据移到起始位置if(len <= TailIdleSize() + HeadIdleSize()){//将数据移动到起始位置uint64_t rsz = ReadAbleSize();//把当前数据大小先保存起来std::copy(ReadPosition(),ReadPosition() + rsz, Begin());_reader_idx = 0;_writer_idx = rsz;}else{//总体空间不够,则需要扩容,不移动数据_buffer.resize(_writer_idx + len);}}// 将写位置向后移动指定长度void MoveWriteOffset(uint64_t len){assert(len <= TailIdleSize());_writer_idx += len;} // 将读位置向后移动指定长度void MoveReadOffset(uint64_t len){if(len == 0) {return 0;}assert(len = len);std::copy(ReadPosition(), ReadPosition() + len, (char*)buf);}//读取并移动读取位置void ReadAndPop(void* buf, uint64_t len){Read(buf,len);MoveReadOffset(len);}//读取字符串并移动读取位置std::string ReadStringAndPop(uint64_t len){assert(ReadAbleSize() >= len);std::string str = ReadString(len);MoveReadOffset(len);return str;}//读取字符串不移动位置std::string ReadString(uint64_t len){assert(len <= ReadAbleSize());std::string str;str.resize(len);Read(&str[0],len);return str;}//获取换行符char* FindCRLF(){char* res = (char*)memchr(ReadPosition(), '\n', ReadAbleSize());return (char*)res;}//获取一行数据std::string GetLine(){char* pos = FindCRLF();if(pos == nullptr){return "";}return ReadString(pos - ReadPosition() +1);//+1是为了把换行也读取出来}std::string GetLineAndPop(){std::string str = GetLine();MoveReadOffset(str.size());return str;}// 清理功能void clear(){//归零,进行覆盖式写入_reader_idx = 0;_writer_idx = 0;}private:std::vector _buffer;//使用vector进行内存空间管理uint64_t _reader_idx; //读偏移uint64_t _writer_idx; //写偏移};

测试代码

int main(){Buffer buffer;std::string str = "hello!!";for(int i = 0;i  0){std::string line = buffer.GetLineAndPop();cout << line << endl;}std::string tmp;tmp = buffer.ReadStringAndPop(buffer.ReadAbleSize());cout << tmp << endl;// buffer.WriteStringAndPush(str);// Buffer buffer1;// buffer1.WriteBufferAndPush(buffer);// std::string tmp = buffer1.ReadStringAndPop(buffer1.ReadAbleSize());// cout << tmp << endl;// cout << buffer.ReadAbleSize() << endl;// cout << buffer1.ReadAbleSize() << endl;return 0;}

6、日志宏的编写

这里使用了自己做的日志系统项目,就不编写了。

7、套接字Socket类设计

主要功能:创建套接字、绑定地址信息、开始监听、向服务器发起连接、获取新连接、接收数据、发送数据

关闭套接字、创建一个服务端连接、创建一个客户端连接、设置套接字选项—开启地址端口重用、

设置套接字阻塞属性—设置为非阻塞

setsockopt

//int setsockopt(int fd, int level, int optname, void *val, int vallen)//fd:文件套接字//level: 选项定义的层次。或为特定协议的代码(如IPv4,IPv6,TCP),或为通用套接字代码(SOL_SOCKET)//地址重用setsockopt(_sockfd,SOL_SOCKET,SO_REUSEADDR, (void*)val, sizeof(int));//端口重用setsockopt(_sockfd,SOL_SOCKET,SO_REUSEPORT, (void*)val, sizeof(int));

Socket类代码

#define MAX_LISTEN 1024class Socket{public:Socket():_sockfd(-1){}Socket(int fd):_sockfd(fd){}~Socket(){Close();}int Fd(){return _sockfd;}//创建套接字bool Create(){//int socket(int domain, int type, int protocol);_sockfd = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);if(_sockfd fatal("CREATE SOCKET FAILED!!");return false;}return true;}//绑定地址信息bool Bind(const std::string &ip, uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;//IPV4addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);//int bind(int sockfd, const struct sockaddr *addr,socklen_t addrlen);int ret = bind(_sockfd,(struct sockaddr*)&addr, len);if(ret fatal("BIND ADDRESS FAILED");return false;}return true;}//开始监听bool Listen(int backlog = MAX_LISTEN)//同一时间最大并发连接数{//int listen(int sockfd, int backlog);int ret = listen(_sockfd,backlog);if(ret fatal("SOCKET LISTEN FAILED");return false;}return true;}//向服务端发起连接bool Connect(const std::string &ip,uint16_t port){struct sockaddr_in addr;addr.sin_family = AF_INET;//IPV4addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);//int connect(int sockfd, struct sockaddr*addr, socklen_len)int ret = connect(_sockfd,(struct sockaddr*)&addr, len);if(ret fatal("CONNECT SERVER FAILED");return false;}return true;}//获取新连接int Accept(){// int accept(int sockfd, struct sockaddr *addr, socklen_t* len);int newfd = accept(_sockfd,nullptr,nullptr);if(newfd fatal("SOCKET ACCEPT FAILED");return -1;}return newfd;}//接收数据ssize_t Recv(void* buf, size_t len, int flag = 0){//ssize_t recv(int sockfd, void *buf, size_t len, int flag);ssize_t ret = recv(_sockfd, buf, len, flag);if(ret fatal("SOCKET RECV FAILED!!");return -1;}return ret;//实际接收数据长度}ssize_t NonBlockRecv(void* buf, size_t len){return Recv(buf,len,MSG_DONTWAIT);//MSG_DONTWAIT 表示当前接收为非阻塞}//发送数据ssize_t Send(const void* buf, ssize_t len, int flag = 0){//ssize_t send(int sockfd, void* data, size_t len, int flag);ssize_t ret = send(_sockfd, buf, len, flag);if(ret fatal("SOCKET SEND FAILED!!");return -1;}return ret;//实际发送长度}ssize_t NonBlockSend(void * buf, size_t len){return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞}//关闭套接字void Close(){if(_sockfd != -1){close(_sockfd);_sockfd = -1;}}//创建一个服务端连接bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool flag = false){//1、创建套接字 2、绑定地址 3、开始监听 4、设置非阻塞 5、启动地址重用if(Create() == false) return false;if(flag) NonBlock();if(Bind(ip,port) == false) return false;if(Listen() == false) return false;ReuseAddress();return true;}//创建一个客户端连接bool CreateClient(uint16_t port, const std::string &ip){//1、创建套接字 2、指向连接服务器if(Create() == false ) return false;if(Connect(ip, port) == false) return false;return true;}//设置套接字选项---开启地址端口重用void ReuseAddress(){//int setsockopt(int fd, int level, int optname, void *val, int vallen)int val = 1;//地址重用setsockopt(_sockfd,SOL_SOCKET,SO_REUSEADDR, (void*)val, sizeof(int));val = 1;//端口重用setsockopt(_sockfd,SOL_SOCKET,SO_REUSEPORT, (void*)val, sizeof(int));}//设置套接字阻塞属性---设置为非阻塞void NonBlock(){// int fcntl(int fd, int cmd, ... /* arg */);int flag = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);//设置为非阻塞函数}private:int _sockfd;};

测试代码

tcp_server.cc

#include "../server.hpp"int main(){Socket lst_socket;lst_socket.CreateServer(8888);while(1){int newfd = lst_socket.Accept();if(newfd < 0){continue;}Socket cli_sock(newfd);char buf[1024] = {0};int ret = cli_sock.Recv(buf,1023);if(ret < 0){cli_sock.Close();continue;}cli_sock.Send(buf,ret);cli_sock.Close();}lst_socket.Close();}

tcp_client.cc

#include "../server.hpp"int main(){Socket cli_sock;cli_sock.CreateClient(8888, "127.0.0.1");std::string str = "hello sqy";cli_sock.Send(str.c_str(),str.size());char buf[1024] = {0};cli_sock.Recv(buf,sizeof buf);std::cout << buf << std::endl;return 0;}

8、Channel类设计

目的:对描述符的监控事件管理

功能

​ 1、事件管理:

​ 描述符是否可读

​ 描述符是否可写

​ 对描述符监控可读

​ 对描述符监控可写

​ 解除可读事件监控

​ 解除可写事件监控

​ 解除所有事件监控

​ 2、事件触发后的处理的管理

​ 需要处理的事件:可读,可写,挂断,错误,任意

​ 事件处理的回调函数

成员:后边使用epoll进行事件监控

EPOLLIN 可读

EPOLLOUT 可写

EPOLLRDHUP 连接断开

EPOLLPRI 优先数据

EPOLLERR 出错

EPOLLHUP 挂断

以上的事件是在uint_32上保存

class Poller;class EventLoop;class Channel{using EventCallback = std::function;public:Channel(EventLoop* loop, int fd):_fd(fd),_events(0),_revents(0),_loop(loop) {}int Fd() {return _fd;}bool ReadAble()//当前是否监控了可读{return (_events & EPOLLIN);}bool WriteAble() //当前是否监控了可写{return (_events & EPOLLOUT);}void EnableRead()//启动读事件监控{_events |= EPOLLIN; Update();}void EnableWrite() //启动写事件监控{_events |= EPOLLOUT; Update();}void DisableRead() //关闭读事件监控{_events &= ~EPOLLIN; Update();}void DisableWrite()//关闭写事件监控{_events &= ~EPOLLOUT; Update();}void DisableAll()//关闭所有事件监控{_events = 0; Update();}void Remove();void Update();void HandleEvent() //事件处理,一旦连接触发了事件,就调用这个函数{if((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)){if(_event_callback)_event_callback();if(_read_callback) _read_callback();}if(_revents & EPOLLOUT){if(_event_callback)_event_callback();//放到事件处理完毕后调用,刷新活跃度if(_write_callback) _write_callback();}else if(_revents & EPOLLERR){if(_event_callback)_event_callback();//不管任何事件,都会调用的回调函数if(_error_callback) _error_callback();}else if(_revents & EPOLLHUP){if(_event_callback)_event_callback();if(_close_callback) _close_callback();}} void SetReadCallback (const EventCallback &cb){_read_callback= cb;}void SetWriteCallback(const EventCallback &cb){_write_callback = cb;}void SetErrorCallback(const EventCallback &cb){_error_callback = cb;}void SetCloseCallback(const EventCallback &cb){_close_callback = cb;}void SetEventCallback(const EventCallback &cb){_event_callback = cb;}void SetREvents(uint32_t events) {_revents = events;}//设置实际就绪的事件uint32_t Events() {return _events;}//获取想要监控的事件private:EventLoop* _loop;int _fd;uint32_t _events; //当前需要监控的事件uint32_t _revents;//当前连接触发的事件EventCallback _read_callback;//可读事件被触发的回调函数EventCallback _write_callback; //可写事件被触发的回调函数EventCallback _error_callback; //错误事件被触发的回调函数EventCallback _close_callback; //连接断开事件被触发的回调函数EventCallback _event_callback; //任意事件被触发的回调函数};

9、Poller模块

功能:对任意的描述符进行IO事件监控

意义:对epoll进行的封装,让对描述符进行事件监控的操作更加简单

功能接口:添加事件监控、修改事件监控、移除事件监控

封装思想

​ 1、必须拥有一个epoll的操作句柄

​ 2、拥有一个struct epoll_event 结构数组,监控时保存所有的活跃事件

​ 3、使用hash表管理描述符与描述符对应事件管理Channel对象

逻辑流程

​ 1、对描述符进行监控,通过Channel才能知道描述符需要监控什么事件

​ 2、当描述符就绪了,通过描述符在hash表中找到对应的Channel(得到了Channel才能知道什么事件如何处理)

​ 当描述符就绪了,返回就绪描述符对应的Channel

#define MAX_EPOLLEVENTS 1024class Poller{public:Poller(){_epfd = epoll_create(1);//这个数字随便给if(_epfd fatal("EPOLL CREATE FAILED");return;}}//添加或修改监控事件void UpdateEvent(Channel* channel){bool ret = HashChannel(channel);if(ret == false){//不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));Update(channel,EPOLL_CTL_ADD);return;}Update(channel,EPOLL_CTL_MOD);}//移除监控void RemoveEvent(Channel* Channel){auto it = _channels.find(Channel->Fd());if(it != _channels.end()){_channels.erase(it);}Update(Channel,EPOLL_CTL_DEL);}//开始监控,返回活跃连接void Poll(std::vector* active){// int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout);int nfds = epoll_wait(_epfd,_evs, MAX_EPOLLEVENTS-1 , -1);//-1为阻塞监控if(nfds fatal("EPOLL WAIT ERROR: %s\n",strerror(errno));abort();//退出程序}for(int i = 0; i second->SetREvents(_evs[i].events);//设置实际就绪的事件active->push_back(it->second);}}private://内层封装void Update(Channel* channel, int op){// int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);int fd = channel->Fd();struct epoll_event ev;ev.data.fd = fd;ev.events = channel->Events();int ret = epoll_ctl(_epfd,op, fd,&ev);if(ret error("EPOLLCTL FAILED");abort();//退出程序}}//判断一个Channel是否已经添加了事件监控bool HashChannel(Channel* channel){auto it = _channels.find(channel->Fd());if(it == _channels.end()){return false;}return true;}private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map _channels;};void Channel::Remove(){ _poller->RemoveEvent(this);}//移除监控void Channel::Update(){ _poller->UpdateEvent(this);}

Poller与Channel的联合调试代码

#include "../server.hpp"using std::cout;using std::endl;void HandleClose(Channel* channel){std::cout << "close: " <Fd() <Remove();//移除监控delete channel;}void HandleRead(Channel* channel){int fd = channel->Fd();char buf[1024] = {0};int ret = recv(fd, buf, 1023, 0);if(ret EnableWrite();//启动可写事件cout << buf <Fd();const char *data = "天气还不错";int ret = send(fd, data, strlen(data), 0);if(ret DisableWrite();//关闭写监控}void HandleError(Channel* channel){HandleClose(channel);//出错移除监控}void HandleEvent(Channel* channel){cout << "有了一个事件" <Fd();int newfd = accept(fd,nullptr,nullptr);if(newfdSetReadCallback (std::bind(HandleRead,channel));//为通信套接字设置可读事件的回调函数channel->SetWriteCallback(std::bind(HandleWrite,channel));//可写事件的回调函数channel->SetCloseCallback(std::bind(HandleClose,channel));//关闭事件的回调函数channel->SetErrorCallback(std::bind(HandleError,channel));//错误事件的回调函数channel->SetEventCallback(std::bind(HandleEvent,channel));//任意事件的回调函数channel->EnableRead();}int main(){Poller poller;Socket lst_socket;lst_socket.CreateServer(8888);//为监听套接字,创建一个Channel进行事件的管理,以及事件的处理Channel channel(&poller,lst_socket.Fd());channel.SetReadCallback(std::bind(Acceptor,&poller, &channel));//回调中,获取新连接,为新连接创建Channel并且添加监控channel.EnableRead();while(1){std::vectoractives;poller.Poll(&actives);for(auto &a : actives){a->HandleEvent();}}lst_socket.Close();}

10、EventLoop模块

eventfd: 一种事件通知机制

创建一个描述符用于实现事件通知

eventfd本质在内核里边管理的就是一个计数器,创建eventfd就会在内核中创建一个计数器(结构)

每当向eventfd中写入一个数值–用于表示事件通知次数

可以使用read进行数据的读取,读取到的数据就是通知的次数

例如:假设每次给eventfd中写入一个1,就表示通知了一次,连续写了三次之后,再去read读取出来的数字就是3,读取之后计数清零

eventfd

#include int eventfd(unsigned int initval, int flags);功能:创建一个eventfd对象,实现事件通知参数:initval:计数初值flags:EFD_CLOEXEC -禁止进程复制EFD_NONBLOCK-开启非阻塞属性返回值:返回一个文件描述符用于操作eventfd也是通过read/write/close进行操作的注意:read&write进行IO的时候数据只能是一个8字节的数据

基本使用

#include #include #include #include int main(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd < 0){perror("eventfd failed!!");return -1;}uint64_t val = 1;write(efd, &val, sizeof(val));write(efd, &val, sizeof(val));write(efd, &val, sizeof(val));uint64_t res = 0;read(efd, &res, sizeof(res));printf("%ld\n",res);write(efd, &val, sizeof(val));read(efd, &res, sizeof(res));printf("%ld\n",res);close(efd);return 0;}

Eventloop

EventLoop:进行事件监控,以及事件处理模块

​ 关键点:这个模块与线程是一一对应的。

监控了一个连接,而这个连接一旦就绪,就要进行事件处理,但是如果这个描述符,在多个线程中触发了事件,进行处理,

就会存在线程安全问题。因此我们需要将一个连接的事件监控,以及连接事件处理,以及其他操作都放在同一个线程中进行。

如何保证一个连接的所有操作都在eventloop对应的线程中?

解决方案:给eventloop模块中,添加一个任务队列,对连接的所有操作,都进行一次封装,将对

​ 连接的操作并不直接执行,而是当任务添加到任务队列中

eventloop处理流程

​ 1、在线程中对描述符进行事件监控

​ 2、有描述符就绪则对描述符进行事件处理(如何保证处理回调函数中的操作都在线程中)

​ 3、所有的就绪事件处理完了,这时候再去将任务队列中的所有任务一一执行

class EventLoop{using Functor = std::function;public:EventLoop():_thread_id(std::this_thread::get_id()),_event_fd(CreatEventFd()),_event_channel(new Channel(this,_event_fd)),_poller(),_timer_wheel(this){//给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd,this));_event_channel->EnableRead();//启动可读事件监控}void ReadEventFd(){uint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));if(ret fatal("READ EVENTFD FAILED!!");abort();}}static int CreatEventFd(){int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd fatal("CREATE EVENTFD FAILED!!");abort();}return efd;}void WeakUpEventFd(){uint64_t val = 1;int ret = write(_event_fd,&val, sizeof(val));if(ret fatal("READ EVENTFD FAILED!!");abort();}}//判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列void RunInLoop(const Functor &cb){if(IsInLoop()){return cb();}return QueueInLoop(cb);}void QueueInLoop(const Functor &cb)//将操作压入任务池{{std::unique_lock _lock(_mutex);_tasks.push_back(cb);}//唤醒又肯因为没有事件就绪,而导致的epoll阻塞//其实就是给eventfd写入一个数据,eventfd就会触发可读事件WeakUpEventFd();}//用于判断当前线程是否是EventLoop对应的线程bool IsInLoop(){return _thread_id == std::this_thread::get_id();}void AssertInLoop(){assert(_thread_id == std::this_thread::get_id());}//添加/修改描述符事件监控void UpdateEvent(Channel * channel){_poller.UpdateEvent(channel);}//移除面是否的监控void RemoveEvent(Channel* channel){_poller.RemoveEvent(channel);}void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb){return _timer_wheel.TimerAdd(id, delay,cb);}void TimerRefresh(uint64_t id){return _timer_wheel.TimerRefresh(id);}void TimerCancel(uint64_t id){return _timer_wheel.TimerCancel(id);}void Start()//事件监控-》就绪事件处理-》执行任务{//1、事件监控std::vector actives;_poller.Poll(&actives);//2、事件处理for(auto &channel : actives){channel->HandleEvent();}//3、执行任务// RunAllTask();}//这个接口存在线程安全问题--这个接口不能被外界使用者调用,只能在模块内,在对应的eventloop线程内执行bool HasTimer(uint64_t id){return _timer_wheel.HasTimer(id);}private:void RunAllTask()//执行所有任务池的任务{std::vector functor;{std::unique_lock _lock(_mutex);//加锁保护交换操作,交换操作不上线程安全的_tasks.swap(functor);}for(auto &f : functor){f();}}private:int _event_fd;//eventfd唤醒IO事件监控有可能导致的阻塞std::thread::id _thread_id;//线程IDPoller _poller;std::unique_ptr_event_channel;//在eventloop释放的时候他也要释放,所以用智能指针std::vector _tasks;//任务池std::mutex _mutex;//实现任务池操作的线程安全TimerWheel _timer_wheel;};

11、定时器模块

timefd:实现内核每隔一段事件,给进程一次超时事件(timefd可读)

timewheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务

要实现一个完整的秒级定时器,就需要将这两个功能整合到一起

timefd设置为每秒钟触发一次定时事件,当事件被触发,则运行一次timewheel的runtimetask,执行一下所有的过期定时任务

using TaskFunc = std::function;using ReleaseFunc = std::function;class TimerTask{public:TimerTask(uint64_t id, uint32_t delay,const TaskFunc &cb):_id(id),_timeout(delay),_task_cb(cb),_canceled(false){}void SetRelease(const ReleaseFunc &cb){_release = cb;}uint32_t DelayTime(){return _timeout;}void Cancel(){_canceled = true;}~TimerTask(){if(_canceled == false) _task_cb();//定时任务没有被取消才会执行_release();}private:uint64_t _id;//定时器任务对象IDuint32_t _timeout;//定时任务的超时时间TaskFunc _task_cb;//定时器对象要执行的定时任务ReleaseFunc _release;//用于删除TimerWheel中保存的定时器对象信息bool _canceled; //false-表示任务没有被取消,true-表示任务被取消};class EventLoop;class TimerWheel{public:TimerWheel(EventLoop* loop):_capacity(60),_tick(0),_wheel(_capacity),_timefd(CreateTimerfd()),_timer_channel(new Channel(_loop, _timefd)),_loop(loop){_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));_timer_channel->EnableRead();//启动读事件监控}void ReadTimefd(){uint64_t times;int ret = read(_timefd, &times, 8);if(ret fatal("READ TIMERFD FAILED!");abort();}}bool HasTimer(uint64_t id) {auto it = _timers.find(id);if (it == _timers.end()) {return false;}return true;}static int CreateTimerfd(){int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);if(timerfd SetRelease(std::bind(&TimerWheel::RemoveTimer,this,id));_timers[id] = WeakTask(pt);int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}//刷新/延迟定时任务void TimerRefresh(u_int64_t id);void TimerRefreshInLoop(u_int64_t id){//通过保存我的定时器对象的weak_ptr构造一个shared_ptr出来,添加到轮子中auto it = _timers.find(id);if(it == _timers.end()){return;//没有找到定时任务,没法刷新,没法延迟}PtrTask pt = it->second.lock();//lock获取weak_ptr管理的对象对应的shared_ptrint delay = pt->DelayTime();int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);}//这个函数应该每秒钟执行一次,相当于秒针向后走了一步void RunTimerTask(){_tick = (_tick + 1) % _capacity;_wheel[_tick].clear();}void OnTime(){ReadTimefd();RunTimerTask();}//取消定时任务void TimerCancel(uint64_t id);void TimerCancelInLoop(uint64_t id){auto it = _timers.find(id);if(it == _timers.end()){return;//没有找到定时任务,没法刷新,没法延迟}PtrTask pt = it->second.lock();if(pt) pt->Cancel();}private:void RemoveTimer(uint64_t id){auto it = _timers.find(id);if(it != _timers.end()){_timers.erase(it);}}private:using PtrTask = std::shared_ptr;using WeakTask = std::weak_ptr;int _timefd;//定时器描述符EventLoop* _loop;std::unique_ptr _timer_channel;int _tick; //当前的秒针,走到哪里释放哪里,就相当于执行哪里的任务 int _capacity; //表盘最大数量---其实就是最大延迟时间std::vector<std::vector> _wheel;std::unordered_map _timers;};

12、Connection模块

目的:对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成

功能设计

​ 1、套接字的管理,能够进行套接字的操作

​ 2、连接事件的管理,可读,可写,错误,挂断,任意

​ 3、缓冲区的管理,便于socket数据的接收和发送

​ 4、协议上下文的管理,记录请求数据的处理过程

​ 5、回调函数的管理

​ 因为连接接收到数据之后该如何处理,由用户决定,因此必须有业务处理回调函数

​ 一个连接建立成功后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数

​ 一个连接关闭前,该如何处理,由用户决定,因此必须有关闭连接回调函数。

​ 任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数

功能

​ 1、发送数据 — 给用户提供的发送数据接口,并不是真正的发送接口,而是把数据放到发送缓冲区,然后启动写事件监控

​ 2、关闭连接 — 给用户提供的关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区是否有数据待处理

​ 3、启动非活跃连接的超时销毁功能

​ 4、取消非活跃连接的超时销毁功能

​ 5、协议切换 — 一个连接接收数据后如何进行业务处理,取决上下文,以及数据的业务处理回调函数

Connection模块是对连接的管理模块,对于连接的所有操作都是通过这个模块完成的

场景:对连接进行操作的时候,但是连接已经被释放,导致内存访问错误,最终程序崩溃

解决方案:使用只能指针shared_ptr对Connection对象进行管理,这样就能保证任意一个地方对Connection对象进行操作的时候,保 存了一份shared_ptr,因此就算其他地方进行释放操作,也只是对shared_ptr的计数器-1,而不会导致Connection的实际释放

typedef enum{DISCONNECTED,//连接关闭状态CONNECTING,//连接建立成功--待处理状态CONNECTED, //连接建立完成--各种设置已完成,可以通信的状态DISCONNECTING//待关闭状态}ConnStatu;class Connection;using PtrConnection = std::shared_ptr;class Connection : public std::enable_shared_from_this {private:uint64_t _conn_id;// 连接的唯一ID,便于连接的管理和查找//uint64_t _timer_id; //定时器ID,必须是唯一的,这块为了简化操作使用conn_id作为定时器IDint _sockfd;// 连接关联的文件描述符bool _enable_inactive_release;// 连接是否启动非活跃销毁的判断标志,默认为falseEventLoop *_loop; // 连接所关联的一个EventLoopConnStatu _statu; // 连接状态Socket _socket; // 套接字操作管理Channel _channel; // 连接的事件管理Buffer _in_buffer;// 输入缓冲区---存放从socket中读取到的数据Buffer _out_buffer; // 输出缓冲区---存放要发送给对端的数据Any _context; // 请求的接收处理上下文/*这四个回调函数,是让服务器模块来设置的(其实服务器模块的处理回调也是组件使用者设置的)*//*换句话说,这几个回调都是组件使用者使用的*/using ConnectedCallback = std::function;using MessageCallback = std::function;using ClosedCallback = std::function;using AnyEventCallback = std::function;ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;/*组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭*//*就应该从管理的地方移除掉自己的信息*/ClosedCallback _server_closed_callback;private:/*五个channel的事件回调函数*///描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callbackvoid HandleRead() {//1. 接收socket的数据,放到缓冲区char buf[65536];ssize_t ret = _socket.NonBlockRecv(buf, 65535);if (ret  0) {//shared_from_this--从当前对象自身获取自身的shared_ptr管理对象return _message_callback(shared_from_this(), &_in_buffer);}}//描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送void HandleWrite() {//_out_buffer中保存的数据就是要发送的数据ssize_t ret = _socket.NonBlockSend(_out_buffer.ReadPosition(), _out_buffer.ReadAbleSize());if (ret  0) {_message_callback(shared_from_this(), &_in_buffer);}return Release();//这时候就是实际的关闭释放操作了。}_out_buffer.MoveReadOffset(ret);//千万不要忘了,将读偏移向后移动if (_out_buffer.ReadAbleSize() == 0) {_channel.DisableWrite();// 没有数据待发送了,关闭写事件监控//如果当前是连接待关闭状态,则有数据,发送完数据释放连接,没有数据则直接释放if (_statu == DISCONNECTING) {return Release();}}return;}//描述符触发挂断事件void HandleClose() {/*一旦连接挂断了,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕关闭连接*/if (_in_buffer.ReadAbleSize() > 0) {_message_callback(shared_from_this(), &_in_buffer);}return Release();}//描述符触发出错事件void HandleError() {return HandleClose();}//描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务;2. 调用组件使用者的任意事件回调void HandleEvent() {if (_enable_inactive_release == true){_loop->TimerRefresh(_conn_id); }if (_event_callback){_event_callback(shared_from_this()); }}//连接获取之后,所处的状态下要进行各种设置(启动读监控,调用回调函数)void EstablishedInLoop() {// 1. 修改连接状态;2. 启动读事件监控;3. 调用回调函数assert(_statu == CONNECTING);//当前的状态必须一定是上层的半连接状态_statu = CONNECTED;//当前函数执行完毕,则连接进入已完成连接状态// 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁_channel.EnableRead();if (_connected_callback) _connected_callback(shared_from_this());}//这个接口才是实际的释放接口void ReleaseInLoop() {//1. 修改连接状态,将其置为DISCONNECTED_statu = DISCONNECTED;//2. 移除连接的事件监控_channel.Remove();//3. 关闭描述符_socket.Close();//4. 如果当前定时器队列中还有定时销毁任务,则取消任务if (_loop->HasTimer(_conn_id)) CancelInactiveReleaseInLoop();//5. 调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户的回调函数if (_closed_callback) _closed_callback(shared_from_this());//移除服务器内部管理的连接信息if (_server_closed_callback) _server_closed_callback(shared_from_this());}//这个接口并不是实际的发送接口,而只是把数据放到了发送缓冲区,启动了可写事件监控void SendInLoop(Buffer &buf) {if (_statu == DISCONNECTED) return ;_out_buffer.WriteBufferAndPush(buf);if (_channel.WriteAble() == false) {_channel.EnableWrite();}}//这个关闭操作并非实际的连接释放操作,需要判断还有没有数据待处理,待发送void ShutdownInLoop() {_statu = DISCONNECTING;// 设置连接为半关闭状态if (_in_buffer.ReadAbleSize() > 0) {if (_message_callback) _message_callback(shared_from_this(), &_in_buffer);}//要么就是写入数据的时候出错关闭,要么就是没有待发送数据,直接关闭if (_out_buffer.ReadAbleSize() > 0) {if (_channel.WriteAble() == false) {_channel.EnableWrite();}}if (_out_buffer.ReadAbleSize() == 0) {Release();}}//启动非活跃连接超时释放规则void EnableInactiveReleaseInLoop(int sec) {//1. 将判断标志 _enable_inactive_release 置为true_enable_inactive_release = true;//2. 如果当前定时销毁任务已经存在,那就刷新延迟一下即可if (_loop->HasTimer(_conn_id)) {return _loop->TimerRefresh(_conn_id);}//3. 如果不存在定时销毁任务,则新增_loop->TimerAdd(_conn_id, sec, std::bind(&Connection::Release, this));}void CancelInactiveReleaseInLoop() {_enable_inactive_release = false;if (_loop->HasTimer(_conn_id)) { _loop->TimerCancel(_conn_id); }}void UpgradeInLoop(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg, const ClosedCallback &closed, const AnyEventCallback &event) {_context = context;_connected_callback = conn;_message_callback = msg;_closed_callback = closed;_event_callback = event;}public:Connection(EventLoop *loop, uint64_t conn_id, int sockfd):_conn_id(conn_id), _sockfd(sockfd),_enable_inactive_release(false), _loop(loop), _statu(CONNECTING), _socket(_sockfd),_channel(loop, _sockfd) {_channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));_channel.SetEventCallback(std::bind(&Connection::HandleEvent, this));_channel.SetReadCallback(std::bind(&Connection::HandleRead, this));_channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));_channel.SetErrorCallback(std::bind(&Connection::HandleError, this));}~Connection() { logger->debug("RELEASE CONNECTION:%p", this); }//获取管理的文件描述符int Fd() { return _sockfd; }//获取连接IDint Id() { return _conn_id; }//是否处于CONNECTED状态bool Connected() { return (_statu == CONNECTED); }//设置上下文--连接建立完成时进行调用void SetContext(const Any &context) { _context = context; }//获取上下文,返回的是指针Any *GetContext() { return &_context; }void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; }//连接建立就绪后,进行channel回调设置,启动读监控,调用_connected_callbackvoid Established() {_loop->RunInLoop(std::bind(&Connection::EstablishedInLoop, this));}//发送数据,将数据放到发送缓冲区,启动写事件监控void Send(const char *data, size_t len) {//外界传入的data,可能是个临时的空间,我们现在只是把发送操作压入了任务池,有可能并没有被立即执行//因此有可能执行的时候,data指向的空间有可能已经被释放了。Buffer buf;buf.WriteAndPush(data, len);_loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buf)));}//提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理void Shutdown() {_loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));}void Release() {_loop->QueueInLoop(std::bind(&Connection::ReleaseInLoop, this));}//启动非活跃销毁,并定义多长时间无通信就是非活跃,添加定时任务void EnableInactiveRelease(int sec) {_loop->RunInLoop(std::bind(&Connection::EnableInactiveReleaseInLoop, this, sec));}//取消非活跃销毁void CancelInactiveRelease() {_loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));}//切换协议---重置上下文以及阶段性回调处理函数 -- 而是这个接口必须在EventLoop线程中立即执行//防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了。void Upgrade(const Any &context, const ConnectedCallback &conn, const MessageCallback &msg,const ClosedCallback &closed, const AnyEventCallback &event) {_loop->AssertInLoop();_loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, context, conn, msg, closed, event));}};

13、Acceptor模块

Acceptor模块是对Socket模块,Channel模块的⼀个整体封装,实现了对⼀个监听套接字的整体的管理。

• Acceptor模块内部包含有⼀个Socket对象:实现监听套接字的操作

• Acceptor模块内部包含有⼀个Channel对象:实现监听套接字IO事件就绪的处理

具体处理流程如下

  1. 实现向Channel提供可读事件的IO事件处理回调函数,函数的功能其实也就是获取新连接
  2. 为新连接构建⼀个Connection对象出来。

意义
当获取了一个新建连接的描述符之后,需要为这个通信连接,封装一个Connection对象,设置各种不同回调

注意

​ 因为Acceptor模块本身并不知道一个连接产生了某个事件该如何处理,因此获取一个通信连接后,Connection的封装,以及事件回调的设置都应该由服务器模块来进行

class Acceptor{using AcceptCallback = std::function;public://不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动//否则有可能造成启动监控后,立即有事件,回调函数还没设置:新连接得不到处理Acceptor(EventLoop* loop, int port):_socket(CreateServer(port)),_loop(loop),_channel(loop,_socket.Fd()){}void SetAcceptCallback(const AcceptCallback& cb){_accept_callback = cb;}void Listen(){_channel.SetReadCallback(std::bind(&Acceptor::HandleRead,this));_channel.EnableRead();}~Acceptor(){if(_socket.Fd() > 0){_socket.Close();}}private://监听套接字的读事件回调处理函数---获取新连接,调用_accept_callback函数进行新连接处理void HandleRead(){int newfd = _socket.Accept();if(newfd < 0){return;}if(_accept_callback) _accept_callback(newfd);}int CreateServer(int port){bool ret = _socket.CreateServer(port);(void)ret;assert(ret == true);return _socket.Fd();}private:AcceptCallback _accept_callback;Socket _socket;//用于创建监听套接字EventLoop* _loop;//用于对监听套接字进行事件监控Channel _channel;//用于对监听套接字进行事件管理};

14、LoopThread模块

目标:将EventLoop模块与线程整合起来

​ EventLoop模块与线程是一一对应的。

EventLoop模块实例化的对象,在构造的时候就会初始化_thread_id,而后边当运行一个操作的时候判断当前是否

运行在eventLoop模块对应的线程中,就是将线程ID与EventLoop模块中的thread_id进行一个比较,相同就表示

在同一个线程,不同就表示当前运行线程不是EventLoop线程

含义:EventLoop模块在实例化对象的时候,必须在线程内部

​ EventLoop实例化对象时会设置自己的thread_id

​ 如果我们先创建了多个EventLoop对象,然后创建了多个线程,将各个线程的id,重新给EventLoop进行设置

存在问题:在构造EventLoop对象,到设置新的thread_id期间将是不可控的

因此我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象

构造一个新的模块:LoopThread

这个模块的功能:将EventLoop与thread整合到一起

思想

​ 1、创建线程

​ 2、在线程中实例化EventLoop对象

功能:可以向外部返回实例化的EventLoop

class LoopThread{public://创建线程,设定线程入口函数LoopThread():_loop(nullptr),_thread(std::thread(&LoopThread::ThreadEntry,this)){}//返回当前线程关联的EventLoop对象指针EventLoop* GetLoop(){EventLoop* loop = nullptr;{std::unique_lock lock(_mutex);//加锁_cond.wait(lock,[&](){return _loop != nullptr;});//loop为空就一直阻塞loop = _loop;}return loop;}private://实例化EventLoop对象,唤醒_cond上有可能阻塞的线程,并且开始运行EventLoop模块的功能void ThreadEntry(){EventLoop loop;{std::unique_lock lock(_mutex);//加锁_loop = &loop;_cond.notify_all();}loop.Start();}private://用于实现_loop获取的同步关系,避免线程创建了,但是_loop还没有实例化之前去获取_loopstd::mutex _mutex;//互斥锁std::condition_variable _cond;//条件变量std::thread _thread;//EventLoop对应的线程EventLoop* _loop;//EventLoop指针变量,这个对象需要在线程内实例化};

15、LoopThreadPool模块

设计一个线程池

​ LoopThreadPool模块:对所有的LoopThread进行管理及分配

功能

​ 1、线程数量可配置

​ 注意事项:在服务器中,主从Reactor模型是主线程只负责连接获取,从属线程负责新连接的事件监控及处理

​ 因此当前的线程池,有可能从属线程会数量为0,也就是实现单Reactor服务器,一个线程及辅助获取连接,也负责连接的处理

​ 2、对所有的线程进行管理,其实就是管理0个或多个LoopThread对象

​ 3、提供线程分配的功能

​ 当主线程获取了一个新连接,需要将新连接挂到从属线程上进行事件监控及处理

​ 假设有0个从属线程,则直接分配给主线程的EventLoop,进行处理

假设有多个从属线程,则采用轮转思想,进行线程的分配(将对应线程的EventLoop获取到,设置给对应的Connection)

class LoopThreadPool{public:LoopThreadPool(EventLoop* baseloop):_thread_count(0),_next_loop_idx(0),_baseloop(baseloop){}//设置线程数量void SetThreadCount(int count){_thread_count = count;}void Create()//创建所有的从属线程{if(_thread_count > 0){_threads.resize(_thread_count);_loops.resize(_thread_count);for(int i = 0; i GetLoop();}}}EventLoop* NextLoop(){if(_thread_count == 0) return _baseloop;_next_loop_idx = (_next_loop_idx + 1) % _thread_count;return _loops[_next_loop_idx];}private:int _thread_count;//线程数量int _next_loop_idx;//索引EventLoop* _baseloop;//住EvnetLoop,运行在主线程,从属线程数量为0,则所有操作都在baseloop中进行std::vector _threads;//保存所有的LoopThread对象std::vector _loops;//从属线程数量大于0则从_loops进行线程EventLoop分配};

16、TcpServer模块

对前边所有子模块的整合模块,是提供给用户用于搭建一个高性能服务器的模块

让组件使用者可以更加轻便的完成一个服务器的搭建

管理

​ 1、Acceptor对象,创建一个监听字

​ 2、EventLoop对象,baseloop对象,实现对监听套接字的事件监控

​ 3、std::unordered_map _conns,实现对所有新建连接的管理

​ 4、LoopThreadPool对象,创建loop线程池,对新建连接进行事件监控及处理

功能

​ 1、设置从属线程池的数量

​ 2、启动服务器

​ 3、设置各种回调函数(连接建立完成,消息,关闭,任意),用户设置给TcpServer,TcpServer设置给获取的新连接

​ 4、是否启动非活跃连接超时销毁功能

​ 5、添加定时任务功能

流程

​ 1、在TcpServer中实例化一个Acceptor对象,以及一个EventLoop对象(baseloop)

​ 2、将Acceptor挂到baseloop上进行事件监控

​ 3、一旦Acceptor挂到baseloop上进行事件监控

​ 4、对新连接,创建一个Connection进行管理

​ 5、对连接对应的ConnEction设置功能回调(连接完成回调,消息回调,关闭回调,任意事件回调)

​ 6、启动Connection的非活跃连接的超时销毁规则

​ 7、将新连接对应的Connection挂到LoopThreadPool中的从属线程对应的EventLoop中进行事件监控

​ 8、一旦Connection对应的连接就绪了可读事件,则这时候执行读事件回调函数,读取数据,

​ 读取完毕后调用TcpServer设置的消息回调

class TcpServer{using ConnectedCallback = std::function;using MessageCallback = std::function;using ClosedCallback = std::function;using AnyEventCallback = std::function;using Functor = std::function;public:TcpServer(int port):_next_id(0),_port(port),_enable_inactive_release(false),_acceptor(&_baseloop,_port),_pool(&_baseloop){_acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection,this,std::placeholders::_1));_acceptor.Listen();//将监听套接字挂到baseloop上开始监听事件}void SetThreadCount(int count){_pool.SetThreadCount(count);}void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }void SetSrvClosedCallback(const ClosedCallback&cb) { _server_closed_callback = cb; }void Start(){_pool.Create(); //创建线程池的从属线程_baseloop.Start();}void EnableInactiveRelease(int timeout){_timeout = timeout;_enable_inactive_release = true;}void RunAfter(const Functor& task, int delay)//用于添加定时任务{_baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop,this,task,delay));}private:void RunAfterInLoop(const Functor& task, int delay){_next_id++;_baseloop.TimerAdd(_next_id,delay,task);}void NewConnection(int fd)//为新连接构造一个Connection进行管理{_next_id++;PtrConnection conn(new Connection(_pool.NextLoop(),_next_id, fd));conn->SetMessageCallback (_message_callback);conn->SetClosedCallback(_closed_callback);conn->SetConnectedCallback(_connected_callback);conn->SetAnyEventCallback(_event_callback);conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection,this,std::placeholders::_1));if(_enable_inactive_release) conn->EnableInactiveRelease(_timeout);//启动非活跃销毁功能conn->Established();_conns.insert(std::make_pair(_next_id, conn));}void RemoveConnectionInLoop(const PtrConnection& conn){int id = conn->Id();auto it = _conns.find(id);if(it != _conns.end()){_conns.erase(it);}}void RemoveConnection(const PtrConnection& conn)//从管理Connection的_conns中移除连接信息{_baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop,this,conn));}private:uint64_t _next_id;//自动增长的连接IDint _port;int _timeout; //非活跃练级的统计事件---多长时间不通信是非活跃连接bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志Acceptor _acceptor; //这是监听套接字的管理对象EventLoop _baseloop;//主线程的eventloop对象,负责监听事件的处理LoopThreadPool _pool;//从属EventLoop线程池std::unordered_map _conns; // 保存管理所有连接对应的shared_ptr对象ConnectedCallback _connected_callback;MessageCallback _message_callback;ClosedCallback _closed_callback;AnyEventCallback _event_callback;ClosedCallback _server_closed_callback;};//这里是声明与定义的分离函数,因为他们使用了不同的类void Channel::Remove(){ _loop->RemoveEvent(this);}//移除监控void Channel::Update(){ _loop->UpdateEvent(this);}void TimerWheel::TimerAdd(u_int64_t id, uint32_t delay, const TaskFunc &cb)//添加定时任务{_loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id,delay,cb));}void TimerWheel::TimerRefresh(u_int64_t id){_loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id));}void TimerWheel::TimerCancel(uint64_t id){_loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id));}class NetWork{public:NetWork(){logger->debug("SIGPIPE INIT");signal(SIGPIPE, SIG_IGN);}};static NetWork nw;

17、echoServer回显服务器

#pragma once#include "../server.hpp"class EchoServer{public:EchoServer(int port):_server(port){_server.SetThreadCount(2);_server.EnableInactiveRelease(10);_server.SetClosedCallback(std::bind(&EchoServer::OnClosed,this,std::placeholders::_1));_server.SetConnectedCallback(std::bind(&EchoServer::OnConnected,this,std::placeholders::_1));_server.SetMessageCallback(std::bind(&EchoServer::OnMessage,this,std::placeholders::_1,std::placeholders::_2));}void Start(){_server.Start();}private:void OnClosed(const PtrConnection& conn){logger->debug("CLOSED CONNECTION:%p",conn.get());}void OnConnected(const PtrConnection& conn){logger->debug("NEW CONNECTION:%p",conn.get());}void OnMessage(const PtrConnection& conn, Buffer *buf){conn->Send(buf->ReadPosition(),buf->ReadAbleSize());buf->MoveReadOffset(buf->ReadAbleSize());}private:TcpServer _server;};

五、HTTP协议模块代码实现

Http协议模块

用于对高并发服务器模块进行协议支持,基于提供的协议支持能够更方便的完成指定协议服务器的搭建。

而Http协议支持模块的实现,可以细分如下模块

Util模块

这个模块是一个根据模块,主要提供HTTP协议模块所用到的一些工具函数,比如url编码,文件读写…等

HttpRequest模块

这个模块是HTTP请求数据模块,用于保存HTTP请求数据被解析后的各项请求元素信息。

HttpResponse模块:

这个模块是HTTP响应数据模块,用于业务处理后设置并保存HTTP响应数据的各项元素信息,最终会被按照HTTP协议响应格式

组织成为响应信息发送给客户端。

HttpContext模块

这个模块是一个HTTP请求接收的上下文模块,主要是为了防止再一次接收的数据中,不是一个完整的HTTP请求,则解析过程并未完成,无法进行完整的请求处理,需要在下次接收到新数据后根据上下文进行解析,最终得到一个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要一个上下文来进行控制接收和处理节奏。

HttpServer模块

这个模块是最终给组件使用者提供的HTTP服务器模块了,用于以简单的接口实现HTTP服务器的搭建。

HttpServer模块内容包含一个TcpServer对象:TcpServer对象实现服务器的搭建

HttpServer模块内部包含有两个提供给TcpServer对象的接口:连接建立成功设置上下文接口,数据处理接口

HttpServer模块内部包含有一个hash-map表存储请求与处理函数的映射表:组件使用者向HttpServer设置那些请求

应该使用那些函数进行处理,等TcpServer收到对应的请求就会使用对应的函数进行处理。

1、Util模块代码实现

#pragma once#include "../server.hpp"#include #include class Util{public://字符串分割函数,将src字符串按照sep字符进行分割,得到的各个子串放到arry中,最终返回子串的数量static size_t Split(const std::string &src, const std::string &sep, std::vector *arry){int idx = 0;while(idx push_back(src.substr(idx));return arry->size();}if(pos == idx) {idx = pos + sep.size();continue;//当字串为空,没必要添加}arry->push_back(src.substr(idx, pos - idx));idx = pos + sep.size();}return arry->size();}//读取文件所有内容static bool ReadFile(const std::string& filename, std::string* buf){std::ifstream ifs(filename, std::ios::binary);if(ifs.is_open() == false){logger->error("OPEN %s FILE FAILED!!", filename.c_str());return false;}size_t fsize = 0;ifs.seekg(0,ifs.end);fsize = ifs.tellg();ifs.seekg(0,ifs.beg);buf->resize(fsize);ifs.read(&(*buf)[0],fsize);if(ifs.good() == false){logger->error("READ %s FILE FAILED!!",filename.c_str());ifs.close();return false;}ifs.close();return true;}//向文件写入数据static bool WriteFile(const std::string &filename, const std::string &buf){std::ofstream ofs(filename,std::ios::binary);if(ofs.is_open() == false) {logger->error("OPEN %s FILE FAILED!!",filename.c_str());return false;}ofs.write(buf.c_str(), buf.size());if(ofs.good() == false){logger->error("WRITE %s FILE FAILED!", filename.c_str());ofs.close();return false;}ofs.close();return true;}//URL编码,避免URL中子源路径与查询字符串中的特殊字符与HTTP请求中特殊字符产生歧义//编码格式:将特殊字符的ascii值,转换为两个16禁止字符,前缀%//不编码字符:RFC3986文档规定 . - _ ~以及字母和数字属于绝对不编码字符//W3C标准中规定param中的空格必须被编码为+//RFC2396规定URI中的保留字符需要转换为%HH格式static std::string UrlEncode(const std::string url, bool convert_space_to_plus){std::string res;for(auto& c : url){if(c == '.' || c == '-' || c == '_' || c == '~' || isalnum(c)){res += c;continue;}if(c == ' ' && convert_space_to_plus){res += '+';continue;}//剩下的字符都是需要编码成为%HH格式char tmp[4] = {0};snprintf(tmp, 4, "%%%02X" ,c);res += tmp;}return res;}static char HEXTOI(char c){if(c >= '0' && c = 'a' && c = 'A' && c  2b %2b->2 << 4 + 11for(int i = 0; i < url.size(); i++){if(url[i] == '+' && convert_space_to_plus) {res += ' ';continue;}if(url[i] == '%' && (i + 2) < url.size()){char v1 = HEXTOI(url[i+1]);char v2 = HEXTOI(url[i+2]);char v = v1 * 16 + v2;res += v;i+=2;continue;}res += url[i];}return res;}//响应状态码的描述信息获取static std::string StatDesc(int statu){std::unordered_map _statu_msg = {{100,"Continue"},{101,"Switching Protocol"},{102,"Processing"},{103,"Early Hints"},{200,"OK"},{201,"Created"},{202,"Accepted"},{203,"Non-Authoritative Information"},{204,"No Content"},{205,"Reset Content"},{206,"Partial Content"},{207,"Multi-Status"},{208,"Already Reported"},{226,"IM Used"},{300,"Multiple Choice"},{301,"Moved Permanently"},{302,"Found"},{303,"See Other"},{304,"Not Modified"},{305,"Use Proxy"},{306,"unused"},{307,"Temporary Redirect"},{308,"Permanent Redirect"},{400,"Bad Request"},{401,"Unauthorized"},{402,"Payment Required"},{403,"Forbidden"},{404,"Not Found"},{405,"Method Not Allowed"},{406,"Not Acceptable"},{407,"Proxy Authentication Required"},{408,"Request Timeout"},{409,"Conflict"},{410,"Gone"},{411,"Length Required"},{412,"Precondition Failed"},{413,"Payload Too Large"},{414,"URI Too Long"},{415,"Unsupported Media Type"},{416,"Range Not Satisfiable"},{417,"Expectation Failed"},{418,"I'm a teapot"},{421,"Misdirected Request"},{422,"Unprocessable Entity"},{423,"Locked"},{424,"Failed Dependency"},{425,"Too Early"},{426,"Upgrade Required"},{428,"Precondition Required"},{429,"Too Many Requests"},{431,"Request Header Fields Too Large"},{451,"Unavailable For Legal Reasons"},{501,"Not Implemented"},{502,"Bad Gateway"},{503,"Service Unavailable"},{504,"Gateway Timeout"},{505,"HTTP Version Not Supported"},{506,"Variant Also Negotiates"},{507,"Insufficient Storage"},{508,"Loop Detected"},{510,"Not Extended"},{511,"Network Authentication Required"}};auto it = _statu_msg.find(statu);if(it != _statu_msg.end()){return it->second;}return "Unknow";}//根据文件后缀名获取文件mimestatic std::string ExtMime(const std::string &filename){std::unordered_map _mime_msg = {{".aac","audio/aac"},{".abw","application/x-abiword"},{".arc","application/x-freearc"},{".avi","video/x-msvideo"},{".azw","application/vnd.amazon.ebook"},{".bin","application/octet-stream"},{".bmp","image/bmp"},{".bz", "application/x-bzip"},{".bz2","application/x-bzip2"},{".csh","application/x-csh"},{".css","text/css"},{".csv","text/csv"},{".doc","application/msword"},{".docx", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"},{".eot","application/vnd.ms-fontobject"},{".epub", "application/epub+zip"},{".gif","image/gif"},{".htm","text/html"},{".html", "text/html"},{".ico","image/vnd.microsoft.icon"},{".ics","text/calendar"},{".jar","application/java-archive"},{".jpeg", "image/jpeg"},{".jpg","image/jpeg"},{".js", "text/javascript"},{".json", "application/json"},{".jsonld", "application/ld+json"},{".mid","audio/midi"},{".midi", "audio/x-midi"},{".mjs","text/javascript"},{".mp3","audio/mpeg"},{".mpeg", "video/mpeg"},{".mpkg", "application/vnd.apple.installer+xml"},{".odp","application/vnd.oasis.opendocument.presentation"},{".ods","application/vnd.oasis.opendocument.spreadsheet"},{".odt","application/vnd.oasis.opendocument.text"},{".oga","audio/ogg"},{".ogv","video/ogg"},{".ogx","application/ogg"},{".otf","font/otf"},{".png","image/png"},{".pdf","application/pdf"},{".ppt","application/vnd.ms-powerpoint"},{".pptx", "application/vnd.openxmlformats-officedocument.presentationml.presentation"},{".rar","application/x-rar-compressed"},{".rtf","application/rtf"},{".sh", "application/x-sh"},{".svg","image/svg+xml"},{".swf","application/x-shockwave-flash"},{".tar","application/x-tar"},{".tif","image/tiff"},{".tiff", "image/tiff"},{".ttf","font/ttf"},{".txt","text/plain"},{".vsd","application/vnd.visio"},{".wav","audio/wav"},{".weba", "audio/webm"},{".webm", "video/webm"},{".webp", "image/webp"},{".woff", "font/woff"},{".woff2","font/woff2"},{".xhtml","application/xhtml+xml"},{".xls","application/vnd.ms-excel"},{".xlsx", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"},{".xml","application/xml"},{".xul","application/vnd.mozilla.xul+xml"},{".zip","application/zip"},{".3gp","video/3gpp"},{".3g2","video/3gpp2"},{".7z", "application/x-7z-compressed"}};size_t pos = filename.find_last_of('.');if(pos != std::string::npos){std::string ext = filename.substr(pos);auto it = _mime_msg.find(ext);if(it == _mime_msg.end()){return "application/octet-stream";}return it->second;}return "application/octet-stream";}//判断一个文件是否是一个目录static bool IsDirectory(const std::string &filename){struct stat st;int ret = stat(filename.c_str(), & st);if(ret < 0){return false;}return S_ISDIR(st.st_mode);}//判断一个文件是否是一个普通文件static bool IsRegular(const std::string &filename){struct stat st;int ret = stat(filename.c_str(), & st);if(ret < 0){return false;}return S_ISREG(st.st_mode);}//http请求的资源路径有效性判断// /index.html ---前边的/叫做相对根目录 映射的是某个服务器的子目录// 想表达的意思就是,客户端只能请求相对根目录中的资源,其他地方的资源不予理会// /../login,这个路径中的..会让路径的查找跑到相对根目录之外,不安全static bool VaidPath(const std::string &path){//思想:按照/进行路径分割,根据有多少子目录,计算目录深度,有多少层,深度不能小于0std::vector subdir;Split(path, "/", &subdir);int level = 0;for(auto& dir : subdir){if(dir == ".."){level--;if(level < 0)return false;continue;}level++;}return true;}};

2、HttpRequest模块

http请求信息模块:存储HTTP请求信息要素,提供简单的功能性接口

请求信息要素

请求行:请求方法,URL,协议版本

URL:资源路径,查询字符串

​ GET /search" />class HttpRequest {public:std::string _method;//请求方法std::string _path;//资源路径std::string _version; //协议版本std::string _body;//请求正文std::smatch _matches; //资源路径的正则提取数据std::unordered_map _headers;//头部字段std::unordered_map _params; //查询字符串public:HttpRequest():_version("HTTP/1.1") {}void ReSet() {_method.clear();_path.clear();_version = "HTTP/1.1";_body.clear();std::smatch match;_matches.swap(match);_headers.clear();_params.clear();}//插入头部字段void SetHeader(const std::string &key, const std::string &val) {_headers.insert(std::make_pair(key, val));}//判断是否存在指定头部字段bool HasHeader(const std::string &key) const {auto it = _headers.find(key);if (it == _headers.end()) {return false;}return true;}//获取指定头部字段的值std::string GetHeader(const std::string &key) const {auto it = _headers.find(key);if (it == _headers.end()) {return "";}return it->second;}//插入查询字符串void SetParam(const std::string &key, const std::string &val) {_params.insert(std::make_pair(key, val));}//判断是否有某个指定的查询字符串bool HasParam(const std::string &key) const {auto it = _params.find(key);if (it == _params.end()) {return false;}return true;}//获取指定的查询字符串std::string GetParam(const std::string &key) const {auto it = _params.find(key);if (it == _params.end()) {return "";}return it->second;}//获取正文长度size_t ContentLength() const {// Content-Length: 1234\r\nbool ret = HasHeader("Content-Length");if (ret == false) {return 0;}std::string clen = GetHeader("Content-Length");return std::stol(clen);}//判断是否是短链接bool Close() const {// 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") {return false;}return true;}};

3、HttpReponse模块

功能:存储HTTP响应信息要素,提供简单的功能性接口

响应信息要素:

​ 1、响应状态码

​ 2、头部字段

​ 3、响应正文

​ 4、重定向信息(是否进行了重定向的标志,重定向的路径)

功能性接口:w欸蓝便于成员的访问,设置为公有成员

​ 1、头部字段的新增

​ 2、正文的设置

​ 3、重定向的设置

​ 4、长短连接的判断

class HttpResponse {public:int _statu;bool _redirect_flag;std::string _body;std::string _redirect_url;std::unordered_map _headers;public:HttpResponse():_redirect_flag(false), _statu(200) {}HttpResponse(int statu):_redirect_flag(false), _statu(statu) {} void ReSet() {_statu = 200;_redirect_flag = false;_body.clear();_redirect_url.clear();_headers.clear();}//插入头部字段void SetHeader(const std::string &key, const std::string &val) {_headers.insert(std::make_pair(key, val));}//判断是否存在指定头部字段bool HasHeader(const std::string &key) {auto it = _headers.find(key);if (it == _headers.end()) {return false;}return true;}//获取指定头部字段的值std::string GetHeader(const std::string &key) {auto it = _headers.find(key);if (it == _headers.end()) {return "";}return it->second;}void SetContent(const std::string &body,const std::string &type = "text/html") {_body = body;SetHeader("Content-Type", type);}void SetRedirect(const std::string &url, int statu = 302) {_statu = statu;_redirect_flag = true;_redirect_url = url;}//判断是否是短链接bool Close() {// 没有Connection字段,或者有Connection但是值是close,则都是短链接,否则就是长连接if (HasHeader("Connection") == true && GetHeader("Connection") == "keep-alive") {return false;}return true;}};

4、HttpContext模块

意义

​ 有可能出现接收的数据并不是一条完整的HTTP请求数据,也就是请求的处理需要在多次收到数据后才能处理完成,因此在每次处理的时候,就需要将处理进度记录起来,以便于下次从当前进度继续向下处理

接收状态
接收请求行,当前处于接收并处理请求行的阶段
接收请求头部,,表示请求头部的接收还没有完毕
接收正文,表示还有正文没有接收完毕
接收数据完毕,这是一个接收完毕,可以对请求进行处理的阶段
接收处理请求出错

typedef enum {RECV_HTTP_ERROR,RECV_HTTP_LINE,RECV_HTTP_HEAD,RECV_HTTP_BODY,RECV_HTTP_OVER}HttpRecvStatu;#define MAX_LINE 8192class HttpContext {private:int _resp_statu; //响应状态码HttpRecvStatu _recv_statu; //当前接收及解析的阶段状态HttpRequest _request;//已经解析得到的请求信息private:bool ParseHttpLine(const std::string &line) {std::smatch matches;std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?", std::regex::icase);bool ret = std::regex_match(line, matches, e);if (ret == false) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 400;//BAD REQUESTreturn false;}//0 : GET /bitejiuyeke/login?user=xiaoming&pass=123123 HTTP/1.1//1 : GET//2 : /bitejiuyeke/login//3 : user=xiaoming&pass=123123//4 : HTTP/1.1//请求方法的获取_request._method = matches[1];std::transform(_request._method.begin(), _request._method.end(), _request._method.begin(), ::toupper);//资源路径的获取,需要进行URL解码操作,但是不需要+转空格_request._path = Util::UrlDecode(matches[2], false);//协议版本的获取_request._version = matches[4];//查询字符串的获取与处理std::vector query_string_arry;std::string query_string = matches[3];//查询字符串的格式 key=val&key=val....., 先以 & 符号进行分割,得到各个字串Util::Split(query_string, "&", &query_string_arry);//针对各个字串,以 = 符号进行分割,得到key 和val, 得到之后也需要进行URL解码for (auto &str : query_string_arry) {size_t pos = str.find("=");if (pos == std::string::npos) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 400;//BAD REQUESTreturn false;}std::string key = Util::UrlDecode(str.substr(0, pos), true);std::string val = Util::UrlDecode(str.substr(pos + 1), true);_request.SetParam(key, val);}return true;}bool RecvHttpLine(Buffer *buf) {if (_recv_statu != RECV_HTTP_LINE) return false;//1. 获取一行数据,带有末尾的换行 std::string line = buf->GetLineAndPop();//2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大if (line.size() == 0) {//缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的if (buf->ReadAbleSize() > MAX_LINE) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414;//URI TOO LONGreturn false;}//缓冲区中数据不足一行,但是也不多,就等等新数据的到来return true;}if (line.size() > MAX_LINE) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414;//URI TOO LONGreturn false;}bool ret = ParseHttpLine(line);if (ret == false) {return false;}//首行处理完毕,进入头部获取阶段_recv_statu = RECV_HTTP_HEAD;return true;}bool RecvHttpHead(Buffer *buf) {if (_recv_statu != RECV_HTTP_HEAD) return false;//一行一行取出数据,直到遇到空行为止, 头部的格式 key: val\r\nkey: val\r\n....while(1){std::string line = buf->GetLineAndPop();//2. 需要考虑的一些要素:缓冲区中的数据不足一行, 获取的一行数据超大if (line.size() == 0) {//缓冲区中的数据不足一行,则需要判断缓冲区的可读数据长度,如果很长了都不足一行,这是有问题的if (buf->ReadAbleSize() > MAX_LINE) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414;//URI TOO LONGreturn false;}//缓冲区中数据不足一行,但是也不多,就等等新数据的到来return true;}if (line.size() > MAX_LINE) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 414;//URI TOO LONGreturn false;}if (line == "\n" || line == "\r\n") {break;}bool ret = ParseHttpHead(line);if (ret == false) {return false;}}//头部处理完毕,进入正文获取阶段_recv_statu = RECV_HTTP_BODY;return true;}bool ParseHttpHead(std::string &line) {//key: val\r\nkey: val\r\n....if (line.back() == '\n') line.pop_back();//末尾是换行则去掉换行字符if (line.back() == '\r') line.pop_back();//末尾是回车则去掉回车字符size_t pos = line.find(": ");if (pos == std::string::npos) {_recv_statu = RECV_HTTP_ERROR;_resp_statu = 400;//return false;}std::string key = line.substr(0, pos);std::string val = line.substr(pos + 2);_request.SetHeader(key, val);return true;}bool RecvHttpBody(Buffer *buf) {if (_recv_statu != RECV_HTTP_BODY) return false;//1. 获取正文长度size_t content_length = _request.ContentLength();if (content_length == 0) {//没有正文,则请求接收解析完毕_recv_statu = RECV_HTTP_OVER;return true;}//2. 当前已经接收了多少正文,其实就是往_request._body 中放了多少数据了size_t real_len = content_length - _request._body.size();//实际还需要接收的正文长度//3. 接收正文放到body中,但是也要考虑当前缓冲区中的数据,是否是全部的正文//3.1 缓冲区中数据,包含了当前请求的所有正文,则取出所需的数据if (buf->ReadAbleSize() >= real_len) {_request._body.append(buf->ReadPosition(), real_len);buf->MoveReadOffset(real_len);_recv_statu = RECV_HTTP_OVER;return true;}//3.2 缓冲区中数据,无法满足当前正文的需要,数据不足,取出数据,然后等待新数据到来_request._body.append(buf->ReadPosition(), buf->ReadAbleSize());buf->MoveReadOffset(buf->ReadAbleSize());return true;}public:HttpContext():_resp_statu(200), _recv_statu(RECV_HTTP_LINE) {}void ReSet() {_resp_statu = 200;_recv_statu = RECV_HTTP_LINE;_request.ReSet();}int RespStatu() { return _resp_statu; }HttpRecvStatu RecvStatu() { return _recv_statu; }HttpRequest &Request() { return _request; }//接收并解析HTTP请求void RecvHttpRequest(Buffer *buf) {//不同的状态,做不同的事情,但是这里不要break, 因为处理完请求行后,应该立即处理头部,而不是退出等新数据switch(_recv_statu) {case RECV_HTTP_LINE: RecvHttpLine(buf);case RECV_HTTP_HEAD: RecvHttpHead(buf);case RECV_HTTP_BODY: RecvHttpBody(buf);}return;}};

5、HttpServer模块

功能:对于HTTP协议支持所有模块的整合

意义:让HTTP服务器的搭建变得更加简便

设计一张请求路由表:

表中记录了针对哪个请求,应该使用哪个函数来进行业务处理的映射关系

当服务器收到了一个请求,就在请求路由表中,查找有没有对应请求的处理函数,如果有,则执行对应的处理函数即可

什么请求,怎么处理,由用户来设定,服务器收到了请求只需要执行函数即可。

好处:用户只需要实现业务处理函数,然后将请求与处理函数的映射关系,添加到服务器中,而服务器只需要接收数据,解析数据,查找

路由表映射关系,执行业务处理函数

要素

​ 1、GET请求的路由映射表

​ 2、POST请求的路由映射表

​ 3、PUT请求的路由映射表

​ 4、DELETE请求的路由映射表 —路由映射表记录对应请求方法的处理函数映射关系

​ 5、高性能TCP服务器 — 进行连接的IO操作

​ 6、静态资源相对根目录 — 实现静态资源的处理

接口

​ 服务器处理流程:

​ 1、从socket接收数据,放到接收缓冲区

​ 2、调用OnMessage回调函数进行业务处理

​ 3、对请求进行解析,得到一个HttpRequest结构,包含了所有的请求要素

​ 4、进行请求的路由查找 – 找到对应请求的处理方法

​ 1.静态资源请求—一些实体文件资源的请求

​ 将静态资源文件的数据读取处理,填充到HttpResponse结构中

​ 2.功能性请求—在请求路由映射表中查找处理函数,找到了则执行函数

​ 具体的业务处理,并运行HttpResponse结构的数据填充

​ 5、对静态资源请求/功能性请求进行处理完毕后,得到了一个填充了响应信息的HttpResponse对象,组织http格式响应,进行发送

功能

​ 1、添加请求-处理函数映射信息(GET/POST/PUT/DELETE)

​ 2、设置静态资源根目录

​ 3、设置线程池中线程数量

​ 4、设置是否设置超时连接释放

​ 5、启动服务器

​ 6、OnConnected —用于给TcpServer设置协议上下文

​ 7、OnMessage — 用于进行缓冲区数据解析处理

​ 8、请求路由查找:静态资源请求查找和处理,功能性请求的查找和处理

​ 9、组织响应进行回复

class HttpServer {private:using Handler = std::function;using Handlers = std::vector<std::pair>;Handlers _get_route;Handlers _post_route;Handlers _put_route;Handlers _delete_route;std::string _basedir; //静态资源根目录TcpServer _server;private:void ErrorHandler(const HttpRequest &req, HttpResponse *rsp) {//1. 组织一个错误展示页面std::string body;body += "";body += "";body += "";body += "";body += "";body += "

";body += std::to_string(rsp->_statu);body += " ";body += Util::StatuDesc(rsp->_statu);body += "

";body += "";body += "";//2. 将页面数据,当作响应正文,放入rsp中rsp->SetContent(body, "text/html");}//将HttpResponse中的要素按照http协议格式进行组织,发送void WriteReponse(const PtrConnection &conn, const HttpRequest &req, HttpResponse &rsp) {//1. 先完善头部字段if (req.Close() == true) {rsp.SetHeader("Connection", "close");}else {rsp.SetHeader("Connection", "keep-alive");}if (rsp._body.empty() == false && rsp.HasHeader("Content-Length") == false) {rsp.SetHeader("Content-Length", std::to_string(rsp._body.size()));}if (rsp._body.empty() == false && rsp.HasHeader("Content-Type") == false) {rsp.SetHeader("Content-Type", "application/octet-stream");}if (rsp._redirect_flag == true) {rsp.SetHeader("Location", rsp._redirect_url);}//2. 将rsp中的要素,按照http协议格式进行组织std::stringstream rsp_str;rsp_str << req._version << " " << std::to_string(rsp._statu) << " " << Util::StatuDesc(rsp._statu) << "\r\n";for (auto &head : rsp._headers) {rsp_str << head.first << ": " << head.second << "\r\n";}rsp_str << "\r\n";rsp_str <Send(rsp_str.str().c_str(), rsp_str.str().size());}bool IsFileHandler(const HttpRequest &req) {// 1. 必须设置了静态资源根目录if (_basedir.empty()) {return false;}// 2. 请求方法,必须是GET / HEAD请求方法if (req._method != "GET" && req._method != "HEAD") {return false;}// 3. 请求的资源路径必须是一个合法路径if (Util::ValidPath(req._path) == false) {return false;}// 4. 请求的资源必须存在,且是一个普通文件//有一种请求比较特殊 -- 目录:/, /image/, 这种情况给后边默认追加一个 index.html// index.html/image/a.png// 不要忘了前缀的相对根目录,也就是将请求路径转换为实际存在的路径/image/a.png-> ./wwwroot/image/a.pngstd::string req_path = _basedir + req._path;//为了避免直接修改请求的资源路径,因此定义一个临时对象if (req._path.back() == '/'){req_path += "index.html";}if (Util::IsRegular(req_path) == false) {return false;}return true;}//静态资源的请求处理 --- 将静态资源文件的数据读取出来,放到rsp的_body中, 并设置mimevoid FileHandler(const HttpRequest &req, HttpResponse *rsp) {std::string req_path = _basedir + req._path;if (req._path.back() == '/'){req_path += "index.html";}bool ret = Util::ReadFile(req_path, &rsp->_body);if (ret == false) {return;}std::string mime = Util::ExtMime(req_path);rsp->SetHeader("Content-Type", mime);return;}//功能性请求的分类处理void Dispatcher(HttpRequest &req, HttpResponse *rsp, Handlers &handlers) {//在对应请求方法的路由表中,查找是否含有对应资源请求的处理函数,有则调用,没有则发挥404//思想:路由表存储的时键值对 -- 正则表达式 & 处理函数//使用正则表达式,对请求的资源路径进行正则匹配,匹配成功就使用对应函数进行处理///numbers/(\d+) /numbers/12345for (auto &handler : handlers) {const std::regex &re = handler.first;const Handler &functor = handler.second;bool ret = std::regex_match(req._path, req._matches, re);if (ret == false) {continue;}return functor(req, rsp);//传入请求信息,和空的rsp,执行处理函数}rsp->_statu = 404;}void Route(HttpRequest &req, HttpResponse *rsp) {//1. 对请求进行分辨,是一个静态资源请求,还是一个功能性请求// 静态资源请求,则进行静态资源的处理// 功能性请求,则需要通过几个请求路由表来确定是否有处理函数// 既不是静态资源请求,也没有设置对应的功能性请求处理函数,就返回405if (IsFileHandler(req) == true) {//是一个静态资源请求, 则进行静态资源请求的处理return FileHandler(req, rsp);}if (req._method == "GET" || req._method == "HEAD") {return Dispatcher(req, rsp, _get_route);}else if (req._method == "POST") {return Dispatcher(req, rsp, _post_route);}else if (req._method == "PUT") {return Dispatcher(req, rsp, _put_route);}else if (req._method == "DELETE"){return Dispatcher(req, rsp, _delete_route);}rsp->_statu = 405;// Method Not Allowedreturn ;}//设置上下文void OnConnected(const PtrConnection &conn) {conn->SetContext(HttpContext());logger->debug("NEW CONNECTION %p", conn.get());}//缓冲区数据解析+处理void OnMessage(const PtrConnection &conn, Buffer *buffer) {while(buffer->ReadAbleSize() > 0){//1. 获取上下文HttpContext *context = conn->GetContext()->get();//2. 通过上下文对缓冲区数据进行解析,得到HttpRequest对象//1. 如果缓冲区的数据解析出错,就直接回复出错响应//2. 如果解析正常,且请求已经获取完毕,才开始去进行处理context->RecvHttpRequest(buffer);HttpRequest &req = context->Request();HttpResponse rsp(context->RespStatu());if (context->RespStatu() >= 400) {//进行错误响应,关闭连接ErrorHandler(req, &rsp);//填充一个错误显示页面数据到rsp中WriteReponse(conn, req, rsp);//组织响应发送给客户端context->ReSet();buffer->MoveReadOffset(buffer->ReadAbleSize());//出错了就把缓冲区数据清空conn->Shutdown();//关闭连接return;}if (context->RecvStatu() != RECV_HTTP_OVER) {//当前请求还没有接收完整,则退出,等新数据到来再重新继续处理return;}//3. 请求路由 + 业务处理Route(req, &rsp);//4. 对HttpResponse进行组织发送WriteReponse(conn, req, rsp);//5. 重置上下文context->ReSet();//6. 根据长短连接判断是否关闭连接或者继续处理if (rsp.Close() == true) conn->Shutdown();//短链接则直接关闭}return;}public:HttpServer(int port, int timeout = DEFALT_TIMEOUT):_server(port) {_server.EnableInactiveRelease(timeout);_server.SetConnectedCallback(std::bind(&HttpServer::OnConnected, this, std::placeholders::_1));_server.SetMessageCallback(std::bind(&HttpServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));}void SetBaseDir(const std::string &path) {assert(Util::IsDirectory(path) == true);_basedir = path;}/*设置/添加,请求(请求的正则表达)与处理函数的映射关系*/void Get(const std::string &pattern, const Handler &handler) {_get_route.push_back(std::make_pair(std::regex(pattern), handler));}void Post(const std::string &pattern, const Handler &handler) {_post_route.push_back(std::make_pair(std::regex(pattern), handler));}void Put(const std::string &pattern, const Handler &handler) {_put_route.push_back(std::make_pair(std::regex(pattern), handler));}void Delete(const std::string &pattern, const Handler &handler) {_delete_route.push_back(std::make_pair(std::regex(pattern), handler));}void SetThreadCount(int count) {_server.SetThreadCount(count);}void Listen() {_server.Start();}};

6、HttpServer服务器测试

#include "http.hpp"#define WWWROOT "./wwwroot/"std::string RequestStr(const HttpRequest &req) {std::stringstream ss;ss << req._method << " " << req._path << " " << req._version << "\r\n";for (auto &it : req._params) {ss << it.first << ": " << it.second << "\r\n";}for (auto &it : req._headers) {ss << it.first << ": " << it.second << "\r\n";}ss << "\r\n";ss <SetContent(RequestStr(req), "text/plain");}void Login(const HttpRequest &req, HttpResponse *rsp) {rsp->SetContent(RequestStr(req), "text/plain");}void PutFile(const HttpRequest &req, HttpResponse *rsp) {std::string pathname = WWWROOT + req._path;Util::WriteFile(pathname, req._body);}void DelFile(const HttpRequest &req, HttpResponse *rsp) {rsp->SetContent(RequestStr(req), "text/plain");}int main(){HttpServer server(7777);server.SetThreadCount(3);server.SetBaseDir(WWWROOT);//设置静态资源根目录,告诉服务器有静态资源请求到来,需要到哪里去找资源文件server.Get("/hello", Hello);server.Post("/login", Login);server.Put("/1234.txt", PutFile);server.Delete("/1234.txt", DelFile);server.Listen();return 0;}

六、服务器测试

1、长连接测试

#include "../server.hpp"int main(){Socket cli_sock;cli_sock.CreateClient(7777, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";while(1) {assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));logger->debug("[%s]", buf);sleep(3);}cli_sock.Close();return 0;}

这里验证了30秒,信息一直在发送,所以长连接测试成功。

2、超时连接测试

/*超时连接测试1:创建一个客户端,给服务器发送一次数据后,不动了,查看服务器是否会正常的超时关闭连接*/#include "../server.hpp"int main(){Socket cli_sock;cli_sock.CreateClient(7777, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";while(1) {assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));logger->debug("[%s]", buf);sleep(15);}cli_sock.Close();return 0;}

这里测试的超时连接关闭,我设置的超时时间是10秒,服务端和客户端运行后,客户端发送了一次数据后,睡眠15秒,

10秒后服务器超时释放,而客户端到了15秒发送数据,这时服务端已经释放连接,发送数据失败,所以超时连接测试成功。

3、Http服务器错误请求测试

/*给服务器发送一个数据,告诉服务器要发送1024字节的数据,但是实际发送的数据不足1024,查看服务器处理结果*//*1. 如果数据只发送一次,服务器将得不到完整请求,就不会进行业务处理,客户端也就得不到响应,最终超时关闭连接2. 连着给服务器发送了多次 小的请求,服务器会将后边的请求当作前边请求的正文进行处理,而后便处理的时候有可能就会因为处理错误而关闭连接*/#include "../server.hpp"int main(){Socket cli_sock;cli_sock.CreateClient(7777, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 100\r\n\r\nbitejiuyeke";while(1) {assert(cli_sock.Send(req.c_str(), req.size()) != -1);assert(cli_sock.Send(req.c_str(), req.size()) != -1);assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));logger->debug("[%s]", buf);sleep(3);}cli_sock.Close();return 0;}

注意:这里测试的时候出了一个bug,就是在测试服务器请求出错时,导致缓冲区频繁扩容,

因为在出错的时候,想要调用关闭连接的操作,没有将状态码重新设置,如果状态码没有重新设置,

他就不会从缓存区提取数据,导致数据越来越多,导致缓冲区频繁扩容

4、HTTP服务器业务处理超时测试

#include "../source/server.hpp"int main(){signal(SIGCHLD, SIG_IGN);for (int i = 0; i < 10; i++) {pid_t pid = fork();if (pid < 0) {DBG_LOG("FORK ERROR");return -1;}else if (pid == 0) {Socket cli_sock;cli_sock.CreateClient(8085, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";while(1) {assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));DBG_LOG("[%s]", buf);}cli_sock.Close();exit(0);}}while(1) sleep(1);return 0;}

业务处理超时,查看服务器的处理情况
当服务器达到了一个性能瓶颈,在一次业务处理中花费了太长的时间(超过了服务器设置的非活跃超时时间)

 1. 在一次业务处理中耗费太长时间,导致其他的连接也被连累超时,其他的连接有可能会被拖累超时释放假设现在12345描述符就绪了, 在处理1的时候花费了30s处理完,超时了,导致2345描述符因为长时间没有刷新活跃度1. 如果接下来的2345描述符都是通信连接描述符,如果都就绪了,则并不影响,因为接下来就会进行处理并刷新活跃度 2. 如果接下来的2号描述符是定时器事件描述符,定时器触发超时,执行定时任务,就会将345描述符给释放掉这时候一旦345描述符对应的连接被释放,接下来在处理345事件的时候就会导致程序崩溃(内存访问错误)因此这时候,在本次事件处理中,并不能直接对连接进行释放,而应该将释放操作压入到任务池中,等到事件处理完了执行任务池中的任务的时候,再去释放

5、HTTP服务器同时多条请求测试

/*一次性给服务器发送多条数据,然后查看服务器的处理结果*//*每一条请求都应该得到正常处理*/#include "../server.hpp"int main(){Socket cli_sock;cli_sock.CreateClient(7777, "127.0.0.1");std::string req = "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";req += "GET /hello HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length: 0\r\n\r\n";while(1) {assert(cli_sock.Send(req.c_str(), req.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));logger->debug("[%s]", buf);sleep(3);}cli_sock.Close();return 0;}

6、HTTP服务器大文件传输测试

/*大文件传输测试,给服务器上传一个大文件,服务器将文件保存下来,观察处理结果*//*上传的文件,和服务器保存的文件一致*/#include "../server.hpp"#include "../Http/http.hpp"int main(){Socket cli_sock;cli_sock.CreateClient(7777, "127.0.0.1");std::string req = "PUT /1234.txt HTTP/1.1\r\nConnection: keep-alive\r\n";std::string body;Util::ReadFile("./hello.txt", &body);req += "Content-Length: " + std::to_string(body.size()) + "\r\n\r\n";assert(cli_sock.Send(req.c_str(), req.size()) != -1);assert(cli_sock.Send(body.c_str(), body.size()) != -1);char buf[1024] = {0};assert(cli_sock.Recv(buf, 1023));logger->debug("[%s]", buf);sleep(3);cli_sock.Close();return 0;}

7、服务器压力测试说明

性能压力测试:

​ 并发量:可以同时处理多少客户端的请求而不会出现连接失败

​ QPS:每秒钟处理的包的的数量

借助:webbench工具

​ 原理:创建大量的进程,在进程中,创建客户端连接服务器,发送请求,收到响应后关闭连接,开始下一个连接的建立

抛开环境说性能测试都是无知的

​ 测试环境:

​ 服务器是2核2g,3m的云服务器,服务器程序采⽤1主3从reactor模式

​ 使用webbench以5000的并发量,向服务器发送请求,发送60s

​ 得到的结果是:每分钟13万的并发量

虚拟机环境

服务器环境:4核4G虚拟机,服务器程序采⽤1主3从reactor模式

webbench客⼾端环境:同⼀个虚拟机…