wangzhengquan
2020-12-03 23a5822a2a4f874b84c2c2a9c2442ac4a5279176
文档
1个文件已删除
3个文件已添加
4个文件已修改
242 ■■■■ 已修改文件
doc/bus_service.png 补丁 | 查看 | 原始文档 | blame | 历史
doc/network_req_rep.png 补丁 | 查看 | 原始文档 | blame | 历史
doc/使用指南.mk 149 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
doc/使用说明.mk 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket_wrapper.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.c 63 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
doc/bus_service.png
doc/network_req_rep.png
doc/ʹÓÃÖ¸ÄÏ.mk
New file
@@ -0,0 +1,149 @@
# <center>BHomeBus ä½¿ç”¨æŒ‡å—</center>
## 1. BHomeBus包含的角色
- NetProxyService(网络代理服务): è´Ÿè´£è·¨æœºå™¨è½¬å‘发布或请求消息
- BusService( æ€»çº¿æœåŠ¡ï¼‰: å‘布订阅的路由,负责记录订阅者的消息并把发布者发布的消息转发给感兴趣的订阅者。
- Pub/Sub Client(发布订阅客户端): å‘BusService发布消息或者在BusService上订阅感兴趣的消息
- Request/Reply Client ï¼ˆè¯·æ±‚应答客户端):向其他机器/进程发送请求消息或者从其他机器/进程接受应答消息
其中BusService和NetProxyService是服务进程,使用者把它起来后就不需要关心了。使用者主要关注的是Pub/Sub Client å’Œ Request/Reply Client
## 2. ä½¿ç”¨è¯´æ˜Ž
具体每个方法的及其参数的说明可以参看它们对应的头文件。
使用时,每台主机都需要先把BusService和NetProxyService这两个服务启动起来。因为后面的功能都依赖这两个服务。
### 2.1 å¯åЍNetProxyService
NetProxyService的头文件是 net_mod_server_socket_wrapper.h. ä¸‹é¢çš„代码启动端口是5000的网络代理服务
```
void *serverSocket  = net_mod_server_socket_open(5000);
if(net_mod_server_socket_start(serverSocket) != 0) {
    err_exit(errno, "net_mod_server_socket_start");
}
```
### 2.2 å¯åЍBusService
BusService, Pub/Sub Client å’Œ Request/Reply Client的头文件都是 net_mod_socket_wrapper.h, ä¸‹é¢å¯åЍkey是8的Bus服务.
```
void * server_socket = net_mod_socket_open();
net_mod_socket_bind(server_socket, 8);
net_mod_socket_start_bus(server_socket);
```
###  2.3 Request/Reply ç”¨ä¾‹è¯´æ˜Ž
现在模拟一个场景,A向B和C发送一个请求,B和C收到请求后分别返回一个响应给A ã€‚假设A的IP是192.168.20.101,B的IP是192.168.20.102, C的IP是192.168.20.103, å®ƒä»¬çš„key都是100, ä»£ç†server的端口是5000。
A çš„代码如下:
```
int recv_arr_size, n;
net_mod_recv_msg_t *recv_arr;
const char* content = "HELLO WORLD!":
net_mod_socket_bind(client, 100);
net_node_t node_arr = {
    {"192.168.20.102", 5000, 100},
    {"192.168.20.103", 5000, 100}
};
int node_arr_size = 2;
void *client = net_mod_socket_open();
n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size);
printf(">>> %d nodes reply\n", n);
for(i=0; i<recv_arr_size; i++) {
    printf("host:%s, port: %d, key:%d, content: %s\n",
        recv_arr[i].host,
        recv_arr[i].port,
        recv_arr[i].key,
        recv_arr[i].content
    );
}
// ä½¿ç”¨å®ŒåŽï¼Œä¸è¦å¿˜è®°é‡Šæ”¾æŽ‰
net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
```
B å’Œ C的代码如下:
```
void *client = net_mod_socket_open();
net_mod_socket_bind(client, 100);
int size;
void *recvbuf;
char sendbuf[512];
int rv;
int remote_port;
while ( (rv = net_mod_socket_recvfrom(client, &recvbuf, &size, &remote_port) ) == 0) {
    // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
    sprintf(sendbuf, "RECEIVED  PORT %d NAME %s", remote_port, recvbuf);
    net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port);
    free(recvbuf);
}
```
### 2.3 Pub/Sub ç”¨ä¾‹è¯´æ˜Ž
现在模拟一个场景,B å’Œ C订阅了主题news, A å‘布了该主题相关的内容 ã€‚假设A的IP是192.168.20.101,B的IP是192.168.20.102, C的IP是192.168.20.103, å®ƒä»¬çš„key都是200, Bus的key是8, ä»£ç†server的端口是5000.
A的代码如下:
```
int n;
const char* topic = "news":
const char* content = "HELLO WORLD!":
void *client = net_mod_socket_open();
net_mod_socket_bind(client, 200);
net_node_t node_arr = {
    {"192.168.20.102", 5000, 8},
    {"192.168.20.103", 5000, 8}
};
int node_arr_size = 2;
void *client = net_mod_socket_open();
net_mod_socket_bind(client, 200);
n = net_mod_socket_pub(client, pub_node_arr, pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1);
printf("pub %d nodes\n", n);
```
B å’Œ C çš„代码如下
```
const char* topic = "news":
void *client = net_mod_socket_open();
net_mod_socket_bind(client, 200);
while (net_mod_socket_sub(client, topic, strlen(topic),  8) == 0) {
 printf("%d Sub success!\n", net_mod_socket_get_key(client));
}
```
更具体的实例代码请参看`test_net_mod_socket.c`
## 3 è®¾è®¡è¯´æ˜Ž
使用的时候,大家会对BusService和NetProxyService这两个需要额外启动服务感到疑惑。下面重点队这两个服务做做一些说明。
### 3.1 Bus设计
![Bus示意图](./bus_service.png)
上面这张示意图是说订阅news的Client有A和B,订阅sports的有B和C,这些都在总线里记录着。当A向总线pub和主题sports相关的内容时,B和C会通过总线收到这个主题的消息。
### 3.2 NetProxyService设计
![NetProxyService示意图](./network_req_rep.png)
上面这张图跨机器请求应答的示意图。这张示意图时说当节点A向节点B的key 1001队列和key 1002队列发送消息时,它会首先发送到节点B的网络代理server上。网络代理server会把请求消息转发到相应的队列上,并接受应答返回给节点A。
同理跨机器的发布订阅也是通过这样的方式交互的。
doc/ʹÓÃ˵Ã÷.mk
File was deleted
src/socket/net_mod_socket.h
@@ -115,7 +115,7 @@
  /**
   * å¦‚果建立连接的节点没有接受到消息会一直等待
   * å‘node_arr ä¸­çš„æ‰€æœ‰ç½‘络节点发送请求消息,节点的返回信息汇总并存储在recv_arr中
   * @node_arr ç½‘络节点组, @node_arr_len该数组长度
   * @node_arr ç½‘络节点组, @node_arr_len该数组长度.如果IP为空则为本地发送。
   * @send_buf å‘送的消息,@send_size è¯¥æ¶ˆæ¯ä½“的长度
   * @recv_arr è¿”回的应答消息组,@recv_arr_size è¯¥æ•°ç»„长度
   * @return æˆåŠŸå‘é€çš„èŠ‚ç‚¹çš„ä¸ªæ•°
src/socket/net_mod_socket_wrapper.h
@@ -59,15 +59,14 @@
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec);
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key);
/**
 * å¦‚果建立连接的节点没有接受到消息会一直等待
 * å‘node_arr ä¸­çš„æ‰€æœ‰ç½‘络节点发送请求消息,节点的返回信息汇总并存储在recv_arr中
 * @node_arr ç½‘络节点组, @node_arr_len该数组长度
 * @node_arr ç½‘络节点组, @node_arr_len该数组长度.如果IP为空则为本地发送。
 * @send_buf å‘送的消息,@send_size è¯¥æ¶ˆæ¯ä½“的长度
 * @recv_arr è¿”回的应答消息组,@recv_arr_size è¯¥æ•°ç»„长度
 * @return æˆåŠŸå‘é€çš„èŠ‚ç‚¹çš„ä¸ªæ•°
 * ä¼˜ç‚¹ï¼š1某个节点的故障不会阻塞其他节点。2性能好
 * ä¼˜ç‚¹ï¼š1某个节点的故障不会阻塞其他节点。2 æ€§èƒ½å¥½ã€‚ 3 é‡‡ç”¨thread local技术即保证了线程安全,又可以使用连接池缓存连接
 */
