2015/12/24

如何使用epoll(转)


文章转自:http://www.oschina.net/translate/how-to-use-epoll-a-complete-example-in-c

通常的网络服务器实现,是对每一个连接使用一个单独的线程或进程。对高性能应用而言,由于需要同时处理非常多的客户请求,所以这种方式必不能工作得很好,因为诸如资源使用和上下文切换 所需要的诗句影响了在一时间内对多个客户端进行处理。

另一种可选的途径是在一个单独的线程里采用非阻塞的I/O,这样当可以从一个socket中读取或写入更多数据时,由一些已经准备就绪的通知方式来告诉我们。


这篇文章介绍 Linux 的 epoll 方法,它是Linux上最好的就绪通知方式。我们会用C语言写一个TCP服务器简单程序。假设你已有C编程的经验,知道Linux下编译和运行程序,并且会用 manpages 来查看所使用的C函数。


epoll 是在 Linux 2.6 才引进的,而且它并不适用于其他 Unix-like 系统。它提供了一个与 select 和 poll 函数相似的功能:

select 可以在某一时间监视最大达到 FD_SETSIZE 数量德文件描述符,通常是由在 libc 编译时指定的一个比较小的数字。

poll 在同一时间能够监视的文件描述符数量并没有受到限制,但是我们必须在每一次都要扫描所有通过的描述符来检查其是否存在已就绪通知,它的时间复杂度为 O(n),是缓慢的。

epoll 没有以上所示的限制,并且不用执行线性扫描。因此,它能有更高的执行效率且可以处理大数据的事件。


一个 epoll 实例可以通过 epoll_create 或者 epoll_create1 函数来创建。 epoll_ctl 是用来在epoll实例中 添加 / 删除 被监视的文件描述符。epoll_wait 是用来等待 所监听描述符事件,它会阻塞到事件到达。可以在manpages上查看更多信息。

当描述符被添加到epoll实例中,有两种添加模式:level triggered(水平触发)edge triggered(边缘触发)。当使用 level triggered 模式并且数据就绪待读,epoll_wait 总会返回就绪事件。如果你没有将数据读取完,并且调用epol_wait在epoll实例上再次监听这个描述符,由于还有数据是可读的,它会再次返回。在 edge triggered 模式时,你只会得到一次 就绪通知。如果你没有将数据读完,并且再次在eopll实例上调用 epoll_wait,由于就绪事件已经被发送所有它会阻塞。


传递到 epoll_ctl 的epoll事件结构如下所示。对每一个被监听的描述符,你可以关联到一个整数或一个座位用户数据的指针。

typedef union epoll_data
{
  void        *ptr;
  int          fd;
  __uint32_t   u32;
  __uint64_t   u64;
} epoll_data_t;

struct epoll_event
{
  __uint32_t   events; /* Epoll events */
  epoll_data_t data;   /* User data variable */
};
int epoll_create(int size);

int epoll_create1(int flags);

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

例子

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/epoll.h>
#include <errno.h>

#define MAXEVENTS 64

static int
make_socket_non_blocking (int sfd) {
    int flags, s;

    flags = fcntl (sfd, F_GETFL, 0);
    if (flags == -1) {
        perror ("fcntl");
        return -1;
    }

    flags |= O_NONBLOCK;
    s = fcntl (sfd, F_SETFL, flags);
    if (s == -1) {
        perror ("fcntl");
        return -1;
    }

    return 0;
}

static int
create_and_bind (char *port) {
    struct addrinfo hints;
    struct addrinfo *result, *rp;
    int s, sfd;

    memset (&hints, 0, sizeof (struct addrinfo));
    hints.ai_family = AF_UNSPEC;     /* Return IPv4 and IPv6 choices */
    hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */
    hints.ai_flags = AI_PASSIVE;     /* All interfaces */

    s = getaddrinfo (NULL, port, &hints, &result);
    if (s != 0) {
        fprintf (stderr, "getaddrinfo: %s\n", gai_strerror (s));
        return -1;
    }

    for (rp = result; rp != NULL; rp = rp->ai_next) {
        sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
        if (sfd == -1)
            continue;
        s = bind (sfd, rp->ai_addr, rp->ai_addrlen);
        if (s == 0) {
            /* We managed to bind successfully! */
            break;
        }

        close (sfd);
    }

    if (rp == NULL) {
        fprintf (stderr, "Could not bind\n");
        return -1;
    }

    freeaddrinfo (result);

    return sfd;
}

