逐步实现TCP服务端Step02-1:基于非阻塞socket的处理逻辑

把socket设置成非阻塞,其目的就是不让程序逻辑在网络I/O处阻塞不前。整个程序都将基于这个“不阻塞”的大前提来进行设计。基于非阻塞socket调用recv和send时不会阻挠逻辑向前进行,那么,其包装函数RecvOneMessage和SendOneMessage也要做到不阻挠才合适。否则,若使用这组包装函数去替换原始函数,程序的行为就有可能会偏离我们的预期。

RecvOneMessage函数内部是循环操作,达到目标有可能要进行多次循环。同时,对于阻塞式socket来说,一次循环中的recv调用又有可能发生阻塞。这么看来基于阻塞式socket的RecvOneMessage函数是有可能阻挠逻辑前进的。

那么,基于非阻塞socket的情况呢?就算是基于非阻塞socket,这种循环的方式,也有可能导致函数调用者进入等待。这跟socket是阻塞还是非阻塞没有太大关系,关键在于何时能达到那个“目标”,达不到目标,函数就一直不返回,对于调用者来说,就是一直等待。

对于非阻塞socket,如果把RecvOneMessage或SendOneMessage调用做成立即返回的话,就可能与这两个函数的本意产生矛盾。因为这二者的存在就是为了保证处理消息的完整性,只有接收或者发出一个完整消息后才可返回,否则,不返回。一调用就返回,且保证处理到了一个完整的消息,这种情况是存在的,只不过不是每次都存在。谁能保证recv的时候,socket缓冲区中恰好就有足够的字节量,而send的时候,就刚好有足够的socket缓冲可用呢。

先看一下RecvOneMessage函数,这个函数内部其实是做了两件事:1. recv字节;2. 还原消息。它保证第一步可以recv到足够一个消息的字节,即,达到它的“目标”,而后顺利完成第二步,最后返回。我们要做的改变是:仍然让其循环recv,但不设定“目标”,能收到多少就收多少。其循环结束条件变为:recv到0个字节(对端关闭)或者recv出错。这样做的话,第二步就没法进行了。因为,recv的时候没有“目标”,一次RecvOneMessage调用可能会recv到不足一个消息长度的字节量,也可能超过一个消息。那么,这些零散的字节该怎么处理呢?它们都已经被recv出来了,必须找个地方存放才行,不然就没法再次获得了。

其实,应该把这两步分开来做,一个只负责recv字节,一个只负责还原消息: RecvBytes将recv到的字节存放到一个区域,GetOneMessage则从这个区域中提取出一个消息。若本次提取不成功(字节不足)则返回失败,可以等下次RecvBytes调用过后,再次进行GetOneMessage看能不能提出一个完整的消息。至于这个“区域”,它其实是一个字节队列,用一个字节数组就可以了。

对于SendOneMessage函数来说,在阻塞式socket的情况下,是不需要做循环处理的,send本身就可以将要求的字节全部投递出去。在使用非阻塞socket时,send会立即返回,这个时候就需要把未投递出去的字节找个地方存一下,以后每次调用SendOneMessage的时候,其内部都要先考虑将这些残留的字节投递出去,然后再投递本次调用所要求字节。因此,至少需要再实现两个函数:ReserveBytes和SendReservedBytes,这两个函数供SendOneMessage调用。至于这个“区域”,也是用字节数组。

这里继续保留RecvOneMessage和SendOneMessage的当前版本,新版命名为SendOneMessageEx,代码如下:

base.h

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
#ifndef __BASE_H__
#define __BASE_H__

#include <unistd.h>
#include <fcntl.h>
#include <cstring>
#include <cerrno>

#define MSG_PREFIX_LEN      (4)
#define MAX_RECV_BUF_LEN    (128)
#define MAX_POST_BUF_LEN    (128)

unsigned char g_recv_buf[MAX_RECV_BUF_LEN];
unsigned char g_post_buf[MAX_POST_BUF_LEN];
int g_read_begin = 0;
int g_read_end   = 0;
int g_post_begin = 0;
int g_post_end   = 0;

// retval:
//  0 - success
// -1 - sock invalid 
// -2 - recv buf is full
// -3 - client close
// -4 - send error
int RecvBytes(int sock)
{
    if (sock <= 0) 
        return -1;
    if (g_read_begin == g_read_end) {
        g_read_begin = 0;
        g_read_end   = 0;
    }
    int recvd_bytes_cnt = 0;
    int retval = 0;
    do {
        if (g_read_end >= MAX_RECV_BUF_LEN) {
            retval = -2;
            break;
        }
        recvd_bytes_cnt = recv(sock, 
            &g_recv_buf[g_read_end], 
            MAX_RECV_BUF_LEN - g_read_end, 0);
        if (recvd_bytes_cnt > 0) {
            g_read_end += recvd_bytes_cnt;
        } else if (0 == recvd_bytes_cnt) {
            retval = -3;
            break;
        } else if (EAGAIN != errno) {
            retval = -4;
            break;
        }
    } while (recvd_bytes_cnt > 0);
    return retval;
}

