From aa2f3b2a9968bb4928463bdae05fb026d16b60bb Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 04 十二月 2020 19:07:01 +0800 Subject: [PATCH] 固定bus key --- src/key_def.h | 2 src/socket/net_mod_socket_wrapper.h | 19 +--- src/socket/shm_mod_socket.h | 3 src/socket/bus_server_socket_wrapper.h | 33 ++++++++ test_net_socket/test_net_mod_socket.c | 40 +++++---- src/socket/bus_server_socket_wrapper.c | 40 ++++++++++ test_net_socket/net_mod_socket.sh | 4 src/socket/net_mod_socket_wrapper.c | 33 +++----- src/socket/net_mod_server_socket.c | 9 +- 9 files changed, 125 insertions(+), 58 deletions(-) diff --git a/src/key_def.h b/src/key_def.h new file mode 100644 index 0000000..80ef7e7 --- /dev/null +++ b/src/key_def.h @@ -0,0 +1,2 @@ +#define BUS_MAP_KEY 1 +#define BUS_KEY 8 \ No newline at end of file diff --git a/src/socket/bus_server_socket_wrapper.c b/src/socket/bus_server_socket_wrapper.c new file mode 100644 index 0000000..22bbe09 --- /dev/null +++ b/src/socket/bus_server_socket_wrapper.c @@ -0,0 +1,40 @@ +#include "bus_server_socket_wrapper.h" +#include "key_def.h" +static Logger *logger = LoggerFactory::getLogger(); + + +/** + * 鍒涘缓 + */ +void * bus_server_socket_wrapper_open() { + + NetModSocket *sockt = new NetModSocket; + return (void *)sockt; +} + +/** + * 鍏抽棴 + */ +void bus_server_socket_wrapper_close(void *_socket) { + NetModSocket *sockt = (NetModSocket *)_socket; + delete sockt; +} + +/** + * 鍚姩bus + * + * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 +*/ +int bus_server_socket_wrapper_start_bus(void * _socket) { + int ret; + NetModSocket *sockt = (NetModSocket *)_socket; + + if( (ret = sockt->bind(BUS_KEY)) == 0) { + sockt->start_bus(); + return 0; + } else { + logger->error("start bus failed"); + return -1; + } + +} \ No newline at end of file diff --git a/src/socket/bus_server_socket_wrapper.h b/src/socket/bus_server_socket_wrapper.h new file mode 100644 index 0000000..7a12fe1 --- /dev/null +++ b/src/socket/bus_server_socket_wrapper.h @@ -0,0 +1,33 @@ +#ifndef _BUS_SERVER_SOCKET_WRAPPER_H_ +#define _BUS_SERVER_SOCKET_WRAPPER_H_ + +#include "net_mod_socket.h" + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * 鍒涘缓 + */ +void * bus_server_socket_wrapper_open(); + +/** + * 鍏抽棴 + */ +void bus_server_socket_wrapper_close(void *_sockt); + +/** + * 鍚姩bus + * + * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 +*/ +int bus_server_socket_wrapper_start_bus(void * _socket); + + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c index 7924421..8959231 100644 --- a/src/socket/net_mod_server_socket.c +++ b/src/socket/net_mod_server_socket.c @@ -3,6 +3,7 @@ #include "socket_io.h" #include "net_mod_socket_io.h" #include "net_mod_socket.h" +#include "key_def.h" static Logger * logger = LoggerFactory::getLogger(); @@ -229,18 +230,18 @@ // LoggerFactory::getLogger()->debug("====server pub %s===\n", buf); memcpy(response_head.host, request_head.host, NI_MAXHOST); response_head.port = request_head.port; - response_head.key = request_head.key; + // response_head.key = request_head.key; if(request_head.timeout > 0) { timeout.tv_sec = request_head.timeout / 1000; timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6; - ret = shmModSocket.pub_timeout((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key, &timeout); + ret = shmModSocket.pub_timeout((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY, &timeout); } else if(request_head.timeout == 0) { - ret = shmModSocket.pub_nowait((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key); + ret = shmModSocket.pub_nowait((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY); } else if(request_head.timeout == -1) { - ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key); + ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY); } response_head.code = ret; diff --git a/src/socket/net_mod_socket_wrapper.c b/src/socket/net_mod_socket_wrapper.c index 1434033..f464487 100644 --- a/src/socket/net_mod_socket_wrapper.c +++ b/src/socket/net_mod_socket_wrapper.c @@ -137,15 +137,6 @@ -/** - * 鍚姩bus - * - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int net_mod_socket_start_bus(void * _socket) { - net_mod_socket_t *sockt = (net_mod_socket_t *)_socket; - return sockt->sockt->start_bus(); -} /** * 璁㈤槄鎸囧畾涓婚 @@ -153,18 +144,18 @@ * @size 涓婚闀垮害 * @port 鎬荤嚎绔彛 */ -int net_mod_socket_sub(void * _socket, void *topic, int size, int port) { +int net_mod_socket_sub(void * _socket, void *topic, int size) { net_mod_socket_t *sockt = (net_mod_socket_t *)_socket; - return sockt->sockt->sub((char *)topic, size, port); + return sockt->sockt->sub((char *)topic, size, BUS_KEY); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int net_mod_socket_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){ +int net_mod_socket_sub_timeout(void * _socket, void *topic, int size, int sec, int nsec){ net_mod_socket_t *sockt = (net_mod_socket_t *)_socket; - return sockt->sockt->sub_timeout((char *)topic, size, port, sec, nsec); + return sockt->sockt->sub_timeout((char *)topic, size, BUS_KEY, sec, nsec); } -int net_mod_socket_sub_nowait(void * _socket, void *topic, int size, int port){ +int net_mod_socket_sub_nowait(void * _socket, void *topic, int size){ net_mod_socket_t *sockt = (net_mod_socket_t *)_socket; - return sockt->sockt->sub_nowait((char *)topic, size, port); + return sockt->sockt->sub_nowait((char *)topic, size, BUS_KEY); } @@ -174,18 +165,18 @@ * @size 涓婚闀垮害 * @port 鎬荤嚎绔彛 */ -int net_mod_socket_desub(void * _socket, void *topic, int size, int port) { +int net_mod_socket_desub(void * _socket, void *topic, int size) { net_mod_socket_t *sockt = (net_mod_socket_t *)_socket; - return sockt->sockt->desub((char *)topic, size, port); + return sockt->sockt->desub((char *)topic, size, BUS_KEY); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec) { +int net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int sec, int nsec) { net_mod_socket_t *sockt = (net_mod_socket_t *)_socket; - return sockt->sockt->desub_timeout((char *)topic, size, port, sec, nsec); + return sockt->sockt->desub_timeout((char *)topic, size, BUS_KEY, sec, nsec); } -int net_mod_socket_desub_nowait(void * _socket, void *topic, int size, int port){ +int net_mod_socket_desub_nowait(void * _socket, void *topic, int size){ net_mod_socket_t *sockt = (net_mod_socket_t *)_socket; - return sockt->sockt->desub_nowait((char *)topic, size, port); + return sockt->sockt->desub_nowait((char *)topic, size, BUS_KEY); } diff --git a/src/socket/net_mod_socket_wrapper.h b/src/socket/net_mod_socket_wrapper.h index 421733f..7f37b28 100644 --- a/src/socket/net_mod_socket_wrapper.h +++ b/src/socket/net_mod_socket_wrapper.h @@ -84,13 +84,6 @@ net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ; -/** - * 鍚姩bus - * - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int net_mod_socket_start_bus(void * _socket); - /** * 鍚憂ode_arr 涓殑鎵�鏈夌綉缁滆妭鐐瑰彂甯冩秷鎭� @@ -112,10 +105,10 @@ * @size 涓婚闀垮害 * @key 鎬荤嚎绔彛 */ -int net_mod_socket_sub(void * _socket, void *topic, int size, int key); +int net_mod_socket_sub(void * _socket, void *topic, int size); // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int net_mod_socket_sub_timeout(void * _socket, void *topic, int size, int key, int sec, int nsec); -int net_mod_socket_sub_nowait(void * _socket, void *topic, int size, int key); +int net_mod_socket_sub_timeout(void * _socket, void *topic, int size, int sec, int nsec); +int net_mod_socket_sub_nowait(void * _socket, void *topic, int size); /** @@ -124,10 +117,10 @@ * @size 涓婚闀垮害 * @key 鎬荤嚎绔彛 */ -int net_mod_socket_desub(void * _socket, void *topic, int size, int key); +int net_mod_socket_desub(void * _socket, void *topic, int size); // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int key, int sec, int nsec); -int net_mod_socket_desub_nowait(void * _socket, void *topic, int size, int key); +int net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int sec, int nsec); +int net_mod_socket_desub_nowait(void * _socket, void *topic, int size); /** diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h index aced91d..bca34c7 100644 --- a/src/socket/shm_mod_socket.h +++ b/src/socket/shm_mod_socket.h @@ -7,6 +7,7 @@ #include "hashtable.h" #include "sem_util.h" #include "logger_factory.h" +#include "key_def.h" #include <set> #define ACTION_LIDENTIFIER "<**" @@ -14,7 +15,7 @@ #define TOPIC_LIDENTIFIER "{" #define TOPIC_RIDENTIFIER "}" -#define BUS_MAP_KEY 1 + //typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString; typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet; typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap; diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh index a54fca4..68d4ff1 100755 --- a/test_net_socket/net_mod_socket.sh +++ b/test_net_socket/net_mod_socket.sh @@ -18,7 +18,7 @@ ./test_net_mod_socket --fun="start_net_client" \ --sendlist="localhost:5000:100" \ - --publist="localhost:5000:8" + --publist="localhost:5000" } @@ -31,7 +31,7 @@ function mpub() { ./test_net_mod_socket --fun="test_net_pub_threads" \ - --publist="localhost:5000:8, localhost:5000:8" + --publist="localhost:5000, localhost:5000" } diff --git a/test_net_socket/test_net_mod_socket.c b/test_net_socket/test_net_mod_socket.c index d8dcd0b..08f3589 100644 --- a/test_net_socket/test_net_mod_socket.c +++ b/test_net_socket/test_net_mod_socket.c @@ -1,5 +1,7 @@ #include "net_mod_server_socket_wrapper.h" #include "net_mod_socket_wrapper.h" +#include "bus_server_socket_wrapper.h" + #include "shm_mm_wraper.h" #include "usg_common.h" #include <getopt.h> @@ -53,13 +55,13 @@ } -void start_bus_server(int key) { +void start_bus_server() { printf("Start bus server\n"); - void * server_socket = net_mod_socket_open(); - - net_mod_socket_bind(server_socket, key); - - net_mod_socket_start_bus(server_socket); + void * server_socket = bus_server_socket_wrapper_open(); + if(bus_server_socket_wrapper_start_bus(server_socket) != 0) { + printf("start bus failed\n"); + exit(1); + } } @@ -139,10 +141,10 @@ } } else if(strcmp(action, "desub") == 0) { - printf("Please input buskey and topic!\n"); + printf("Please input topic!\n"); - scanf("%d %s", &buskey, topic); - if (net_mod_socket_desub(client, topic, strlen(topic), buskey) == 0) { + scanf("%s", topic); + if (net_mod_socket_desub(client, topic, strlen(topic)) == 0) { printf("%d Desub success!\n", net_mod_socket_get_key(client)); } else { printf("Desub failture!\n"); @@ -151,11 +153,10 @@ } else if(strcmp(action, "sub") == 0) { - printf("Please input buskey and topic!\n"); - scanf("%d %s",&buskey, topic); + printf("Please input topic!\n"); + scanf("%s",topic); - printf("===%d %s\n",buskey, topic); - if (net_mod_socket_sub(client, topic, strlen(topic), buskey) == 0) { + if (net_mod_socket_sub(client, topic, strlen(topic)) == 0) { printf("%d Sub success!\n", net_mod_socket_get_key(client)); } else { printf("Sub failture!\n"); @@ -361,7 +362,7 @@ usage(argv[0]); exit(1); } - start_bus_server(opt.key); + start_bus_server(); } else if (strcmp("start_reply", opt.fun) == 0) { if(opt.key == 0) { @@ -557,10 +558,15 @@ for(i = 0; i < entry_arr_len; i++) { property_arr_len = str_split(entry_arr[i], ":", &property_arr); // printf("%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]); - node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), atoi(property_arr[2])}; - free(entry_arr[i]); + node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), 0}; + free(property_arr[1]); - free(property_arr[2]); + if(property_arr_len == 3) { + node_arr[i].key = atoi(property_arr[2]); + free(property_arr[2]); + } + free(entry_arr[i]); + } *node_arr_addr = node_arr; -- Gitblit v1.8.0