逐步实现TCP服务端Step04-7:select、poll

select是最基本的I/O多路复用函数,Linux及Windows平台均提供支持。pollselect类似,二者的语义都是在给定的时间内对一组文件描述符进行遍历检查,找出其中的就绪者。

  • nfds:要求给出传入集合中最大的文件描述符值+1,内核需要知道这个值已确保其操作能涵盖结合中的所有文件描述符。
  • readfds/writefds/exceptfds:这三个指针分别指向可读、可写和异常事件对应的文件描述符的集合。使用者在调用select的时候,就是通过这三个参数,将一系列文件描述符传入,以监测相应事件是否发生。这三者均为fd_set类型,它是一个结构体,其中包含了一个数组,该数组使用每个元素中的1个bit来标识某个文件描述符的状态变化。那些以FD开头的宏专门用于操作fd_set结构。
    • FD_CLR(int fd, fd_set* set):清除set中对应于fd的那个位。
    • FD_ISSET(int fd, fd_set* set):检查set中对应于fd的位是否已被设置。
    • FD_SET(int fd, fd_set* set):在set中设置对应于fd的位。
    • FD_ZERO(fd_set* set):清除set中的所有位。
  • timeout:该参数用于指定select调用的超时时间。若置为NULL,则select调用将引起阻塞,直到发现传入的文件描述集合中,有至少一个发生了状态变化;若将时间值置为0,它将变为非阻塞函数,调用时立即返回。若无文件描述符出现状态变化,则返回0,有的话返回就绪的文件描述符数;若设置了一个大于0的值,就是明确指定了阻塞的时长,只要超过该时长,不论有无文件描述符出现状态变化,都会返回,当然在时长范围内,一旦有描述符准备就绪,将会结束阻塞。
  • return:出错返回-1,超时返回0,有文件描述准备就绪,返回就绪的个数。

在netio中使用select,改造目标为NControl及其父类CommControl,相关方法为GetClientCode

CommControl作为NControl的父类,用于抽象具备通信功能的Control的特征。这里,它应该负责维护与select相关的一些属性,read_fds_和write_fds_分别为存放读写文件描述符的集合,暂不考虑带外数据的处理,没定义异常集合。wait_time_属性用于指定select的阻塞时长。

实际调用select的地方在NControl::GetClientCode方法中。由于要区分读和写socket,因此在原SocketInfo结构中添加了一个flag字段用于标识,1为RECV_DATA,2为SEND_DATA 。

不过,目前看来,不关注socket的可写状态可能更好一些。也就是说,在我们要发数据的时候没必要使用select进行状态检测。因为,当前的做法是轮询s2c_code_queue,每发现有待发的code,就取出来,然后send,即,取一个发一个。若此处使用select,就意味着,每次要send之前,都要有一次select调用。最好的情况是只调一次select,因为socket没准备就绪;最坏的情况是两次调用,select发现socket准备就绪,然后send就被调用了。在发送“滞留”数据的时候也是类似的情况。

不要忘了select本身也是系统调用,而且还可能引起阻塞。因此,除非有充分的理由,否则不对CheckWaitSendBytes和TCPSocket中的SendReservedBytes方法做处理。

