多进/线程的网络服务端
-
为每个客户端连接创建一个进/线程,消耗的资源很多。
-
1核2GB的虚拟机,大概可以创建一百多个进/线程。(现实中服务器配置至少是这个的十倍,也就是能创建1000多个进程/线程,只能处理1000个客户端连接,远不能满足需求。)。
IO多路复用
-
用一个进程/线程处理多个TCP连接,减少系统开销。
-
三种模型:select(1024)、poll(数千,可改)和epoll(百万)。
一、IO多路复用-select模型
1. select模型(上)
网络通讯-读事件 1)已连接队列中有已经准备好的socket(有新的客户端连上来) 2)接收缓存中有数据可以读(对端发送的报文已到达, 3)tcp连接已断开(对端调用close()函数关闭了连接)
网络通讯-写事件 发送缓冲区没有满,可以写入数据(可以向对端发送报文)。
fd_set 本质是32个int型的数组(int[32]),那么32X4X8=1024位,这就是bitmap(位图)。
初始化全为0(没有画出),加入的socket为3 4 6位置将变为1。C语言有四个宏操作位图:
① 用于把socket从位图中删除。
② 判断socket是否在位图中。
③ 用于socket加入位图中。
④ 初始化位图,1024个位置置为0空。
细节:调用select函数有事件发生的时候,会改变bitmap,所以select前需要将bitmap复制一份(备份)tmpfds,对备份进行select。
2. select模型的细节(下)
select模型-写事件
-
如果tcp的发送缓冲区没有满,那么,socket连接是可写的(select函数不阻塞,立即返回写事件socket)。
-
一般来说,发送缓冲区不容易被填满。
-
如果发送的数据量太大,或网络带宽不够,发送缓冲区有填满的可能。
select模型-水平触发
-
select0)监视的socket如果发生了事件,select()会返回(通知应用程序处理事件)。
-
如果事件没有被处理,再次调用select())的时候会立即再通知。
-
如果数据没处理完,那么select会立即触发再次通知。
select模型-性能测试
每个客户端for20w个报文进行send,设置脚本同时启动五个客户端(100w个报文),用了8秒处理完。结论:每秒钟处理12w个业务请求,效率比多进程多线程快很多。
select模型-存在的问题
-
采用轮询方式扫描bitmap,"性能会随着socket数量增多而下降。
-
每次调用 select(),select里面会修改bitmap,需要拷贝bitmap。
-
程序运行在用户态,网络通信在内核,调用select会将bitmap从用户态拷贝到内核态,bitmap被拷贝两次,如果每秒要拷贝很多次没开销也比较大。
-
bitmap的大小(单个进/线程打开的socket数量)由FDSETSIZE宏设置,默认是 1024 个,可以修改,但是,效率将更低。
代码实现:
/** 程序名:tcpselect.cpp,此程序用于演示采用select模型实现网络通讯的服务端。* 作者:张咸武
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/fcntl.h>// 初始化服务端的监听端口。
int initserver(int port);int main(int argc,char *argv[])
{if (argc != 2) { printf("usage: ./tcpselect port\n"); return -1; }// 初始化服务端用于监听的socket。int listensock = initserver(atoi(argv[1]));printf("listensock=%d\n",listensock);if (listensock < 0) { printf("initserver() failed.\n"); return -1; }// 读事件:1)已连接队列中有已经准备好的socket(有新的客户端连上来了);// 2)接收缓存中有数据可以读(对端发送的报文已到达);// 3)tcp连接已断开(对端调用close()函数关闭了连接)。// 写事件:发送缓冲区没有满,可以写入数据(可以向对端发送报文)。fd_set readfds; // 需要监视读事件的socket的集合,大小为16字节(1024位)的bitmap。FD_ZERO(&readfds); // 初始化readfds,把bitmap的每一位都置为0。FD_SET(listensock,&readfds); // 把服务端用于监听的socket加入readfds。int maxfd=listensock; // readfds中socket的最大值。while (true) // 事件循环。{// 用于表示超时时间的结构体。struct timeval timeout; timeout.tv_sec=10; // 秒timeout.tv_usec=0; // 微秒。fd_set tmpfds=readfds; // 在select()函数中,会修改bitmap,所以,要把readfds复制一份给tmpfds,再把tmpfds传给select()。// 调用select() 等待事件的发生(监视哪些socket发生了事件)。int infds=select(maxfd+1,&tmpfds,NULL,NULL,0); // 如果infds<0,表示调用select()失败。if (infds<0){perror("select() failed"); break;}// 如果infds==0,表示select()超时。if (infds==0){printf("select() timeout.\n"); continue;}// 如果infds>0,表示有事件发生,infds存放了已发生事件的个数。for (int eventfd=0;eventfd<=maxfd;eventfd++){if (FD_ISSET(eventfd,&tmpfds)==0) continue; // 如果eventfd在bitmap中的标志为0,表示它没有事件,continue// 如果发生事件的是listensock,表示已连接队列中有已经准备好的socket(有新的客户端连上来了)。if (eventfd==listensock){struct sockaddr_in client;socklen_t len = sizeof(client);int clientsock = accept(listensock,(struct sockaddr*)&client,&len);if (clientsock < 0) { perror("accept() failed"); continue; }printf ("accept client(socket=%d) ok.\n",clientsock);FD_SET(clientsock,&readfds); // 把bitmap中新连上来的客户端的标志位置为1。if (maxfd<clientsock) maxfd=clientsock; // 更新maxfd的值。}else{// 如果是客户端连接的socke有事件,表示接收缓存中有数据可以读(对端发送的报文已到达),或者有客户端已断开连接。char buffer[1024]; // 存放从接收缓冲区中读取的数据。memset(buffer,0,sizeof(buffer));if (recv(eventfd,buffer,sizeof(buffer),0)<=0){// 如果客户端的连接已断开。printf("client(eventfd=%d) disconnected.\n",eventfd);close(eventfd); // 关闭客户端的socketFD_CLR(eventfd,&readfds); // 把bitmap中已关闭客户端的标志位清空。if (eventfd == maxfd) // 重新计算maxfd的值,注意,只有当eventfd==maxfd时才需要计算。{for (int ii=maxfd;ii>0;ii--) // 从后面往前找。{if (FD_ISSET(ii,&readfds)){maxfd = ii; break;}}}}else{// 如果客户端有报文发过来。printf("recv(eventfd=%d):%s\n",eventfd,buffer);// 把接收到的报文内容原封不动的发回去。send(eventfd,buffer,strlen(buffer),0);}}}}return 0;
}// 初始化服务端的监听端口。
int initserver(int port)
{int sock = socket(AF_INET,SOCK_STREAM,0);if (sock < 0){perror("socket() failed"); return -1;}int opt = 1; unsigned int len = sizeof(opt);setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,len);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(port);if (bind(sock,(struct sockaddr *)&servaddr,sizeof(servaddr)) < 0 ){perror("bind() failed"); close(sock); return -1;}if (listen(sock,5) != 0 ){perror("listen() failed"); close(sock); return -1;}return sock;
}
client.cpp
// 网络通讯的客户端程序。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <time.h>int main(int argc, char *argv[])
{if (argc != 3){printf("usage:./client ip port\n"); return -1;}int sockfd;struct sockaddr_in servaddr;char buf[1024];if ((sockfd=socket(AF_INET,SOCK_STREAM,0))<0) { printf("socket() failed.\n"); return -1; }memset(&servaddr,0,sizeof(servaddr));servaddr.sin_family=AF_INET;servaddr.sin_port=htons(atoi(argv[2]));servaddr.sin_addr.s_addr=inet_addr(argv[1]);if (connect(sockfd, (struct sockaddr *)&servaddr,sizeof(servaddr)) != 0){printf("connect(%s:%s) failed.\n",argv[1],argv[2]); close(sockfd); return -1;}printf("connect ok.\n");// printf("开始时间:%d",time(0));for (int ii=0;ii<200000;ii++){// 从命令行输入内容。memset(buf,0,sizeof(buf));printf("please input:"); scanf("%s",buf);if (send(sockfd,buf,strlen(buf),0) <=0){ printf("write() failed.\n"); close(sockfd); return -1;}memset(buf,0,sizeof(buf));if (recv(sockfd,buf,sizeof(buf),0) <=0) { printf("read() failed.\n"); close(sockfd); return -1;}printf("recv:%s\n",buf);}// printf("结束时间:%d",time(0));
}
二、IO多路复用-poll模型
pollfd fds[2048] 结构体数组存放需要监视的socket(select模型使用fd_set readfds存放,bitmap大小1024),poll模型监视的范围自己定义,其中pollfd结构体定义如下:
struct pollfd{int fd; /* 需要监听的socket */short int events; /* 需要监听的事件 */short int revents; /* poll返回的事件 */};
对于结构体数组:
结构体数组 | 0 | 1 | 2 | 3 | 4 | 5 | 6 | ... | 2047 |
---|---|---|---|---|---|---|---|---|---|
方案一 | -1 | -1 | -1 | 3 | 4 | -1 | 6 | ... | -1 |
方案二 | 3 | 4 | 6 | -1 | -1 | -1 | -1 | ... | -1 |
方案二对数组的利用率更高,但是方案一写代码更方便,效率也更高,用第一种方法,把socket和数组的下标一一对应。
poll服务端思路:
// 初始化服务端用于监听的socket。
// 初始化数组,把全部的socket设置为-1,如果数组中的socket的值为-1,那么,poll将忽略它。
// 打算让poll监视listensock读事件。while(true)
{// 调用poll() 等待事件的发生(监视哪些socket发生了事件)。// 如果infds<=0,表示调用poll()失败或者超时。// 如果infds>0,表示有事件发生,infds存放了已发生事件的个数:遍历。for (int eventfd=0;eventfd<=maxfd;eventfd++){// 如果发生事件的是listensock,表示已连接队列中有已经准备好的socket(有新的客户端连上来了)。// 将新连接的socket加入poll。// 如果是客户端连接的socke有事件,表示有报文发过来了或者连接已断开。// 如果客户端的连接已断开。// 如果客户端有报文发过来。}
}
poll模型的:写事件、水平触发、性能测试、存在的问题。与select模型是一样的。
poll模型-存在的问题
-
在程序中,poll的数据结构是数组,传入内核后转换成了链表。select用bitmap存放用于监 视的socket。
-
每调用一次select()需要拷贝两次bitmap(把bitmap拷贝成临时的,然后把临时的拷贝到内核态),poll拷贝一次结构体数组。
-
poI监视的连接数没有1024的限制,但是,也是遍历的方法,监视的socket越多,效率越低。
select与poll差别不大,本质上没多大区别。
poll服务端代码:
/** 程序名:tcppoll.cpp,此程序用于演示采用poll模型实现网络通讯的服务端。* 作者:张咸武
*/
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <poll.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/fcntl.h>// 初始化服务端的监听端口。
int initserver(int port);int main(int argc,char *argv[])
{if (argc != 2) { printf("usage: ./tcppoll port\n"); return -1; }// 初始化服务端用于监听的socket。int listensock = initserver(atoi(argv[1]));printf("listensock=%d\n",listensock);if (listensock < 0) { printf("initserver() failed.\n"); return -1; }pollfd fds[2048]; // fds存放需要监视的socket。// 初始化数组,把全部的socket设置为-1,如果数组中的socket的值为-1,那么,poll将忽略它。for (int ii=0;ii<2048;ii++) fds[ii].fd=-1; // 打算让poll监视listensock读事件。fds[listensock].fd=listensock;fds[listensock].events=POLLIN; // POLLIN表示读事件,POLLOUT表示写事件。// fds[listensock].events=POLLIN|POLLOUT;int maxfd=listensock; // fds数组中需要监视的socket的实际大小。while (true) // 事件循环。{// 调用poll() 等待事件的发生(监视哪些socket发生了事件)。int infds=poll(fds,maxfd+1,10000); // 超时时间为10秒。// 如果infds<0,表示调用poll()失败。if (infds < 0){perror("poll() failed"); break;}// 如果infds==0,表示poll()超时。if (infds == 0){printf("poll() timeout.\n"); continue;}// 如果infds>0,表示有事件发生,infds存放了已发生事件的个数。for (int eventfd=0;eventfd<=maxfd;eventfd++){if (fds[eventfd].fd<0) continue; // 如果fd为负,忽略它。if ((fds[eventfd].revents&POLLIN)==0) continue; // 如果没有读事件,continue// 如果发生事件的是listensock,表示已连接队列中有已经准备好的socket(有新的客户端连上来了)。if (eventfd==listensock){struct sockaddr_in client;socklen_t len = sizeof(client);int clientsock = accept(listensock,(struct sockaddr*)&client,&len);if (clientsock < 0) { perror("accept() failed"); continue; }printf ("accept client(socket=%d) ok.\n",clientsock);// 修改fds数组中clientsock位置的元素。fds[clientsock].fd=clientsock;fds[clientsock].events=POLLIN;if (maxfd<clientsock) maxfd=clientsock; // 更新maxfd的值。}else{// 如果是客户端连接的socke有事件,表示有报文发过来了或者连接已断开。char buffer[1024]; // 存放从客户端读取的数据。memset(buffer,0,sizeof(buffer));if (recv(eventfd,buffer,sizeof(buffer),0)<=0){// 如果客户端的连接已断开。printf("client(eventfd=%d) disconnected.\n",eventfd);close(eventfd); // 关闭客户端的socket。fds[eventfd].fd=-1; // 修改fds数组中clientsock位置的元素,置为-1,poll将忽略该元素。// 重新计算maxfd的值,注意,只有当eventfd==maxfd时才需要计算。if (eventfd == maxfd){for (int ii=maxfd;ii>0;ii--) // 从后面往前找。{if (fds[ii].fd!=-1){maxfd = ii; break;}}}}else{// 如果客户端有报文发过来。printf("recv(eventfd=%d):%s\n",eventfd,buffer);send(eventfd,buffer,strlen(buffer),0);}}}}return 0;
}// 初始化服务端的监听端口。
int initserver(int port)
{int sock = socket(AF_INET,SOCK_STREAM,0);if (sock < 0){perror("socket() failed"); return -1;}int opt = 1; unsigned int len = sizeof(opt);setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&opt,len);struct sockaddr_in servaddr;servaddr.sin_family = AF_INET;servaddr.sin_addr.s_addr = htonl(INADDR_ANY);servaddr.sin_port = htons(port);if (bind(sock,(struct sockaddr *)&servaddr,sizeof(servaddr)) < 0 ){perror("bind() failed"); close(sock); return -1;}if (listen(sock,5) != 0 ){perror("listen() failed"); close(sock); return -1;}return sock;
}
客户端与select模型的一样。