其中BusService和NetProxyService是服务进程,使用者把它起来后就不需要关心了。使用者主要关注的是Pub/Sub Client 和 Request/Reply Client
具体每个方法的及其参数的说明可以参看它们对应的头文件。
使用时,每台主机都需要先把BusService和NetProxyService这两个服务启动起来。因为后面的功能都依赖这两个服务。
NetProxyService的头文件是 net_mod_server_socket_wrapper.h. 下面的代码启动端口是5000的网络代理服务
void *serverSocket = net_mod_server_socket_open(5000);
if(net_mod_server_socket_start(serverSocket) != 0) {
err_exit(errno, "net_mod_server_socket_start");
}
BusService的头文件是bus_server_socket_wrapper.h, 下面是启动Bus服务的代码.
void * server_socket = bus_server_socket_wrapper_open();
if(bus_server_socket_wrapper_start_bus(server_socket) != 0) {
printf("start bus failed\n");
exit(1);
}
Pub/Sub Client 和 Request/Reply Client的头文件都是 net_mod_socket_wrapper.h
现在模拟一个场景,A向B和C发送一个请求,B和C收到请求后分别返回一个响应给A 。假设A的IP是192.168.20.101,B的IP是192.168.20.102, C的IP是192.168.20.103, 它们的key都是100, 代理server的端口是5000。
A 的代码如下:
int recv_arr_size, n;
net_mod_recv_msg_t *recv_arr;
const char* content = "HELLO WORLD!":
net_mod_socket_bind(client, 100);
net_node_t node_arr = {
{"192.168.20.102", 5000, 100},
{"192.168.20.103", 5000, 100}
};
int node_arr_size = 2;
void *client = net_mod_socket_open();
n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size,
content, strlen(content), &recv_arr, &recv_arr_size);
printf(">>> %d nodes reply\n", n);
for(i=0; i<recv_arr_size; i++) {
printf("host:%s, port: %d, key:%d, content: %s\n",
recv_arr[i].host,
recv_arr[i].port,
recv_arr[i].key,
recv_arr[i].content
);
}
// 使用完后,不要忘记释放掉
net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
B 和 C的代码如下:
void *client = net_mod_socket_open();
net_mod_socket_bind(client, 100);
int size;
void *recvbuf;
char sendbuf[512];
int rv;
int remote_port;
while ( (rv = net_mod_socket_recvfrom(client, &recvbuf, &size, &remote_port) ) == 0) {
// printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf);
net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port);
free(recvbuf);
}
现在模拟一个场景,B 和 C订阅了主题news, A 发布了该主题相关的内容 。假设A的IP是192.168.20.101,B的IP是192.168.20.102, C的IP是192.168.20.103, 它们的key都是200, 代理server的端口是5000.
A的代码如下:
int main() {
int n;
const char* topic = "news":
const char* content = "HELLO WORLD!":
void *client = net_mod_socket_open();
net_mod_socket_bind(client, 200);
net_node_t node_arr = {
{"192.168.20.102", 5000},
{"192.168.20.103", 5000}
};
int node_arr_size = 2;
void *client = net_mod_socket_open();
net_mod_socket_bind(client, 200);
n = net_mod_socket_pub(client, pub_node_arr,
pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1);
printf("pub %d nodes\n", n);
}
B 和 C 的代码如下
```
// 打印接受到的订阅消息
void *print_sub_msg(void *sockt) {
pthread_detach(pthread_self());
void *recvbuf;
int size;
int key;
while (net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) == 0) {
printf("收到订阅消息:%s\n", recvbuf);
free(recvbuf);
}
}
int main() {
pthread_t tid;
// 创建一个线程接受订阅消息
pthread_create(&tid, NULL, print_sub_msg, client);
const char* topic = "news":
void *client = net_mod_socket_open();
net_mod_socket_bind(client, 200);
// 订阅感兴趣的主题
if (net_mod_socket_sub(client, topic, strlen(topic)) == 0) {
printf("%d Sub success!\n", net_mod_socket_get_key(client));
}
}
```
更具体的实例代码请参看test_net_mod_socket.c
使用的时候,大家会对BusService和NetProxyService这两个需要额外启动服务感到疑惑。下面重点队这两个服务做做一些说明。
上面这张示意图是说订阅news的Client有A和B,订阅sports的有B和C,这些都在总线里记录着。当A向总线pub和主题sports相关的内容时,B和C会通过总线收到这个主题的消息。
上面这张图跨机器请求应答的示意图。这张示意图时说当节点A向节点B的key 1001队列和key 1002队列发送消息时,它会首先发送到节点B的网络代理server上。网络代理server会把请求消息转发到相应的队列上,并接受应答返回给节点A。
同理跨机器的发布订阅也是通过这样的方式交互的。