epoll的基本原理和简单使用

epoll

epoll作为目前linux处理网络io的“唯一”选择,几乎在所有的互联网服务器项目中被使用,各种不同的知名项目对epoll模型的设计都有一些区别,我们的项目也是epoll,记录一下,加深记忆。


首先epoll的基本操作很简单,只有几个函数和一个结构体:

头文件:
#include <sys/epoll.h>

创建epoll对象:

int epoll_create(int size);

epoll_create() 可以创建一个epoll实例。在linux 内核版本大于2.6.8 后,这个size 参数就被弃用了,但是为了兼容老版本,传入的值必须大于0,在 epoll_create() 的最初实现版本时, size参数的作用是创建epoll实例时候告诉内核需要使用多少个文件描述符。内核会使用 size 的大小去申请对应的内存(如果在使用的时候超过了给定的size, 内核会申请更多的空间)。epoll_create() 会返回新的epoll对象的文件描述符。这个文件描述符用于后续的epoll操作。如果不需要使用这个描述符,请使用close关闭。

设置epoll事件:

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll_ctl() 可以控制通过epoll_create()创建的epfd,通过op增删改文件描述符(比如socket)fd上的事件。

op值:
EPOLL_CTL_ADD 在epfd中注册指定的fd文件描述符并能把event和fd关联起来。
EPOLL_CTL_MOD 改变fd和event之间的联系。
EPOLL_CTL_DEL 从指定的epfd中删除fd文件描述符。在这种操作中event是被忽略的,并且为可以等于NULL。

event这个参数是用于关联制定的fd文件描述符的。它的定义如下:

typedef union epoll_data {
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
} epoll_data_t;

struct epoll_event {
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};

events是传入参数,是一个字节的掩码构成的。下面是可以用的事件:

`EPOLLIN` - 当关联的文件可以执行 read ()操作时。
`EPOLLOUT` - 当关联的文件可以执行 write ()操作时。
`EPOLLET` - 设置指定的文件描述符模式为边缘触发,默认的模式是水平触发。

LT(level-triggered)水平触发是缺省的工作方式,并且同时支持block和no-block socket。在这种模式下,内核告诉你一个fd是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你,直到这个fd的数据被读完。

ET(edge-triggered)边缘触发是高速工作方式,只支持no-block socket。在这种模式下,当fd从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(直到你读取了全部数据或者fd遇到错误)。

上面这些事件是经常用到的,下面这几个事件目前理解还不够深,先放着吧。。。

EPOLLRDHUP - (从 linux 2.6.17 开始)当socket关闭的时候,或者半关闭写段的(当使用边缘触发的时候,这个标识在写一些测试代码去检测关闭的时候特别好用)
EPOLLPRI - 当 read ()能够读取紧急数据的时候。
EPOLLERR - 当关联的文件发生错误的时候,epoll_wait() 总是会等待这个事件,并不是需要必须设置的标识。
EPOLLHUP - 当指定的文件描述符被挂起的时候。epoll_wait() 总是会等待这个事件,并不是需要必须设置的标识。当socket从某一个地方读取数据的时候(管道或者socket),这个事件只是标识出这个已经读取到最后了(EOF)。所有的有效数据已经被读取完毕了,之后任何的读取都会返回0(EOF)。
EPOLLONESHOT - (从 linux 2.6.17 开始)设置指定文件描述符为单次模式。这意味着,在设置后只会有一次从epoll_wait() 中捕获到事件,之后你必须要重新调用 epoll_ctl() 重新设置。

data是传入参数,主要使用int fd;来标记文件描述符,也可以使用void *ptr;标记自定义的数据格式,包含文件描述符和自定义的数据。

等待epoll事件:

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

epoll_wait()这个系统调用是用来等待epfd中的事件。events是传入参数,一组空的epoll_event对象,用来接收epoll操作返回数据。maxevents是epoll_event数组的大小,必须要大于0(这里的epoll_event数组大小可以理解为一次epoll_wait调用操作的最大fd数量)。

