概述

select,poll,epoll 都是 IO 多路复用的机制。I/O 多路复用就是通过一种机制,可以监视多个描述符(socket, file, tunnel),一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

select 有 3 个缺点:

  1. 连接数受限,可以通过修改宏定义甚至重新编译内核的方式提升这一限制。
  2. 查找配对速度慢,调用select()会对所有socket进行一次线性扫描。
  3. 数据由内核拷贝到用户态

!! poll改善了第一个缺点
!!!epoll改了三个缺点.

进一步解析

两种触发

select实现

select的调用过程如下所示:

select_activity.png

  1. 使用 copy_from_user 从用户空间拷贝 fd_set 到内核空间
  2. 注册回调函数 __pollwait
  3. 遍历所有 fd,调用其对应的 poll 方法(对于 socket,这个 poll 方法是 sock_pollsock_poll 根据情况会调用到 tcp_poll , udp_poll 或者 datagram_poll
  4. tcp_poll 为例,其核心实现就是 __pollwait,也就是上面注册的回调函数。
  5. __pollwait 的主要工作就是把 current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于 tcp_poll 来说,其等待队列是 sk->sk_sleep (注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时 current 便被唤醒了。
  6. poll 方法返回时会返回一个描述读写操作是否就绪的 mask 掩码,根据这个 mask 掩码给 fd_set 赋值。
  7. 如果遍历完所有的 fd,还没有返回一个可读写的 mask 掩码,则会调用 schedule_timeout 是调用 select 的进程(也就是 current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout 指定),还是没人唤醒,则调用 select 的进程会重新被唤醒获得CPU,进而重新遍历 fd,判断有没有就绪的 fd
  8. fd_set 从内核空间拷贝到用户空间。

poll实现

poll 的实现和 select 非常相似,只是描述 fd 集合的方式不同,poll 使用 pollfd 结构而不是 selectfd_set 结构,其他的都差不多。

epoll

相比于 selectpollepoll 的改进可以归结为 2 点:

  1. epollpoll 一样只告知那些就绪的文件描述符,而且当我们调用 epoll_wait() 获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去 epoll 指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销

  2. 另一个本质的改进在于 epoll 采用基于事件的就绪通知方式。在 select/poll 中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而 epoll 事先通过 epoll_ctl() 来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似 callback 的回调机制,迅速激活这个文件描述符,当进程调用 epoll_wait() 时便得到通知

epoll 既然是对 selectpoll 的改进,就应该能避免上述的三个缺点。那 epoll 都是怎么解决的呢?在此之前,我们先看一下 epollselectpoll 的调用接口上的不同,selectpoll 都只提供了一个函数—— select 或者 poll 函数。而epoll 提供了三个函数,epoll_create, epoll_ctlepoll_waitepoll_create 是创建一个 epoll 句柄;epoll_ctl 是注册要监听的事件类型;epoll_wait 则是等待事件的产生。

  1. 对于第一个缺点,epoll 的解决方案在 epoll_ctl 函数中。每次注册新的事件到 epoll 句柄中时(在 epoll_ctl 中指定 EPOLL_CTL_ADD),会把所有的 fd 拷贝进内核,而不是在 epoll_wait 的时候重复拷贝。epoll 保证了每个 fd 在整个过程中只会拷贝一次。

  2. 对于第二个缺点,epoll 的解决方案不像 selectpoll 一样每次都把 current 轮流加入 fd 对应的设备等待队列中,而只在 epoll_ctl 时把 current 挂一遍(这一遍必不可少)并为每个 fd 指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait 的工作实际上就是在这个就绪链表中查看有没有就绪的 fd(利用 schedule_timeout()实现睡一会,判断一会的效果,和 select 实现中的第7步是类似的)。

  3. 对于第三个缺点,epoll 没有这个限制,它所支持的 fd 上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以 cat /proc/sys/fs/file-max 察看,一般来说这个数目和系统内存关系很大。

总结:

  1. selectpoll 实现需要自己不断轮询所有 fd 集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而 epoll 其实也需要调用 epoll_wait 不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪 fd 放入就绪链表中,并唤醒在 epoll_wait 中进入睡眠的进程。虽然都要睡眠和交替,但是 selectpoll 在“醒着”的时候要遍历整个 fd 集合,而 epoll 在“醒着”的时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间。这就是回调机制带来的性能提升。

  2. selectpoll 每次调用都要把 fd 集合从用户态往内核态拷贝一次,并且要把 current 往设备等待队列中挂一次,而 epoll 只要一次拷贝,而且把 current 往等待队列上挂也只挂一次(在 epoll_wait 的开始,注意这里的等待队列并不是设备等待队列,只是一个 epoll 内部定义的等待队列)。这也能节省不少的开销。

代码示例

select

这里为了来个实践,就以 python 语言为例,给个示例,因为 selectpoll 差不多,这里就只给 select 的例子了,然后 epoll 也给一个,需要注意的是 epoll 只能在 Linux 的机器上使用,所以,重复实验的时候需要注意到这一点。

select_server.py

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. import sys
  4. import cPickle
  5. import struct
  6. import socket
  7. import select
  8. import logging
  9. from threading import Event
  10. host = '0.0.0.0'
  11. port = 12007
  12. class SelectServer(object):
  13. def __init__(self):
  14. self.stop = Event()
  15. self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  16. self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  17. self.server.bind((host, port))
  18. self.server.listen(10)
  19. def send(self, channel, *args):
  20. buffer = cPickle.dumps(args)
  21. value = socket.htonl(len(buffer))
  22. size = struct.pack('L', value)
  23. channel.send(size)
  24. channel.send(buffer)
  25. def receive(self, channel):
  26. size = struct.calcsize('L')
  27. size = channel.recv(size)
  28. try:
  29. size = socket.ntohl(struct.unpack("L", size)[0])
  30. except struct.error, e:
  31. logging.error(e)
  32. return ''
  33. buf = ''
  34. while len(buf) < size:
  35. buf += channel.recv(size-len(buf))
  36. return cPickle.loads(buf)[0]
  37. def stop_server(self):
  38. for output in self.outputs:
  39. output.close()
  40. self.stop.set()
  41. def run(self):
  42. inputs = [self.server, sys.stdin]
  43. self.outputs = []
  44. while not self.stop.is_set():
  45. try:
  46. readable, writeable, exceptional = select.select(
  47. inputs, self.outputs, [])
  48. except select.error, e:
  49. logging.error(e)
  50. break
  51. for sock in readable:
  52. if sock == self.server:
  53. client, address = self.server.accept()
  54. text = self.receive(client)
  55. self.send(client, text)
  56. print("receive {} from {}".format(text, address))
  57. self.outputs.append(client)
  58. inputs.append(client)
  59. elif sock == sys.stdin:
  60. junk = sys.stdin.readline()
  61. if junk.strip() == 'stop':
  62. self.stop_server()
  63. else:
  64. try:
  65. data = self.receive(sock)
  66. if data:
  67. print("receive {} from {}".format(data, sock))
  68. self.send(sock, data)
  69. else:
  70. sock.close()
  71. inputs.remove(sock)
  72. self.outputs.remove(sock)
  73. except socket.error, e:
  74. logging.error(e)
  75. inputs.remove(sock)
  76. self.outputs.remove(sock)
  77. self.server.close()
  78. if __name__ == "__main__":
  79. SelectServer().run()

select_client.py

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. import time
  4. import logging
  5. import random
  6. import struct
  7. import cPickle
  8. import socket
  9. host = '0.0.0.0'
  10. port = 12007
  11. class SelectClient(object):
  12. def __init__(self):
  13. self.clients = []
  14. self.connections = 10
  15. def send(self, channel, *args):
  16. buffer = cPickle.dumps(args)
  17. value = socket.htonl(len(buffer))
  18. size = struct.pack('L', value)
  19. channel.send(size)
  20. channel.send(buffer)
  21. def receive(self, channel):
  22. size = struct.calcsize('L')
  23. size = channel.recv(size)
  24. try:
  25. size = socket.socket.ntohl(struct.unpack("L", size)[0])
  26. except struct.error, e:
  27. logging.error(e)
  28. return ''
  29. buf = ''
  30. while len(buf) < size:
  31. buf += channel.recv(size-len(buf))
  32. return cPickle.loads(buf)[0]
  33. def run(self):
  34. for i in xrange(self.connections):
  35. client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  36. client.connect((host, port))
  37. self.clients.append(client)
  38. for i in xrange(100):
  39. idx = random.randint(0, 9)
  40. self.send(self.clients[idx], 'message {}'.format(i))
  41. time.sleep(10)
  42. for client in self.clients:
  43. client.close()
  44. if __name__ == '__main__':
  45. SelectClient().run()

这一对 clientserver 比较简单,就是服务器从客户端接收消息,然后再把消息打印出来,而客户端就 10 个客户端随机选择出来发送消息。如果不使用 select 的话,我们以前的通用操作方法可能是使用 threading 来进行多条线程监控多个 client,但是,这里使用 select 的话可以很简单得处理很多 client

epoll_server.py

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. import socket
  4. import select
  5. from threading import Event
  6. host = "0.0.0.0"
  7. port = 12008
  8. class EpollServer(object):
  9. def __init__(self):
  10. self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  11. self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  12. self.server.bind((host, port))
  13. self.server.listen(1)
  14. self.server.setblocking(0)
  15. self.server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
  16. self.epoll = select.epoll()
  17. self.epoll.register(self.server.fileno(), select.EPOLLIN)
  18. self.stop = Event()
  19. def run(self):
  20. try:
  21. conns = {}
  22. reqs = {}
  23. resps = {}
  24. while not self.stop.is_set():
  25. events = self.epoll.poll(1)
  26. for fileno, event in events:
  27. if fileno == self.server.fileno():
  28. conn, addr = self.server.accept()
  29. conn.setblocking(0)
  30. self.epoll.register(conn.fileno(), select.EPOLLIN)
  31. conns[conn.fileno()] = conn
  32. reqs[conn.fileno()] = b''
  33. resps[conn.fileno()] = "hello world"
  34. elif event & select.EPOLLIN:
  35. data = conns[fileno].recv(1024)
  36. print "receive data {} from {}".format(data, fileno)
  37. reqs[fileno] += data
  38. if len(data) < 1024:
  39. self.epoll.modify(fileno, select.EPOLLOUT)
  40. elif event & select.EPOLLOUT:
  41. bytes_writen = conns[fileno].send(resps[fileno])
  42. resps[fileno] = resps[fileno][bytes_writen:]
  43. if len(resps[fileno]) == 0:
  44. self.epoll.modify(fileno, select.EPOLLIN)
  45. elif event & select.EPOLLHUP:
  46. self.epoll.unregister(fileno)
  47. conns[fileno].close()
  48. del conns[fileno]
  49. finally:
  50. self.epoll.unregister(self.server.fileno())
  51. self.epoll.close()
  52. self.server.close()
  53. if __name__ == "__main__":
  54. server = EpollServer()
  55. server.run()

epoll_client.py

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. import time
  4. import random
  5. import socket
  6. host = '0.0.0.0'
  7. port = 12008
  8. class SelectClient(object):
  9. def __init__(self):
  10. self.clients = []
  11. self.connections = 10
  12. def send(self, channel, data):
  13. channel.send(data)
  14. def receive(self, channel):
  15. pass
  16. def run(self):
  17. for i in xrange(self.connections):
  18. client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  19. client.connect((host, port))
  20. self.clients.append(client)
  21. for i in xrange(100):
  22. idx = random.randint(0, 9)
  23. self.send(self.clients[idx], 'message {}'.format(i))
  24. time.sleep(3)
  25. for client in self.clients:
  26. client.close()
  27. if __name__ == '__main__':
  28. SelectClient().run()

这里的 epollselect 的使用方式就不一样了,这里不再是传递一个 fd 列表了,而是给每个 fd register 一个关注的事件,是入事件还是出事件,然后根据 fdevent 来处理。需要注意的是,这里的 Python 代码和前面所说的底层实现有一点差别,例如没有 epoll_create 函数之类的,反之,而是 epoll.registerepoll.poll 之类的。

Reference