epoll实现原理分析(转载)

epoll实现原理分析(转载)

epoll 是 Linux 平台下的一种特有的多路复用 IO 实现方式,与传统的 select、poll 相比,epoll 在性能上有很大的提升。

epoll 可以说是和 poll 非常相似的一种 I/O 多路复用技术,有些朋友将 epoll 归为异步 I/O,我觉得这是不正确的。本质上 epoll 还是一种 I/O 多路复用技术, epoll 通过监控注册的多个描述字,来进行 I/O 事件的分发处理。不同于 poll 的是,epoll 不仅提供了默认的 level-triggered(水平触发)机制,还提供了性能更为强劲的 edge-triggered(边缘触发)机制。

一、epoll 的用法

使用 epoll 进行网络程序的编写,需要三个步骤,分别是 epoll_create,epoll_ctl 和 epoll_wait。

epoll_create

1
2
3
int epoll_create(int size);
int epoll_create1(int flags);
返回值: 若成功返回一个大于0的值,表示 epoll 实例;若返回-1表示出错

epoll_create() 方法创建了一个 epoll 实例,参数 size 是由于历史原因遗留下来的,现在不起作用。从 Linux 2.6.8 开始,参数 size 被自动忽略,但是该值仍需要一个大于 0 的整数。这个 epoll 实例被用来调用 epoll_ctl 和 epoll_wait,如果这个 epoll 实例不再需要,比如服务器正常关机,需要调用 close() 方法释放 epoll 实例,这样系统内核可以回收 epoll 实例所分配使用的内核资源。

epoll_create1() 的用法和 epoll_create() 基本一致,如果 epoll_create1() 的输入 flags 为 0,则和 epoll_create() 一样,内核自动忽略。可以增加如 EPOLL_CLOEXEC 的额外选项。

epoll_ctl

1
2
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
返回值: 若成功返回0;若返回-1表示出错

在创建完 epoll 实例之后,可以通过调用 epoll_ctl 往这个 epoll 实例增加或删除对应文件描述符监控的事件。函数 epll_ctl 有 4 个入口参数。

第一个参数 epfd 是刚刚调用 epoll_create 创建的 epoll 实例描述字,可以简单理解成是 epoll 句柄。

第二个参数表示增加,删除还是修改一个监控事件,它有三个选项可供选择:

  • EPOLL_CTL_ADD:向 epoll 实例注册文件描述符对应的事件;
  • EPOLL_CTL_DEL:向 epoll 实例删除文件描述符对应的事件;
  • EPOLL_CTL_MOD:修改文件描述符对应的事件。

第三个参数是注册的事件的文件描述符,比如一个监听套接字。

第四个参数表示的是注册的事件类型,并且可以在这个结构体里设置用户需要的数据,其中最为常见的是使用联合结构里的 fd 字段,表示事件所对应的文件描述符。

1
2
3
4
5
6
7
8
9
10
11
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

这里 epoll 仍旧使用了基于 mask 的事件类型,我们重点看一下这几种事件类型:

  • EPOLLIN:表示对应的文件描述字可以读;
  • EPOLLOUT:表示对应的文件描述字可以写;
  • EPOLLRDHUP:表示套接字的一端已经关闭,或者半关闭;
  • EPOLLHUP:表示对应的文件描述字被挂起;
  • EPOLLET:设置为 edge-triggered(边缘触发),默认为 level-triggered(水平触发)。

epoll_wait

1
2
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
返回值: 成功返回的是一个大于0的数,表示事件的个数;返回0表示的是超时时间到;若出错返回-1.

epoll_wait() 函数类似之前的 poll 和 select 函数,调用者进程被挂起,在等待内核 I/O 事件的分发。

这个函数的第一个参数是 epoll 实例描述字,也就是 epoll 句柄。

第二个参数返回给用户空间需要处理的 I/O 事件,这是一个数组,数组的大小由 epoll_wait 的返回值决定,这个数组的每个元素都是一个需要待处理的 I/O 事件,其中 epoll_event.events 表示具体的事件类型,事件类型取值和 epoll_ctl 可设置的值一样,这个 epoll_event 结构体里的 data 值就是在 epoll_ctl 那里设置的 data,也就是用户空间和内核空间调用时需要的数据。

第三个参数是一个大于 0 的整数,表示 epoll_wait 可以返回的最大事件值。

第四个参数是 epoll_wait 阻塞调用的超时值,如果这个值设置为 -1,表示不超时;如果设置为 0 则立即返回,即使没有任何 I/O 事件发生。

二、epoll 实例

实例代码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93

#include "lib/common.h"

#define MAXEVENTS 128

// 旋转字符
char rot13_char(char c) {
if ((c >= 'a' && c <= 'm') || (c >= 'A' && c <= 'M'))
return c + 13;
else if ((c >= 'n' && c <= 'z') || (c >= 'N' && c <= 'Z'))
return c - 13;
else
return c;
}