这是改动后的NControl::GetClientCode :

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
int NControl::GetClientCode()
{
    int retval = 0;
    int tmp_fd = 0;
    int max_fd = 0;
    FD_ZERO(&read_fds_);
    SocketInfo* info = NULL;
    if (socket_info_list_.get_list_len() <= 0)
        return 0;
    for (int idx = socket_info_list_.GetHeadNodeIdx(); 
        idx >= 0;) {
        if (socket_info_list_.GetOneItem(idx, &info) < 0) {
            std::cout << "In GetClientCode, no SocketInfo of " 
                << idx << std::endl; 
        } else {
            TCPSocket& tcp_socket = info->socket;
            tmp_fd = tcp_socket.get_socket();
            if (tmp_fd > max_fd) max_fd = tmp_fd;
            if (RECV_DATA == info->flag)
                FD_SET(tmp_fd, &read_fds_);
        }
        idx = socket_info_list_.GetNextNodeIdx(idx);
    }
    wait_time_.tv_sec = 0;
    wait_time_.tv_usec = 10 * 1000;
    retval = select(max_fd + 1, 
        &read_fds_, NULL, NULL, &wait_time_);
    if (retval <= 0) {
        if (EINTR == errno) {
            std::cout << "Call select error, errno " 
                << errno << std::endl;
        }
        return 0;
    }
    for (int idx = socket_info_list_.GetHeadNodeIdx(); 
        idx >= 0;) {
        if (socket_info_list_.GetOneItem(idx, &info) < 0) {
            std::cout << "In GetClientCode, no SocketInfo of " 
                << idx << std::endl; 
        } else {
            TCPSocket& tcp_socket = info->socket;
            tmp_fd = tcp_socket.get_socket();
            if (FD_ISSET(tmp_fd, &read_fds_)) {
                if ((retval = RecvClientBytes(idx)) < 0) {
                    if (ERR_RECV_REMOTE == retval) {
                        tcp_socket.Close();
                        std::cout << info->ip << ":" 
                            << info->port 
                            << " socket_idx(" 
                            << idx << ") close." 
                            << std::endl;
                        int next = socket_info_list_.GetNextNodeIdx(idx);
                        socket_info_list_.DelOneItem(idx);
                        NotifySClientClosed(idx);
                        idx = next;
                        continue;
                    } else {
                        std::cout << info->ip << ":" 
                            << info->port 
                            << " recv error." << std::endl;
                    }
                }
            }
            tcp_socket.SendReservedBytes();
        }
        idx = socket_info_list_.GetNextNodeIdx(idx);
    }
    return 0;
}

pollselect所做的工作完全类似,都是在指定的时限内,对指定数目的文件描述符进行状态检测。二者最大的不同在于,通过select传入的一组文件描述符是由内核提供空间来维护的;而poll则要求用一个用户空间的pollfd数组来承载待检测的一组文件描述符。内核提供的东西,不可无限制的使用,select被设定为最多受理FD_SETSIZE个文件描述符的检测请求。FD_SETSIZE默认为一般为1024,也就是说文件描述符最大值只能到1023,如果要修改的话,需要重新编译内核。

  • fds:它指向一个pollfd结构类型的数组。我们由此传入要检测的那些文件描述符。pollfd结构中有三个变量,fd是文件描述符,events用于指定要关注的事件类型,revents是实际发生在该文件描述符上的事件类型。
  • nfds:用于给出fds数组所包含的元素的个数,这个值与select的第一参数有所不同。注意,它不是数组中含有的最大文件描述符的值+1 。
  • timeout:指定poll的阻塞时间,单位为毫秒。若指定一个负值,将引起阻塞,直到有事件发生;置0将不会引起阻塞,调用后立即返回;指定一个正值,poll调用的阻塞时长最多不会超过该值,期间若有描述符准备就绪,阻塞将结束。
  • return:出错返回-1,超时返回0,成功返回准备就绪文件描述符的个数,也就是pollfd结构体数组中那些revents值不为0的文件描述符的个数。

在netio中使用poll函数,为保留select的代码,用宏来设定当前是使用select还是poll,同时,考虑到以后可能会进行对比测试,有必要把不使用I/O多路复用机制的代码也保留下来,同样,用宏来判定当前是否启用

修改CommControl,添加pollfd类型的结构数组fds_,还有int型等待时间wait_time_属性。

