From aa2f3b2a9968bb4928463bdae05fb026d16b60bb Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 04 十二月 2020 19:07:01 +0800 Subject: [PATCH] 固定bus key --- src/socket/shm_mod_socket.c | 184 +++++++++++++++++++++++----------------------- 1 files changed, 92 insertions(+), 92 deletions(-) diff --git a/src/socket/shm_mod_socket.c b/src/socket/shm_mod_socket.c index e890721..9563e6b 100644 --- a/src/socket/shm_mod_socket.c +++ b/src/socket/shm_mod_socket.c @@ -1,5 +1,7 @@ #include "shm_mod_socket.h" +static Logger *logger = LoggerFactory::getLogger(); + void ShmModSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); @@ -105,8 +107,8 @@ // printf("ShmModSocket destory 4\n"); } -int ShmModSocket::bind(int port) { - return shm_socket_bind(shm_socket, port); +int ShmModSocket::bind(int key) { + return shm_socket_bind(shm_socket, key); } @@ -115,82 +117,83 @@ * 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int ShmModSocket::force_bind(int port) { - return shm_socket_force_bind(shm_socket, port); +int ShmModSocket::force_bind(int key) { + return shm_socket_force_bind(shm_socket, key); } /** * 鍙戦�佷俊鎭� - * @port 鍙戦�佺粰璋� + * @key 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int ShmModSocket::sendto(const void *buf, const int size, const int port) { - return shm_sendto(shm_socket, buf, size, port, NULL, 0); +int ShmModSocket::sendto(const void *buf, const int size, const int key) { + return shm_sendto(shm_socket, buf, size, key, NULL, 0); } // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::sendto_timeout(const void *buf, const int size, const int port, const struct timespec *timeout) { - return shm_sendto(shm_socket, buf, size, port, timeout, 0); +int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) { + return shm_sendto(shm_socket, buf, size, key, timeout, 0); } // 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� -int ShmModSocket::sendto_nowait( const void *buf, const int size, const int port){ - return shm_sendto(shm_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT); +int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){ + return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT); } -inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *port, struct timespec *timeout, int flags) { +inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *key, struct timespec *timeout, int flags) { if(mod == BUS) { - err_exit(0, "Can not use method recvfrom in a Bus"); + logger->error("Can not use method recvfrom in a Bus"); + exit(1); } // printf("dgram_mod_recvfrom before\n"); - int rv = shm_recvfrom(shm_socket, buf, size, port, timeout, flags); + int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags); // printf("dgram_mod_recvfrom after\n"); return rv; } /** * 鎺ユ敹淇℃伅 - * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� + * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int ShmModSocket::recvfrom(void **buf, int *size, int *port) { +int ShmModSocket::recvfrom(void **buf, int *size, int *key) { - return _recvfrom_( buf, size, port, NULL, 0); + return _recvfrom_( buf, size, key, NULL, 0); } // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *port, struct timespec *timeout) { - return _recvfrom_(buf, size, port, timeout, 0); +int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) { + return _recvfrom_(buf, size, key, timeout, 0); } -int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *port){ - return _recvfrom_(buf, size, port, NULL, (int)SHM_MSG_NOWAIT); +int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){ + return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT); } /** * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 - * @port 鍙戦�佺粰璋� + * @key 鍙戦�佺粰璋� * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 */ -int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ - return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0); +int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ + return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){ - return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0); +int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ + return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); } -int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ - return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); +int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ + return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); } -int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ - return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0); +int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ + return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){ - return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0); +int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ + return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); } -int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ - return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); +int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ + return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); } @@ -214,17 +217,17 @@ * 璁㈤槄鎸囧畾涓婚 * @topic 涓婚 * @size 涓婚闀垮害 - * @port 鎬荤嚎绔彛 + * @key 鎬荤嚎绔彛 */ -int ShmModSocket::sub(char *topic, int size, int port){ - return _sub_( topic, size, port, NULL, 0); +int ShmModSocket::sub(char *topic, int size, int key){ + return _sub_( topic, size, key, NULL, 0); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::sub_timeout(char *topic, int size, int port, struct timespec *timeout){ - return _sub_(topic, size, port, timeout, 0); +int ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){ + return _sub_(topic, size, key, timeout, 0); } -int ShmModSocket::sub_nowait(char *topic, int size, int port) { - return _sub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); +int ShmModSocket::sub_nowait(char *topic, int size, int key) { + return _sub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); } @@ -233,17 +236,17 @@ * 鍙栨秷璁㈤槄鎸囧畾涓婚 * @topic 涓婚 * @size 涓婚闀垮害 - * @port 鎬荤嚎绔彛 + * @key 鎬荤嚎绔彛 */ -int ShmModSocket::desub(char *topic, int size, int port){ - return _desub_( topic, size, port, NULL, 0); +int ShmModSocket::desub(char *topic, int size, int key){ + return _desub_( topic, size, key, NULL, 0); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::desub_timeout(char *topic, int size, int port, struct timespec *timeout){ - return _desub_(topic, size, port, timeout, 0); +int ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){ + return _desub_(topic, size, key, timeout, 0); } -int ShmModSocket::desub_nowait(char *topic, int size, int port) { - return _desub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); +int ShmModSocket::desub_nowait(char *topic, int size, int key) { + return _desub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); } @@ -252,76 +255,76 @@ * 鍙戝竷涓婚 * @topic 涓婚 * @content 涓婚鍐呭 - * @port 鎬荤嚎绔彛 + * @key 鎬荤嚎绔彛 */ -int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int port){ - return _pub_(topic, topic_size, content, content_size, port, NULL, 0); +int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){ + return _pub_(topic, topic_size, content, content_size, key, NULL, 0); } // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){ - return _pub_( topic, topic_size, content, content_size, port, timeout, 0); +int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){ + return _pub_( topic, topic_size, content, content_size, key, timeout, 0); } -int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int port){ - return _pub_(topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT); +int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){ + return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT); } /** - * 鑾峰彇soket绔彛鍙� + * 鑾峰彇soket key */ -int ShmModSocket::get_port(){ - return shm_socket->port; +int ShmModSocket::get_key(){ + return shm_socket->key; } // ============================================================================= /** - * @port 鎬荤嚎绔彛 + * @key 鎬荤嚎绔彛 */ -int ShmModSocket::_sub_(char *topic, int size, int port, +int ShmModSocket::_sub_(char *topic, int size, int key, struct timespec *timeout, int flags) { char buf[8192]; int rv; snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); - rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); + rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); if(rv == 0) { - bus_set->insert(port); + bus_set->insert(key); } return rv; } /** - * @port 鎬荤嚎绔彛 + * @key 鎬荤嚎绔彛 */ -int ShmModSocket::_desub_(char *topic, int size, int port, +int ShmModSocket::_desub_(char *topic, int size, int key, struct timespec *timeout, int flags) { char buf[8192]; if(topic == NULL) { topic = ""; } snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); - return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); + return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); } /** - * @port 鎬荤嚎绔彛 + * @key 鎬荤嚎绔彛 */ -int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int port, +int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, struct timespec *timeout, int flags) { int head_len; char buf[8192+content_size]; snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); head_len = strlen(buf); memcpy(buf+head_len, content, content_size); - return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags); + return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags); } /* * 澶勭悊璁㈤槄 */ -void ShmModSocket::_proxy_sub( char *topic, int port) { +void ShmModSocket::_proxy_sub( char *topic, int key) { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; @@ -334,13 +337,13 @@ subscripter_set = new(set_ptr) SHMKeySet; topic_sub_map->insert({topic, subscripter_set}); } - subscripter_set->insert(port); + subscripter_set->insert(key); } /* * 澶勭悊鍙栨秷璁㈤槄 */ -void ShmModSocket::_proxy_desub( char *topic, int port) { +void ShmModSocket::_proxy_desub( char *topic, int key) { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; @@ -349,30 +352,28 @@ if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; - subscripter_set->erase(port); -printf("============ desub %d\n", port); + subscripter_set->erase(key); } } /* * 澶勭悊鍙栨秷鎵�鏈夎闃� */ -void ShmModSocket::_proxy_desub_all(int port) { +void ShmModSocket::_proxy_desub_all(int key) { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; // SHMKeySet::iterator set_iter; for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { subscripter_set = map_iter->second; - subscripter_set->erase(port); -printf("============ desub %d\n", port); + subscripter_set->erase(key); } } /* * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙� */ -void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port) { +void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) { SHMKeySet *subscripter_set; SHMTopicSubMap::iterator map_iter; @@ -381,19 +382,19 @@ std::vector<int> subscripter_to_del; std::vector<int>::iterator vector_iter; - int send_port; + int send_key; struct timespec timeout = {1,0}; if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { subscripter_set = map_iter->second; for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { - send_port = *set_iter; - // printf("_proxy_pub send before %d \n", send_port); - if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_port, &timeout) == SHM_SOCKET_ECONNFAILED ) { + send_key = *set_iter; + // printf("_proxy_pub send before %d \n", send_key); + if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) { //瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡 - subscripter_to_del.push_back(send_port); + subscripter_to_del.push_back(send_key); } else { -// printf("_proxy_pub send after: %d \n", send_port); +// printf("_proxy_pub send after: %d \n", send_key); } @@ -403,7 +404,7 @@ for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { subscripter_set->erase(set_iter); - printf("remove closed subscripter %d \n", send_port); + logger->debug("remove closed subscripter %d \n", send_key); } } subscripter_to_del.clear(); @@ -414,13 +415,13 @@ void * ShmModSocket::run_pubsub_proxy() { // pthread_detach(pthread_self()); int size; - int port; + int key; char * action, *topic, *topics, *buf; size_t head_len; const char *topic_delim = ","; // printf("run_pubsub_proxy server receive before\n"); - while(shm_recvfrom(shm_socket, (void **)&buf, &size, &port) == 0) { + while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { //printf("run_pubsub_proxy server recv after: %s \n", buf); if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { // printf("run_pubsub_proxy %s %s \n", action, topics); @@ -429,7 +430,7 @@ topic = strtok(topics, topic_delim); //printf("run_pubsub_proxy topic = %s\n", topic); while(topic) { - _proxy_sub(trim(topic, 0), port); + _proxy_sub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); } @@ -437,13 +438,12 @@ // printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); if(strcmp(trim(topics, 0), "") == 0) { // 鍙栨秷鎵�鏈夎闃� - printf("====鍙栨秷鎵�鏈夎闃匼n"); - _proxy_desub_all(port); + _proxy_desub_all(key); } else { topic = strtok(topics, topic_delim); while(topic) { - _proxy_desub(trim(topic, 0), port); + _proxy_desub(trim(topic, 0), key); topic = strtok(NULL, topic_delim); } } @@ -451,13 +451,13 @@ } else if(strcmp(action, "pub") == 0) { - _proxy_pub(topics, head_len, buf, size, port); + _proxy_pub(topics, head_len, buf, size, key); } free(action); free(topics); } else { - err_msg(0, "incorrect format msg"); + logger->error( "ShmModSocket::run_pubsub_proxy : incorrect format msg"); } free(buf); } -- Gitblit v1.8.0