static void *
client_socket(void *arg) {
    sleep(2);

    char* port = (char*)arg;

    int sock = socket(AF_INET, SOCK_STREAM, 0);

    struct sockaddr_in addr;
    memset(&addr, 0, sizeof(addr));
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = inet_addr("0.0.0.0");
    addr.sin_port = htons(atoi(port));

    int ret = connect(sock, (struct sockaddr*)&addr, sizeof(addr));
    if (ret != 0) {
        printf("\nsock connect error: %d error:%d\n", ret, errno);
        return;
    }

    make_socket_non_blocking (sock);

    char* msg = "hello server socket";
    size_t sz = strlen(msg);
    int size = write(sock, msg, sz);
    printf("\nmsg write():%d\n", size);
    sleep(2);

    close(sock);

    return NULL;
}

int
main (int argc, char *argv[]) {
    int sfd, s;
    int efd;
    struct epoll_event event;
    struct epoll_event events[MAXEVENTS];

    if (argc != 2) {
        fprintf (stderr, "Usage: %s [port]\n", argv[0]);
        exit (EXIT_FAILURE);
    }

    sfd = create_and_bind (argv[1]);
    if (sfd == -1)
        abort ();

    s = make_socket_non_blocking (sfd);
    if (s == -1)
        abort ();

    s = listen (sfd, SOMAXCONN);
    if (s == -1) {
        perror ("listen");
        abort ();
    }

    efd = epoll_create1 (0);
    if (efd == -1) {
        perror ("epoll_create");
        abort ();
    }

    event.data.fd = sfd;
    event.events = EPOLLIN | EPOLLET;
    s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
    if (s == -1) {
        perror ("epoll_ctl");
        abort ();
    }

    pthread_t pid;
    pthread_create(&pid, NULL, client_socket, argv[1]);

    /* The event loop */
    while (1) {
        int n, i;

        n = epoll_wait (efd, events, MAXEVENTS, -1);
        for (i = 0; i < n; i++) {
            if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) {
                /* An error has occured on this fd, or the socket is not
                ready for reading (why were we notified then?) */
                fprintf (stderr, "epoll error\n");
                close (events[i].data.fd);
                continue;
            }
            else if (sfd == events[i].data.fd) {
                /* We have a notification on the listening socket, which
                means one or more incoming connections. */
                while (1) {
                    struct sockaddr in_addr;
                    socklen_t in_len;
                    int infd;
                    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

                    in_len = sizeof in_addr;
                    infd = accept (sfd, &in_addr, &in_len);
                    if (infd == -1) {
                        if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
                            /* We have processed all incoming
                            connections. */
                            break;
                        } else {
                            perror ("accept");
                            break;
                        }
                    }

                    s = getnameinfo (&in_addr, in_len, hbuf, sizeof hbuf, sbuf, sizeof sbuf, NI_NUMERICHOST | NI_NUMERICSERV);
                    if (s == 0) {
                        printf("\nAccepted connection on descriptor %d (host=%s, port=%s)\n", infd, hbuf, sbuf);
                    }

                    /* Make the incoming socket non-blocking and add it to the
                    list of fds to monitor. */
                    s = make_socket_non_blocking (infd);
                    if (s == -1) {
                        abort ();
                    }

                    event.data.fd = infd;
                    event.events = EPOLLIN | EPOLLET;
                    s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
                    if (s == -1) {
                        perror ("epoll_ctl");
                        abort ();
                    }
                }
                continue;
            }
            else {
                /* We have data on the fd waiting to be read. Read and
                display it. We must read whatever data is available
                completely, as we are running in edge-triggered mode
                and won't get a notification again for the same
                data. */
                int done = 0;

                while (1) {
                    ssize_t count;
                    char buf[512];

                    count = read (events[i].data.fd, buf, sizeof buf);
                    if (count == -1) {
                        /* If errno == EAGAIN, that means we have read all
                        data. So go back to the main loop. */
                        if (errno != EAGAIN) {
                            perror ("read");
                            done = 1;
                        }
                        break;
                    }
                    else if (count == 0) {
                        /* End of file. The remote has closed the
                        connection. */
                        done = 1;
                        break;
                    }

                    /* Write the buffer to standard output */
                    s = write (1, buf, count);
                    if (s == -1) {
                        perror ("write");
                        abort ();
                    }
                }

                if (done) {
                    printf ("\nClosed connection on descriptor %d\n", events[i].data.fd);

                    /* Closing the descriptor will make epoll remove it
                    from the set of descriptors which are monitored. */
                    close (events[i].data.fd);
                }
            }
        }
    }

    free (events);
    close (sfd);
    return EXIT_SUCCESS;
}