逐步实现TCP服务端Step02-6:两个进程协作

communicator与processor分属两个进程,这个时候需要额外的机制来完成这两者的通信。

这里使用共享内存的方式来实现:

与使用线程协作的主要区别在于,g_c2s_code_queue和g_s2c_code_queue这两个队列占用的是共享内存的空间,需要对共享内存进行管理。

此前的communicator和processor共存于一个进程s中,现在把它俩分开,在两个进程中实现。可以理解为将communicator从s中拆分了出来,做成了单独的进程,把这个进程的名字定为”netio”,它专注于通信。

除了基于共享内存进行交流之外,netio和s再没有其他关系,二者是分别实现的,没有fork操作。

从此以后,服务端由单一进程s,转变为netio和s组合。

以前总结过SystemV共享内存的创建,详见:《封装一个SystemV共享内存创建函数》,需要对CreateShareMemory的代码稍作改动,调用者需要知道CreateShareMemory是创建了全新的共享内存段,还是attach到了现有的内存段。基于这个信息来决定是否要Init构建于该共享内存中的对象的状态,比如队列。

netio与s进程的启动一定是有先后的,假设netio先启动,此时有client连进来并发来了数据,此时,构建于共享内存段中的队列就会有等待s进程处理的数据。若s启动后,不管是何种情况,就直接将该队列Init掉,已有的数据就遗失了,这是不合理的。新版CreateShareMemory增加了is_new_shm参数来告知调用方下一步是否要Init队列。

修改后的CreateShareMemory,is_new_shm其实是做返回值用的,调用者传入的值不起任何作用

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
void* CreateShareMemory(unsigned int key, size_t size, bool& is_new_shm)
{
    is_new_shm = true;
    int shm_id = shmget(key, size, IPC_CREAT | IPC_EXCL | 0666);
    if (shm_id < 0) {
        // 创建失败,可能是已有共享内存了
        if (errno != EEXIST) {
            std::clog << "Alloc share memory failed, " 
                << strerror(errno) << std::endl;
            return NULL;
        }
        shm_id = shmget(key, size, 0666);
        if (shm_id < 0) {
            // 如attach失败,可能是size比之前的大
            std::clog << "Attach to share memory " 
                << shm_id << " failed, " << strerror(errno) 
                << ", try to touch it."<< std::endl;
            shm_id = shmget(key, 0, 0666);
            if (shm_id < 0) {
                std::clog << "Fatal error, touch to share memory failed, " 
                    << strerror(errno) << std::endl;
                return NULL;
            } else {
                std::clog << "Remove the exist share memory " 
                    << shm_id << std::endl;
                if (shmctl(shm_id, IPC_RMID, NULL)) {
                    std::clog << "Remove share memory failed, " 
                        << strerror(errno) << std::endl;
                    return NULL;
                }
                shm_id = shmget(key, size, IPC_CREAT | IPC_EXCL | 0666);
                if (shm_id < 0) {
                    std::clog << "Fatal error, alloc share memory failed, " 
                        << strerror(errno) << std::endl;
                    return NULL;
                }
            }
        } else {
            is_new_shm = false;
        }
    }
    unsigned char* cur_shm = (unsigned char*)shmat(shm_id, NULL, 0);
    if (!cur_shm) return NULL;
    return cur_shm;
}

另外,base.h文件中对CodeQueue结构的定义,InitCodeQueue函数均需要修改:

将CodeQueue中的buf改为数组

1
2
3
4
5
6
struct CodeQueueEx {
    unsigned char buf[MAX_CODE_BUF_LEN];
    int size;
    int begin;
    int end;
};

InitCodeQueue函数不需要再new新的空间,整个code queue应该构建在CreateShareMemory时所创建的共享内存中,InitCodeQueue接受code queue的指针,仅对内存中的内容做初始化即可。需要对g_c2s_code_queueex或g_s2c_code_queueex进行操作的函数都要做相应的改动,代码不在文中展示,详见base.h文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
CodeQueueEx* g_c2s_code_queueex = NULL;
CodeQueueEx* g_s2c_code_queueex = NULL;

int InitCodeQueueEx(CodeQueueEx* code_queue, int size)
{
    if (!code_queue) {
        return -1;
    }
    if (size > MAX_CODE_BUF_LEN) {
        return -2;
    }
    code_queue->end   = 0;
    code_queue->begin = 0;
    code_queue->size  = size;
    memset(&code_queue->buf, 
        0, sizeof(code_queue->buf));
    return 0;
}

下面是s和netio的具体实现,由于netio实际是从老版的s中拆分出来的,这二者的代码都是基于老版s做的修改,大结构没变:

s.cc

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
#include <cstring>
#include <cstdlib>
#include <iostream>
#include <sys/types.h>  
#include <errno.h>
#include "base.h"