int main(int argc, char **argv) {
int listen_fd, socket_fd;
int n, i;
int efd;
struct epoll_event event;
struct epoll_event *events;

listen_fd = tcp_nonblocking_server_listen(SERV_PORT);

efd = epoll_create1(0);
if (efd == -1) {
error(1, errno, "epoll create failed");
}

event.data.fd = listen_fd;
event.events = EPOLLIN | EPOLLET;
if (epoll_ctl(efd, EPOLL_CTL_ADD, listen_fd, &event) == -1) {
error(1, errno, "epoll_ctl add listen fd failed");
}

/* Buffer where events are returned */
events = calloc(MAXEVENTS, sizeof(event));

while (1) {
n = epoll_wait(efd, events, MAXEVENTS, -1);
printf("epoll_wait wakeup\n");
for (i = 0; i < n; i++) {
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
} else if (listen_fd == events[i].data.fd) {
struct sockaddr_storage ss;
socklen_t slen = sizeof(ss);
int fd = accept(listen_fd, (struct sockaddr *) &ss, &slen);
if (fd < 0) {
error(1, errno, "accept failed");
} else {
make_nonblocking(fd);
event.data.fd = fd;
event.events = EPOLLIN | EPOLLET; //edge-triggered
if (epoll_ctl(efd, EPOLL_CTL_ADD, fd, &event) == -1) {
error(1, errno, "epoll_ctl add connection fd failed");
}
}
continue;
} else {
socket_fd = events[i].data.fd;
printf("get event on socket fd == %d \n", socket_fd);
while (1) {
char buf[512];
if ((n = read(socket_fd, buf, sizeof(buf))) < 0) {
if (errno != EAGAIN) {
error(1, errno, "read error");
close(socket_fd);
}
break;
} else if (n == 0) {
close(socket_fd);
break;
} else {
for (i = 0; i < n; ++i) {
buf[i] = rot13_char(buf[i]);
}
if (write(socket_fd, buf, n) < 0) {
error(1, errno, "write error");
}
}
}
}
}
}

free(events);
close(listen_fd);
}

程序的第 116 行调用 epoll_create1(0) 创建了一个 epoll 实例。

第 121-125 行,调用 epoll_ctl 将监听套接字对应的 I/O 事件进行了注册,这样在有新的连接建立之后,就可以感知到。注意这里使用的是 edge-triggered(边缘触发)

第 128 行为返回的 event 数组使用 calloc 分配了内存。

主循环调用 epoll_wait 函数分发 I/O 事件,当 epoll_wait 成功返回时,通过遍历返回的 event 数组,就直接可以知道发生的 I/O 事件。

第 134-139 行判断了返回的事件各种错误情况。

第 140-154 行是监听套接字上有事件发生的情况下,调用 accept 获取已建立连接,并将该连接设置为非阻塞,再调用 epoll_ctl 把已连接套接字对应的可读事件注册到 epoll 实例中。这里我们使用了 event_data 里面的 fd 字段,将连接套接字存储其中。

第 156-177 行,处理了已连接套接字上的可读事件,读取字节流,编码后再回应给客户端。

实验

启动 epoll 服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
$./epoll01
epoll_wait wakeup
epoll_wait wakeup
epoll_wait wakeup
get event on socket fd == 6
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 6
epoll_wait wakeup
get event on socket fd == 6
epoll_wait wakeup
get event on socket fd == 6
epoll_wait wakeup
get event on socket fd == 5

再启动几个 telnet 客户端,可以看到有连接建立情况下,epoll_wait 迅速从挂起状态结束;并且套接字上有数据可读时,epoll_wait 也迅速结束挂起状态,这时候通过 read 可以读取套接字接收缓冲区上的数据。

1
2
3
4
5
6
7
8
9
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
fasfsafas
snfsfnsnf
^]
telnet> quit
Connection closed.

edge-triggered VS level-triggered

对于 edge-triggered 和 level-triggered,一个是边缘触发,一个是水平触发。也有文章从电子脉冲角度来解读的,总体上,给初学者的带来的感受是理解上有困难。

这里有两个程序,我们用这个程序来说明一下这两者之间的不同。

在这两个程序里,即使已连接套接字上有数据可读,我们也不调用 read 函数去读,只是简单地打印出一句话。

第一个程序我们设置为 edge-triggered,即边缘触发。开启这个服务器程序,用 telnet 连接上,输入一些字符,我们看到,服务器端只从 epoll_wait 中苏醒过一次,就是第一次有数据可读的时候。

1
2
3
4
$./epoll02
epoll_wait wakeup
epoll_wait wakeup
get event on socket fd == 5
1
2
3
4
5
$telnet 127.0.0.1 43211
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
asfafas

第二个程序我们设置为 level-triggered,即水平触发。然后按照同样的步骤来一次,观察服务器端,这一次我们可以看到,服务器端不断地从 epoll_wait 中苏醒,告诉我们有数据需要读取。

1
2
3
4
5
6
7
8
9
10
11
$./epoll03
epoll_wait wakeup
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 5
epoll_wait wakeup
get event on socket fd == 5
...

这就是两者的区别,水平触发的意思是只要满足事件的条件,比如有数据需要读,就一直不断地把这个事件传递给用户;而边缘触发的意思是只有第一次满足条件的时候才触发,之后就不会再传递同样的事件了。

一般我们认为,边缘触发的效率比条件触发的效率要高,这一点也是 epoll 的杀手锏之一。

三、底层实现分析

基本的数据结构

在开始研究源代码之前,我们先看一下 epoll 中使用的数据结构,分别是 eventpoll、epitem 和 eppoll_entry。