timeout这个参数是用来制定epoll_wait会阻塞多少毫秒,会一直阻塞到下面几种情况:

1.一个文件描述符触发了事件。
2.被一个信号处理函数打断,或者timeout超时。

当timeout等于-1的时候这个函数会无限期的阻塞下去,当timeout等于0的时候,就算没有任何事件,也会立刻返回。

一个精简版的epoll伪代码:

#define MAX_EVENTS 10
struct epoll_event  ev, events[MAX_EVENTS];
int         listen_sock, conn_sock, nfds, epollfd;


/* Code to set up listening socket, 'listen_sock',
* (socket(), bind(), listen()) omitted */

epollfd = epoll_create1( 0 );
if ( epollfd == -1 )
{
    perror( "epoll_create1" );
    exit( EXIT_FAILURE );
}

ev.events   = EPOLLIN;
ev.data.fd  = listen_sock;
if ( epoll_ctl( epollfd, EPOLL_CTL_ADD, listen_sock, &ev ) == -1 )
{
    perror( "epoll_ctl: listen_sock" );
    exit( EXIT_FAILURE );
}

for (;; )
{
    nfds = epoll_wait( epollfd, events, MAX_EVENTS, -1 );
    if ( nfds == -1 )
    {
        perror( "epoll_wait" );
        exit( EXIT_FAILURE );
    }

    for ( n = 0; n < nfds; ++n )
    {
        if ( events[n].data.fd == listen_sock )
        {
            conn_sock = accept( listen_sock,
                        (struct sockaddr *) &local, &addrlen );
            if ( conn_sock == -1 )
            {
                perror( "accept" );
                exit( EXIT_FAILURE );
            }
            setnonblocking( conn_sock );
            ev.events   = EPOLLIN | EPOLLET;
            ev.data.fd  = conn_sock;
            if ( epoll_ctl( epollfd, EPOLL_CTL_ADD, conn_sock,
                    &ev ) == -1 )
            {
                perror( "epoll_ctl: conn_sock" );
                exit( EXIT_FAILURE );
            }
        } else {
            do_use_fd( events[n].data.fd );
        }
    }
}

这段代理里省略了服务器socket的创建过程和接收到客户端socket读数据事件处理的do_use_fd函数,比较精简。

下面是一个完整可运行的简单epoll例子:

#include <stdio.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <netdb.h>
#include <errno.h>

#define MAX_EVENT 20
#define READ_BUF_LEN 256

/**
* 设置 file describe 为非阻塞模式
* @param fd 文件描述
* @return 返回0成功,返回-1失败
*/
static int make_socket_non_blocking (int fd) {
    int flags, s;
    // 获取当前flag
    flags = fcntl(fd, F_GETFL, 0);
    if (-1 == flags) {
        perror("Get fd status");
        return -1;
    }

    flags |= O_NONBLOCK;

    // 设置flag
    s = fcntl(fd, F_SETFL, flags);
    if (-1 == s) {
        perror("Set fd status");
        return -1;
    }
    return 0;
}