int net_mod_socket_sendandrecv(void *_sockt, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ;
test_net_socket/net_mod_socket.sh
@@ -6,7 +6,7 @@
    ./test_net_mod_socket  --fun="start_net_proxy" --port=5000 & server_pid=$! && echo "pid: ${server_pid}" 
    # æ‰“开请求应答测试的接受端
    ./test_net_mod_socket --fun="start_reply" --key=11 & server_pid=$! &&  echo "pid: ${server_pid}"
    ./test_net_mod_socket --fun="start_reply" --key=100 & server_pid=$! &&  echo "pid: ${server_pid}"
}
function client() {
@@ -17,7 +17,7 @@
    ./test_net_mod_socket --fun="start_net_client" \
     --sendlist="localhost:5000:11" \
     --sendlist="localhost:5000:100" \
     --publist="localhost:5000:8"  
     
@@ -25,7 +25,7 @@
function msend() {
    ./test_net_mod_socket --fun="test_net_sendandrecv_threads" \
     --sendlist="localhost:5000:11, localhost:5000:11"
     --sendlist="localhost:5000:100, localhost:5000:100"
     
}
test_net_socket/test_net_mod_socket.c
@@ -52,6 +52,36 @@
  
}
void start_bus_server(int key) {
  printf("Start bus server\n");
  void * server_socket = net_mod_socket_open();
  net_mod_socket_bind(server_socket, key);
  net_mod_socket_start_bus(server_socket);
}
void start_reply(int key) {
  printf("start reply\n");
  void *client = net_mod_socket_open();
  net_mod_socket_bind(client, key);
  int size;
  void *recvbuf;
  char sendbuf[512];
  int rv;
  int remote_port;
  while ( (rv = net_mod_socket_recvfrom(client, &recvbuf, &size, &remote_port) ) == 0) {
   // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
    sprintf(sendbuf, "RECEIVED  PORT %d NAME %s", remote_port, recvbuf);
    net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port);
    free(recvbuf);
  }
}
void start_net_client(char *sendlist, char*publist ){
    client = net_mod_socket_open();
    char content[MAXLINE];
@@ -93,9 +123,8 @@
        
          if (fgets(content, MAXLINE, stdin) != NULL) {
              // æ”¶åˆ°æ¶ˆæ¯çš„节点即使没有对应的信息, ä¹Ÿè¦å›žå¤ä¸€ä¸ªè¡¨ç¤ºæ— çš„æ¶ˆæ¯,否则会一直等待
            n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content,
          strlen(content), &recv_arr, &recv_arr_size);
            printf("send %d nodes\n", n);
            n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size);
            printf(" %d nodes reply\n", n);
            for(i=0; i<recv_arr_size; i++) {
                printf("host:%s, port: %d, key:%d, content: %s\n", 
                    recv_arr[i].host,
@@ -147,34 +176,6 @@
  
}
void start_bus_server(int key) {
  printf("Start bus server\n");
  void * server_socket = net_mod_socket_open();
  net_mod_socket_bind(server_socket, key);
  net_mod_socket_start_bus(server_socket);
}
void start_reply(int key) {
  printf("start reply\n");
  void *socket = net_mod_socket_open();
  net_mod_socket_bind(socket, key);
  int size;
  void *recvbuf;
  char sendbuf[512];
  int rv;
  int remote_port;
  while ( (rv = net_mod_socket_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) {
   // printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
    sprintf(sendbuf, "RECEIVED  PORT %d NAME %s", remote_port, recvbuf);
    net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port);
    free(recvbuf);
  }
}