From 8b0a8c644f19e97606dfb06a865f56dbad15f95e Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 21 十二月 2020 14:44:45 +0800 Subject: [PATCH] update --- src/key_def.h | 7 src/socket/net_mod_socket.c | 9 src/socket/shm_socket.c | 8 test_net_socket/test_net_mod_socket.c | 40 + src/socket/net_mod_socket.h | 17 Makefile | 2 src/socket/shm_socket.h | 37 + src/socket/shm_mod_socket.c | 312 ---------------- /dev/null | 127 ------ src/socket/shm_mod_socket.h | 44 -- src/socket/bus_server_socket_wrapper.h | 2 src/socket/bus_server_socket.c | 353 ++++++++++++++++++ src/socket/bus_server_socket_wrapper.c | 8 src/socket/socket_def.h | 37 + test_net_socket/net_mod_socket.sh | 6 src/socket/bus_server_socket.h | 85 ++++ src/socket/net_mod_server_socket.c | 3 17 files changed, 558 insertions(+), 539 deletions(-) diff --git a/Makefile b/Makefile index 74a5919..df8a65d 100755 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ # debug "make --just-print" -DIRS = src test_net_socket test_socket +DIRS = src test_net_socket TAR_NAME = shm_queue.tar.gz all: diff --git a/src/key_def.h b/src/key_def.h index 80ef7e7..1807b6a 100644 --- a/src/key_def.h +++ b/src/key_def.h @@ -1,2 +1,7 @@ +#ifndef _KEY_DEF_H_ +#define _KEY_DEF_H_ + #define BUS_MAP_KEY 1 -#define BUS_KEY 8 \ No newline at end of file +#define BUS_KEY 8 + +#endif \ No newline at end of file diff --git a/src/socket/bus_server_socket.c b/src/socket/bus_server_socket.c new file mode 100644 index 0000000..eac784f --- /dev/null +++ b/src/socket/bus_server_socket.c @@ -0,0 +1,353 @@ + +#include "bus_server_socket.h" + +static Logger *logger = LoggerFactory::getLogger(); + +void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { + SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); + SHMKeySet *subscripter_set; + SHMKeySet::iterator set_iter; + SHMTopicSubMap::iterator map_iter; + + if(topic_sub_map != NULL) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + subscripter_set = map_iter->second; + if(subscripter_set != NULL) { + for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { + cb(subscripter_set, *set_iter); + } + } + } + } +} + +// bool BusServerSocket::include_in_keys(int key, int keys[], size_t length) { +// if(length == 0) { +// return false; +// } +// for(int i = 0; i < length; i++) { +// if(keys[i] == key) +// return true; +// } +// return false; +// } + +size_t BusServerSocket::remove_subscripters(int keys[], size_t length) { + size_t count = 0; + int key; + for(int i = 0; i < length; i++) { + key = keys[i]; + SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); + SHMKeySet *subscripter_set; + SHMKeySet::iterator set_iter; + SHMTopicSubMap::iterator map_iter; + + if(topic_sub_map != NULL) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + subscripter_set = map_iter->second; + if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { + subscripter_set->erase(set_iter); +// printf("remove_subscripter %s, %d\n", map_iter->first, key); + count++; + } + } + } + } + return count; + +} + + +BusServerSocket::BusServerSocket() { + shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); + topic_sub_map = NULL; +} + +BusServerSocket::~BusServerSocket() { +// printf("BusServerSocket destory 1\n"); + SHMKeySet *subscripter_set; + SHMTopicSubMap::iterator map_iter; + + stop(); + sleep(2); +// printf("BusServerSocket destory 2\n"); + if(topic_sub_map != NULL) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + subscripter_set = map_iter->second; +// printf("BusServerSocket destory 2-1\n"); + if(subscripter_set != NULL) { +// printf("BusServerSocket destory 2-2\n"); + subscripter_set->clear(); +// printf("BusServerSocket destory 2-3\n"); + mm_free((void *)subscripter_set); +// printf("BusServerSocket destory 2-4\n"); + } + + } + topic_sub_map->clear(); + mem_pool_free_by_key(BUS_MAP_KEY); + } +// printf("BusServerSocket destory 3\n"); + // printf("=============close socket\n"); + shm_close_socket(shm_socket); +// printf("BusServerSocket destory 4\n"); +} + + + +int BusServerSocket::bind(int key) { + return shm_socket_bind(shm_socket, key); +} + +/** + * 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key + * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 +*/ +int BusServerSocket::force_bind(int key) { + return shm_socket_force_bind(shm_socket, key); +} +/** + * 鍚姩bus + * + * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 +*/ +int BusServerSocket::start(){ + topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); + + run_pubsub_proxy(); + // pthread_t tid; + // pthread_create(&tid, NULL, run_accept_sub_request, _socket); + return 0; +} + + +int BusServerSocket::stop(){ + char buf[128]; + + if( shm_socket->key <= 0) { + return -1; + } + snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); + return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0); +} + +/* + * 澶勭悊璁㈤槄 +*/ +void BusServerSocket::_proxy_sub( char *topic, int key) { + SHMKeySet *subscripter_set; + + SHMTopicSubMap::iterator map_iter; + SHMKeySet::iterator set_iter; +//printf("_proxy_sub topic = %s\n", topic); + if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { + subscripter_set = map_iter->second; + } else { + void *set_ptr = mm_malloc(sizeof(SHMKeySet)); + subscripter_set = new(set_ptr) SHMKeySet; + topic_sub_map->insert({topic, subscripter_set}); + } + subscripter_set->insert(key); +} + +/* + * 澶勭悊鍙栨秷璁㈤槄 +*/ +void BusServerSocket::_proxy_desub( char *topic, int key) { + SHMKeySet *subscripter_set; + + SHMTopicSubMap::iterator map_iter; + // SHMKeySet::iterator set_iter; + + if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { + subscripter_set = map_iter->second; + + subscripter_set->erase(key); + } +} + +/* + * 澶勭悊鍙栨秷鎵�鏈夎闃� +*/ +void BusServerSocket::_proxy_desub_all(int key) { + SHMKeySet *subscripter_set; + + SHMTopicSubMap::iterator map_iter; + // SHMKeySet::iterator set_iter; + for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + subscripter_set = map_iter->second; + subscripter_set->erase(key); + } +} + +/* + * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙� +*/ +void BusServerSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) { + SHMKeySet *subscripter_set; + + SHMTopicSubMap::iterator map_iter; + SHMKeySet::iterator set_iter; + + std::vector<int> subscripter_to_del; + std::vector<int>::iterator vector_iter; + + int send_key; + struct timespec timeout = {1,0}; + + if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { + subscripter_set = map_iter->second; + for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { + send_key = *set_iter; + // printf("_proxy_pub send before %d \n", send_key); + if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) { + //瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡 + subscripter_to_del.push_back(send_key); + } else { +// printf("_proxy_pub send after: %d \n", send_key); + } + + + } + + // 鍒犻櫎宸插叧闂殑绔� + for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { + if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { + subscripter_set->erase(set_iter); + logger->debug("remove closed subscripter %d \n", send_key); + } + } + subscripter_to_del.clear(); + + } +} + +void * BusServerSocket::run_pubsub_proxy() { + // pthread_detach(pthread_self()); + int size; + int key; + char * action, *topic, *topics, *buf; + size_t head_len; + + const char *topic_delim = ","; +// printf("run_pubsub_proxy server receive before\n"); + while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { +//printf("run_pubsub_proxy server recv after: %s \n", buf); + if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { + // printf("run_pubsub_proxy %s %s \n", action, topics); + if(strcmp(action, "sub") == 0) { + // 璁㈤槄鏀寔澶氫富棰樿闃� + topic = strtok(topics, topic_delim); +//printf("run_pubsub_proxy topic = %s\n", topic); + while(topic) { + _proxy_sub(trim(topic, 0), key); + topic = strtok(NULL, topic_delim); + } + + } else if(strcmp(action, "desub") == 0) { +// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); + if(strcmp(trim(topics, 0), "") == 0) { + // 鍙栨秷鎵�鏈夎闃� + _proxy_desub_all(key); + } else { + + topic = strtok(topics, topic_delim); + while(topic) { + _proxy_desub(trim(topic, 0), key); + topic = strtok(NULL, topic_delim); + } + } + + } else if(strcmp(action, "pub") == 0) { + _proxy_pub(topics, head_len, buf, size, key); + } else if(strcmp(action, "stop") == 0) { + logger->info( "Stopping Bus..."); + // snprintf(buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER); + // shm_sendto(shm_socket, const void *buf, const int size, key); + free(action); + free(topics); + free(buf); + break; + } else { + logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action"); + } + + free(action); + free(topics); + } else { + logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg"); + } + free(buf); + } + return NULL; +} + + + +/** + * @str "<**sub**>{缁忔祹}" + */ + +int BusServerSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) { + char *ptr = str; + char *str_end_ptr = str + size; + char *action_start_ptr; + char *action_end_ptr; + size_t action_len = 0; + + char *topic_start_ptr; + char *topic_end_ptr; + size_t topic_len = 0; + + // if (strlen(identifier) > strlen(str)) { + // return 0; + // } + + if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) { + ptr += strlen(ACTION_LIDENTIFIER); + action_start_ptr = ptr; + while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) { + if(ptr >= str_end_ptr) { + return 0; + } + } +// printf("%s\n", ptr); + action_end_ptr = ptr; + action_len = action_end_ptr - action_start_ptr; + ptr += strlen(ACTION_RIDENTIFIER); +// printf("%s\n", ptr); +// printf("%s\n", str_end_ptr-1); + if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) { + topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER); + + + while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) { + if(ptr >= str_end_ptr) { + return 0; + } + } + topic_end_ptr = ptr; + topic_len = topic_end_ptr - topic_start_ptr; + + ptr += strlen(TOPIC_RIDENTIFIER); + + } else { + return 0; + } + } else { + return 0; + } + + char *topic = (char *)malloc(topic_len+1); + strncpy(topic, topic_start_ptr, topic_len); + *(topic+topic_len) = '\0'; + *_topic = topic; + + char *action = (char *)malloc(action_len+1); + strncpy(action, action_start_ptr, action_len); + *(action+action_len) = '\0'; + *_action = action; + *head_len = ptr-str; + + return 1; +} \ No newline at end of file diff --git a/src/socket/bus_server_socket.h b/src/socket/bus_server_socket.h new file mode 100644 index 0000000..1a83d0e --- /dev/null +++ b/src/socket/bus_server_socket.h @@ -0,0 +1,85 @@ +#ifndef _BUS_SERVER_SOCKET_H_ +#define _BUS_SERVER_SOCKET_H_ +#include "usg_common.h" +#include "shm_socket.h" +#include "shm_allocator.h" +#include "mem_pool.h" +#include "hashtable.h" +#include "sem_util.h" +#include "logger_factory.h" +#include "key_def.h" +#include "socket_def.h" +#include <set> + + + + +//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString; +typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet; +typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap; + + +class BusServerSocket { +private: + shm_socket_t *shm_socket; + // pthread_t recv_thread; + // <涓婚锛� 璁㈤槄鑰�> + SHMTopicSubMap *topic_sub_map; + +private: + void _proxy_sub( char *topic, int key); + void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key); + void *run_pubsub_proxy(); + int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); + + void _proxy_desub( char *topic, int key); + void _proxy_desub_all(int key); + + static void foreach_subscripters(std::function<void(SHMKeySet *, int)> cb); + // static bool include_in_keys(int key, int keys[], size_t length); + +public: + static size_t remove_subscripters(int keys[], size_t length) ; + +public: + BusServerSocket(); + ~BusServerSocket(); + + /** + * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� + * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 + */ + int bind(int key); + + /** + * 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key + * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 + */ + int force_bind(int key); + + + /** + * 鍚姩bus + * + * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 + */ + int start(); + + /** + * 鍋滄bus + * + * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 + */ + int stop(); + + + + /** + * 鑾峰彇soket key + */ + int get_key() ; + + +}; + +#endif diff --git a/src/socket/bus_server_socket_wrapper.c b/src/socket/bus_server_socket_wrapper.c index 64a4970..ecc4816 100644 --- a/src/socket/bus_server_socket_wrapper.c +++ b/src/socket/bus_server_socket_wrapper.c @@ -8,7 +8,7 @@ */ void * bus_server_socket_wrapper_open() { - NetModSocket *sockt = new NetModSocket; + BusServerSocket *sockt = new BusServerSocket; return (void *)sockt; } @@ -16,7 +16,7 @@ * 鍏抽棴 */ void bus_server_socket_wrapper_close(void *_socket) { - NetModSocket *sockt = (NetModSocket *)_socket; + BusServerSocket *sockt = (BusServerSocket *)_socket; delete sockt; } @@ -27,10 +27,10 @@ */ int bus_server_socket_wrapper_start_bus(void * _socket) { int ret; - NetModSocket *sockt = (NetModSocket *)_socket; + BusServerSocket *sockt = (BusServerSocket *)_socket; if( (ret = sockt->bind(BUS_KEY)) == 0) { - return sockt->start_bus(); + return sockt->start(); } else { logger->error("start bus failed"); return -1; diff --git a/src/socket/bus_server_socket_wrapper.h b/src/socket/bus_server_socket_wrapper.h index 7a12fe1..cd625dd 100644 --- a/src/socket/bus_server_socket_wrapper.h +++ b/src/socket/bus_server_socket_wrapper.h @@ -1,7 +1,7 @@ #ifndef _BUS_SERVER_SOCKET_WRAPPER_H_ #define _BUS_SERVER_SOCKET_WRAPPER_H_ -#include "net_mod_socket.h" +#include "bus_server_socket.h" #ifdef __cplusplus extern "C" { diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c deleted file mode 100644 index cd560b2..0000000 --- a/src/socket/dgram_mod_socket.c +++ /dev/null @@ -1,214 +0,0 @@ -#include "dgram_mod_socket.h" -#include "shm_mod_socket.h" - -typedef struct dgram_mod_socket_t { - ShmModSocket *m_socket; - -} dgram_mod_socket_t; - - -int dgram_mod_remove_keys(int keys[], int length){ - return ShmModSocket::remove_keys(keys, length); -} - -int dgram_mod_remove_key(int key){ - int keys[] = {key}; - return ShmModSocket::remove_keys(keys, 1); -} -/** - * 鍒涘缓socket - * @return socket鍦板潃 -*/ -void *dgram_mod_open_socket() { - dgram_mod_socket_t * socket = (dgram_mod_socket_t *)calloc(1, sizeof(dgram_mod_socket_t)); - // socket->mod = (socket_mod_t)mod; - socket->m_socket = new ShmModSocket; - return (void *)socket; -} - -/** - * 鍏抽棴socket - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_close_socket(void * _socket) { - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - delete socket->m_socket; - free(_socket); - return 0; -} - -/** - * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_bind(void * _socket, int key) { - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->bind(key); -} - -/** - * 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_force_bind(void * _socket, int key) { - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->force_bind(key); -} -/** - * 鍙戦�佷俊鎭� - * @key 鍙戦�佺粰璋� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ -int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int key){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->sendto(buf, size,key); -} - -// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - struct timespec timeout = {sec, nsec}; - return socket->m_socket->sendto_timeout(buf, size, key, &timeout); -} - -// 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� -int dgram_mod_sendto_nowait(void *_socket, const void *buf, const int size, const int key){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->sendto_nowait(buf, size,key); -} - -/** - * 鎺ユ敹淇℃伅 - * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *key) { - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->recvfrom(buf, size, key); -} -// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - struct timespec timeout = {sec, nsec}; - return socket->m_socket->recvfrom_timeout(buf, size, key, &timeout); -} - -int dgram_mod_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->recvfrom_nowait(buf, size, key); -} - -/** - * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 - * @key 鍙戦�佺粰璋� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->sendandrecv(send_buf, send_size, key, recv_buf, recv_size); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, int sec, int nsec){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - struct timespec timeout = {sec, nsec}; - return socket->m_socket->sendandrecv_timeout(send_buf, send_size, key, recv_buf, recv_size, &timeout); -} -int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) { - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->sendandrecv_nowait(send_buf, send_size, key, recv_buf, recv_size); -} - - -/** - * 鍚姩bus - * - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_start_bus(void * _socket) { - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->start_bus(); -} - -/** - * 璁㈤槄鎸囧畾涓婚 - * @topic 涓婚 - * @size 涓婚闀垮害 - * @key 鎬荤嚎绔彛 - */ -int dgram_mod_sub(void * _socket, void *topic, int size, int key){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->sub((char *)topic, size, key); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_sub_timeout(void * _socket, void *topic, int size, int key, int sec, int nsec){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - struct timespec timeout = {sec, nsec}; - return socket->m_socket->sub_timeout((char *)topic, size, key, &timeout); -} -int dgram_mod_sub_nowait(void * _socket, void *topic, int size, int key){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->sub_nowait((char *)topic, size, key); -} - - - -/** - * 鍙栨秷璁㈤槄鎸囧畾涓婚 - * @topic 涓婚 - * @size 涓婚闀垮害 - * @key 鎬荤嚎绔彛 - */ -int dgram_mod_desub(void * _socket, void *topic, int size, int key){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->desub((char *)topic, size, key); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_desub_timeout(void * _socket, void *topic, int size, int key, int sec, int nsec){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - struct timespec timeout = {sec, nsec}; - return socket->m_socket->desub_timeout((char *)topic, size, key, &timeout); -} -int dgram_mod_desub_nowait(void * _socket, void *topic, int size, int key){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->desub_nowait((char *)topic, size, key); -} - - - -/** - * 鍙戝竷涓婚 - * @topic 涓婚 - * @content 涓婚鍐呭 - * @key 鎬荤嚎绔彛 - */ -int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int key){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->pub((char *)topic, topic_size, content, content_size, key); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - struct timespec timeout = {sec, nsec}; - return socket->m_socket->pub_timeout((char *)topic, topic_size, content, content_size, key, &timeout); -} -int dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int key){ - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->pub_nowait((char *)topic, topic_size, content, content_size, key); -} - - -/** - * 鑾峰彇soket key - */ -int dgram_mod_get_port(void * _socket) { - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return socket->m_socket->get_key(); -} - - -/** - * 閲婃斁瀛樺偍鎺ユ敹淇℃伅鐨刡uf - */ -void dgram_mod_free(void *buf) { - free(buf); -} \ No newline at end of file diff --git a/src/socket/dgram_mod_socket.h b/src/socket/dgram_mod_socket.h deleted file mode 100644 index dbfd6e2..0000000 --- a/src/socket/dgram_mod_socket.h +++ /dev/null @@ -1,127 +0,0 @@ -#ifndef __DGRAM_MOD_SOCKET_H__ -#define __DGRAM_MOD_SOCKET_H__ - - - - -/** - * 鍒犻櫎key瀵瑰簲鐨勫叡浜槦鍒楋紝骞跺湪bus閲屽垹闄よkey鐨勮闃� - */ -int dgram_mod_remove_key(int key); - -/** - * 鎵归噺鍒犻櫎key瀵瑰簲鐨勫叡浜槦鍒楋紝骞跺湪bus閲屽垹闄よkey鐨勮闃� - */ -int dgram_mod_remove_keys(int keys[], int length); - - -/** - * 鍒涘缓socket - * @return socket鍦板潃 -*/ -void *dgram_mod_open_socket(); - -/** - * 鍏抽棴socket - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_close_socket(void * _socket); - -/** - * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_bind(void * _socket, int key); - -/** - * 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_force_bind(void * _socket, int key); -/** - * 鍙戦�佷俊鎭� - * @key 鍙戦�佺粰璋� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ -int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int key); -// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec); -// 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� -int dgram_mod_sendto_nowait(void *_socket, const void *buf, const int size, const int key); - -/** - * 鎺ユ敹淇℃伅 - * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *key); -// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec); -int dgram_mod_recvfrom_nowait(void *_socket, void **buf, int *size, int *key); - -/** - * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 - * @key 鍙戦�佺粰璋� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size) ; -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, int sec, int nsec) ; -int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size) ; - - -/** - * 鍚姩bus - * - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_start_bus(void * _socket); - -/** - * 璁㈤槄鎸囧畾涓婚 - * @topic 涓婚 - * @size 涓婚闀垮害 - * @key 鎬荤嚎绔彛 - */ -int dgram_mod_sub(void * _socket, void *topic, int size, int key); -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_sub_timeout(void * _socket, void *topic, int size, int key, int sec, int nsec); -int dgram_mod_sub_nowait(void * _socket, void *topic, int size, int key); - - -/** - * 鍙栨秷璁㈤槄鎸囧畾涓婚 - * @topic 涓婚,涓婚涓虹┖鏃跺彇娑堝叏閮ㄨ闃� - * @size 涓婚闀垮害 - * @key 鎬荤嚎绔彛 - */ -int dgram_mod_desub(void * _socket, void *topic, int size, int key); -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_desub_timeout(void * _socket, void *topic, int size, int key, int sec, int nsec); -int dgram_mod_desub_nowait(void * _socket, void *topic, int size, int key); - -/** - * 鍙戝竷涓婚 - * @topic 涓婚 - * @content 涓婚鍐呭 - * @key 鎬荤嚎绔彛 - */ -int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int key); -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec); -int dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int key); - - -/** - * 鑾峰彇soket key - */ -int dgram_mod_get_port(void * _socket) ; - - -/** - * 閲婃斁瀛樺偍鎺ユ敹淇℃伅鐨刡uf - */ -void dgram_mod_free(void *buf) ; - - -#endif \ No newline at end of file diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c index 8959231..56f5907 100644 --- a/src/socket/net_mod_server_socket.c +++ b/src/socket/net_mod_server_socket.c @@ -215,6 +215,7 @@ } else if(request_head.mod == BUS) { + if(request_head.topic_length > max_topic_buf) { if( (topic_buf = realloc(topic_buf, request_head.topic_length)) == NULL ) { LoggerFactory::getLogger()->error(errno, "NetModServerSocket::process_client realloc topic_buf"); @@ -243,7 +244,7 @@ else if(request_head.timeout == -1) { ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY); } - +printf("bus server pub ret=%d\n", ret); response_head.code = ret; response_head.content_length = 0; if( rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH ) diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c index 3eb7077..675c80b 100644 --- a/src/socket/net_mod_socket.c +++ b/src/socket/net_mod_socket.c @@ -560,15 +560,6 @@ /** - * 鍚姩bus - * - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int NetModSocket::start_bus() { - return shmModSocket.start_bus(); -} - -/** * 璁㈤槄鎸囧畾涓婚 * @topic 涓婚 * @size 涓婚闀垮害 diff --git a/src/socket/net_mod_socket.h b/src/socket/net_mod_socket.h index 55739d2..f3c0a54 100644 --- a/src/socket/net_mod_socket.h +++ b/src/socket/net_mod_socket.h @@ -4,13 +4,8 @@ #include "shm_mod_socket.h" #include "socket_io.h" #include <poll.h> +#include "socket_def.h" - -#define GET(p) (*(uint32_t *)(p)) -#define PUT(p, val) (*(uint32_t *)(p) = (val)) - -#define GET_INT32(p) (*(int32_t *)(p)) -#define PUT_INT32(p, val) (*(int32_t *)(p) = (val)) class NetModServerSocket; @@ -178,15 +173,7 @@ int sendandrecv_timeout( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, int sec, int nsec) ; int sendandrecv_nowait( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size) ; - - /** - * 鍚姩bus - * - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ - int start_bus(); - - + /** * 鍚憂ode_arr 涓殑鎵�鏈夌綉缁滆妭鐐瑰彂甯冩秷鎭� * @node_arr 缃戠粶鑺傜偣缁�, @node_arr_len璇ユ暟缁勯暱搴� diff --git a/src/socket/shm_mod_socket.c b/src/socket/shm_mod_socket.c index 9563e6b..6685087 100644 --- a/src/socket/shm_mod_socket.c +++ b/src/socket/shm_mod_socket.c @@ -1,65 +1,10 @@ #include "shm_mod_socket.h" - +#include "bus_server_socket.h" static Logger *logger = LoggerFactory::getLogger(); -void ShmModSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { - SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); - SHMKeySet *subscripter_set; - SHMKeySet::iterator set_iter; - SHMTopicSubMap::iterator map_iter; - - if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { - subscripter_set = map_iter->second; - if(subscripter_set != NULL) { - for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { - cb(subscripter_set, *set_iter); - } - } - } - } -} - -bool ShmModSocket::include_in_keys(int key, int keys[], size_t length) { - if(length == 0) { - return false; - } - for(int i = 0; i < length; i++) { - if(keys[i] == key) - return true; - } - return false; -} - -size_t ShmModSocket::remove_subscripters(int keys[], size_t length) { - size_t count = 0; - int key; - for(int i = 0; i < length; i++) { - key = keys[i]; - SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); - SHMKeySet *subscripter_set; - SHMKeySet::iterator set_iter; - SHMTopicSubMap::iterator map_iter; - - if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { - subscripter_set = map_iter->second; - if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { - subscripter_set->erase(set_iter); -// printf("remove_subscripter %s, %d\n", map_iter->first, key); - count++; - } - } - } - } - return count; - -} - - size_t ShmModSocket::remove_keys(int keys[], size_t length) { - remove_subscripters(keys, length); + BusServerSocket::remove_subscripters(keys, length); return shm_socket_remove_keys(keys, length); } @@ -67,42 +12,18 @@ mod = (socket_mod_t)0; shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); bus_set = new std::set<int>; - topic_sub_map = NULL; } ShmModSocket::~ShmModSocket() { -// printf("ShmModSocket destory 1\n"); - SHMKeySet *subscripter_set; - SHMTopicSubMap::iterator map_iter; + logger->debug("Destory ShmModSocket...\n"); struct timespec timeout = {1, 0}; if(bus_set != NULL) { for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) { -// printf("ShmModSocket desub_timeout before"); desub_timeout(NULL, 0, *bus_iter, &timeout); -// printf("ShmModSocket desub_timeout after %d\n", *bus_iter); } delete bus_set; } -// printf("ShmModSocket destory 2\n"); - if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { - subscripter_set = map_iter->second; -// printf("ShmModSocket destory 2-1\n"); - if(subscripter_set != NULL) { -// printf("ShmModSocket destory 2-2\n"); - subscripter_set->clear(); -// printf("ShmModSocket destory 2-3\n"); - mm_free((void *)subscripter_set); -// printf("ShmModSocket destory 2-4\n"); - } - - } - topic_sub_map->clear(); - mem_pool_free_by_key(BUS_MAP_KEY); - } -// printf("ShmModSocket destory 3\n"); - // printf("=============close socket\n"); shm_close_socket(shm_socket); // printf("ShmModSocket destory 4\n"); } @@ -110,8 +31,6 @@ int ShmModSocket::bind(int key) { return shm_socket_bind(shm_socket, key); } - - /** * 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key @@ -198,20 +117,6 @@ -/** - * 鍚姩bus - * - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int ShmModSocket::start_bus(){ - mod = BUS; - topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); - - run_pubsub_proxy(); - // pthread_t tid; - // pthread_create(&tid, NULL, run_accept_sub_request, _socket); - return 0; -} /** * 璁㈤槄鎸囧畾涓婚 @@ -310,7 +215,9 @@ /** * @key 鎬荤嚎绔彛 + * @str "<**pub**>{缁忔祹}" */ + int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, struct timespec *timeout, int flags) { int head_len; @@ -321,216 +228,7 @@ return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags); } -/* - * 澶勭悊璁㈤槄 -*/ -void ShmModSocket::_proxy_sub( char *topic, int key) { - SHMKeySet *subscripter_set; - - SHMTopicSubMap::iterator map_iter; - SHMKeySet::iterator set_iter; -//printf("_proxy_sub topic = %s\n", topic); - if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { - subscripter_set = map_iter->second; - } else { - void *set_ptr = mm_malloc(sizeof(SHMKeySet)); - subscripter_set = new(set_ptr) SHMKeySet; - topic_sub_map->insert({topic, subscripter_set}); - } - subscripter_set->insert(key); -} - -/* - * 澶勭悊鍙栨秷璁㈤槄 -*/ -void ShmModSocket::_proxy_desub( char *topic, int key) { - SHMKeySet *subscripter_set; - - SHMTopicSubMap::iterator map_iter; - // SHMKeySet::iterator set_iter; - - if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { - subscripter_set = map_iter->second; - - subscripter_set->erase(key); - } -} - -/* - * 澶勭悊鍙栨秷鎵�鏈夎闃� -*/ -void ShmModSocket::_proxy_desub_all(int key) { - SHMKeySet *subscripter_set; - - SHMTopicSubMap::iterator map_iter; - // SHMKeySet::iterator set_iter; - for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { - subscripter_set = map_iter->second; - subscripter_set->erase(key); - } -} - -/* - * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙� -*/ -void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) { - SHMKeySet *subscripter_set; - - SHMTopicSubMap::iterator map_iter; - SHMKeySet::iterator set_iter; - - std::vector<int> subscripter_to_del; - std::vector<int>::iterator vector_iter; - - int send_key; - struct timespec timeout = {1,0}; - - if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { - subscripter_set = map_iter->second; - for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { - send_key = *set_iter; - // printf("_proxy_pub send before %d \n", send_key); - if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) { - //瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡 - subscripter_to_del.push_back(send_key); - } else { -// printf("_proxy_pub send after: %d \n", send_key); - } - - - } - - // 鍒犻櫎宸插叧闂殑绔� - for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { - if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { - subscripter_set->erase(set_iter); - logger->debug("remove closed subscripter %d \n", send_key); - } - } - subscripter_to_del.clear(); - - } -} - -void * ShmModSocket::run_pubsub_proxy() { - // pthread_detach(pthread_self()); - int size; - int key; - char * action, *topic, *topics, *buf; - size_t head_len; - - const char *topic_delim = ","; -// printf("run_pubsub_proxy server receive before\n"); - while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { -//printf("run_pubsub_proxy server recv after: %s \n", buf); - if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { -// printf("run_pubsub_proxy %s %s \n", action, topics); - if(strcmp(action, "sub") == 0) { - // 璁㈤槄鏀寔澶氫富棰樿闃� - topic = strtok(topics, topic_delim); -//printf("run_pubsub_proxy topic = %s\n", topic); - while(topic) { - _proxy_sub(trim(topic, 0), key); - topic = strtok(NULL, topic_delim); - } - - } else if(strcmp(action, "desub") == 0) { -// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); - if(strcmp(trim(topics, 0), "") == 0) { - // 鍙栨秷鎵�鏈夎闃� - _proxy_desub_all(key); - } else { - - topic = strtok(topics, topic_delim); - while(topic) { - _proxy_desub(trim(topic, 0), key); - topic = strtok(NULL, topic_delim); - } - } - - - - } else if(strcmp(action, "pub") == 0) { - _proxy_pub(topics, head_len, buf, size, key); - } - - free(action); - free(topics); - } else { - logger->error( "ShmModSocket::run_pubsub_proxy : incorrect format msg"); - } - free(buf); - } - return NULL; -} -/** - * @str "<**sub**>{缁忔祹}" - */ - -int ShmModSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) { - char *ptr = str; - char *str_end_ptr = str + size; - char *action_start_ptr; - char *action_end_ptr; - size_t action_len = 0; - - char *topic_start_ptr; - char *topic_end_ptr; - size_t topic_len = 0; - - // if (strlen(identifier) > strlen(str)) { - // return 0; - // } - - if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) { - ptr += strlen(ACTION_LIDENTIFIER); - action_start_ptr = ptr; - while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) { - if(ptr >= str_end_ptr) { - return 0; - } - } -// printf("%s\n", ptr); - action_end_ptr = ptr; - action_len = action_end_ptr - action_start_ptr; - ptr += strlen(ACTION_RIDENTIFIER); -// printf("%s\n", ptr); -// printf("%s\n", str_end_ptr-1); - if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) { - topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER); - - - while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) { - if(ptr >= str_end_ptr) { - return 0; - } - } - topic_end_ptr = ptr; - topic_len = topic_end_ptr - topic_start_ptr; - - ptr += strlen(TOPIC_RIDENTIFIER); - - } else { - return 0; - } - } else { - return 0; - } - - char *topic = (char *)malloc(topic_len+1); - strncpy(topic, topic_start_ptr, topic_len); - *(topic+topic_len) = '\0'; - *_topic = topic; - - char *action = (char *)malloc(action_len+1); - strncpy(action, action_start_ptr, action_len); - *(action+action_len) = '\0'; - *_action = action; - *head_len = ptr-str; - - return 1; -} diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h index bca34c7..5c66fbb 100644 --- a/src/socket/shm_mod_socket.h +++ b/src/socket/shm_mod_socket.h @@ -9,53 +9,23 @@ #include "logger_factory.h" #include "key_def.h" #include <set> - -#define ACTION_LIDENTIFIER "<**" -#define ACTION_RIDENTIFIER "**>" -#define TOPIC_LIDENTIFIER "{" -#define TOPIC_RIDENTIFIER "}" - - -//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString; -typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet; -typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap; - -enum socket_mod_t -{ - PULL_PUSH = 1, - REQ_REP = 2, - PAIR = 3, - PUB_SUB = 4, - SURVEY = 5, - BUS = 6 - -}; +#include "socket_def.h" class ShmModSocket { private: shm_socket_t *shm_socket; socket_mod_t mod; - // pthread_t recv_thread; - // <涓婚锛� 璁㈤槄鑰�> - SHMTopicSubMap *topic_sub_map; + std::set<int> *bus_set; private: inline int _recvfrom_(void **buf, int *size, int *key, struct timespec *timeout, int flags); - void _proxy_sub( char *topic, int key); - void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key); - void *run_pubsub_proxy(); - int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); + int _sub_( char *topic, int size, int key, struct timespec *timeout, int flags); int _pub_( char *topic, int topic_size, void *content, int content_size, int key, struct timespec *timeout, int flags); - void _proxy_desub( char *topic, int key); - void _proxy_desub_all(int key); int _desub_( char *topic, int size, int key, struct timespec *timeout, int flags); - static void foreach_subscripters(std::function<void(SHMKeySet *, int)> cb); - static bool include_in_keys(int key, int keys[], size_t length); - static size_t remove_subscripters(int keys[], size_t length) ; public: static size_t remove_keys(int keys[], size_t length); public: @@ -109,14 +79,6 @@ // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, struct timespec *timeout) ; int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ; - - - /** - * 鍚姩bus - * - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ - int start_bus(); /** * 璁㈤槄鎸囧畾涓婚 diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c index 0ac71cb..9581b69 100644 --- a/src/socket/shm_socket.c +++ b/src/socket/shm_socket.c @@ -298,10 +298,10 @@ } SemUtil::inc(socket->mutex); - if (key == socket->key) { - logger->error( "can not send to your self!"); - return -1; - } + // if (key == socket->key) { + // logger->error( "can not send to your self!"); + // return -1; + // } SHMQueue<shm_msg_t> *remoteQueue; if ((remoteQueue = _attach_remote_queue(key)) == NULL) { diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h index 65fb899..cf6737f 100644 --- a/src/socket/shm_socket.h +++ b/src/socket/shm_socket.h @@ -5,29 +5,13 @@ #include "usg_typedef.h" #include "shm_queue.h" - - -enum shm_msg_type_t -{ - SHM_SOCKET_OPEN = 1, - SHM_SOCKET_OPEN_REPLY = 2, - SHM_SOCKET_CLOSE = 3, - SHM_COMMON_MSG = 4 - -}; - enum shm_socket_flag_t { SHM_MSG_TIMEOUT = 1, SHM_MSG_NOWAIT = 2 }; -enum shm_socket_type_t -{ - SHM_SOCKET_STREAM = 1, - SHM_SOCKET_DGRAM = 2 - -}; + enum shm_socket_error_type_t { SHM_SOCKET_ECONNFAILED = 1, @@ -38,6 +22,22 @@ SHM_CONN_CLOSED=1, SHM_CONN_LISTEN=2, SHM_CONN_ESTABLISHED=3 +}; + +enum shm_socket_type_t +{ + SHM_SOCKET_STREAM = 1, + SHM_SOCKET_DGRAM = 2 + +}; + +enum shm_msg_type_t +{ + SHM_SOCKET_OPEN = 1, + SHM_SOCKET_OPEN_REPLY = 2, + SHM_SOCKET_CLOSE = 3, + SHM_COMMON_MSG = 4 + }; typedef struct shm_msg_t { @@ -93,6 +93,9 @@ int shm_recv(shm_socket_t * socket, void **buf, int *size) ; +/** + * @flags : SHM_MSG_NOWAIT + */ int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0); int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key, struct timespec * timeout = NULL, int flags=0); diff --git a/src/socket/socket_def.h b/src/socket/socket_def.h new file mode 100644 index 0000000..56cca6d --- /dev/null +++ b/src/socket/socket_def.h @@ -0,0 +1,37 @@ +#ifndef _SOCKET_DEF_H_ +#define _SOCKET_DEF_H_ + + +#define GET(p) (*(uint32_t *)(p)) +#define PUT(p, val) (*(uint32_t *)(p) = (val)) + +#define GET_INT32(p) (*(int32_t *)(p)) +#define PUT_INT32(p, val) (*(int32_t *)(p) = (val)) + + +enum socket_mod_t +{ + PULL_PUSH = 1, + REQ_REP = 2, + PAIR = 3, + PUB_SUB = 4, + SURVEY = 5, + BUS = 6 + +}; + + + + +// typedef struct shm_bus_msg_t { +// void *topic; +// int topic_length; + +// } shm_bus_msg_t; + +#define ACTION_LIDENTIFIER "<**" +#define ACTION_RIDENTIFIER "**>" +#define TOPIC_LIDENTIFIER "{" +#define TOPIC_RIDENTIFIER "}" + +#endif \ No newline at end of file diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh index 68d4ff1..cd4c809 100755 --- a/test_net_socket/net_mod_socket.sh +++ b/test_net_socket/net_mod_socket.sh @@ -1,7 +1,7 @@ function server() { # 寮�鍚痓us - ./test_net_mod_socket --fun="start_bus_server" --key=8 & server_pid=$! && echo "pid: ${server_pid}" + ./test_net_mod_socket --fun="start_bus_server" & server_pid=$! && echo "pid: ${server_pid}" # 寮�鍚綉缁滆浆鍙戜唬鐞� ./test_net_mod_socket --fun="start_net_proxy" --port=5000 & server_pid=$! && echo "pid: ${server_pid}" @@ -49,6 +49,7 @@ case ${1} in "server") close + sleep 2 server ;; "client") @@ -65,12 +66,13 @@ ;; "") close + sleep 2 server client ;; *) - echo "error input" + echo "argument input error" exit 1 ;; esac diff --git a/test_net_socket/test_net_mod_socket.c b/test_net_socket/test_net_mod_socket.c index dbebba5..1b6cf33 100644 --- a/test_net_socket/test_net_mod_socket.c +++ b/test_net_socket/test_net_mod_socket.c @@ -55,9 +55,35 @@ } +void *bus_handler(void *sockt) { + pthread_detach(pthread_self()); + + char action[512]; + while ( true) { + printf("Input action: Close?\n"); + if(scanf("%s",action) < 1) { + printf("Invalide action\n"); + continue; + } + + if(strcmp(action, "close") == 0) { + bus_server_socket_wrapper_close(sockt); + break; + } else { + printf("Invalide action\n"); + } + } + +} + + + void start_bus_server() { printf("Start bus server\n"); void * server_socket = bus_server_socket_wrapper_open(); + pthread_t tid; + // 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴bus + pthread_create(&tid, NULL, bus_handler, server_socket); if(bus_server_socket_wrapper_start_bus(server_socket) != 0) { printf("start bus failed\n"); exit(1); @@ -177,11 +203,6 @@ } - - - - - void *_run_sendandrecv_(void *arg) { Targ *targ = (Targ *)arg; @@ -349,6 +370,10 @@ // port = atoi(argv[2]); + if(opt.fun == NULL) { + usage(argv[0]); + exit(1); + } if (strcmp("start_net_proxy", opt.fun) == 0 ) { if(opt.port == 0) { @@ -359,10 +384,7 @@ } else if (strcmp("start_bus_server", opt.fun) == 0) { - if(opt.key == 0) { - usage(argv[0]); - exit(1); - } + start_bus_server(); } else if (strcmp("start_reply", opt.fun) == 0) { -- Gitblit v1.8.0