zl程序教程

您现在的位置是:首页 >  Java

当前栏目

白话IO多路复用技术

2023-02-18 16:23:08 时间

阿巩

日拱一卒,我们开始吧!

这里引述知乎大佬对于IO多路复用的机场空管的比喻:

假设你是一个机场的空管,你需要管理到你机场的所有的航线,包括进港、出港,有些航班需要放到停机坪等待,有些航班需要去登机口接乘客。你会怎么做呢?

首先最简单的做法,就是你去招一大批空管员,然后每人盯一架飞机, 从进港,接客,排位,出港,航线监控,直至交接给下一个空港,全程监控。那么问题就来了:

  • 很快你就发现空管塔里面聚集起来一大票的空管员,交通稍微繁忙一点,新的空管员就已经挤不进来了。
  • 空管员之间需要协调,屋子里面就1, 2个人的时候还好,几十号人以后 ,基本上就成菜市场了。
  • 空管员经常需要更新一些公用的东西,比如起飞显示屏,比如下一个小时后的出港排期,最后你会很惊奇的发现,每个人的时间最后都花在了抢这些资源上。

解决方法是他们用flight progress strip ,其中每一个块代表一个航班,不同的槽代表不同的状态,然后一个空管员可以管理一组这样的块(一组航班),而他的工作,就是在航班信息有新的更新的时候,把对应的块放到不同的槽子里面。

如果我们把每一个航线当成一个socket(I/O 流), 空管当成服务端socket管理代码的话:第一种方法就是最传统的多进程并发模型,每进来一个新的I/O流会分配一个新的进程管理。

第二种方法就是IO多路复用:

I/O多路复用 (单个线程,通过记录跟踪每个I/O流(socket)的状态,来同时管理多个I/O流 )。在Nginx中会有很多链接进来, epoll会把他们都监视起来,然后像拨开关一样,谁有数据就拨向谁,然后调用相应的代码处理。

注:每个socket就是一个I/O流,服务端只会监听一个端口,每次来了新的请求,都会创建一个新的socket和客户端通信。

当多个客户端与服务器通信时,若服务器阻塞在其中一个客户的read(sockfd1,…),当另一个客户数据到达sockfd2时,服务器无法及时处理,此时需要用到IO多路复用。即同时监听n个客户,当其中有一个发来消息时就从select的阻塞中返回,然后调用read读取收到消息的sockfd,然后又循环回select阻塞。这样就解决了阻塞在一个消息而无法处理其它的。即用来解决对多个I/O监听时,一个I/O阻塞影响其他I/O的问题。

那IO多路复用如何实现呢?select, poll, epoll 都是I/O多路复用的具体的实现。

select特点:

  • 单个进程所打开的FD是有限制的,通过FD_SETSIZE设置,默认1024。
  • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大。
  • 对socket扫描时是线性扫描,采用轮询的方法,效率较低(高并发时)。
  • select 不是线程安全的。

poll特点:

  • poll和select是非常相似的,poll相对于select的优化仅仅在于解决了文件描述符不能超过1024个的限制。
  • select和poll都会随着监控的文件描述符增加而出现性能下降,因此不适合高并发场景。

epoll 特点:

epoll 修复了poll 和select绝大部分问题, 比如:

  • epoll 现在是线程安全的。
  • epoll 现在不仅告诉你socket组里面数据,还会告诉你具体哪个socket有数据,你不用自己去找了。
  • 不过缺点是epoll只能工作在linux下

三者之间的区别如下:

select

poll

epoll

数据结构

bitmap

数组

红黑树

最大连接数

1024

无上限

无上限

fd拷贝

每次调用select拷贝

每次调用poll拷贝

fd首次调用epoll_ctl拷贝,每次调用epoll_wait不拷贝

工作效率

轮询:O(n)

轮询:O(n)

回调:O(1)

上面的比较分析,都是建立在大并发下面,如果你的并发数太少,用哪个其实没有区别。

epoll应用在Redis和Nginx中,这里实现一个简单的epoll函数接口:

#include <sys/epoll.h>
 
// 数据结构
// 每一个epoll对象都有一个独立的eventpoll结构体
// 用于存放通过epoll_ctl方法向epoll对象中添加进来的事件
// epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可
struct eventpoll {
    /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
    struct rb_root  rbr;
    /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
    struct list_head rdlist;
};
 
// API
 
int epoll_create(int size); // 内核中间加一个 ep 对象,把所有需要监听的 socket 都放到 ep 对象中
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epoll_ctl 负责把 socket 增加、删除到内核红黑树
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);// epoll_wait 负责检测可读队列,没有可读 socket 则阻塞进程
int main(int argc, char* argv[])
{
   /*
   * 在这里进行一些初始化的操作,
   * 比如初始化数据和socket等。
   */
    // 内核中创建ep对象
    epfd=epoll_create(256);
    // 需要监听的socket放到ep中
    epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
 
    while(1) {
      // 阻塞获取
      nfds = epoll_wait(epfd,events,20,0);
      for(i=0;i<nfds;++i) {
          if(events[i].data.fd==listenfd) {
              // 这里处理accept事件
              connfd = accept(listenfd);
              // 接收新连接写到内核对象中
              epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
          } else if (events[i].events&EPOLLIN) {
              // 这里处理read事件
              read(sockfd, BUF, MAXLINE);
              //读完后准备写
              epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
          } else if(events[i].events&EPOLLOUT) {
              // 这里处理write事件
              write(sockfd, BUF, n);
              //写完后准备读
              epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
          }
      }
    }
    return 0;
}

参考:

知乎 https://www.zhihu.com/question/32163005/answer/55772739

公众号文章 https://mp.weixin.qq.com/s/PzOF9lFYVacPIRx0JakTjA