逐步实现TCP服务端Step02:正确处理流

TCP所提供的是一种面向连接的、可靠的字节流服务。它通过一系列控制机制:流量控制、差错控制、连接管理,令数据从一端顺利地流向另一端。

TCP这种流式服务对其使用者(应用进程)来说,麻烦之处在于:当调用其接口进行数据收发时,其处理粒度是字节,而不是消息(应用层约定的某种格式的数据)。也就是说,若调用发送接口欲投递一个应用层面的消息时,TCP有可能只受理前n个字节;调用接收接口提取数据时,有可能拿到的m个字节不足以构成一个应用层消息,或者可以构成多个消息也不一定。

总之,recv或send的调用次数与收到或发出的完整消息的个数并非是对等的。如何发出完整消息以及如何从一堆字节中解析出完整的消息,是应用层面需要关注的问题。

原始阶段,我们没有定义应用层消息,服务程序只是简单地把收到的字节原样发回去而已。此时,应用层的处理粒度与TCP的处理粒度是吻合的,不会存在上述问题。没定义应用层消息,也可以理解为一个字节就是一个应用层消息。显然,消息之间是天然分开的,不论recv还是send都不可能处理不足一个消息/字节的数据。

如果应用层的处理粒度与TCP的处理粒度不吻合呢?即,应用层定义的是长度大于1字节的消息,像这样:

如果消息的长度又不一致呢?像这样:

其实,不论定义了何种格式的消息,只要程序不关注消息的语义,就没必要在乎消息是否完整。如果只是进行数据转发,像原始阶段的代码那样,拿到了字节就投递出去,就不需要关注send或recv消息的完整性。

然而,实际情况是,服务端程序是要关注消息的语义的,不然怎么为客户服务呢?客户就是通过消息将自己的需求告知于服务端,服务端拿到消息,解析语义,而后提供相应的服务。

到目前为止,程序中所使用的是阻塞式socket(socket默认即是阻塞式的)。基于此种socket的send是易于处理的,因为它会将进程要求的字节量全部放到socket缓冲区后才返回。同时,发消息是一个主动行为,发送方肯定是知道自己要发的消息的长度的。调用send时,给出消息长度,等待返回即可。

而接收消息是一个被动的行为,事先不可能知道对方会发来什么消息。不知道是什么消息,也就不知道消息的长度,不知道消息的长度,就没法确定是否已经拿到了一个完整的消息。因此,首要问题是要明确消息的长度。若约定了一致的消息长度,那这个长度就是已知的。若使用的是长度不一的消息,就需要用某种方式让接收方知道当前消息的长度,以便处理。

办法就是在每个消息的前部都附加一个定长的前缀空间来存放该消息的长度,像这样:对于每一个消息,先把蓝色的定长部分还原出来,得到消息长度,然后就可以将整个消息还原出来了。

下面对原始的recv和send进行包装,得到RecvOneMessage和SendOneMessage函数。这两个函数保证了收或发一个消息的完整性。因为socket是阻塞式的,所以在调用RecvOneMessage或SendOneMessage的时候是有可能引起阻塞的。

这里将用于存放消息长度的前缀(上图蓝色部分)的长度设定为4个字节。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#ifndef __BASE_H__
#define __BASE_H__

#define MSG_PREFIX_LEN  (4)

int RecvOneMessage(int sock, unsigned int& len, unsigned char* msg)
{
    if (!len) return -1;
    if (!msg) return -1;
    if (sock <= 0) return -1;
    int recvd_bytes_cnt = 0;
    int total_recvd_bytes_cnt = 0;
    unsigned char prefix[MSG_PREFIX_LEN];
    int max_buf_len = MSG_PREFIX_LEN;
    while (total_recvd_bytes_cnt != MSG_PREFIX_LEN) {
        if ((recvd_bytes_cnt = recv(sock, 
            &prefix[total_recvd_bytes_cnt], 
            max_buf_len - total_recvd_bytes_cnt, 0)) < 0) {
            return -1;
        }
        if (0 == recvd_bytes_cnt) {
            return 0;
        }
        total_recvd_bytes_cnt += recvd_bytes_cnt;
    }
    max_buf_len = len;
    len = ntohl(*((unsigned int*)prefix));
    if (len > max_buf_len) {
        return -1;
    }
    total_recvd_bytes_cnt = 0;
    while (total_recvd_bytes_cnt != len) {
        if ((recvd_bytes_cnt = recv(sock, 
            &msg[total_recvd_bytes_cnt], 
            max_buf_len - total_recvd_bytes_cnt, 0)) < 0) {
            return -1;
        }
        if (0 == recvd_bytes_cnt) {
            return 0;
        }
        total_recvd_bytes_cnt += recvd_bytes_cnt;
    }
    return total_recvd_bytes_cnt + MSG_PREFIX_LEN;
}