int main() {
    // epoll 实例 file describe
    int epfd = 0;
    int listenfd = 0;
    int result = 0;
    struct epoll_event ev, event[MAX_EVENT];
    // 绑定的地址
    const char * const local_addr = "192.168.0.45";
    struct sockaddr_in server_addr = { 0 };

    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    if (-1 == listenfd) {
        perror("Open listen socket");
        return -1;
    }
    /* Enable address reuse */
    int on = 1;
    // 打开 socket 端口复用, 防止测试的时候出现 Address already in use
    result = setsockopt( listenfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on) );
    if (-1 == result) {
        perror ("Set socket");
        return 0;
    }

    server_addr.sin_family = AF_INET;
    inet_aton (local_addr, &(server_addr.sin_addr));
    server_addr.sin_port = htons(8080);
    result = bind(listenfd, (const struct sockaddr *)&server_addr, sizeof (server_addr));
    if (-1 == result) {
        perror("Bind port");
        return 0;
    }
    result = make_socket_non_blocking(listenfd);
    if (-1 == result) {
        return 0;
    }

    result = listen(listenfd, 200);
    if (-1 == result) {
        perror("Start listen");
        return 0;
    }

    // 创建epoll实例
    epfd = epoll_create1(0);
    if (1 == epfd) {
        perror("Create epoll instance");
        return 0;
    }

    ev.data.fd = listenfd;
    ev.events = EPOLLIN | EPOLLET /* 边缘触发选项。 */;
    // 设置epoll的事件
    result = epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);

    if(-1 == result) {
        perror("Set epoll_ctl");
        return 0;
    }

    for ( ; ; ) {
        int wait_count;
        // 等待事件
        wait_count = epoll_wait(epfd, event, MAX_EVENT, -1);

        for (int i = 0 ; i < wait_count; i++) {
            uint32_t events = event[i].events;
            // IP地址缓存
            char host_buf[NI_MAXHOST];
            // PORT缓存
            char port_buf[NI_MAXSERV];

            int __result;
            // 判断epoll是否发生错误
            if ( events & EPOLLERR || events & EPOLLHUP || (! events & EPOLLIN)) {
                printf("Epoll has error\n");
                close (event[i].data.fd);
                continue;
            } else if (listenfd == event[i].data.fd) {
                // listen的 file describe 事件触发, accpet事件

                for ( ; ; ) { // 由于采用了边缘触发模式,这里需要使用循环
                    struct sockaddr in_addr = { 0 };
                    socklen_t in_addr_len = sizeof (in_addr);
                    int accp_fd = accept(listenfd, &in_addr, &in_addr_len);
                    if (-1 == accp_fd) {
                        perror("Accept");
                        break;
                    }
                    __result = getnameinfo(&in_addr, sizeof (in_addr),
                                        host_buf, sizeof (host_buf) / sizeof (host_buf[0]),
                                        port_buf, sizeof (port_buf) / sizeof (port_buf[0]),
                                        NI_NUMERICHOST | NI_NUMERICSERV);

                    if (! __result) {
                        printf("New connection: host = %s, port = %s\n", host_buf, port_buf);
                    }

                    __result = make_socket_non_blocking(accp_fd);
                    if (-1 == __result) {
                        return 0;
                    }

                    ev.data.fd = accp_fd;
                    ev.events = EPOLLIN | EPOLLET;
                    // 为新accept的 file describe 设置epoll事件
                    __result = epoll_ctl(epfd, EPOLL_CTL_ADD, accp_fd, &ev);

                    if (-1 == __result) {
                        perror("epoll_ctl");
                        return 0;
                    }
                }
                continue;
            } else {
                // 其余事件为 file describe 可以读取
                int done = 0;
                // 因为采用边缘触发,所以这里需要使用循环。如果不使用循环,程序并不能完全读取到缓存区里面的数据。
                for ( ; ;) {
                    ssize_t result_len = 0;
                    char buf[READ_BUF_LEN] = { 0 };

                    result_len = read(event[i].data.fd, buf, sizeof (buf) / sizeof (buf[0]));

                    if (-1 == result_len) {
                        if (EAGAIN != errno) {
                            perror ("Read data");
                            done = 1;
                        }
                        break;
                    } else if (! result_len) {
                        done = 1;
                        break;
                    }

                    write(STDOUT_FILENO, buf, result_len);
                }
                if (done) {
                    printf("Closed connection\n");
                    close (event[i].data.fd);
                }
            }
        }

    }
    close (epfd);
    return 0;
}

总结

先记录一下epoll的基本原理和基本使用方法,后面记录一下不同的epoll框架和我们自己项目中使用的情况。