黃冈建设厅官方网站娄底地seo
C++网络编程(三)IO复用
前言
多进程/多线程网络服务端在创建进程/线程时,CPU和内存开销很大。因为多线程/进程并发模型,为每个socket分配一个线程/进程。而IO复用采用单个的进程/线程就可以管理多个socket。
select
系统调用原型:
#include <sys/select.h>
int select(int nfds,fd_set*readfds, fd_set* writefds, fd_set* exceptfds, struct timeval* timeout);
ndfs
参数指定被监听的文件描述符总数,通常设置为select监听的所有文件描述符中最大值+1,因为文件描述符从0开始编号。
readfds
,writefds
,exceptfds
参数指向可读,可写,异常等事件对应的文件描述符集合。他们将各自感兴趣的文件描述符传入函数,当select调用返回时,内核将修改他们来通知程序哪些文件描述符已经就绪。
#include <typesizes.h>
#define __FD_SETSIZE 1024
#include <sys/select.h>
#define FD_SETSIZE __FD_SETSIZE
typedef long int __fd_mask;
#undef __NFDBITS
#define __NFDBITS (8*(int)sizeof(__fd_mask))
typedef struct
{
#ifdef __USE_XOPEN__fd_mask fds_bits[__FD_SETSIZE/__NFDBITS];#define __FDS_BITS (set)((set)->fds_bits)
#else__fd_mask __fds_bits[__FD_SETSIZE/__NFDBITS];#define __FDS_BITS (set)((set)->__fds_bits)
#endif
}fd_set;
fd_set结构体仅包含一个整型数组,该数组的每个元素的每一位(bit)标记一个文件描述符。fd_set能容纳的文件描述符数量由FD_SETSIZE指定。
对于fd_set结构体中的位操作,有设置对应的宏进行处理:
#include <sys/select.h>
FD_ZERO(fd_set* fdset);/*清除fdset的所有位*/
FD_SET(int fd, fd_set* fdset);/*设置fdset的位fd*/
FD_CLR(int fd, fd_set* fdset);/*清除fdset的位fd*/
int FD_ISSET(int fd, fd_set* fdset);/*测试fdset的位fd是否被设置*/
timeout
参数用来设置select的超时时间。它是一个timeval结构类型的指针,采用指针参数是因为内核将修改它以告诉应用程序select等待了多久。不过我们不能完全信任select调用返回后的timeout值,比如调用失败时timeout值是不确定的。timeval结构体的定义如下:
struct timeval
{long tv_sec;/*秒数*/long tv_usec;/*微秒数*/
};
如果给timeout变量的tv_sec成员和tv_usec成员都传递0,则select将立即返回。如果给timeout传递NULL,则select将一直阻塞,直到某个文件描述符就绪。
select
成功时返回就绪文件描述符的总数。失败返回-1并设置errno。如果是在select等待期间,程序接收到信号,则select立即返回-1,并设置errno为EINTR。
select流程
文件描述符就绪条件
在网络编程中,下列情况下socket可读:
- socket内核接收缓存区中的字节数大于或等于其低水位标记SO_RCVLOWAT。此时我们可以无阻塞地读该socket,并且读操作返回的字节数大于0。
- socket通信的对方关闭连接。此时对该socket的读操作将返回0。
- 监听socket上有新的连接请求。
- socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。
下列情况下socket可写:
- socket内核发送缓存区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT。此时我们可以无阻塞地写该socket,并且写操作返回的字节数大于0。
- socket的写操作被关闭。对写操作被关闭的socket执行写操作将触发一个SIGPIPE信号。
- socket使用非阻塞connect连接成功或者失败(超时)之后。
- socket上有未处理的错误。此时我们可以使用getsockopt来读取和清除该错误。
网络程序中,select能处理的异常情况只有一种:socket上接收到带外数据。
位图
select中对文件描述符是否有改动是用位图进行标记的,即用一位表示一个文件描述符,当其中某一位发生改变时,说明对应的文件描述符有相应的事件发生。但值得注意的是,这一次改变后,在下一轮的select调用前需要先将其重置。
示例代码
//server.cpp
#include <bits/stdc++.h>
#include <cstring>
#include <errno.h>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>using namespace std;int main()
{int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock == -1){cout << "创建socket失败:" << strerror(errno) << endl;exit(0);}sockaddr_in addr;bzero(&addr, 0);addr.sin_family = AF_INET;addr.sin_addr.s_addr = htonl(INADDR_ANY); // 自动获取主机ipaddr.sin_port = htons(8000);int ret;ret = bind(sock, (struct sockaddr *)&addr, sizeof(addr));if (ret == -1){cout << "绑定socket失败:" << strerror(errno) << endl;exit(0);}ret = listen(sock, 10);if (ret == -1){cout << "监听socket失败:" << strerror(errno) << endl;exit(0);}cout << "初始化完成" << endl;fd_set readset;char buff[4096];list<int> sock_list;while (1){FD_ZERO(&readset);FD_SET(sock, &readset);//设置监听socketint nfds = sock;for (int it : sock_list) //计算最大的fd值{nfds = max(nfds, it);FD_SET(it, &readset);//}ret = select(nfds + 1, &readset, NULL, NULL, NULL);if (ret == -1){cout << strerror(errno) << endl;break;}else{if (FD_ISSET(sock, &readset)) // 有新的连接请求{int clientfd = accept(sock, NULL, 0);cout << "new connect: " << clientfd << endl;sock_list.push_back(clientfd);}else{for (auto fd : sock_list) // 轮询所有的连接{if (FD_ISSET(fd, &readset)){int len = recv(fd, buff, 4096, 0);if (len <= 0){sock_list.remove(fd);cout << fd << " exit" << endl;}else{buff[len] = 0;cout << "receive message:" << buff << endl;}//break;//注释掉break即一轮select中可能不止一个socket有事件发生,但即是不予注释,数据仍不会丢失,select采用的是电平触发的方式。}}}}}return 0;
}
//client.cpp
#include <arpa/inet.h>
#include <bits/stdc++.h>
#include <cstring>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>using namespace std;int main()
{int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock == -1){cout << "创建socket失败:" << strerror(errno) << endl;exit(0);}sockaddr_in addr;bzero(&addr, 0);addr.sin_family = AF_INET;addr.sin_addr.s_addr = inet_addr("127.0.0.1");addr.sin_port = htons(8000);if (connect(sock, (sockaddr *)&addr, sizeof(addr)) == -1){cout << "连接失败:" << strerror(errno) << endl;exit(0);}cout << "连接成功" << endl;char buff[4096];while (1){cin >> buff;if (send(sock, buff, strlen(buff), 0) <= 0){cout << "发送失败" << endl;break;}cout << "发送成功" << endl;}close(sock);return 0;
}
poll
poll系统调用和select极其类似,也是在指定时间内轮询一定数量的文件描述符,以测试其中是否有就绪者。poll的原型如下:
#include <poll.h>
int poll(struct pollfd* fds, nfds_t nfds, int timeout);
- fds参数是一个pollfd结构体的数组,指定所有我们感兴趣的文件描述符上发生的可读、可写和异常等事件。pollfd结构体的定义如下:
struct pollfd
{int fd;/*文件描述符*/short events;/*注册的事件*/short revents;/*实际发生的事件,由内核填充*/
};
fd成员指定文件描述符;
events成员告诉poll监听fd上的哪些事件,它是一系列事件的按位或;
revents成员则由内核修改,以通知应用程序fd上实际发生了哪些事件。
- nfds参数指定被监听事件集合fds的大小。其类型nfds_t的定义如下:
typedef unsigned long int nfds_t;
- timeout参数指定poll的超时值,单位是毫秒。当timeout为-1时,poll调用将永远阻塞,直到某个事件发生;当timeout为0时,poll调用将立即返回。
poll系统调用的返回值的含义与select相同。
示例
//server.cpp
#include <bits/stdc++.h>
#include <cstring>
#include <errno.h>
#include <netinet/in.h>
#include <poll.h>
#include <sys/socket.h>
#include <sys/types.h>using namespace std;int main()
{int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock == -1){cout << "创建socket失败:" << strerror(errno) << endl;exit(0);}sockaddr_in addr;bzero(&addr, 0);addr.sin_family = AF_INET;addr.sin_addr.s_addr = htonl(INADDR_ANY); // 自动获取主机ipaddr.sin_port = htons(8000);int ret;ret = bind(sock, (struct sockaddr *)&addr, sizeof(addr));if (ret == -1){cout << "绑定socket失败:" << strerror(errno) << endl;exit(0);}ret = listen(sock, 10);if (ret == -1){cout << "监听socket失败:" << strerror(errno) << endl;exit(0);}cout << "初始化完成" << endl;pollfd pfd[64];int nfds = 1;pfd[0].fd = sock;pfd[0].events = POLLIN;char buff[4096];while (1){int ret = poll(pfd, nfds, -1);if (ret == -1){cout << strerror(errno) << endl;break;}else{if (pfd[0].revents & (POLLIN)) // 有新的连接请求{int clientfd = accept(sock, NULL, 0);if (nfds < 64){cout << "new connect: " << clientfd << endl;pfd[nfds].fd = clientfd;pfd[nfds].events = POLLIN | POLLRDHUP;nfds++;}elseclose(clientfd);}else{for (int i = 1; i < nfds; i++){if (pfd[i].revents & POLLRDHUP) // 断开连接{cout << pfd[i].fd << " exit." << endl;pfd[i] = pfd[nfds - 1];nfds--;break;}else if (pfd[i].revents & POLLIN) // 消息可读{int len = recv(pfd[i].fd, buff, 4096, 0);if (len <= 0){cout << pfd[i].fd << " exit." << endl;pfd[i] = pfd[nfds - 1];nfds--;}else{buff[len] = 0;cout << "receive message:" << buff << endl;}break;}}}}}return 0;
}
epoll
epoll是Linux特有的IO复用函数,与前面提到的select和poll在实现上有较大的差异。
- epoll使用一组函数来完成任务,而非单个函数
- epoll把用户关心的事件放进内核的一个事件表中,不用像select、poll那样每次都要重置,但epoll需要一个额外的文件描述符来标识内核中的事件表
#include <sys/epoll.h>
//创建指向内核事件表的文件描述符,size参数指示需要多大的事件表
int epoll_create(int size);
/*操作epoll的内核事件表
epfd参数: epoll_create返回的内核事件表描述符
fd参数: 要操作的文件描述符
op参数: 指定操作类型
event参数: 指定事件
*/
int epoll_ctl(int epfd,int op,int fd,struct epoll_event*event);
op参数的类型:
- EPOLL_CTL_ADD: 往事件表里注册fd上的事件
- EPOLL_CTL_MOD: 修改fd上的注册事件
- EPOLL_CTL_DEL: 删除fd上的注册事件
event参数类型:
struct epoll_event
{__uint32_t events;/*epoll事件*/epoll_data_t data;/*用户数据*/
};
epoll支持的事件类型和poll基本相同。表示epoll事件类型的宏是在poll对应的宏前加上“E”,比如epoll的数据可读事件是EPOLLIN。但epoll有两个额外的事件类型——EPOLLET和EPOLLONESHOT。
data用于存储用户数据,其定义如下:
typedef union epoll_data
{void* ptr;int fd;uint32_t u32;uint64_t u64;
}epoll_data_t;
epoll_data_t是一个联合体,其4个成员中使用最多的是fd,它指定事件所从属的目标文件描述符。ptr成员可用来指定与fd相关的用户数据。
epoll_ctl成功时返回0,失败则返回-1并设置errno。
epoll系列系统调用的主要接口是epoll_wait
函数。它在一段超时时间内等待一组文件描述符上的事件,其原型如下:
#include <sys/epoll.h>
/*
maxevents参数指定最多监听多少个事件,它必须大于0。
timeout参数的含义与poll接口的timeout参数相同。
*/
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);
该函数成功时返回就绪的文件描述符的个数,失败时返回-1并设置errno.
epoll_wait函数如果检测到事件,就将所有就绪的事件从内核事件表(由epfd参数指定)中复制到它的第二个参数events指向的数组中。这个数组只用于输出epoll_wait检测到的就绪事件,而不像select和poll的数组参数那样既用于传入用户注册的事件,又用于输出内核检测到的就绪事件。这就极大地提高了应用程序索引就绪文件描述符的效率。
EPOLL 的ET和LT模式
epoll对文件描述符的操作有两种模式:LT(Level Trigger,电平触发)模式和ET(Edge Trigger,边沿触发)模式。当然这是与硬件无关的,只是模拟效果。
默认工作模式是LT,与poll和select一致,这种情况下相当于一个更高效的poll。当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序可以不立即处理该事件。这样,当应用程序下一次调用epoll_wait时,epoll_wait还会再次向应用程序通告此事件,直到该事件被处理。
采用ET模式,当epoll_wait检测到其上有事件发生并将此事件通知应用程序后,应用程序必须立即处理该事件,因为后续的epoll_wait调用将不再向应用程序通知这一事件。可见,ET模式在很大程度上降低了同一个epoll事件被重复触发的次数,因此效率要比LT模式高。
注:每个使用ET模式的文件描述符都应该是非阻塞的。如果文件描述符是阻塞的,那么读或写操作将会因为没有后续的事件而一直处于阻塞状态。
EPOLLONESHOT事件
EPOLLONESHOT事件实现的是socket连接在任一时刻都只被一个线程处理。处理方式类似于加锁操作。
对于注册了EPOLLONESHOT事件的文件描述符,操作系统最多触发其上注册的一个可读、可写或者异常事件,且只触发一次,除非我们使用epoll_ctl函数重置该文件描述符上注册的EPOLLONESHOT事件。当一个线程在处理某个socket时,其他线程是不可能有机会操作该socket的。同样的,在线程处理完socket后,应该立即重置其EPOLLONESHOT事件,保证下次socket可读时,其EPOLLIN事件能被触发。
示例
#include <bits/stdc++.h>
#include <cstring>
#include <errno.h>
#include <netinet/in.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>using namespace std;int main()
{int sock = socket(AF_INET, SOCK_STREAM, 0);if (sock == -1){cout << "创建socket失败:" << strerror(errno) << endl;exit(0);}sockaddr_in addr;bzero(&addr, 0);addr.sin_family = AF_INET;addr.sin_addr.s_addr = htonl(INADDR_ANY); // 自动获取主机ipaddr.sin_port = htons(8000);int ret;ret = bind(sock, (struct sockaddr *)&addr, sizeof(addr));if (ret == -1){cout << "绑定socket失败:" << strerror(errno) << endl;exit(0);}ret = listen(sock, 10);if (ret == -1){cout << "监听socket失败:" << strerror(errno) << endl;exit(0);}cout << "初始化完成" << endl;char buff[4096];int epfd = epoll_create(64);epoll_event events[64], tmpevent;tmpevent.events = EPOLLIN;tmpevent.data.fd = sock;epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &tmpevent);//添加监听socket可读事件while (1){int ret = epoll_wait(epfd, events, 64, -1);if (ret == -1){cout << strerror(errno) << endl;exit(0);}else{//遍历就绪的事件for (int i = 0; i < ret; i++){if (events[i].data.fd == sock) //新的连接{int clientfd = accept(sock, NULL, 0);tmpevent.events = EPOLLIN | EPOLLRDHUP | EPOLLET | EPOLLONESHOT;tmpevent.data.fd = clientfd;epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &tmpevent);cout << "new connect: " << clientfd << endl;}else{//断开连接时会同时触发EPOLLRDHUP和EPOLLIN,因此先判断EPOLLRDHUPif (events[i].events & EPOLLRDHUP){cout << events[i].data.fd << " exit" << endl;close(events[i].data.fd);epoll_ctl(epfd, EPOLL_CTL_DEL, events[i].data.fd, NULL);}else if (events[i].events & EPOLLIN){while (1){int len = recv(events[i].data.fd, buff, 4096, MSG_DONTWAIT);if (len <= 0) break;buff[len] = 0;cout << "receive message:" << buff << endl;}//EPOLLONESHOT模式下处理完后必须重置,否则下次不能触发tmpevent.events = EPOLLIN | EPOLLRDHUP | EPOLLET | EPOLLONESHOT;tmpevent.data.fd = events[i].data.fd;epoll_ctl(epfd, EPOLL_CTL_MOD, events[i].data.fd, &tmpevent);}}}}}return 0;
}
应用:非阻塞connect
将socket设置成非阻塞:
int setnonblocking(int fd)
{int old_option = fcntl(fd, F_GETFL);//获取文件状态标记int new_option = old_option | O_NONBLOCK;//加上非阻塞状态fcntl(fd, F_SETFL, new_option);//设置文件状态标记return old_option;
}
使用非阻塞的socket进行connect操作时,如果连接没有立即建立,会返回EINPROGRESS的错误。在这种情况下,我们可以调用select、poll等函数来监听这个连接失败的socket上的可写事件。当select、poll等函数返回后,再利用getsockopt来读取错误码并清除该socket上的错误。如果错误码是0,表示连接成功建立,否则连接失败。
使用非阻塞的socket可以同时对多个socket进行connect操作,然后对没有立即建立连接的socket使用select、poll等函数来监听,提高连接的效率。
不过,该方法仍存在某些移植性问题,比如connect始终失败,select对EINPROGRESS状态下的socket可能不起作用,以及某些版本的getsockopt出现的返回值并不相同。