我们先看一下 eventpoll 这个数据结构,这个数据结构是我们在调用 epoll_create 之后内核侧创建的一个句柄,表示了一个 epoll 实例。后续如果我们再调用 epoll_ctl 和 epoll_wait 等,都是对这个 eventpoll 数据进行操作,这部分数据会被保存在 epoll_create 创建的匿名文件 file 的 private_data 字段中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/*
* This structure is stored inside the "private_data" member of the file
* structure and represents the main data structure for the eventpoll
* interface.
*/
struct eventpoll {
/* Protect the access to this structure */
spinlock_t lock;

/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/

/* 添加、修改或者删除监听 fd 的时候,以及 epoll_wait 返回,向用户空间
* 传递数据时都会持有这个互斥锁,所以在用户空间可以放心的在多个线程
* 中同时执行 epoll 相关的操作,内核级已经做了保护。
*/
struct mutex mtx;

/* Wait queue used by sys_epoll_wait() */
//这个队列里存放的是执行epoll_wait从而等待的进程队列
wait_queue_head_t wq;

/* Wait queue used by file->poll() */
//这个队列里存放的是该eventloop作为poll对象的一个实例,加入到等待的队列
//这是因为eventpoll本身也是一个file, 所以也会有poll操作
wait_queue_head_t poll_wait;

/* List of ready file descriptors */
//这里存放的是事件就绪的fd列表,链表的每个元素是下面的epitem
struct list_head rdllist;

/* RB tree root used to store monitored fd structs */
//这是用来快速查找fd的红黑树
struct rb_root_cached rbr;

/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;

/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws;

/* The user that created the eventpoll descriptor */
struct user_struct *user;

//这是eventloop对应的匿名文件,充分体现了Linux下一切皆文件的思想
struct file *file;

/* used to optimize loop detection check */
int visited;
struct list_head visited_list_link;

#ifdef CONFIG_NET_RX_BUSY_POLL
/* used to track busy poll napi_id */
unsigned int napi_id;
#endif
};

下图展示了 eventpoll 对象与被监听的文件关系:

每当我们调用 epoll_ctl 增加一个 fd 时,内核就会为我们创建出一个 epitem 实例,并且把这个实例作为红黑树的一个子节点,增加到 eventpoll 结构体中的红黑树中,对应的字段是 rbr。这之后,查找每一个 fd 上是否有事件发生都是通过红黑树上的 epitem 来操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

/*
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the "rbr" RB tree.
* Avoid increasing the size of this struct, there can be many thousands
* of these on a server and we do not want this to take another cache line.
*/
struct epitem {
union {
/* RB tree node links this structure to the eventpoll RB tree */
/* rb_node,当使用 epoll_ctl() 将一批 fds 加入到某个 epollfd 时,内核会分配
* 一批的 epitem 与 fds 们对应,而且它们以 rb_tree 的形式组织起来,tree 的 root
* 保存在 epollfd,也就是 struct eventpoll 中。
* 在这里使用 rb_tree 的原因我认为是提高查找、插入以及删除的速度。
* rb_tree 对以上3个操作都具有O(logN)的时间复杂度 */
struct rb_node rbn;
/* Used to free the struct epitem */
struct rcu_head rcu;
};

/* List header used to link this structure to the eventpoll ready list */
//将这个epitem连接到eventpoll 里面的rdllist的list指针
struct list_head rdllink;

/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem *next;

/* The file descriptor information this item refers to */
//epoll监听的fd
struct epoll_filefd ffd;

/* Number of active wait queue attached to poll operations */
//一个文件可以被多个epoll实例所监听,这里就记录了当前文件被监听的次数
int nwait;

/* List containing poll wait queues */
struct list_head pwqlist;

/* The "container" of this item */
//当前epollitem所属的eventpoll
struct eventpoll *ep;

/* List header used to link this item to the "struct file" items list */
struct list_head fllink;

/* wakeup_source used when EPOLLWAKEUP is set */
struct wakeup_source __rcu *ws;

/* The structure that describe the interested events and the source fd */
/* 当前的 epitem 关心哪些 events,这个数据是调用 epoll_ctl 时从用户态传递过来 */
struct epoll_event event;
};

每次当一个 fd 关联到一个 epoll 实例,就会有一个 eppoll_entry 产生。eppoll_entry 的结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

/* Wait structure used by the poll hooks */
/* poll 所用到的钩子 */
struct eppoll_entry {
/* List header used to link this structure to the "struct epitem" */
struct list_head llink;

/* The "base" pointer is set to the container "struct epitem" */
struct epitem *base;

/*
* Wait queue item that will be linked to the target file wait
* queue head.
*/
wait_queue_entry_t wait;

/* The wait queue head that linked the "wait" wait queue item */
wait_queue_head_t *whead;
};

epoll_create

我们在使用 epoll 的时候,首先会调用 epoll_create 来创建一个 epoll 实例。这个函数是如何工作的呢?

首先,epoll_create 会对传入的 flags 参数做简单的验证。

1
2
3
4
5
6
/* Check the EPOLL_* constant for consistency.  */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);

if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*

接下来,内核申请分配 eventpoll 需要的内存空间。

1
2
3
4
5
/* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);
if (error < 0)
return error;

在接下来,epoll_create 为 epoll 实例分配了匿名文件和文件描述字,其中 fd 是文件描述字,file 是一个匿名文件。这里充分体现了 UNIX 下一切都是文件的思想。注意,eventpoll 的实例会保存一份匿名文件的引用,通过调用 fd_install 函数将匿名文件和文件描述字完成了绑定。

最后,这个文件描述字作为 epoll 的文件句柄,被返回给 epoll_create 的调用者。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0) {
error = fd;
goto out_free_ep;
}

/*
* 这里是创建一个匿名 fd。
* epollfd 本身并不存在一个真正的文件与之对应, 所以内核需要创建一个
* "虚拟"的文件,并为之分配真正的 struct file 结构,而且有真正的 fd。
* 这里2个参数比较关键:
* eventpoll_fops、fops 就是 file operations,就是当你对这个文件(这里是虚拟的)进行操作(比如读)时,
* fops 里面的函数指针指向真正的操作实现,类似 C++ 里面虚函数和子类的概念。
* epoll 只实现了 poll 和 release(就是close) 操作,其它文件系统操作都由 VFS 全权处理了。
* ep、ep 就是 struct epollevent,它会作为一个私有数据保存在 struct file 的 private 指针里面。
* 其实说白了,就是为了能通过 fd 找到 struct file,通过 struct file 能找到 eventpoll 结构。
* 如果懂一点 Linux 下字符设备驱动开发,这里应该是很好理解的,推荐阅读《Linux device driver 3rd》
*/
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file)) {
error = PTR_ERR(file);
goto out_free_fd;
}
ep->file = file;
fd_install(fd, file);
return fd;