int GetOneMessage(unsigned int& len, unsigned char* msg)
{
    if (!msg) return -1;
    int bytes_cnt = g_read_end - g_read_begin;
    if (bytes_cnt <= MSG_PREFIX_LEN)
        return -2;
    int max_buf_len = len;
    len = ntohl(*((int*)&g_recv_buf[g_read_begin]));
    if (len <= 0) {
        g_read_begin = g_read_end = 0;
        return -3;
    }
    if (max_buf_len < len)
        return -4;
    if (int(bytes_cnt - MSG_PREFIX_LEN) < len) {
        // check if the rest of g_recv_buf is enough.
        if (g_read_begin + len + MSG_PREFIX_LEN 
            >= MAX_RECV_BUF_LEN) {
            memmove(g_recv_buf, 
                &g_recv_buf[g_read_begin], bytes_cnt);
            g_read_begin = 0;
            g_read_end = bytes_cnt;
            if (g_read_begin + len + MSG_PREFIX_LEN 
                > MAX_RECV_BUF_LEN) {
                return -5;
            }
        }
        return -6;
    }
    memcpy(msg, &g_recv_buf[g_read_begin + MSG_PREFIX_LEN], len);
    g_read_begin += MSG_PREFIX_LEN;
    g_read_begin += len;
    if (g_read_begin == g_read_end)
        g_read_begin = g_read_end = 0;
    return 0;
}

int SendReservedBytes(int sock)
{
    if (sock <= 0) return -1;
    int sent_bytes = 0;
    int left_bytes = g_post_end - g_post_begin; 
    if (!left_bytes) return 0;
    unsigned char* tmp_post_buf = &g_post_buf[g_post_begin]; 
    while (1) {
        sent_bytes = send(sock, 
            tmp_post_buf, left_bytes, 0);
        if (sent_bytes > 0) {
            tmp_post_buf += sent_bytes;
            left_bytes -= sent_bytes;
            g_post_begin += sent_bytes;
            if (!left_bytes) {
                g_post_begin = g_post_end = 0;
                return 0;
            }
        } else if (sent_bytes < 0) {
            if (EAGAIN == errno)
                return -2;
            else
                return -3;
        } else 
            return -3;
    }
    return 0;
}

int ReserveBytes(int bytes_cnt, const unsigned char* bytes)
{
    int existing_bytes_cnt = g_post_end - g_post_begin;
    if (existing_bytes_cnt + bytes_cnt < MAX_POST_BUF_LEN) {
        if (g_post_begin > 0) {
            memmove(g_post_buf, &g_post_buf[g_post_begin], 
                    existing_bytes_cnt);
            g_post_begin = 0;
        }
        g_post_end = existing_bytes_cnt;
        memcpy(&g_post_buf[g_post_end], bytes, bytes_cnt);
        g_post_end += bytes_cnt;
    } else return -1;
    return 0;
}

int SendOneMessageEx(int sock, unsigned int len, const unsigned char* msg)
{    
    if (!len) return -1;
    if (!msg) return -1;
    if (sock <= 0) return -1;
    int sent_bytes = 0;
    int left_bytes = MSG_PREFIX_LEN + len;
    unsigned char* buf 
        = new unsigned char[left_bytes];
    unsigned char* tmpbuf = buf;
    if (!buf) return -1;
    *((int*)tmpbuf) = htonl(len);
    memcpy(&tmpbuf[MSG_PREFIX_LEN], msg, len);
    int retval = SendReservedBytes(sock);
    if (retval < 0) {
        if (-2 == retval) {
            if (ReserveBytes(left_bytes, tmpbuf) < 0) {
                delete[] buf;
                return -1;
            }
        } else { 
            delete[] buf;
            return -1;
        }
        delete[] buf;
        return -2;
    }
    while (left_bytes > 0) {
        sent_bytes = send(sock, tmpbuf, left_bytes, 0);
        if (sent_bytes > 0) {
            tmpbuf += sent_bytes;
            left_bytes -= sent_bytes;
        } else if (sent_bytes < 0) {
            if (EAGAIN == errno) {
                if (ReserveBytes(left_bytes, tmpbuf) < 0) {
                    delete[] buf;
                    return -1;
                }
                delete[] buf;
                return -2;
            }
        } else {
            delete[] buf;
            return -1;
        }
    }
    delete[] buf;
    return 0;
}

int SetSockNonBlock(int sock)
{
    if (sock <= 0) return -1;
    int flags = fcntl(sock, F_GETFL, 0);
    if (flags < 0) return -1;
    flags |= O_NONBLOCK;
    return fcntl(sock, F_SETFL, flags);
}

