From 8ec1d776751f62c43335d36c3427dc1ab2d84a61 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期六, 25 七月 2020 13:52:06 +0800 Subject: [PATCH] commit --- src/socket/include/shm_socket.h | 2 src/libshm_queue.a | 0 src/socket/shm_socket.c | 771 +++++++++++++++++++------------------- src/util/include/sem_util.h | 2 src/queue/include/shm_queue.h | 7 test_socket/dgram_socket_test | 0 src/socket/dgram_mod_socket.c | 197 +++++++++ test_socket/dgram_mod_bus | 0 src/util/sem_util.c | 4 test/test | 0 demo/Makefile | 2 test_socket/Makefile | 2 test_socket/dgram_mod_survey | 0 /dev/null | 0 test/protocle_parse.c | 39 + test/Makefile | 2 test_socket/dgram_mod_bus.c | 86 ++++ test/protocle_parse | 0 test_socket/dgram_mod_survey.c | 6 test/test.c | 12 src/queue/include/lock_free_queue.h | 10 src/socket/include/dgram_mod_socket.h | 15 src/logger_factory.h | 4 23 files changed, 741 insertions(+), 420 deletions(-) diff --git a/demo/Makefile b/demo/Makefile index e339a50..eeb4834 100644 --- a/demo/Makefile +++ b/demo/Makefile @@ -14,7 +14,7 @@ include $(ROOT)/Make.defines.$(PLATFORM) -PROGS = dgram_mod_req_rep dgram_mod_survey +PROGS = build: $(PROGS) diff --git a/demo/dgram_mod_req_rep b/demo/dgram_mod_req_rep deleted file mode 100755 index 96eaae7..0000000 --- a/demo/dgram_mod_req_rep +++ /dev/null Binary files differ diff --git a/demo/dgram_mod_survey b/demo/dgram_mod_survey deleted file mode 100755 index 1202691..0000000 --- a/demo/dgram_mod_survey +++ /dev/null Binary files differ diff --git a/src/libshm_queue.a b/src/libshm_queue.a index 4faef4c..60be46c 100644 --- a/src/libshm_queue.a +++ b/src/libshm_queue.a Binary files differ diff --git a/src/logger_factory.h b/src/logger_factory.h index a766d14..34aef50 100644 --- a/src/logger_factory.h +++ b/src/logger_factory.h @@ -6,8 +6,8 @@ public: static Logger getLogger() { -//ERROR ALL DEBUG INFO - static Logger logger(Logger::ERROR); +//ERROR ALL DEBUG INFO WARN + static Logger logger(Logger::WARN); return logger; } }; diff --git a/src/queue/include/lock_free_queue.h b/src/queue/include/lock_free_queue.h index 0b6c42f..6ba2a00 100644 --- a/src/queue/include/lock_free_queue.h +++ b/src/queue/include/lock_free_queue.h @@ -200,13 +200,16 @@ template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) { +// printf("==================LockFreeQueue push before\n"); if (SemUtil::dec(slots) == -1) { err_msg(errno, "LockFreeQueue push"); return false; } if ( m_qImpl.push(a_data) ) { - SemUtil::inc(items); + + SemUtil::inc(items); +// printf("==================LockFreeQueue push after\n"); return true; } return false; @@ -244,6 +247,7 @@ bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout) { + if (SemUtil::dec_timeout(slots, timeout) == -1) { if (errno == EAGAIN) return false; @@ -270,13 +274,15 @@ template <typename T, typename AT> class Q_TYPE> bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) { +// printf("==================LockFreeQueue pop before\n"); if (SemUtil::dec(items) == -1) { err_msg(errno, "LockFreeQueue pop"); return false; } if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); + SemUtil::inc(slots); +// printf("==================LockFreeQueue pop after\n"); return true; } return false; diff --git a/src/queue/include/shm_queue.h b/src/queue/include/shm_queue.h index 30e2e28..26bf019 100644 --- a/src/queue/include/shm_queue.h +++ b/src/queue/include/shm_queue.h @@ -104,7 +104,7 @@ delete queue; hashtable_t *hashtable = mm_get_hashtable(); hashtable_remove(hashtable, KEY); - LoggerFactory::getLogger().debug("SHMQueue destructor delete queue"); +printf("SHMQueue destructor delete queue\n"); } else { SemUtil::inc(queue->mutex); } @@ -159,7 +159,10 @@ template < typename ELEM_T > inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data) { - return queue->pop(a_data); +// printf("SHMQueue pop before\n"); + int rv = queue->pop(a_data); +// printf("SHMQueue after before\n"); + return rv; } diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c index 8f7036f..c5e5a28 100644 --- a/src/socket/dgram_mod_socket.c +++ b/src/socket/dgram_mod_socket.c @@ -6,18 +6,28 @@ #include "hashtable.h" #include "sem_util.h" #include "logger_factory.h" +#include <set> + +#define ACTION_LIDENTIFIER "<**" +#define ACTION_RIDENTIFIER "**>" +#define TOPIC_LIDENTIFIER "{" +#define TOPIC_RIDENTIFIER "}" + +static int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); +void * run_accept_pubsub_request(void * _socket) ; typedef struct dgram_mod_socket_t { socket_mod_t mod; shm_socket_t *shm_socket; // pthread_t recv_thread; - // std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * > *recv_queue_map; + // <涓婚锛� 璁㈤槄鑰�> + std::map<std::string, std::set<int> *> *topic_sub_map; } dgram_mod_socket_t; -void *dgram_mod_open_socket(int mod) { +void *dgram_mod_open_socket() { dgram_mod_socket_t * socket = (dgram_mod_socket_t *)calloc(1, sizeof(dgram_mod_socket_t)); - socket->mod = (socket_mod_t)mod; + // socket->mod = (socket_mod_t)mod; socket->shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); return (void *)socket; } @@ -25,6 +35,19 @@ int dgram_mod_close_socket(void * _socket) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map; + std::set<int> *subscripter_set; + std::map<std::string, std::set<int> *>::iterator map_iter; + + if(topic_sub_map != NULL) { + for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { + subscripter_set = map_iter->second; + delete subscripter_set; + } + delete topic_sub_map; + } + + shm_close_socket(socket->shm_socket); free(_socket); } @@ -44,7 +67,10 @@ int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return shm_recvfrom(socket->shm_socket, buf, size, port); +// printf("dgram_mod_recvfrom before\n"); + int rv = shm_recvfrom(socket->shm_socket, buf, size, port); +// printf("dgram_mod_recvfrom after\n"); + return rv; } @@ -65,4 +91,167 @@ void dgram_mod_free(void *buf) { free(buf); +} + +int start_bus(void * _socket) { + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + socket->topic_sub_map = new std::map<std::string, std::set<int> *>; + run_accept_pubsub_request(_socket); + // pthread_t tid; + // pthread_create(&tid, NULL, run_accept_sub_request, _socket); + return 0; + +} + +/** + * @port 鎬荤嚎绔彛 + */ +int sub(void * _socket, void *topic, int size, int port) { + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + char buf[8192]; + snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); + return shm_sendto(socket->shm_socket, buf, strlen(buf) + 1, port); +} + +/** + * @port 鎬荤嚎绔彛 + */ +int pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) { + + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + int head_len; + char buf[8192+content_size]; + snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); + head_len = strlen(buf); + memcpy(buf+head_len, content, content_size); + return shm_sendto(socket->shm_socket, buf, head_len+content_size, port); + +} + + +//========================================================================================================================== + + +void * run_accept_pubsub_request(void * _socket) { + // pthread_detach(pthread_self()); + dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; + int size; + int port; + char * action, *topic, *buf; + + size_t head_len; + // void * send_buf; + int send_port; + struct timespec timeout = {1,0}; + std::map<std::string, std::set<int> *> *topic_sub_map = socket->topic_sub_map; + std::set<int> *subscripter_set; + + std::map<std::string, std::set<int> *>::iterator map_iter; + std::set<int>::iterator set_iter; +//printf("server receive before\n"); + while(shm_recvfrom(socket->shm_socket, (void **)&buf, &size, &port) == 0) { +//printf("server recv after: %s \n", buf); + if(parse_pubsub_topic(buf, size, &action, &topic, &head_len)) { + if(strcmp(action, "sub") == 0) { + if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { + subscripter_set = map_iter->second; + } else { + subscripter_set = new std::set<int>; + topic_sub_map->insert({topic, subscripter_set}); + } + subscripter_set->insert(port); + + } else if(strcmp(action, "pub") == 0) { + if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { + subscripter_set = map_iter->second; + for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { + send_port = *set_iter; + // send_buf = malloc(size-head_len); + // memcpy(send_buf, buf+head_len, ); +printf("run_accept_sub_request send before %d \n", send_port); + if (shm_sendto(socket->shm_socket, buf+head_len, size-head_len, send_port, &timeout) !=0 ) { +printf("erase %d \n", send_port); + subscripter_set->erase(set_iter); + set_iter++; + } +printf("run_accept_sub_request send after: %d \n", send_port); + + } + } + } + free(buf); + free(action); + free(topic); + } else { + err_msg(0, "incorrect format msg"); + } + } + return NULL; +} + + +/** + * @str "<**sub**>{缁忔祹}" + */ + +static int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) { + char *ptr = str; + char *str_end_ptr = str + size; + char *action_start_ptr; + char *action_end_ptr; + size_t action_len = 0; + + char *topic_start_ptr; + char *topic_end_ptr; + size_t topic_len = 0; + + // if (strlen(identifier) > strlen(str)) { + // return 0; + // } + + if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) { + ptr += strlen(ACTION_LIDENTIFIER); + action_start_ptr = ptr; + while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) { + if(ptr >= str_end_ptr) { + return 0; + } + } +// printf("%s\n", ptr); + action_end_ptr = ptr; + action_len = action_end_ptr - action_start_ptr; + ptr += strlen(ACTION_RIDENTIFIER); +// printf("%s\n", ptr); +// printf("%s\n", str_end_ptr-1); + if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) { + topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER); + + + while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) { + if(ptr >= str_end_ptr) { + return 0; + } + } + topic_end_ptr = ptr; + topic_len = topic_end_ptr - topic_start_ptr; + + ptr += strlen(TOPIC_RIDENTIFIER); + + } else { + return 0; + } + } else { + return 0; + } + + char *topic = (char *)calloc(1, topic_len+1); + strncpy(topic, topic_start_ptr, topic_len); + *_topic = topic; + + char *action = (char *)calloc(1, action_len+1); + strncpy(action, action_start_ptr, action_len); + *_action = action; + *head_len = ptr-str; + + return 1; } \ No newline at end of file diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h index c698f68..5e6c424 100644 --- a/src/socket/include/dgram_mod_socket.h +++ b/src/socket/include/dgram_mod_socket.h @@ -18,7 +18,7 @@ }; -void *dgram_mod_open_socket(int mod); +void *dgram_mod_open_socket(); int dgram_mod_close_socket(void * _socket); @@ -34,6 +34,19 @@ void dgram_mod_free(void *buf) ; +int start_bus(void * _socket); + +/** + * @port 鎬荤嚎绔彛 + */ +int sub(void * _socket, void *topic, int size, int port); + +/** + * @port 鎬荤嚎绔彛 + */ +int pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port); + + #ifdef __cplusplus } #endif diff --git a/src/socket/include/shm_socket.h b/src/socket/include/shm_socket.h index 822450d..7025f9e 100644 --- a/src/socket/include/shm_socket.h +++ b/src/socket/include/shm_socket.h @@ -77,7 +77,7 @@ int shm_recv(shm_socket_t * socket, void **buf, int *size) ; -int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port); +int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec * timeout = NULL); int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port); diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c index 185ecaf..9d54986 100644 --- a/src/socket/shm_socket.c +++ b/src/socket/shm_socket.c @@ -3,490 +3,497 @@ #include "logger_factory.h" #include <map> - - static Logger logger = LoggerFactory::getLogger(); -void print_msg(char *head, shm_msg_t& msg) { - //err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); +void print_msg(char *head, shm_msg_t &msg) { + // err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); } -void * _server_run_msg_rev(void* _socket); +void *_server_run_msg_rev(void *_socket); -void * _client_run_msg_rev(void* _socket); +void *_client_run_msg_rev(void *_socket); int _shm_close_dgram_socket(shm_socket_t *socket); - int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); -SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ; +SHMQueue<shm_msg_t> *_attach_remote_queue(int port); shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { - shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); - socket->socket_type = socket_type; - socket->port = -1; - socket->dispatch_thread = 0; - socket->status=SHM_CONN_CLOSED; - - return socket; + shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); + socket->socket_type = socket_type; + socket->port = -1; + socket->dispatch_thread = 0; + socket->status = SHM_CONN_CLOSED; + + return socket; } int shm_close_socket(shm_socket_t *socket) { - switch(socket->socket_type) { - case SHM_SOCKET_STREAM: - return _shm_close_stream_socket(socket, true); - case SHM_SOCKET_DGRAM: - return _shm_close_dgram_socket(socket); - default: - return -1; - } - return -1; - + switch (socket->socket_type) { + case SHM_SOCKET_STREAM: + return _shm_close_stream_socket(socket, true); + case SHM_SOCKET_DGRAM: + return _shm_close_dgram_socket(socket); + default: + return -1; + } + return -1; } -int shm_socket_bind(shm_socket_t * socket, int port) { - socket -> port = port; - return 0; +int shm_socket_bind(shm_socket_t *socket, int port) { + socket->port = port; + return 0; } -int shm_listen(shm_socket_t* socket) { +int shm_listen(shm_socket_t *socket) { - if(socket->socket_type != SHM_SOCKET_STREAM) { - err_exit(0, "can not invoke shm_listen method with a socket which is not a SHM_SOCKET_STREAM socket"); - } + if (socket->socket_type != SHM_SOCKET_STREAM) { + err_exit(0, "can not invoke shm_listen method with a socket which is not a " + "SHM_SOCKET_STREAM socket"); + } - int port; - hashtable_t *hashtable = mm_get_hashtable(); - if(socket -> port == -1) { - port = hashtable_alloc_key(hashtable); - socket -> port = port; - } else { - - if(hashtable_get(hashtable, socket->port)!= NULL) { - err_exit(0, "key %d has already been in used!", socket->port); - } - } + int port; + hashtable_t *hashtable = mm_get_hashtable(); + if (socket->port == -1) { + port = hashtable_alloc_key(hashtable); + socket->port = port; + } else { - socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); - socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); - socket->clientSocketMap = new std::map<int, shm_socket_t* >; - socket->status = SHM_CONN_LISTEN; - pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket); + if (hashtable_get(hashtable, socket->port) != NULL) { + err_exit(0, "key %d has already been in used!", socket->port); + } + } - - return 0; + socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + socket->clientSocketMap = new std::map<int, shm_socket_t *>; + socket->status = SHM_CONN_LISTEN; + pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev, + (void *)socket); + + return 0; } - /** * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹� * */ -shm_socket_t* shm_accept(shm_socket_t* socket) { - if(socket->socket_type != SHM_SOCKET_STREAM) { - err_exit(0, "can not invoke shm_accept method with a socket which is not a SHM_SOCKET_STREAM socket"); - } - hashtable_t *hashtable = mm_get_hashtable(); - int client_port; - shm_socket_t *client_socket; - shm_msg_t src; +shm_socket_t *shm_accept(shm_socket_t *socket) { + if (socket->socket_type != SHM_SOCKET_STREAM) { + err_exit(0, "can not invoke shm_accept method with a socket which is not a " + "SHM_SOCKET_STREAM socket"); + } + hashtable_t *hashtable = mm_get_hashtable(); + int client_port; + shm_socket_t *client_socket; + shm_msg_t src; - if (socket->acceptQueue->pop(src) ) { + if (socket->acceptQueue->pop(src)) { -// print_msg("===accept:", src); - client_port = src.port; - // client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); - client_socket = shm_open_socket(socket->socket_type); - client_socket->port = socket->port; - // client_socket->queue= socket->queue; - //鍒濆鍖栨秷鎭痲ueue - client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); - //杩炴帴鍒板鏂筿ueue - client_socket->remoteQueue = _attach_remote_queue(client_port); + // print_msg("===accept:", src); + client_port = src.port; + // client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); + client_socket = shm_open_socket(socket->socket_type); + client_socket->port = socket->port; + // client_socket->queue= socket->queue; + //鍒濆鍖栨秷鎭痲ueue + client_socket->messageQueue = + new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + //杩炴帴鍒板鏂筿ueue + client_socket->remoteQueue = _attach_remote_queue(client_port); - socket->clientSocketMap->insert({client_port, client_socket}); + socket->clientSocketMap->insert({client_port, client_socket}); - /* - * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰� - */ - //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰 - struct timespec timeout = {1, 0}; - shm_msg_t msg; - msg.port = socket->port; - msg.size = 0; - msg.type = SHM_SOCKET_OPEN_REPLY; + /* +* shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� +* 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰� + */ + //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰 + struct timespec timeout = {1, 0}; + shm_msg_t msg; + msg.port = socket->port; + msg.size = 0; + msg.type = SHM_SOCKET_OPEN_REPLY; - if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) - { - client_socket->status = SHM_CONN_ESTABLISHED; - return client_socket; - } else { - err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触"); - return NULL; - } + if (client_socket->remoteQueue->push_timeout(msg, &timeout)) { + client_socket->status = SHM_CONN_ESTABLISHED; + return client_socket; + } else { + err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触"); + return NULL; + } - - } else { - err_exit(errno, "shm_accept"); - } - return NULL; - + } else { + err_exit(errno, "shm_accept"); + } + return NULL; } +int shm_connect(shm_socket_t *socket, int port) { + if (socket->socket_type != SHM_SOCKET_STREAM) { + err_exit(0, "can not invoke shm_connect method with a socket which is not " + "a SHM_SOCKET_STREAM socket"); + } + hashtable_t *hashtable = mm_get_hashtable(); + if (hashtable_get(hashtable, port) == NULL) { + err_exit(0, "shm_connect锛歝onnect at port %d failed!", port); + } -int shm_connect(shm_socket_t* socket, int port) { - if(socket->socket_type != SHM_SOCKET_STREAM) { - err_exit(0, "can not invoke shm_connect method with a socket which is not a SHM_SOCKET_STREAM socket"); - } - hashtable_t *hashtable = mm_get_hashtable(); - if(hashtable_get(hashtable, port)== NULL) { - err_exit(0, "shm_connect锛歝onnect at port %d failed!", port); - } + if (socket->port == -1) { + socket->port = hashtable_alloc_key(hashtable); + } else { - if(socket->port == -1) { - socket->port = hashtable_alloc_key(hashtable); - } else { - - if(hashtable_get(hashtable, socket->port)!= NULL) { - err_exit(0, "key %d has already been in used!", socket->port); - } - } + if (hashtable_get(hashtable, socket->port) != NULL) { + err_exit(0, "key %d has already been in used!", socket->port); + } + } - socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); - socket->remoteQueue = _attach_remote_queue(port); - socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); - + socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); - //鍙戦�乷pen璇锋眰 - struct timespec timeout = {1, 0}; - shm_msg_t msg; - msg.port = socket->port; - msg.size = 0; - msg.type=SHM_SOCKET_OPEN; - socket->remoteQueue->push_timeout(msg, &timeout); + if ((socket->remoteQueue = _attach_remote_queue(port)) == NULL) { + err_exit(0, "connect to %d failted", port); + } + socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); - //鎺ュ彈open reply - if(socket->queue->pop(msg)) { - // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺� - if(msg.type == SHM_SOCKET_OPEN_REPLY) { - socket->status = SHM_CONN_ESTABLISHED; - pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); - } else { - err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!"); - } - - } else { - err_exit(0, "connect failted!"); - } - - return 0; + //鍙戦�乷pen璇锋眰 + struct timespec timeout = {1, 0}; + shm_msg_t msg; + msg.port = socket->port; + msg.size = 0; + msg.type = SHM_SOCKET_OPEN; + socket->remoteQueue->push_timeout(msg, &timeout); + + //鎺ュ彈open reply + if (socket->queue->pop(msg)) { + // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺� + if (msg.type == SHM_SOCKET_OPEN_REPLY) { + socket->status = SHM_CONN_ESTABLISHED; + pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev, + (void *)socket); + } else { + err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!"); + } + + } else { + err_exit(0, "connect failted!"); + } + + return 0; } - int shm_send(shm_socket_t *socket, const void *buf, const int size) { - if(socket->socket_type != SHM_SOCKET_STREAM) { - err_exit(0, "can not invoke shm_send method with a socket which is not a SHM_SOCKET_STREAM socket"); - } - // hashtable_t *hashtable = mm_get_hashtable(); - // if(socket->remoteQueue == NULL) { - // err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!"); - // return -1; - // } - shm_msg_t dest; - dest.type=SHM_COMMON_MSG; - dest.port = socket->port; - dest.size = size; - dest.buf = mm_malloc(size); - memcpy(dest.buf, buf, size); + if (socket->socket_type != SHM_SOCKET_STREAM) { + err_exit(0, "can not invoke shm_send method with a socket which is not a " + "SHM_SOCKET_STREAM socket"); + } + // hashtable_t *hashtable = mm_get_hashtable(); + // if(socket->remoteQueue == NULL) { + // err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!"); + // return -1; + // } + shm_msg_t dest; + dest.type = SHM_COMMON_MSG; + dest.port = socket->port; + dest.size = size; + dest.buf = mm_malloc(size); + memcpy(dest.buf, buf, size); - - if(socket->remoteQueue->push(dest)) { - return 0; - } else { - err_msg(errno, "connection has been closed!"); - return -1; - } + if (socket->remoteQueue->push(dest)) { + return 0; + } else { + err_msg(errno, "connection has been closed!"); + return -1; + } } +int shm_recv(shm_socket_t *socket, void **buf, int *size) { + if (socket->socket_type != SHM_SOCKET_STREAM) { + err_exit(0, "can not invoke shm_recv method in a %d type socket which is " + "not a SHM_SOCKET_STREAM socket ", + socket->socket_type); + } + shm_msg_t src; -int shm_recv(shm_socket_t* socket, void **buf, int *size) { - if(socket->socket_type != SHM_SOCKET_STREAM) { - err_exit(0, "can not invoke shm_recv method in a %d type socket which is not a SHM_SOCKET_STREAM socket ", socket->socket_type); - } - shm_msg_t src; - - if (socket->messageQueue->pop(src)) { - void * _buf = malloc(src.size); - memcpy(_buf, src.buf, src.size); - *buf = _buf; - *size = src.size; - mm_free(src.buf); - return 0; - } else { - return -1; - } - + if (socket->messageQueue->pop(src)) { + void *_buf = malloc(src.size); + memcpy(_buf, src.buf, src.size); + *buf = _buf; + *size = src.size; + mm_free(src.buf); + return 0; + } else { + return -1; + } } - - // 鐭繛鎺ユ柟寮忓彂閫� -int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port) { - if(socket->socket_type != SHM_SOCKET_DGRAM) { - err_exit(0, "Can't invoke shm_sendto method in a %d type socket which is not a SHM_SOCKET_DGRAM socket ", socket->socket_type); - } - hashtable_t *hashtable = mm_get_hashtable(); +int shm_sendto(shm_socket_t *socket, const void *buf, const int size, + const int port, const struct timespec *timeout) { + if (socket->socket_type != SHM_SOCKET_DGRAM) { + err_exit(0, "Can't invoke shm_sendto method in a %d type socket which is " + "not a SHM_SOCKET_DGRAM socket ", + socket->socket_type); + } + hashtable_t *hashtable = mm_get_hashtable(); - if(socket->queue == NULL) { - if(socket->port == -1) { - socket->port = hashtable_alloc_key(hashtable); - } else { - - if(hashtable_get(hashtable, socket->port)!= NULL) { - err_exit(0, "key %d has already been in used!", socket->port); - } - } + if (socket->queue == NULL) { + if (socket->port == -1) { + socket->port = hashtable_alloc_key(hashtable); + } else { - socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); - } - if (port == socket->port) { - err_msg(0, "can not send to your self!"); - return -1; - } + if (hashtable_get(hashtable, socket->port) != NULL) { + err_exit(0, "key %d has already been in used!", socket->port); + } + } - shm_msg_t dest; - dest.type=SHM_COMMON_MSG; - dest.port = socket->port; - dest.size = size; - dest.buf = mm_malloc(size); - memcpy(dest.buf, buf, size); + socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + } + if (port == socket->port) { + err_msg(0, "can not send to your self!"); + return -1; + } - SHMQueue<shm_msg_t> *remoteQueue = _attach_remote_queue(port); - if(remoteQueue->push(dest)) { - delete remoteQueue; - return 0; - } else { - delete remoteQueue; - err_msg(errno, "sendto port %d failed!", port); - return -1; - } + shm_msg_t dest; + dest.type = SHM_COMMON_MSG; + dest.port = socket->port; + dest.size = size; + dest.buf = mm_malloc(size); + memcpy(dest.buf, buf, size); + + SHMQueue<shm_msg_t> *remoteQueue; + if ((remoteQueue = _attach_remote_queue(port)) == NULL) { + err_msg(0, "shm_sendto failed, then other end has been closed!"); + return -1; + } + // printf("shm_sendto push before\n"); + bool rv; + if(timeout != NULL) { + rv = remoteQueue->push_timeout(dest, timeout); + } else { + rv = remoteQueue->push(dest); + } + + if (rv) { + // printf("shm_sendto push after\n"); + delete remoteQueue; + return 0; + } else { + delete remoteQueue; + err_msg(errno, "sendto port %d failed!", port); + return -1; + } } - // 鐭繛鎺ユ柟寮忔帴鍙� -int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port){ - if(socket->socket_type != SHM_SOCKET_DGRAM) { - err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket which is not a SHM_SOCKET_DGRAM socket ", socket->socket_type); - } - hashtable_t *hashtable = mm_get_hashtable(); - if(socket->queue == NULL) { - if(socket->port == -1) { - socket->port = hashtable_alloc_key(hashtable); - } else { - - if(hashtable_get(hashtable, socket->port)!= NULL) { - err_exit(0, "key %d has already been in used!", socket->port); - } - } +int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port) { + if (socket->socket_type != SHM_SOCKET_DGRAM) { + err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket which " + "is not a SHM_SOCKET_DGRAM socket ", + socket->socket_type); + } + hashtable_t *hashtable = mm_get_hashtable(); + if (socket->queue == NULL) { + if (socket->port == -1) { + socket->port = hashtable_alloc_key(hashtable); + } else { - socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); - } + if (hashtable_get(hashtable, socket->port) != NULL) { + err_exit(0, "key %d has already been in used!", socket->port); + } + } - shm_msg_t src; -// printf("shm_recvfrom pop before"); - if (socket->queue->pop(src)) { - void * _buf = malloc(src.size); - memcpy(_buf, src.buf, src.size); - *buf = _buf; - *size = src.size; - *port = src.port; - mm_free(src.buf); -// printf("shm_recvfrom pop after"); - return 0; - } else { - return -1; - } + socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + } + + shm_msg_t src; + // printf("shm_recvfrom pop before\n"); + if (socket->queue->pop(src)) { + void *_buf = malloc(src.size); + memcpy(_buf, src.buf, src.size); + *buf = _buf; + *size = src.size; + *port = src.port; + mm_free(src.buf); + // printf("shm_recvfrom pop after\n"); + return 0; + } else { + return -1; + } } -int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) { - if(socket->socket_type != SHM_SOCKET_DGRAM) { - err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket which is not a SHM_SOCKET_DGRAM socket ", socket->socket_type); - } - int recv_port; - int rv; +int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, + const int send_size, const int send_port, void **recv_buf, + int *recv_size) { + if (socket->socket_type != SHM_SOCKET_DGRAM) { + err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket " + "which is not a SHM_SOCKET_DGRAM socket ", + socket->socket_type); + } + int recv_port; + int rv; - shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); - if (shm_sendto(tmp_socket, send_buf, send_size, send_port) == 0) { - rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port); - shm_close_socket(tmp_socket); - return rv; - } - return -1; + shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM); + if (shm_sendto(tmp_socket, send_buf, send_size, send_port) == 0) { + rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port); + shm_close_socket(tmp_socket); + return rv; + } + return -1; } - /** * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑� */ -SHMQueue<shm_msg_t> * _attach_remote_queue(int port) { - hashtable_t *hashtable = mm_get_hashtable(); - if(hashtable_get(hashtable, port)== NULL) { - err_exit(0, "_remote_queue_attach锛歝onnet at port %d failed!", port); - return NULL; - } - - SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0); - return queue; +SHMQueue<shm_msg_t> *_attach_remote_queue(int port) { + hashtable_t *hashtable = mm_get_hashtable(); + if (hashtable_get(hashtable, port) == NULL) { + err_msg(0, "_remote_queue_attach锛歝onnet at port %d failed!", port); + return NULL; + } + + SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0); + return queue; } - - - - -void _server_close_conn_to_client(shm_socket_t* socket, int port) { - shm_socket_t *client_socket; - std::map<int, shm_socket_t* >::iterator iter = socket->clientSocketMap->find(port); - if( iter != socket->clientSocketMap->end() ) { - client_socket = iter->second; - free((void *)client_socket); - socket->clientSocketMap->erase(iter); - } - - +void _server_close_conn_to_client(shm_socket_t *socket, int port) { + shm_socket_t *client_socket; + std::map<int, shm_socket_t *>::iterator iter = + socket->clientSocketMap->find(port); + if (iter != socket->clientSocketMap->end()) { + client_socket = iter->second; + free((void *)client_socket); + socket->clientSocketMap->erase(iter); + } } /** * server绔悇绉嶇被鍨嬫秷鎭紙锛夊湪杩欓噷杩涚▼鍒嗘嫞 */ -void * _server_run_msg_rev(void* _socket) { - pthread_detach(pthread_self()); - shm_socket_t* socket = (shm_socket_t*) _socket; - struct timespec timeout = {1, 0}; - shm_msg_t src; - shm_socket_t *client_socket; - std::map<int, shm_socket_t* >::iterator iter; +void *_server_run_msg_rev(void *_socket) { + pthread_detach(pthread_self()); + shm_socket_t *socket = (shm_socket_t *)_socket; + struct timespec timeout = {1, 0}; + shm_msg_t src; + shm_socket_t *client_socket; + std::map<int, shm_socket_t *>::iterator iter; - while(socket->queue->pop(src)) { + while (socket->queue->pop(src)) { - switch (src.type) { - case SHM_SOCKET_OPEN : - socket->acceptQueue->push_timeout(src, &timeout); - break; - case SHM_SOCKET_CLOSE : - _server_close_conn_to_client(socket, src.port); - break; - case SHM_COMMON_MSG : + switch (src.type) { + case SHM_SOCKET_OPEN: + socket->acceptQueue->push_timeout(src, &timeout); + break; + case SHM_SOCKET_CLOSE: + _server_close_conn_to_client(socket, src.port); + break; + case SHM_COMMON_MSG: - iter = socket->clientSocketMap->find(src.port); - if( iter != socket->clientSocketMap->end()) { - client_socket= iter->second; - // print_msg("_server_run_msg_rev push before", src); - client_socket->messageQueue->push_timeout(src, &timeout); - // print_msg("_server_run_msg_rev push after", src); - } - - break; + iter = socket->clientSocketMap->find(src.port); + if (iter != socket->clientSocketMap->end()) { + client_socket = iter->second; + // print_msg("_server_run_msg_rev push before", src); + client_socket->messageQueue->push_timeout(src, &timeout); + // print_msg("_server_run_msg_rev push after", src); + } - default: - err_msg(0, "socket.__shm_rev__: undefined message type."); - } + break; + + default: + err_msg(0, "socket.__shm_rev__: undefined message type."); } - - return NULL; + } + + return NULL; } +void _client_close_conn_to_server(shm_socket_t *socket) { - -void _client_close_conn_to_server(shm_socket_t* socket) { - - _shm_close_stream_socket(socket, false); + _shm_close_stream_socket(socket, false); } - /** * client绔殑鍚勭绫诲瀷娑堟伅锛堬級鍦ㄨ繖閲岃繘绋嬪垎鎷� */ -void * _client_run_msg_rev(void* _socket) { - pthread_detach(pthread_self()); - shm_socket_t* socket = (shm_socket_t*) _socket; - struct timespec timeout = {1, 0}; - shm_msg_t src; - - while(socket->queue->pop(src)) { - switch (src.type) { - - case SHM_SOCKET_CLOSE : - _client_close_conn_to_server(socket); - break; - case SHM_COMMON_MSG : - socket->messageQueue->push_timeout(src, &timeout); - break; - default: - err_msg(0, "socket.__shm_rev__: undefined message type."); - } +void *_client_run_msg_rev(void *_socket) { + pthread_detach(pthread_self()); + shm_socket_t *socket = (shm_socket_t *)_socket; + struct timespec timeout = {1, 0}; + shm_msg_t src; + + while (socket->queue->pop(src)) { + switch (src.type) { + + case SHM_SOCKET_CLOSE: + _client_close_conn_to_server(socket); + break; + case SHM_COMMON_MSG: + socket->messageQueue->push_timeout(src, &timeout); + break; + default: + err_msg(0, "socket.__shm_rev__: undefined message type."); } - - return NULL; + } + + return NULL; } - int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote) { - socket->status = SHM_CONN_CLOSED; - //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅 - struct timespec timeout = {1, 0}; - shm_msg_t close_msg; + socket->status = SHM_CONN_CLOSED; + //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅 + struct timespec timeout = {1, 0}; + shm_msg_t close_msg; - close_msg.port = socket->port; - close_msg.size = 0; - close_msg.type=SHM_SOCKET_CLOSE; - if(notifyRemote && socket->remoteQueue != NULL) { - socket->remoteQueue->push_timeout(close_msg, &timeout); - } - - if(socket->queue != NULL) { - delete socket->queue; - socket->queue = NULL; - } + close_msg.port = socket->port; + close_msg.size = 0; + close_msg.type = SHM_SOCKET_CLOSE; + if (notifyRemote && socket->remoteQueue != NULL) { + socket->remoteQueue->push_timeout(close_msg, &timeout); + } - if(socket->remoteQueue != NULL) { - delete socket->remoteQueue; - socket->remoteQueue = NULL; - } + if (socket->queue != NULL) { + delete socket->queue; + socket->queue = NULL; + } - if(socket->messageQueue != NULL) { - delete socket->messageQueue; - socket->messageQueue = NULL; - } + if (socket->remoteQueue != NULL) { + delete socket->remoteQueue; + socket->remoteQueue = NULL; + } - if(socket->acceptQueue != NULL) { - delete socket->acceptQueue; - socket->acceptQueue = NULL; - } + if (socket->messageQueue != NULL) { + delete socket->messageQueue; + socket->messageQueue = NULL; + } - if(socket->clientSocketMap != NULL) { - shm_socket_t *client_socket; - for(auto iter = socket->clientSocketMap->begin(); iter != socket->clientSocketMap->end(); iter++) { - client_socket= iter->second; + if (socket->acceptQueue != NULL) { + delete socket->acceptQueue; + socket->acceptQueue = NULL; + } - client_socket->remoteQueue->push_timeout(close_msg, &timeout); - delete client_socket->remoteQueue; - client_socket->remoteQueue=NULL; + if (socket->clientSocketMap != NULL) { + shm_socket_t *client_socket; + for (auto iter = socket->clientSocketMap->begin(); + iter != socket->clientSocketMap->end(); iter++) { + client_socket = iter->second; - delete client_socket->messageQueue; - client_socket->messageQueue=NULL; - socket->clientSocketMap->erase(iter); - free((void *)client_socket); - } - delete socket->clientSocketMap; - } + client_socket->remoteQueue->push_timeout(close_msg, &timeout); + delete client_socket->remoteQueue; + client_socket->remoteQueue = NULL; + delete client_socket->messageQueue; + client_socket->messageQueue = NULL; + socket->clientSocketMap->erase(iter); + free((void *)client_socket); + } + delete socket->clientSocketMap; + } - if(socket->dispatch_thread != 0) - pthread_cancel(socket->dispatch_thread); - - free(socket); - return 0; + if (socket->dispatch_thread != 0) + pthread_cancel(socket->dispatch_thread); + + free(socket); + return 0; } diff --git a/src/util/include/sem_util.h b/src/util/include/sem_util.h index 0d673de..5d2cf77 100644 --- a/src/util/include/sem_util.h +++ b/src/util/include/sem_util.h @@ -9,7 +9,7 @@ int get(key_t key, unsigned int value); int dec(int semId); int dec_nowait(int semId); - int dec_timeout(int semId, struct timespec * timeout); + int dec_timeout(const int semId, const struct timespec * timeout); int inc(int semId); void remove(int semid); diff --git a/src/util/sem_util.c b/src/util/sem_util.c index 4f294c0..1dcf00d 100644 --- a/src/util/sem_util.c +++ b/src/util/sem_util.c @@ -72,6 +72,7 @@ /* Reserve semaphore - decrement it by 1 */ int SemUtil::dec(int semId) { +logger.debug("%d: SemUtil::dec\n", semId); struct sembuf sops; sops.sem_num = 0; @@ -103,7 +104,7 @@ return 0; } -int SemUtil::dec_timeout(int semId, struct timespec *timeout) { +int SemUtil::dec_timeout(const int semId, const struct timespec *timeout) { struct sembuf sops; sops.sem_num = 0; @@ -121,6 +122,7 @@ /* Release semaphore - increment it by 1 */ int SemUtil::inc(int semId) { +logger.debug("%d: SemUtil::inc\n", semId); struct sembuf sops; sops.sem_num = 0; diff --git a/test/Makefile b/test/Makefile index d6fa6d8..4b888f5 100755 --- a/test/Makefile +++ b/test/Makefile @@ -14,7 +14,7 @@ include $(ROOT)/Make.defines.$(PLATFORM) -PROGS = protocle_parse +PROGS = protocle_parse test build: $(PROGS) diff --git a/test/protocle_parse b/test/protocle_parse index 6b06605..e18682e 100755 --- a/test/protocle_parse +++ b/test/protocle_parse Binary files differ diff --git a/test/protocle_parse.c b/test/protocle_parse.c index 9acef45..0280b7c 100644 --- a/test/protocle_parse.c +++ b/test/protocle_parse.c @@ -3,10 +3,10 @@ #define ACTION_LIDENTIFIER "<**" #define ACTION_RIDENTIFIER "**>" -#define TOPIC_LIDENTIFIER '{' -#define TOPIC_RIDENTIFIER '}' +#define TOPIC_LIDENTIFIER "{" +#define TOPIC_RIDENTIFIER "}" -int parse_pubsub_topic(char *str, char **_action, size_t *_action_len, char **_topic, size_t *_topic_len) { +int parse_pubsub_topic(char *str, char **_action, char **_topic, size_t *head_len ) { char *ptr = str; char *str_end_ptr = str + strlen(str); char *action_start_ptr; @@ -35,14 +35,20 @@ ptr += strlen(ACTION_RIDENTIFIER); // printf("%s\n", ptr); // printf("%s\n", str_end_ptr-1); - if( (*ptr == TOPIC_LIDENTIFIER) && (*(str_end_ptr-1) == TOPIC_RIDENTIFIER) ) { - topic_start_ptr = ptr; - topic_end_ptr = str_end_ptr; - topic_len = topic_end_ptr - topic_start_ptr + 1; - ptr++; - // while(*(++ptr) != '}') { - // length++; - // } + if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) { + topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER); + + + while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) { + if(ptr >= str_end_ptr) { + return 0; + } + } + topic_end_ptr = ptr; + topic_len = topic_end_ptr - topic_start_ptr; + + ptr += strlen(TOPIC_RIDENTIFIER); + } else { return 0; } @@ -53,13 +59,11 @@ char *topic = (char *)calloc(1, topic_len+1); strncpy(topic, topic_start_ptr, topic_len); *_topic = topic; - *_topic_len = topic_len; char *action = (char *)calloc(1, action_len+1); strncpy(action, action_start_ptr, action_len); *_action = action; - *_action_len = action_len; - + *head_len = ptr-str; return 1; } @@ -68,12 +72,13 @@ char *action; size_t action_len; char *topic; - size_t topic_len; + size_t head_len; - char *str = "<**sub**>{缁忔祹}"; - if(parse_pubsub_topic(str, &action, &action_len, &topic, &topic_len)) { + char *str = "<**pub**>{缁忔祹}abcdef"; + if(parse_pubsub_topic(str, &action, &topic, &head_len)) { printf("action:%s\n", action); printf("topic:%s\n", topic); + printf("content:%s\n", str+head_len); free(action); free(topic); } else { diff --git a/test/test b/test/test new file mode 100755 index 0000000..1c5c801 --- /dev/null +++ b/test/test Binary files differ diff --git a/test/test.c b/test/test.c new file mode 100644 index 0000000..744ac72 --- /dev/null +++ b/test/test.c @@ -0,0 +1,12 @@ +#include "usg_common.h" +#include "usg_typedef.h" + +void test(char *src, int size) { + char dest[size]; + strncpy(dest, src, size); + puts(dest); +} +int main() { + char *str = "hello"; + test(str, strlen(str)); +} \ No newline at end of file diff --git a/test_socket/Makefile b/test_socket/Makefile index f3d0ac8..ae1485e 100644 --- a/test_socket/Makefile +++ b/test_socket/Makefile @@ -14,7 +14,7 @@ include $(ROOT)/Make.defines.$(PLATFORM) -PROGS = dgram_socket_test dgram_mod_req_rep dgram_mod_survey stream_mod_req_rep +PROGS = dgram_mod_bus dgram_mod_survey build: $(PROGS) diff --git a/test_socket/dgram_mod_bus b/test_socket/dgram_mod_bus new file mode 100755 index 0000000..8e5a8c8 --- /dev/null +++ b/test_socket/dgram_mod_bus Binary files differ diff --git a/test_socket/dgram_mod_bus.c b/test_socket/dgram_mod_bus.c new file mode 100644 index 0000000..0c58bb4 --- /dev/null +++ b/test_socket/dgram_mod_bus.c @@ -0,0 +1,86 @@ +#include "dgram_mod_socket.h" +#include "shm_mm.h" +#include "usg_common.h" + +void server(int port) { + void *socket = dgram_mod_open_socket(); + dgram_mod_bind(socket, port); + + start_bus(socket); + +} + + +void *run_recv(void *socket) { + pthread_detach(pthread_self()); + void *recvbuf; + int size; + int port; + while (dgram_mod_recvfrom( socket, &recvbuf, &size, &port) == 0) { + printf("鏀跺埌璁㈤槄娑堟伅:%s\n", recvbuf); + free(recvbuf); + } + +} + +void client(int port) { + void *socket = dgram_mod_open_socket(); + pthread_t tid; + pthread_create(&tid, NULL, run_recv, socket); + int size; + + char action[512]; + char topic[512]; + char content[512]; + long i = 0; + while (true) { + //printf("Usage: pub <topic> [content] or sub <topic>\n"); + printf("Can I help you? sub, pub or quit\n"); + scanf("%s",action); + + if(strcmp(action, "sub") == 0) { + printf("Please input topic!\n"); + scanf("%s", topic); + sub(socket, topic, strlen(topic), port); + printf("Sub success!\n"); + } + else if(strcmp(action, "pub") == 0) { + // printf("%s %s %s\n", action, topic, content); + printf("Please input topic and content\n"); + scanf("%s %s", topic, content); + pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port); + printf("Pub success!\n"); + } else if(strcmp(action, "quit") == 0) { + break; + } else { + printf("error input\n"); + continue; + } + + } + printf("(%d) quit\n", dgram_mod_get_socket_port(socket)); + dgram_mod_close_socket(socket); +} + + + +int main(int argc, char *argv[]) { + shm_init(512); + int port; + if (argc < 3) { + fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); + return 1; + } + + port = atoi(argv[2]); + + if (strcmp("server", argv[1]) == 0) { + server(port); + } + + if (strcmp("client", argv[1]) == 0) + client(port); + + + return 0; +} \ No newline at end of file diff --git a/test_socket/dgram_mod_req_rep b/test_socket/dgram_mod_req_rep deleted file mode 100755 index c9694d9..0000000 --- a/test_socket/dgram_mod_req_rep +++ /dev/null Binary files differ diff --git a/test_socket/dgram_mod_survey b/test_socket/dgram_mod_survey index 3cb69a6..6e179bb 100755 --- a/test_socket/dgram_mod_survey +++ b/test_socket/dgram_mod_survey Binary files differ diff --git a/test_socket/dgram_mod_survey.c b/test_socket/dgram_mod_survey.c index f19b6a1..24a72a5 100644 --- a/test_socket/dgram_mod_survey.c +++ b/test_socket/dgram_mod_survey.c @@ -3,7 +3,7 @@ #include "usg_common.h" void server(int port) { - void *socket = dgram_mod_open_socket(SURVEY); + void *socket = dgram_mod_open_socket(); dgram_mod_bind(socket, port); int size; void *recvbuf; @@ -18,16 +18,14 @@ } void client(int port) { - void *socket = dgram_mod_open_socket(SURVEY); + void *socket = dgram_mod_open_socket(); int size; - void *recvbuf; char sendbuf[512]; long i = 0; while (true) { sprintf(sendbuf, "%d", i); printf("SEND HEART:%s\n", sendbuf); dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port); - free(recvbuf); sleep(1); i++; } diff --git a/test_socket/dgram_socket_test b/test_socket/dgram_socket_test index 98b6997..ca33409 100755 --- a/test_socket/dgram_socket_test +++ b/test_socket/dgram_socket_test Binary files differ diff --git a/test_socket/stream_mod_req_rep b/test_socket/stream_mod_req_rep deleted file mode 100755 index 8c5e1ff..0000000 --- a/test_socket/stream_mod_req_rep +++ /dev/null Binary files differ -- Gitblit v1.8.0