epoll_ctl

查找 epoll 实例

首先,epoll_ctl 函数通过 epoll 实例句柄来获得对应的匿名文件,这一点很好理解,UNIX 下一切都是文件,epoll 的实例也是一个匿名文件。

1
2
3
4
//获得epoll实例对应的匿名文件
f = fdget(epfd);
if (!f.file)
goto error_return;

接下来,获得添加的套接字对应的文件,这里 tf 表示的是 target file,即待处理的目标文件。

1
2
3
4
5
6
/* Get the "struct file *" for the target file */
/* 我们需要监听的fd, 它当然也有个 struct file 结构, 上下2个不要搞混了 */
//获得真正的文件,如监听套接字、读写套接字
tf = fdget(fd);
if (!tf.file)
goto error_fput;

再接下来,进行了一系列的数据验证,以保证用户传入的参数是合法的,比如 epfd 真的是一个 epoll 实例句柄,而不是一个普通文件描述符。

1
2
3
4
5
6
/* The target file descriptor must support poll */
//如果不支持poll,那么该文件描述字是无效的
error = -EPERM;
if (!tf.file->f_op->poll)
goto error_tgt_fput;
...

如果获得了一个真正的 epoll 实例句柄,就可以通过 private_data 获取之前创建的 eventpoll 实例了。

1
2
3
4
5
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = f.file->private_data;

红黑树查找

接下来 epoll_ctl 通过目标文件和对应描述字,在红黑树中查找是否存在该套接字,这也是 epoll 为什么高效的地方。红黑树(RB-tree)是一种常见的数据结构,这里 eventpoll 通过红黑树跟踪了当前监听的所有文件描述字,而这棵树的根就保存在 eventpoll 数据结构中。

1
2
/* RB tree root used to store monitored fd structs */
struct rb_root_cached rbr;

对于每个被监听的文件描述字,都有一个对应的 epitem 与之对应,epitem 作为红黑树中的节点就保存在红黑树中。

1
2
3
4
5
6
7
8
9
10
11
12
/*
* Try to lookup the file inside our RB tree, Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/

/* 对于每一个监听的fd,内核都有分配一个 epitem 结构,
* 而且我们也知道,epoll 是不允许重复添加 fd 的,
* 所以我们首先查找该 fd 是不是已经存在了。
* ep_find() 其实就是 RBTREE 查找,跟 C++STL 的 map 差不多一回事,O(logN)的时间复杂度。
*/
epi = ep_find(ep, tf.file, fd);

红黑树是一棵二叉树,作为二叉树上的节点,epitem 必须提供比较能力,以便可以按大小顺序构建出一棵有序的二叉树。其排序能力是依靠 epoll_filefd 结构体来完成的,epoll_filefd 可以简单理解为需要监听的文件描述字,它对应到二叉树上的节点。

可以看到这个还是比较好理解的,按照文件的地址大小排序。如果两个相同,就按照文件描述字来排序。

1
2
3
4
5
6
7
8
9
10
11
12
struct epoll_filefd {
struct file *file; // pointer to the target file struct corresponding to the fd
int fd; // target file descriptor number
} __packed;

/* Compare RB tree keys */
static inline int ep_cmp_ffd(struct epoll_filefd *p1,
struct epoll_filefd *p2)
{
return (p1->file > p2->file ? +1:
(p1->file < p2->file ? -1 : p1->fd - p2->fd));
}

在进行完红黑树查找之后,如果发现是一个 ADD 操作,并且在树中没有找到对应的二叉树节点,就会调用 ep_insert 进行二叉树节点的增加。

1
2
3
4
5
6
7
8
9
case EPOLL_CTL_ADD:
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tf.file, fd, full_check);
} else
error = -EEXIST;
if (full_check)
clear_tfile_check_list();
break;

ep_insert

ep_insert 首先判断当前监控的文件值是否超过了 /proc/sys/fs/epoll/max_user_watches 的预设最大值,如果超过了则直接返回错误。

1
2
3
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;

接下来是分配资源和初始化动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/* 从著名的 slab 中分配一个 epitem */
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;

/* Item initialization follow here ... */
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;

/* Initialize the poll table using the queue callback */
epq.epi = epi;
/* 初始化一个 poll_table
* 其实就是指定调用 poll_wait(注意不是 epoll_wait) 时的回调函数,和我们关心哪些 events,
* ep_ptable_queue_proc() 就是我们的回调,初值是所有 event 都关心。
*/
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

/*
* Attach the item to the poll hooks and get current event bits.
* We can safely use the file* here because its usage count has
* been increased by the caller of this function. Note that after
* this operation completes, the poll callback can start hitting
* the new item.
*/
/* 这一步很关键,也比较难懂,完全是内核的 poll 机制导致的...
* 首先,f_op->poll() 一般来说只是个 wrapper,它会调用真正的 poll 实现,
* 拿 UDP 的 socket 来举例,这里就是这样的调用流程: f_op->poll()、sock_poll()、
* udp_poll()、datagram_poll()、sock_poll_wait(),最后调用到我们上面指定的
* ep_ptable_queue_proc() 这个回调函数...(好深的调用路径...)。
* 完成这一步,我们的 epitem 就跟这个 socket 关联起来了,当它有状态变化时,
* 会通过 ep_poll_callback() 来通知.
* 最后,这个函数还会查询当前的 fd 是不是已经有什么 event 已经 ready 了,有的话,会将 event 返回。
*/
revents = tfile->f_op->poll(tfile, &epq.pt);