int RecvOneMessage(int sock, unsigned int& len, unsigned char* msg)
{
    if (!len) return -1;
    if (!msg) return -1;
    if (sock <= 0) return -1;
    int recvd_bytes_cnt = 0;
    int total_recvd_bytes_cnt = 0;
    unsigned char prefix[MSG_PREFIX_LEN];
    int max_buf_len = MSG_PREFIX_LEN;
    while (total_recvd_bytes_cnt != MSG_PREFIX_LEN) {
        if ((recvd_bytes_cnt = recv(sock, 
            &prefix[total_recvd_bytes_cnt], 
            max_buf_len - total_recvd_bytes_cnt, 0)) < 0) {
            return -1;
        }
        if (0 == recvd_bytes_cnt) {
            return 0;
        }
        total_recvd_bytes_cnt += recvd_bytes_cnt;
    }
    max_buf_len = len;
    len = ntohl(*((unsigned int*)prefix));
    if (len > max_buf_len) {
        return -1;
    }
    total_recvd_bytes_cnt = 0;
    while (total_recvd_bytes_cnt != len) {
        if ((recvd_bytes_cnt = recv(sock, 
            &msg[total_recvd_bytes_cnt], 
            max_buf_len - total_recvd_bytes_cnt, 0)) < 0) {
            return -1;
        }
        if (0 == recvd_bytes_cnt) {
            return 0;
        }
        total_recvd_bytes_cnt += recvd_bytes_cnt;
    }
    return total_recvd_bytes_cnt + MSG_PREFIX_LEN;
}

int SendOneMessage(int sock, unsigned int len, const unsigned char* msg)
{    
    if (!len) return -1;
    if (!msg) return -1;
    if (sock <= 0) return -1;
    int total_sent_bytes_cnt = 0;
    unsigned char* buf 
        = new unsigned char[MSG_PREFIX_LEN + len];
    unsigned char* tmpbuf = buf;
    if (!buf) return -1;
    *((int*)tmpbuf) = htonl(len);
    memcpy(&tmpbuf[MSG_PREFIX_LEN], msg, len);
    len += MSG_PREFIX_LEN;
    if ((total_sent_bytes_cnt 
        = send(sock, tmpbuf, len, 0)) < 0) {
        delete[] buf;
        return -1;
    }
    delete[] buf;
    return total_sent_bytes_cnt;
}

#endif /* __BASE_H__ */

s.cc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
#include <cstring>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
#include <sys/types.h>  
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <errno.h>
#include "base.h"

int main(int argc, char**argv)
{
    if (argc != 2) {
        std::clog << "Usage: " 
            << argv[0] << " port" << std::endl;
        return 0;
    }
    int svr_sock, cli_sock;
    sockaddr_in svr_addr, cli_addr;
    if ((svr_sock = socket(AF_INET, 
        SOCK_STREAM, 0)) < 0) {
        return 1;
    }
    memset(&svr_addr, 0, sizeof(svr_addr));
    svr_addr.sin_family = AF_INET;
    svr_addr.sin_port = htons(atoi(argv[1]));
    svr_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    if (bind(svr_sock, 
        (const sockaddr*)&svr_addr, 
        sizeof(svr_addr)) < 0) {
        return 1;
    }
    if (listen(svr_sock, 4) < 0) {
        return 1;
    }
    while (1) {
        memset(&cli_addr, 0, sizeof(cli_addr));
        socklen_t addr_len = sizeof(cli_addr);
        if ((cli_sock = accept(svr_sock, 
            (sockaddr*)&cli_addr, &addr_len)) < 0) {
            return 1;
        }
        if (SetSockNonBlock(cli_sock) < 0) {
            std::cerr << "set sock nonblock failed!" << std::endl;
            return 1;
        }
        unsigned char msg[128] = {0};
        while (1) {
            unsigned int msg_len = sizeof(msg);
            memset(msg, 0, msg_len);
            int retval = RecvBytes(cli_sock);
            if (-2 == retval) {
                std::clog << "recv buf is full." << std::endl;
            } else if (-3 == retval) {
                std::clog << "cli close!" << std::endl;
                break;
            } else if (retval < 0) 
                return 1;
            retval = GetOneMessage(msg_len, msg);
            if (retval < 0) {
                if (-2 == retval || -6 == retval) {
                    continue;
                }
                return 1;
            }
            std::clog << msg << std::endl;
            retval = SendOneMessageEx(cli_sock, msg_len, msg);
            if (retval < 0) {
                if (-2 == retval) {
                    std::clog << "no tcp buf, reserve bytes." 
                        << std::endl;
                    continue;
                }
                return 1;
            }
        }
        close(cli_sock);
        g_read_begin = g_read_end = 0;
    }
    return 0;
}

其中,SetSockNonBlock用来将指定的socket设置为非阻塞。

客户端扔使用阻塞式socket,完成一次消息发送后,只有收到服务端返回的消息,才可再次发送消息。

<==  index  ==>