IO复用
我们在第一章介绍的accept,send,recv等函数他们都是阻塞函数,即意味着当没有连接请求或者接收请求时,他们会一直卡在那一行无法继续向下运行。这对我们的性能有很大的影响,因为我们无法确定什么时候会发起连接,也无法同时处理两个请求,因此这里我们提出了IO复用的方法。即可以处理多个IO请求。在linux中实现IO复用的方法主要有三个:select,poll,epoll。我们会挨个接受他们。需要注意的是这里的IO虽然解决了阻塞线程的问题,但是accept,recv这些函数仍然是阻塞的,只不过我们通过了一些手段来略过了他们。因为我们所有的操作都是在主线程中来执行的,不必去考虑关心其他的线程,因此系统的开销还是很小的。
select
select是最简单的一种模型。他的原理是:我们给每一个关心的socket(在linux也就是文件操作符)以及对应的事件进行了注册。然后我们每隔一段时间就来看看这些事件被触发了没有。
从上图我们就可以很直观的看出来select的原理.那我们现在就来看看怎么使用select。
int select(int maxfdp,fd_set *readset,fd_set *writeset,fd_set *exceptset,const struct timeval *timeout);
第一个参数为最大文件操作符+1,还记得前面说到过的文件操作符吗。我们的监听socketfd最小都为3,则连接socketfd最小也为4,因此这里的maxfdp最小也为5。
后面的三个参数分别是 可读事件表,可写事件表,错误事件表。我们需要把想要监听的事件添加到表中去。主要有以下几个关键函数
fd_set set;
FD_ZERO(&set); /*将set清零使集合中不含任何fd*/
FD_SET(fd, &set); /*将fd加入set集合*/
FD_CLR(fd, &set); /*将fd从set集合中清除*/
FD_ISSET(fd, &set); /*测试fd是否在set集合中*/
由于年代问题,fd_set只用了一个32位的矢量来表示fd,也就是最多也就存1024个fd。这点可以在头文件<sys/select.h>中定义的常量FD_SETSIZE找到。
最后一个参数 timeout则是一个计时器,我们可以设定时间,在设定的时间中select会阻塞线程,而最后的select则会返回就绪的事件数量
那么我们接下来就来看看最简单的使用。
fd_set read;
fd_set write;
fd_set error;
timeval timeout;
//timeout.tv_sec = 1;
FD_ZERO(&read);
FD_ZERO(&write);
FD_ZERO(&error);
while (true)
{
//重置监听的队列
FD_SET(listenfd,&read);
for (int i = 0; i < conns.size(); i++)
{
FD_SET(conns[i],&read);
FD_SET(conns[i],&error);
}
int ret = select(max_fd+1,&read,&write,&error,NULL);
if(ret == -1){
std::cout<< "Error : select" <<std::endl;
continue;
}
else if(ret == 0){
std::cout << "----"<<std::endl;
continue;
}
if(ret > 0){
//判断是否是连接事件
if(FD_ISSET(listenfd,&read)){
int clientid = accept(listenfd,(sockaddr*)&clientAddr,&clientAddrLen);
if(clientid == -1){
std::cout <<"Error : accept"<<std::endl;
continue;
}
else{
conns.push_back(clientid);
std::cout << "有客户端连接进来:" << inet_ntoa(clientAddr.sin_addr)<< std::endl;
}
max_fd = clientid;
}
for(std::vector<int>::iterator it = conns.begin() ; it != conns.end();it++){
memset(buff,0,sizeof(buff-1));
if(FD_ISSET(*it,&read)){
int ret = recv(*it,buff,sizeof(buff)-1,0);
if(ret < 0){
std::cout << "Error : Recv"<<std::endl;
}else{
std::cout << buff<<std::endl;
send(*it,buff,ret,0);
}
}
}
FD_ZERO(&read);
FD_ZERO(&write);
FD_ZERO(&error);
}
首先是声明好事件表并初始化他们,然后便可以不断循环来处理请求。当我们接收到请求的时候,他会将就绪了的事件置为1,为就绪的置为0,因此我们无法知晓具体是哪一个fd的事件被触发了,只能通过循环来用FD_ISSET遍历判断。因此select的复杂度是线性增长,当连接数量越多所需要的处理时间越多。另外由于我们改变了事件表中的值,事件表也因此被污染了,所以需要重新置零并重新监听。Linux: fd_set用法
epoll
Epoll的提出是在select很多年之后了,是目前最好的一种IO多路复用的模型,大概的原理是:我们先将所有关心的事件注册在一个事件表中。这样就不用像select一样每次通知了以后还要重新初始化和订阅。
然后像调用select函数一样,我们一直轮询的来调用查询函数判断是否有事件被触发。
因为epoll不像是select一样每次都要将事件传输到内核空间,因此我们需要一个文件描述符来控制我们所订阅的所有事件。
int epoll_create(int size);
上面的函数便创建了一个事件表。所返回的值是文件操作符(失败的时候会返回负一),而他的参数size 在现在的版本来说没有意义。属于是对系统的一个建议。 当创建好了表以后。我们就可以来订阅具体的事件了。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
我们会调用上面的函数来订阅一个函数,当成功了以后会返回0,否则返回-1.
第一个参数epfd 便是我们的事件表的fd值。
参数op: 表示动作,用三个宏来表示:
- EPOLL_CTL_ADD:注册新的 fd 到 epfd 中;
- EPOLL_CTL_MOD:修改已经注册的fd的监听事件;
- EPOLL_CTL_DEL:从 epfd 中删除一个 fd;
参数fd则是我们要订阅的文件描述符,如listenfd或者clientfd。
参数event: 告诉内核要监听什么事件,struct epoll_event 结构如:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;//保存触发事件的某个文件描述符相关的数据
struct epoll_event {
__uint32_t events; /* epoll event */
epoll_data_t data; /* User data variable */
};
- events可以是以下几个宏的集合:
- EPOLLIN :表示对应的文件描述符可以读(包括对端 SOCKET 正常关闭);
- EPOLLOUT:表示对应的文件描述符可以写;
- EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
- EPOLLERR:表示对应的文件描述符发生错误;
- EPOLLHUP:表示对应的文件描述符被挂断;
- EPOLLET :将 EPOLL 设为边缘触发(Edge Trigger)模式,这是相对于水平触发(Level Trigger)来说的。
- EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里
最后便是我们监听的函数。
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
第一个参数不用说了,第二个参数则是一个epoll_event类型的数组。第三个参数则是一次所返回的最多时间。第四个参数则是一个定时器。值为-1的时候表示堵塞。当函数成功调用的时候,会返回已经就绪的事件数量,并将这些事件装进 events的数组里面。 如果错误则会返回-1.
最后让我们来看看最简单epoll如何使用。
int main(){
// 创建,绑定,监听socket
epoll_event events[100];
int ret;
char buff[1024];
int bufflen = 0;
epollfd = epoll_create(100);
if (epollfd == -1) {
std::cout << "创建epoll错误" << std::endl;
return false;
}
add_event(socketfd,EPOLLIN);
while (1) {
//返回值为有多少个事件可以处理
ret = epoll_wait(epollfd,events,100,-1);
handle_events(events,ret,buff,bufflen);
}
close(epollfd);
}
void MyEpoll::add_event(int fd, int state) {
epoll_event event;
event.events = state;
event.data.fd = fd;
epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&event);
}
void MyEpoll::handle_events(epoll_event* events, int num, char* buff, int& bufflen)
{
int fd;
for (int i = 0; i < num; i++)
{
fd = events[i].data.fd;
if ((fd == socketfd) && (events[i].events & EPOLLIN)) {
handle_accept();
}
else if (events[i].events & EPOLLIN)
do_read(fd, buff, bufflen);
else if (events[i].events & EPOLLOUT)
do_write(fd, buff, bufflen);
else
close(fd);
}
}
由于events的是位操作(类似于c#的多enum)。因此我们只用将其值和想要判断的类型想与即可。记得在所有的操作完毕以后调用close关闭fd。不然有可能会造成无fd可用的局面。
epoll的LT与ET
epoll对文件描述符的操作方式有两种工作模式:LT模式(Level Trigger,水平触发) 和ET模式(Edge Trigger,边缘触发)。
LT模式:当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件,这样,当应用程序下一次调用epoll_wait时,epoll_wait还会向应用程序通告此事件,直到该事件被处理。((缓冲区剩余未读尽的数据会导致epoll_wait返回.)
a.用户不读数据,数据一直在缓冲区,epoll会一直通知
b.用户只读了一部分数据,epoll会通知
c.缓冲区的数据读完了,不通知
ET模式:当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不在向应用程序通告此事件。
a.用户不读数据,数据一致在缓冲区中,epoll下次检测的时候就不通知了
b.用户只读了一部分数据,epoll不通知
c.缓冲区的数据读完了,不通知
epoll的其他事件描述符
待补充
参考
深入浅出理解select、poll、epoll的实现
一文搞懂,4种主要的 I/O 模型(高并发IO的底层原理)
高性能网络编程之 Reactor 网络模型