再接下来的事情非常重要,ep_insert 会为加入的每个文件描述字设置回调函数。这个回调函数是通过函数 ep_ptable_queue_proc 来进行设置的。这个回调函数是干什么的呢?其实,对应的文件描述字上如果有事件发生,就会调用这个函数,比如套接字缓冲区有数据了,就会回调这个函数。这个函数就是 ep_poll_callback。这里你会发现,原来内核设计也是充满了事件回调的原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* This is the callback that is used to add our wait queue to the
* target file wakeup lists.
*/
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead, poll_table *pt)
{
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;

if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
/* 初始化等待队列,指定 ep_poll_callback 为唤醒时的回调函数,
* 当我们监听的 fd 发生状态改变时,也就是队列头被唤醒时,
* 指定的回调函数将会被调用。 */
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
if (epi->event.events & EPOLLEXCLUSIVE)
add_wait_queue_exclusive(whead, &pwq->wait);
else
/* 将刚分配的等待队列成员加入到头中, 头是由 fd 持有的 */
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
/* nwait 记录了当前 epitem 加入到了多少个等待队列中,
* 我认为这个值最大也只会是1... */
epi->nwait++;
} else {
/* We have to signal that an error occurred */
epi->nwait = -1;
}
}

ep_poll_callback

ep_poll_callback 函数的作用非常重要,它将内核事件真正地和 epoll 对象联系了起来。它又是怎么实现的呢?

首先,通过这个文件的 wait_queue_entry_t 对象找到对应的 epitem 对象,因为 eppoll_entry 对象里保存了 wait_queue_entry_t,根据 wait_queue_entry_t 这个对象的地址就可以简单计算出 eppoll_entry 对象的地址,从而可以获得 epitem 对象的地址。这部分工作在 ep_item_from_wait 函数中完成。一旦获得 epitem 对象,就可以寻迹找到 eventpoll 实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/*
* This is the callback that is passed to the wait queue wakeup
* mechanism. It is called by the stored file descriptors when they
* have events to report.
*/
/*
* 这个是关键性的回调函数,当我们监听的 fd 发生状态改变时,它会被调用。
* 参数 key 被当作一个 unsigned long 整数使用,携带的是events。
*/
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
unsigned long flags;
// 从等待队列获取 epitem。需要知道哪个进程挂载到这个设备
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;

接下来,进行一个加锁操作。

1
spin_lock_irqsave(&ep->lock, flags);

下面对发生的事件进行过滤,为什么需要过滤呢?为了性能考虑,ep_insert 向对应监控文件注册的是所有的事件,而实际用户侧订阅的事件未必和内核事件对应。比如,用户向内核订阅了一个套接字的可读事件,在某个时刻套接字的可写事件发生时,并不需要向用户空间传递这个事件。

1
2
3
4
5
6
7
8
/*
* Check the events coming with the callback. At this stage, not
* every device reports the events in the "key" parameter of the
* callback. We need to be able to handle both cases here, hence the
* test for "key" != NULL before the event match test.
*/
if (key && !((unsigned long) key & epi->event.events))
goto out_unlock;

接下来,判断是否需要把该事件传递给用户空间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/*
* 这里看起来可能有点费解,其实干的事情比较简单:
* 如果该 callback 被调用的同时,epoll_wait() 已经返回了,
* 也就是说,此刻应用程序有可能已经在循环获取 events,
* 这种情况下,内核将此刻发生 event 的 epitem 用一个单独的链表
* 链起来,不发给应用程序,也不丢弃,而是在下一次 epoll_wait() 时返回给用户。
*/
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
if (epi->ws) {
/*
* Activate ep->ws since epi->ws may get
* deactivated at any time.
*/
__pm_stay_awake(ep->ws);
}
}
goto out_unlock;
}

如果需要,而且该事件对应的 event_item 不在 eventpoll 对应的已完成队列中,就把它放入该队列,以便将该事件传递给用户空间。

1
2
3
4
5
/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake_rcu(epi);
}

我们知道,当我们调用 epoll_wait 的时候,调用进程被挂起,在内核看来调用进程陷入休眠。如果该 epoll 实例上对应描述字有事件发生,这个休眠进程应该被唤醒,以便及时处理事件。下面的代码就是起这个作用的,wake_up_locked 函数唤醒当前 eventpoll 上的等待进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
* Wake up ( if active ) both the eventpoll wait list and the ->poll()
* wait list.
*/
if (waitqueue_active(&ep->wq)) {
if ((epi->event.events & EPOLLEXCLUSIVE) &&
!((unsigned long)key & POLLFREE)) {
switch ((unsigned long)key & EPOLLINOUT_BITS) {
case POLLIN:
if (epi->event.events & POLLIN)
ewake = 1;
break;
case POLLOUT:
if (epi->event.events & POLLOUT)
ewake = 1;
break;
case 0:
ewake = 1;
break;
}
}
wake_up_locked(&ep->wq);
}

查找 epoll 实例

epoll_wait 函数首先进行一系列的检查,例如传入的 maxevents 应该大于 0。

1
2
3
4
5
6
7
/* The maximum number of event must be greater than zero */
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;

/* Verify that the area passed by the user is writeable */
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
return -EFAULT;