int SendOneMessage(int sock, unsigned int len, const unsigned char* msg)
{    
    if (!len) return -1;
    if (!msg) return -1;
    if (sock <= 0) return -1;
    int total_sent_bytes_cnt = 0;
    unsigned char* buf 
        = new unsigned char[MSG_PREFIX_LEN + len];
    unsigned char* tmpbuf = buf; 
    if (!buf) return -1;
    *((int*)tmpbuf) = htonl(len);
    memcpy(&tmpbuf[MSG_PREFIX_LEN], msg, len);
    len += MSG_PREFIX_LEN;
    if ((total_sent_bytes_cnt 
        = send(sock, tmpbuf, len, 0)) < 0) {
        delete[] buf;
        return -1;
    }
    delete[] buf;
    return total_sent_bytes_cnt;
}

#endif /* __BASE_H__ */

使用了RecvOneMsg和SendOneMsg的服务端代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
#include <cstring>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
#include <sys/types.h>  
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "base.h"

int main(int argc, char**argv)
{
    if (argc != 2) {
        std::clog << "Usage: " 
            << argv[0] << " port" << std::endl;
        return 0;
    }
    int svr_sock, cli_sock;
    sockaddr_in svr_addr, cli_addr;
    if ((svr_sock = socket(AF_INET, 
        SOCK_STREAM, 0)) < 0) {
        return 1;
    }
    memset(&svr_addr, 0, sizeof(svr_addr));
    svr_addr.sin_family = AF_INET;
    svr_addr.sin_port = htons(atoi(argv[1]));
    svr_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    if (bind(svr_sock, 
        (const sockaddr*)&svr_addr, 
        sizeof(svr_addr)) < 0) {
        return 1;
    }
    if (listen(svr_sock, 4) < 0) {
        return 1;
    }
    while (1) {
        memset(&cli_addr, 0, sizeof(cli_addr));
        socklen_t addr_len = sizeof(cli_addr);
        if ((cli_sock = accept(svr_sock, 
            (sockaddr*)&cli_addr, &addr_len)) < 0) {
            return 1;
        }
        unsigned char msg[128] = {0};
        int recvd_bytes_cnt = 0;
        while (1) {
            unsigned int msg_len = sizeof(msg);
            memset(msg, 0, msg_len);
            if ((recvd_bytes_cnt = RecvOneMessage(cli_sock, 
                msg_len, msg)) < 0) {
                return 1;
            }
            if (0 == recvd_bytes_cnt) {
                std::clog << "cli close!" << std::endl;
                break;
            }
            if (SendOneMessage(cli_sock, msg_len, msg) < 0) {
                return 1;
            }
        }
        close(cli_sock);
    }
    return 0;
}

因为我们自己定义了一个所谓的“前缀”来存放消息的长度,要想让客户端也能识别出这4个字节,我们就得自己实现客户端。这个客户端同样使用RecvOneMsg和SendOneMsg函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <string>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include "base.h"

int main(int argc, char** argv)
{
    if (argc != 3) {
        std::clog << "Usage: " 
            << argv[0] << " ip port" << std::endl;
        return 0;
    }
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0) {
        return 1;
    }
    sockaddr_in svr_addr;
    memset(&svr_addr, 0, sizeof(svr_addr));
    svr_addr.sin_family = AF_INET;
    svr_addr.sin_port   = htons(atoi(argv[2]));
    svr_addr.sin_addr.s_addr = inet_addr(argv[1]); 
    if (connect(sock, (const sockaddr*)&svr_addr, 
        sizeof(svr_addr)) < 0) {
        return 1;
    }
    std::string input;
    unsigned char buf[128] = {0};
    unsigned int len = 0;
    while (1) {
        std::getline(std::cin, input);
        len = input.length();
        if (SendOneMessage(sock, len, 
            (const unsigned char*)input.c_str()) < 0) {
            return 1;
        }
        len = sizeof(buf);
        memset(buf, 0, len);
        if (RecvOneMessage(sock, 
            len, buf) < 0) {
            return 1;
        }
        std::clog << buf << std::endl;
    }
    return 0;
}

如果改用非阻塞socket情况会怎样呢?下一篇继续研究。

< == index == >