int main(int argc, char**argv)
{
    if (InitCodeQueue(g_code_queue, 100) < 0) {
        std::clog << "Init g_code_queue failed." 
            << std::endl;
        return 1;
    }
    bool is_new_shm = false;
    g_c2s_code_queueex 
        = (CodeQueueEx*)CreateShareMemory(0x1110, 
            sizeof(CodeQueueEx), is_new_shm);
    if (!g_c2s_code_queueex) {
        std::clog << "Create share memeory for " 
            << "c2s code queue failed." << std::endl;
        return 1;
    }
    if (is_new_shm) {
        if (InitCodeQueueEx(g_c2s_code_queueex, 100) < 0) {
            std::clog << "Init g_c2s_code_queueex failed." 
                << std::endl;
            return 1;
        }
    }
    g_s2c_code_queueex 
        = (CodeQueueEx*)CreateShareMemory(0x1111, 
            sizeof(CodeQueueEx), is_new_shm);
    if (!g_s2c_code_queueex) {
        std::clog << "Create share memeory for " 
            << "s2c code queue failed." << std::endl;
        return 1;
    }
    if (is_new_shm) {
        if (InitCodeQueueEx(g_s2c_code_queueex, 100) < 0) {
            std::clog << "Init g_s2c_code_queueex failed." 
                << std::endl;
            return 1;
        }
    }
    memset(g_recv_buf, 0, sizeof(g_recv_buf));
    memset(g_post_buf, 0, sizeof(g_post_buf));

    while (1) {
        if (GetC2SCodeEx() < 0)
            break;
        if (DispatchMessages() < 0)
            break;
    }

    return 0;
}

netio.cc

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#include <cstring>
#include <cstdlib>
#include <iostream>
#include <unistd.h>
#include <sys/types.h>  
#include <arpa/inet.h>
#include <netinet/in.h>
#include <errno.h>
#include "base.h"

int main(int argc, char** argv)
{
    if (argc != 2) {
        std::clog << "Usage: " 
            << argv[0] << " port" << std::endl;
        return 0;
    }
    bool is_new_shm = false;
    g_c2s_code_queueex 
        = (CodeQueueEx*)CreateShareMemory(0x1110, 
            sizeof(CodeQueueEx), is_new_shm);
    if (!g_c2s_code_queueex) {
        std::clog << "Create share memeory for " 
            << "c2s code queue failed." << std::endl;
        return 1;
    }
    if (is_new_shm) {
        if (InitCodeQueueEx(g_c2s_code_queueex, 100) < 0) {
            std::clog << "Init g_c2s_code_queueex failed." 
                << std::endl;
            return 1;
        }
    }
is_new_shm = false;
    g_s2c_code_queueex 
        = (CodeQueueEx*)CreateShareMemory(0x1111, 
            sizeof(CodeQueueEx), is_new_shm);
    if (!g_s2c_code_queueex) {
        std::clog << "Create share memeory for " 
            << "s2c code queue failed." << std::endl;
        return 1;
    }
    if (is_new_shm) {
        if (InitCodeQueueEx(g_s2c_code_queueex, 100) < 0) {
            std::clog << "Init g_s2c_code_queueex failed." 
                << std::endl;
            return 1;
        }
    }
    memset(g_recv_buf, 0, sizeof(g_recv_buf));
    memset(g_post_buf, 0, sizeof(g_post_buf));

    int svr_sock, cli_sock;
    sockaddr_in svr_addr, cli_addr;
    if ((svr_sock = socket(AF_INET, 
        SOCK_STREAM, 0)) < 0) {
        return 1;
    }
    memset(&svr_addr, 0, sizeof(svr_addr));
    svr_addr.sin_family = AF_INET;
    svr_addr.sin_port = htons(atoi(argv[1]));
    svr_addr.sin_addr.s_addr = htonl(INADDR_ANY);
    if (bind(svr_sock, 
        (const sockaddr*)&svr_addr, 
        sizeof(svr_addr)) < 0) {
        return 1;
    }
    if (listen(svr_sock, 4) < 0) {
        return 1;
    }
    while (1) {
        memset(&cli_addr, 0, sizeof(cli_addr));
        socklen_t addr_len = sizeof(cli_addr);
        if ((cli_sock = accept(svr_sock, 
            (sockaddr*)&cli_addr, &addr_len)) < 0) {
            return 1;
        }
        if (SetSockNonBlock(cli_sock) < 0) {
            std::clog << "set sock nonblock failed!" << std::endl;
            return 1;
        }
        unsigned char msg[128] = {0};
        while (1) {
            unsigned int msg_len = sizeof(msg);
            memset(msg, 0, msg_len);

            // RecvBytes
            /////////////////////////////////////////////////
            int retval = RecvBytes(cli_sock);
            if (-2 == retval) {
                std::clog << "recv buf is full." << std::endl;
            } else if (-3 == retval) {
                std::clog << "cli close!" << std::endl;
                break;
            } else if (retval < 0) 
                return 1;

            // OnRecvdSocketBytesEx
            /////////////////////////////////////////////////
            retval = OnRecvdSocketBytesEx();
            if (retval < 0) {
                if (retval != -2 && retval != -6 
                    && retval != -7) {
                    return 1;
                }
            }

            // GetS2CCodeEx
            /////////////////////////////////////////////////
            retval = GetS2CCodeEx();
            if (retval < 0)
                return 1;

            // SendBytes 
            /////////////////////////////////////////////////
            retval = SendBytes(cli_sock);
            if (retval < 0) {
                if (-2 == retval) {
                    continue;
                }
                return 1;
            }
        }
        close(cli_sock);
        g_read_begin = g_read_end = 0;
    }
    return 0;
}


<==  index  ==>