和前面介绍的 epoll_ctl 一样,通过 epoll 实例找到对应的匿名文件和描述字,并且进行检查和验证。

1
2
3
4
5
6
7
8
9
10
11
12
/* Get the "struct file *" for the eventpoll file */
f = fdget(epfd);
if (!f.file)
return -EBADF;

/*
* We have to check that the file structure underneath the fd
* the user passed to us _is_ an eventpoll file.
*/
error = -EINVAL;
if (!is_file_epoll(f.file))
goto error_fput;

还是通过读取 epoll 实例对应匿名文件的 private_data 得到 eventpoll 实例。

1
2
3
4
5
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = f.file->private_data;

接下来调用 ep_poll 来完成对应的事件收集并传递到用户空间。

1
2
/* Time to fish for events ... */
error = ep_poll(ep, events, maxevents, timeout);

ep_poll

这里 ep_poll 就分别对 timeout 不同值的场景进行了处理。如果大于 0 则产生了一个超时时间,如果等于 0 则立即检查是否有事件发生。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout)
{
int res = 0, eavail, timed_out = 0;
unsigned long flags;
u64 slack = 0;
wait_queue_entry_t wait;
ktime_t expires, *to = NULL;

if (timeout > 0) {
struct timespec64 end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec64_to_ktime(end_time);
} else if (timeout == 0) {
/*
* Avoid the unnecessary trip to the wait queue loop, if the
* caller specified a non blocking operation.
*/
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
goto check_events;
}

接下来尝试获得 eventpoll 上的锁:

1
spin_lock_irqsave(&ep->lock, flags);

获得这把锁之后,检查当前是否有事件发生,如果没有,就把当前进程加入到 eventpoll 的等待队列 wq 中,这样做的目的是当事件发生时,ep_poll_callback 函数可以把该等待进程唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
if (!ep_events_available(ep)) {
/*
* Busy poll timed out. Drop NAPI ID for now, we can add
* it back in when we have moved a socket with a valid NAPI
* ID onto the ready list.
*/
ep_reset_busy_poll_napi_id(ep);

/*
* We don't have any available event to return to the caller.
* We need to sleep here, and we will be wake up by
* ep_poll_callback() when events will become available.
*/
init_waitqueue_entry(&wait, current);
__add_wait_queue_exclusive(&ep->wq, &wait);

紧接着是一个无限循环,这个循环中通过调用 schedule_hrtimeout_range,将当前进程陷入休眠,CPU 时间被调度器调度给其他进程使用,当然,当前进程可能会被唤醒,唤醒的条件包括有以下四种:

  1. 当前进程超时;
  2. 当前进程收到一个 signal 信号;
  3. 某个描述字上有事件发生;
  4. 当前进程被 CPU 重新调度,进入 for 循环重新判断,如果没有满足前三个条件,就又重新进入休眠

对应的 1、2、3 都会通过 break 跳出循环,直接返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//这个循环里,当前进程可能会被唤醒,唤醒的途径包括
//1.当前进程超时
//2.当前进行收到一个signal信号
//3.某个描述字上有事件发生
//对应的1.2.3都会通过break跳出循环
//第4个可能是当前进程被CPU重新调度,进入for循环的判断,如果没有满足1.2.3的条件,就又重新进入休眠
for (;;) {
/*
* We don't want to sleep if the ep_poll_callback() sends us
* a wakeup in between. That's why we set the task state
* to TASK_INTERRUPTIBLE before doing the checks.
*/
set_current_state(TASK_INTERRUPTIBLE);
/*
* Always short-circuit for fatal signals to allow
* threads to make a timely exit without the chance of
* finding more events available and fetching
* repeatedly.
*/
if (fatal_signal_pending(current)) {
res = -EINTR;
break;
}
if (ep_events_available(ep) || timed_out)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}

spin_unlock_irqrestore(&ep->lock, flags);

//通过调用schedule_hrtimeout_range,当前进程进入休眠,CPU时间被调度器调度给其他进程使用
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;

spin_lock_irqsave(&ep->lock, flags);
}

如果进程从休眠中返回,则将当前进程从 eventpoll 的等待队列中删除,并且设置当前进程为 TASK_RUNNING 状态。

1
2
3
//从休眠中结束,将当前进程从wait队列中删除,设置状态为TASK_RUNNING,接下来进入check_events,来判断是否是有事件发生
__remove_wait_queue(&ep->wq, &wait);
__set_current_state(TASK_RUNNING);

最后,调用 ep_send_events 将事件拷贝到用户空间。

1
2
3
4
5
6
7
8
9
10
11
12
//ep_send_events将事件拷贝到用户空间
/*
* Try to transfer events to user space. In case we get 0 events and
* there's still timeout left over, we go trying again in search of
* more luck.
*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;


return res;

ep_send_events

ep_send_events 这个函数会将 ep_send_events_proc 作为回调函数并调用 ep_scan_ready_list 函数,ep_scan_ready_list 函数调用 ep_send_events_proc 对每个已经就绪的事件循环处理。

ep_send_events_proc 循环处理就绪事件时,会再次调用每个文件描述符的 poll 方法,以便确定确实有事件发生。为什么这样做呢?这是为了确定注册的事件在这个时刻还是有效的。

可以看到,尽管 ep_send_events_proc 已经尽可能的考虑周全,使得用户空间获得的事件通知都是真实有效的,但还是有一定的概率,当 ep_send_events_proc 再次调用文件上的 poll 函数之后,用户空间获得的事件通知已经不再有效,这可能是用户空间已经处理掉了,或者其他什么情形。在这种情况下,如果套接字不是非阻塞的,整个进程将会被阻塞,这也是为什么将非阻塞套接字配合 epoll 使用作为最佳实践的原因。

在进行简单的事件掩码校验之后,ep_send_events_proc 将事件结构体拷贝到用户空间需要的数据结构中。这是通过 __put_user 方法完成的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//这里对一个fd再次进行poll操作,以确认事件
revents = ep_item_poll(epi, &pt);

/*
* If the event mask intersect the caller-requested one,
* deliver the event to userspace. Again, ep_scan_ready_list()
* is holding "mtx", so no operations coming from userspace
* can change the item.
*/
if (revents) {
if (__put_user(revents, &uevent->events) ||
__put_user(epi->event.data, &uevent->data)) {
list_add(&epi->rdllink, head);
ep_pm_stay_awake(epi);
return eventcnt ? eventcnt : -EFAULT;
}
eventcnt++;
uevent++;

ep_poll() 函数主要做以下几件事:

  1. 判断被监听的文件集合中是否有就绪的文件,如果有就返回。
  2. 如果没有就把当前进程添加到 epoll 的等待队列中,并且进入睡眠。
  3. 进程会一直睡眠直到有以下几种情况发生:
    1、被监听的文件集合中有就绪的文件
    2、设置了超时时间并且超时了
    3、接收到信号
  4. 如果有就绪的文件,那么就调用 ep_send_events() 函数把就绪文件复制到 events 参数中。
  5. 返回就绪文件的个数。

最后,我们通过一张图来总结 epoll 的原理:

Level-triggered VS Edge-triggered

从实现角度来看其实非常简单,在 ep_send_events_proc 函数的最后,针对 level-triggered 情况,当前的 epoll_item 对象被重新加到 eventpoll 的就绪列表中,这样在下一次 epoll_wait 调用时,这些 epoll_item 对象就会被重新处理。

在前面我们提到,在最终拷贝到用户空间有效事件列表中之前,会调用对应文件的 poll 方法,以确定这个事件是不是依然有效。所以,如果用户空间程序已经处理掉该事件,就不会被再次通知;如果没有处理,意味着该事件依然有效,就会被再次通知。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//这里是Level-triggered的处理,可以看到,在Level-triggered的情况下,这个事件被重新加回到ready list里面
//这样,下一轮epoll_wait的时候,这个事件会被重新check
else if (!(epi->event.events & EPOLLET)) {
/*
* If this file has been added with Level
* Trigger mode, we need to insert back inside
* the ready list, so that the next call to
* epoll_wait() will check again the events
* availability. At this point, no one can insert
* into ep->rdllist besides us. The epoll_ctl()
* callers are locked out by
* ep_scan_ready_list() holding "mtx" and the
* poll callback will queue them in ep->ovflist.
*/
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
}

epoll VS poll/select

最后,我们从实现角度来说明一下为什么 epoll 的效率要远远高于 poll/select。

首先,poll/select 先将要监听的 fd 从用户空间拷贝到内核空间,然后在内核空间里面进行处理之后,再拷贝给用户空间。这里就涉及到内核空间申请内存,释放内存等等过程,这在大量 fd 情况下,是非常耗时的。而 epoll 维护了一个红黑树,通过对这棵黑红树进行操作,可以避免大量的内存申请和释放的操作,而且查找速度非常快。

下面的代码就是 poll/select 在内核空间申请内存的展示。可以看到 select 是先尝试申请栈上资源,如果需要监听的 fd 比较多, 就会去申请堆空间的资源。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
fd_set __user *exp, struct timespec64 *end_time)
{
fd_set_bits fds;
void *bits;
int ret, max_fds;
size_t size, alloc_size;
struct fdtable *fdt;
/* Allocate small arguments on the stack to save memory and be faster */
long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

ret = -EINVAL;
if (n < 0)
goto out_nofds;

/* max_fds can increase, so grab it once to avoid race */
rcu_read_lock();
fdt = files_fdtable(current->files);
max_fds = fdt->max_fds;
rcu_read_unlock();
if (n > max_fds)
n = max_fds;

/*
* We need 6 bitmaps (in/out/ex for both incoming and outgoing),
* since we used fdset we need to allocate memory in units of
* long-words.
*/
size = FDS_BYTES(n);
bits = stack_fds;
if (size > sizeof(stack_fds) / 6) {
/* Not enough space in on-stack array; must use kmalloc */
ret = -ENOMEM;
if (size > (SIZE_MAX / 6))
goto out_nofds;


alloc_size = 6 * size;
bits = kvmalloc(alloc_size, GFP_KERNEL);
if (!bits)
goto out_nofds;
}
fds.in = bits;
fds.out = bits + size;
fds.ex = bits + 2*size;
fds.res_in = bits + 3*size;
fds.res_out = bits + 4*size;
fds.res_ex = bits + 5*size;
...

第二,select/poll 从休眠中被唤醒时,如果监听多个 fd,只要其中有一个 fd 有事件发生,内核就会遍历内部的 list 去检查到底是哪一个事件到达,并没有像 epoll 一样,通过 fd 直接关联 eventpoll 对象,快速地把 fd 直接加入到 eventpoll 的就绪列表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time)
{
...
retval = 0;
for (;;) {
unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;
bool can_busy_loop = false;

inp = fds->in; outp = fds->out; exp = fds->ex;
rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;

for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
unsigned long in, out, ex, all_bits, bit = 1, mask, j;
unsigned long res_in = 0, res_out = 0, res_ex = 0;

in = *inp++; out = *outp++; ex = *exp++;
all_bits = in | out | ex;
if (all_bits == 0) {
i += BITS_PER_LONG;
continue;
}

if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,
to, slack))
timed_out = 1;
...

四、总结

epoll 流程

1、epoll_create()
从 slab 缓存中创建一个 eventpoll 对象,并且创建一个匿名的 fd 跟 fd 对应的 file 对象,而 eventpoll 对象保存在 struct file 结构的private 指针中,并且返回。该 fd 对应的 file operations 只是实现了 poll 跟 release 操作。

2、创建 eventpoll 对象的初始化操作
获取当前用户信息,是不是 root,最大监听 fd 数目等,并且将这些信息保存到 eventpoll 对象中。然后初始化等待队列,初始化就绪链表,初始化红黑树的头结点。

3、epoll_ctl()
将 epoll_event 结构拷贝到内核空间中,并且判断加入的 fd 是否支持 poll 结构(epoll、poll、select,I/O 多路复用必须支持 poll 操作)。并且从 epfd->file->privatedata 获取 event_poll 对象,根据 op 区分是添加、删除,还是修改,首先在 eventpoll 结构中的红黑树查找是否已经存在了相对应的 fd,没找到就支持插入操作,否则报重复添加的错误。

插入操作时,会创建一个与 fd 对应的 epitem 结构,并且初始化相关成员,比如保存监听的 fd 跟 file 结构之类的。重要的是指定了调用poll_wait 时的回调函数用于数据就绪时唤醒进程。poll_wait 内部初始化设备的等待队列,将该进程注册到等待队列,完成这一步, 我们的 epitem就跟这个 socket 关联起来了,当它有状态变化时,会通过 ep_poll_callback() 来通知。最后调用加入的 fd 的 file operation->poll 函数(最后会调用 poll_wait 操作)用于完成注册操作。最后将 epitem 结构添加到红黑树中。

4、epoll_wait()
计算睡眠时间(如果有),判断 eventpoll 对象的链表是否为空,不为空那就干活不睡眠。并且初始化一个等待队列,把自己挂上去,设置自己的进程状态为可睡眠状态。判断是否有信号到来(有的话直接被中断醒来),如果啥事都没有那就调用 schedule_timeout 进行睡眠,如果超时或者被唤醒,首先从自己初始化的等待队列删除,然后开始拷贝资源给用户空间了。拷贝资源则是先把就绪事件链表转移到中间链表,然后挨个遍历拷贝到用户空间,并且挨个判断其是否为水平触发,是的话再次插入到就绪链表。

epoll 的改进

epoll 维护了一棵红黑树来跟踪所有待检测的文件描述字,红黑树的使用减少了内核和用户空间大量的数据拷贝和内存分配,大大提高了性能。

同时,epoll 维护了一个链表来记录就绪事件,内核在每个文件有事件发生时将自己登记到这个就绪事件列表中,通过内核自身的文件 file-eventpoll 之间的回调和唤醒机制,减少了对内核描述字的遍历,大大加速了事件通知和检测的效率,这也为 level-triggered 和 edge-triggered 的实现带来了便利。

通过对比 poll/select 的实现,我们发现 epoll 确实克服了 poll/select 的种种弊端,不愧是 Linux 下高性能网络编程的皇冠。我们应该感谢 Linux 社区的大神们设计了这么强大的事件分发机制,让我们在 Linux 下可以享受高性能网络服务器带来的种种技术红利。

五、拓展延伸

epoll 实现特点

  1. 等待队列 waitqueue
    队列头 wait_queue_head_t 往往是资源生产者,队列成员 wait_queue_t 往往是资源消费者,当头的资源 ready 后,会逐个执行每个成员指定的回调函数,来通知它们资源已经 ready 了。
  2. 内核的 poll 机制
    1、被 poll 的 fd,必须在实现上支持内核的 poll 技术,比如 fd 是某个字符设备,或者是个 socket,它必须实现 file_operations 中的 poll 操作,给自己分配有一个等待队列头,主动 poll fd 的某个进程必须分配一个等待队列成员, 添加到 fd 的等待待队列里面去,并指定资源 ready 时的回调函数。
    2、用 socket 做例子,它必须要实现一个 poll 操作,这个 poll 是发起轮询的代码必须主动调用的,该函数中必须调用 poll_wait(),poll_wait() 会将发起者作为等待队列成员加入到 socket 的等待队列中去,这样 socket 发生状态变化时可以通过队列头逐个通知所有关心它的进程,这一点必须很清楚的理解, 否则会想不明白 epoll 是如何得知 fd 的状态发生变化的。
  3. epollfd 本身也是个 fd,所以它本身也可以被 epoll,可以猜测一下它是不是可以无限嵌套 epoll 下去…

其他部分内核知识点

  1. fd 我们知道是文件描述符,在内核态,与之对应的是 struct file 结构,可以看作是内核态的文件描述符。
  2. spinlock,自旋锁,必须要非常小心使用的锁,尤其是调用 spin_lock_irqsave() 的时候,中断关闭,不会发生进程调度,被保护的资源其它CPU 也无法访问。这个锁是很强力的,所以只能锁一些非常轻量级的操作。
  3. 引用计数在内核中是非常重要的概念,内核代码里面经常有些 release、free 释放资源的函数几乎不加任何锁,这是因为这些函数往往是在对象的引用计数变成0时被调用,既然没有进程在使用在这些对象,自然也不需要加锁。struct file 是持有引用计数的。

原文链接

31丨性能篇答疑:epoll源码深度剖析

参考文章

1、Epoll 实现原理
2、epoll源码分析(基于linux-5.1.4)
3、Linux内核笔记:epoll实现原理(一)
4、epoll内核源码详解+自己总结的流程

评论

:D 一言句子获取中...

加载中,最新评论有1分钟缓存...