调用poll成功后,需要通过遍历pollfd数组来查询哪些文件描述符已准备就绪。虽然已在pollfd中记录了各文件描述符的值,但我们还需要对应的SocketInfo结构中的信息。而基于文件描述符并不能方便地从socket_info_list_中找出对应的SocketInfo结构体。因此,直接遍历pollfd并不能满足我们的需求,还是要在最外层对socket_info_list_实施遍历,同时,为了能找到当前SocketInfo对应的那个pollfd结构体,在SocketInfo中增加一个pollfd_idx字段用以记录pollfd结构体的索引值。

再次修改后的NControl::GetClientCode :

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
int NControl::GetClientCode()
{
    int retval = 0;
    int tmp_fd = 0;
#if defined(__SELECT__)
    int max_fd = 0;
    FD_ZERO(&read_fds_);
#elif defined(__POLL__)
    int fds_num = 0;
    int tmp_pollfd_idx = 0;
#endif
    SocketInfo* info = NULL;
    if (socket_info_list_.get_list_len() <= 0)
        return 0;
    for (int idx = socket_info_list_.GetHeadNodeIdx(); 
        idx >= 0;) {
        if (socket_info_list_.GetOneItem(idx, &info) < 0) {
            std::cout << "In GetClientCode, no SocketInfo of " 
                << idx << std::endl; 
        } else {
            TCPSocket& tcp_socket = info->socket;
            tmp_fd = tcp_socket.get_socket();
#if defined(__SELECT__)
            if (tmp_fd > max_fd) max_fd = tmp_fd;
            if (RECV_DATA == info->flag)
                FD_SET(tmp_fd, &read_fds_);
#elif defined(__POLL__)
            fds_[fds_num].fd = tmp_fd;
            fds_[fds_num].events = POLLIN;
            info->pollfd_idx = fds_num;
            fds_num ++;
#endif
        }
        idx = socket_info_list_.GetNextNodeIdx(idx);
    }
#if defined(__SELECT__)
    wait_time_.tv_sec = 0;
    wait_time_.tv_usec = 10 * 1000;
    retval = select(max_fd + 1, 
        &read_fds_, NULL, NULL, &wait_time_);
    if (retval <= 0) {
        if (EINTR == errno) {
            std::cout << "Call select error, errno " 
                << errno << std::endl;
        }
        return 0;
    }
#elif defined(__POLL__)
    wait_time_ = 10; 
    retval = poll(fds_, fds_num, wait_time_);
    if (retval <= 0) {
        if (EINTR == errno) {
            std::cout << "Call poll error, errno " 
                << errno << std::endl;
        }
        return 0;
    }
#endif
    for (int idx = socket_info_list_.GetHeadNodeIdx(); 
        idx >= 0;) {
        if (socket_info_list_.GetOneItem(idx, &info) < 0) {
            std::cout << "In GetClientCode, no SocketInfo of " 
                << idx << std::endl; 
        } else {
            TCPSocket& tcp_socket = info->socket;
            tmp_fd = tcp_socket.get_socket();
#if defined(__SELECT__)
            if (FD_ISSET(tmp_fd, &read_fds_)) {
#elif defined(__POLL__)
            if ((tmp_pollfd_idx = info->pollfd_idx) < 0) {
                idx = socket_info_list_.GetNextNodeIdx(idx);
                continue;
            }
            if (fds_[tmp_pollfd_idx].revents & POLLIN) {
#endif
                if ((retval = RecvClientBytes(idx)) < 0) {
                    if (ERR_RECV_REMOTE == retval) {
                        tcp_socket.Close();
                        std::cout << info->ip << ":" 
                            << info->port 
                            << " socket_idx(" 
                            << idx << ") close." 
                            << std::endl;
                        int next = socket_info_list_.GetNextNodeIdx(idx);
                        socket_info_list_.DelOneItem(idx);
                        NotifySClientClosed(idx);
                        idx = next;
                        continue;
                    } else {
                        std::cout << info->ip << ":" 
                            << info->port 
                            << " recv error." << std::endl;
                    }
                }
            }
            tcp_socket.SendReservedBytes();
        }
        idx = socket_info_list_.GetNextNodeIdx(idx);
    }
    return 0;
}

使用I/O多路复用机制以后,出现的问题及改进

使用I/O多路复用以后,之前“盲目”的轮询,将变成“有意义”的轮询,即轮询多路复用接口返回给我们的已准备就绪的文件描述符集合。这其实是把原先的主动行为变为了被驱动的行为,因而,多路复用被视为一种事件(准备就绪)驱动机制。

事件驱动,就意味着,如果没有事件,我们就不会去做响应。反映到目前的程序上来说,就是NControl::RecvClientBytes这个方法不会不停地被执行。这就意味着,其中的NControl::OnRecvdClientBytes不会不停地被调用。

看一下OnRecvdClientBytes方法的实现,它的核心任务就是从当前取到的字节中提取出code,然后将code入队到c2s_code_queue中。问题就出在提取code这一步上,目前是调用一次OnRecvdClientBytes只进行一次code提取。若收到的这些字节中包含多个code且内核接收缓冲中已没有任何数据的话,就意味着OnRecvdClientBytes将不再会被调用,那些未被提取的code将没有机会被取出。

问题根源是:多路复用机制只关注内核接收缓冲中是否有数据就绪,有就“驱动”处理流程OnRecvdClientBytes工作,没有就不会“驱动”。

我们要修改提取code的流程,让每一次OnRecvdClientBytes调用,都尽最大可能将完整的code全部取出。不用关系那些尚不构成code的字节,因为,一旦有后续字节到来,或者内核接收缓冲中仍有残留数据,OnRecvdClientBytes流程都会被“驱动”,它们总有机会被提取出来。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int NControl::OnRecvdClientBytes(int socket_idx)
{
    int retval = 0;
    SocketInfo* info = NULL;
    if (socket_info_list_.GetOneItem(socket_idx, &info) < 0)
        return -1;
    TCPSocket& comm_socket = info->socket;
    unsigned char code[256];
    memset(code, 0, sizeof(code));
    int nethead_len = sizeof(NetHead);
    unsigned int code_len = sizeof(code) - nethead_len;
    (*((NetHead*)code)).socket_index = info->index;
    while ((retval = comm_socket.GetOneCode(code_len, 
        code + nethead_len)) >= 0) {
        std::cout << "netio:" << info->server_port 
            << " <<<<<<<<<<< " << info->ip << ":" 
            << info->port << " socket_idx(" 
            << socket_idx << ") " << code_len 
            << " bytes." << std::endl;
        code_len += nethead_len;
        retval = c2s_code_queue_->PushCodeBack(code, code_len);
        if (retval < 0) {
            if (-2 == retval) {
                std::cout << "c2s_code_queue is full." 
                    << std::endl;
                break;
            } else {
                std::cout<< "Retval: " << retval << std::endl;
                return 1;
            }
        }
    }
    return 0;
}

不过,这么修改之后,又会有新的问题:c2s_code_queue_可能会比较容易出现“满”的情况,按照现在的处理,code队列满了以后,准备要入队的那个code将被丢弃。如果要避免这个问题,就要尽量将c2s_code_queue_的长度设置的大一些。同时,c2s_code_queue_中入队code的速度变快了,也要求s在从中取code的速度要加快,就是把循环取code的次数调大。而这么一来,就又会使s2c_code_queue_变得更容易“满”,类似地,在增加s2c_code_queue_长度的同时,我们要提高netio从该队列中取code的速度。

由于目前的netio部分,在从s2c_code_queue_中取code时,还是一次取一个,这里加一个外层循环,实现一次调用,多次取code的效果,循环次数由MAX_GET_SERVER_MSG_NUM指定。由于改动很简单,不列出代码。

上面说的这些问题,都可以通过调整数值进行缓解,后续在做系统容量测试的时候,再深入讨论。

相关文件:


<==  index  ==>