From 1a7a9bdc976e4496d739bc57053e613f993bd85b Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 16 七月 2020 10:03:11 +0800 Subject: [PATCH] update --- queue/libshm_queue.a | 0 queue/include/shm_queue.h | 2 queue/include/socket.h | 61 +++++++ queue/include/shm_queue_wrapper.h | 12 - queue/socket.c | 234 +++++++++++++++++++++++++++++ test2/Makefile | 2 queue/hashtable.c | 8 /dev/null | 0 queue/Makefile | 6 test2/req_rep.c | 59 +++++++ test2/pub_sub.c | 60 +++++++ queue/hashtable.h | 2 queue/shm_queue_wrapper.c | 7 test2/req_rep | 0 14 files changed, 426 insertions(+), 27 deletions(-) diff --git a/queue/Makefile b/queue/Makefile index e26b381..83a16dd 100644 --- a/queue/Makefile +++ b/queue/Makefile @@ -22,6 +22,12 @@ MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE) +ifeq ($(DEBUG),y) + MYLIBS = $(LIBSQUEUE) +else + MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE) +endif + all: build diff --git a/queue/hashtable.c b/queue/hashtable.c index 629d2af..43c59ce 100755 --- a/queue/hashtable.c +++ b/queue/hashtable.c @@ -15,8 +15,6 @@ TAILQ_ENTRY(tailq_entry_t) joint; } tailq_entry_t; -static int hashtable_mutex; - #define START_KEY 1000 typedef TAILQ_HEAD(tailq_header_t, tailq_entry_t) tailq_header_t; @@ -56,7 +54,7 @@ } -void* _hashtable_put(hashtable_t *hashtable, int key, void *value) +void * _hashtable_put(hashtable_t *hashtable, int key, void *value) { size_t code = hashcode(key); void *oldvalue; @@ -209,7 +207,7 @@ return res; } -void* hashtable_put(hashtable_t *hashtable, int key, void *value) { +void hashtable_put(hashtable_t *hashtable, int key, void *value) { SemUtil::dec(hashtable->mutex); while (hashtable->readcnt > 0) { @@ -262,4 +260,4 @@ } } return keyset; -} \ No newline at end of file +} diff --git a/queue/hashtable.h b/queue/hashtable.h index 2cdfb4d..726a5bc 100755 --- a/queue/hashtable.h +++ b/queue/hashtable.h @@ -19,7 +19,7 @@ void hashtable_init(hashtable_t *hashtable); void *hashtable_get(hashtable_t *hashtable, int key); -void* hashtable_put(hashtable_t *hashtable, int key, void *value); +void hashtable_put(hashtable_t *hashtable, int key, void *value); void *hashtable_remove(hashtable_t *hashtable, int key); void hashtable_removeall(hashtable_t *hashtable); diff --git a/queue/include/shm_queue.h b/queue/include/shm_queue.h index 3a506c9..a62a6f0 100644 --- a/queue/include/shm_queue.h +++ b/queue/include/shm_queue.h @@ -64,7 +64,7 @@ bool found; for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { found = false; - for(int i = 0; i < length; i++) { + for(size_t i = 0; i < length; i++) { if(*keyItr == keys[i]) { found = true; break; diff --git a/queue/include/shm_queue_wrapper.h b/queue/include/shm_queue_wrapper.h index 6d9013b..33ed54f 100644 --- a/queue/include/shm_queue_wrapper.h +++ b/queue/include/shm_queue_wrapper.h @@ -9,18 +9,6 @@ #ifdef __cplusplus extern "C" { #endif -/** - * 鍒濆鍖栧叡浜唴瀛� - * @size 鍏变韩鍐呭瓨澶у皬 - * - */ -void shm_init(int size); - -/** - * 閿�姣佸叡浜唴瀛� - * 鏁翠釜杩涚▼閫�鍑烘椂闇�瑕佹墽琛岃繖涓柟娉曪紝璇ユ柟娉曢鍏堜細妫�鏌ユ槸鍚﹁繕鏈夊叾浠栬繘绋嬪湪浣跨敤璇ュ叡浜唴瀛橈紝濡傛灉杩樻湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ氨鍙槸detach,濡傛灉娌℃湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ垯閿�姣佹暣鍧楀唴瀛樸�� - */ -void shm_destroy(); diff --git a/queue/include/socket.h b/queue/include/socket.h new file mode 100644 index 0000000..b19a7b4 --- /dev/null +++ b/queue/include/socket.h @@ -0,0 +1,61 @@ +#ifndef __BASIC_SHM_SOCKET_H__ +#define __BASIC_SHM_SOCKET_H__ + +#include "usg_common.h" +#ifdef __cplusplus +extern "C" { +#endif + +enum shm_mod_t +{ + PULL_PUSH = 1, + REQ_REP = 2, + PAIR = 3, + PUB_SUB = 4, + SURVEY = 5, + BUS = 6 + +}; + + +/** + * 鍒濆鍖栧叡浜唴瀛� + * @size 鍏变韩鍐呭瓨澶у皬, 鍗曚綅M + * + */ +void shm_init(int size); + +/** + * 閿�姣佸叡浜唴瀛� + * 鏁翠釜杩涚▼閫�鍑烘椂闇�瑕佹墽琛岃繖涓柟娉曪紝璇ユ柟娉曢鍏堜細妫�鏌ユ槸鍚﹁繕鏈夊叾浠栬繘绋嬪湪浣跨敤璇ュ叡浜唴瀛橈紝濡傛灉杩樻湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ氨鍙槸detach,濡傛灉娌℃湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ垯閿�姣佹暣鍧楀唴瀛樸�� + */ +void shm_destroy(); + +/** + * 閲婃斁recv鏂规硶鍒嗛厤鐨刡uf + */ +void shm_free(void *buf); + +void *shm_open_socket(int mod); + + +int shm_close_socket(void *socket) ; + + +int shm_bind(void* socket, int port) ; + +int shm_listen(void* socket) ; + +int shm_connect(void* socket, int port); + +int shm_send(void *socket, void *buf, int size) ; + +int shm_recv(void* socket, void **buf, int *size) ; + + + +#ifdef __cplusplus +} +#endif + +#endif \ No newline at end of file diff --git a/queue/libshm_queue.a b/queue/libshm_queue.a index 873f630..94b87c9 100644 --- a/queue/libshm_queue.a +++ b/queue/libshm_queue.a Binary files differ diff --git a/queue/libshm_queue.so b/queue/libshm_queue.so deleted file mode 100755 index b0908e5..0000000 --- a/queue/libshm_queue.so +++ /dev/null Binary files differ diff --git a/queue/shm_queue_wrapper.c b/queue/shm_queue_wrapper.c index a6abd17..f680018 100644 --- a/queue/shm_queue_wrapper.c +++ b/queue/shm_queue_wrapper.c @@ -13,13 +13,6 @@ } shmqueue_t; -void shm_init(int size) { - mem_pool_init(size); -} - -void shm_destroy() { - mem_pool_destroy(); -} //绉婚櫎涓嶅寘鍚湪keys涓殑闃熷垪 void shm_remove_queues_exclue(void *keys, int length) { diff --git a/queue/socket.c b/queue/socket.c new file mode 100644 index 0000000..cc9a08a --- /dev/null +++ b/queue/socket.c @@ -0,0 +1,234 @@ +#include "usg_common.h" +#include "usg_typedef.h" +#include "shm_queue.h" +#include "shm_allocator.h" + +#include "mem_pool.h" +#include "hashtable.h" +#include "sem_util.h" +#include "socket.h" +#include <map> + + +enum shm_msg_type_t +{ + SHM_SOCKET_OPEN = 1, + SHM_SOKET_CLOSE = 2, + SHM_COMMON_MSG = 3 + +}; + +typedef struct shm_msg_t { + int port; + shm_msg_type_t type; + size_t size; + void * buf; + +} shm_msg_t; + +typedef struct shm_socket_t { + int port; + shm_mod_t mod; + SHMQueue<shm_msg_t> *queue; + std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap; + int slots; + int items; + int is_server; + +} shm_socket_t; + + + +void shm_init(int size) { + mem_pool_init(size); +} + +void shm_destroy() { + mem_pool_destroy(); +} + +void shm_free(void *buf) { + free(buf); +} + +void *shm_open_socket(int mod) { + shm_socket_t *socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); + socket->remoteQueueMap = new std::map<int, SHMQueue<shm_msg_t>* >; + socket->port = -1; + socket->mod = (shm_mod_t)mod; +printf("mod===%d\n", socket->mod); + socket->is_server = 0; + if (mod == REQ_REP) { + socket->slots = SemUtil::get(IPC_PRIVATE, 1); + socket->items = SemUtil::get(IPC_PRIVATE, 0); + } + + return (void *)socket; +} + + +int shm_close_socket(void *socket) { + shm_socket_t * _socket = (shm_socket_t *) socket; + delete _socket->queue; + + std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; + for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) { + delete iter->second; + } + delete _socket->remoteQueueMap; + + if (_socket->mod == REQ_REP) { + SemUtil::remove(_socket->slots); + SemUtil::remove(_socket->items); + } + + free(socket); + return 0; + +} + + +int shm_bind(void* socket, int port) { + shm_socket_t * _socket = (shm_socket_t *) socket; + _socket -> port = port; + return 0; +} + +int shm_listen(void* socket) { + shm_socket_t * _socket = (shm_socket_t *) socket; + _socket->is_server = 1; + int port; + hashtable_t *hashtable = mm_get_hashtable(); + if(_socket -> port == -1) { + port = hashtable_alloc_key(hashtable); + _socket -> port = port; + } else { + + if(hashtable_get(hashtable, _socket->port)!= NULL) { + err_exit(0, "key %d has already been in used!", _socket->port); + } + } + + _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16); + return 0; +} + + +static int __shm_rev(shm_socket_t* _socket, void **buf, int *size) { + shm_msg_t src; + + std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; + bool rv = _socket->queue->pop(src); + + if (rv) { + if(src.type=="open") + + + + if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) { + if(_socket->mod == REQ_REP) + SemUtil::dec(_socket->slots); + + remoteQueueMap->insert({src.port, new SHMQueue<shm_msg_t>(src.port, 0)}); + + if(_socket->mod == REQ_REP) + SemUtil::inc(_socket->items); + } + + void * _buf = malloc(src.size); + memcpy(_buf, src.buf, src.size); + *buf = _buf; + *size = src.size; + mm_free(src.buf); + return 0; + } else { + return 1; + } +} + +int shm_connect(void* socket, int port) { + shm_socket_t * _socket = (shm_socket_t *) socket; + hashtable_t *hashtable = mm_get_hashtable(); + if(hashtable_get(hashtable, port)== NULL) { + err_exit(0, "shm_connect锛歝onnect at port %d failed!", port); + } + if(_socket -> port == -1) { + _socket -> port = hashtable_alloc_key(hashtable); + } else { + + if(hashtable_get(hashtable, _socket->port)!= NULL) { + err_exit(0, "key %d has already been in used!", _socket->port); + } + } + + _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16); + _socket->remoteQueueMap->insert({port, new SHMQueue<shm_msg_t>(port, 0)}); + return 0; +} + +int shm_send(void *socket, void *buf, int size) { + shm_socket_t * _socket = (shm_socket_t *) socket; + hashtable_t *hashtable = mm_get_hashtable(); + shm_msg_t dest; + dest.port = _socket->port; + dest.size = size; + dest.buf = mm_malloc(size); + memcpy(dest.buf, buf, size); + + std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; + for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) { + if(hashtable_get(hashtable, iter->first)== NULL) { + err_msg(0, "shm_send锛歝onnect at port %d failed, the other part has been closed!", iter->first); + delete iter->second; + remoteQueueMap->erase(iter); + continue; + } + if(_socket->mod == REQ_REP && _socket->is_server == 1) + SemUtil::dec(_socket->items); + + iter->second->push(dest); + + if( _socket->mod == REQ_REP && _socket->is_server == 1) { + delete iter->second; + remoteQueueMap->erase(iter); + SemUtil::inc(_socket->slots); + } + + + } + return 0; +} + +int shm_recv(void* socket, void **buf, int *size) { + shm_socket_t * _socket = (shm_socket_t *) socket; + shm_msg_t src; + + std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; + bool rv = _socket->queue->pop(src); + if (rv) { + if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) { + if(_socket->mod == REQ_REP) + SemUtil::dec(_socket->slots); + + remoteQueueMap->insert({src.port, new SHMQueue<shm_msg_t>(src.port, 0)}); + + if(_socket->mod == REQ_REP) + SemUtil::inc(_socket->items); + } + + void * _buf = malloc(src.size); + memcpy(_buf, src.buf, src.size); + *buf = _buf; + *size = src.size; + mm_free(src.buf); + return 0; + } else { + return 1; + } +} + + + + + + diff --git a/test2/Makefile b/test2/Makefile index 872f05d..e8c4421 100644 --- a/test2/Makefile +++ b/test2/Makefile @@ -14,7 +14,7 @@ include $(ROOT)/Make.defines.$(PLATFORM) -PROGS = test_queue_wrapper server client pub sub +PROGS = req_rep build: $(PROGS) diff --git a/test2/pub_sub.c b/test2/pub_sub.c new file mode 100644 index 0000000..669783f --- /dev/null +++ b/test2/pub_sub.c @@ -0,0 +1,60 @@ +#include "socket.h" + + +void server(int port) { + void *socket = shm_open_socket(PUB_SUB); + shm_bind(socket, port); + shm_listen(socket); + int size; + void *recvbuf; + char sendbuf[512]; + while(true) { + shm_recv(socket, &recvbuf, &size); + sprintf(sendbuf, "pub: %s", recvbuf); + puts(sendbuf); + shm_send(socket, sendbuf, strlen(sendbuf)+1) ; + shm_free(recvbuf); + + } + shm_close_socket(socket); +} + +void client(int port) { + void *socket = shm_open_socket(PUB_SUB); + shm_connect(socket, port); + int size; + void *recvbuf; + char sendbuf[512]; + + sprintf(sendbuf, "sub"); + shm_send(socket, sendbuf, strlen(sendbuf)+1) ; + while(true) { + + shm_recv(socket, &recvbuf, &size); + printf("received sub message: %s\n", (char *)recvbuf); + shm_free(recvbuf); + + } + shm_close_socket(socket); +} + +int main(int argc, char *argv[]) { + shm_init(512); + int port; + if (argc < 3) { + fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); + return 1; + } + + port = atoi(argv[2]); + + if (strcmp("server", argv[1]) == 0 ) { + server(port); + } + + if (strcmp("client", argv[1]) == 0) + client(port); + shm_destroy(); + // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); + return 0; +} \ No newline at end of file diff --git a/test2/req_rep b/test2/req_rep new file mode 100755 index 0000000..b2f9345 --- /dev/null +++ b/test2/req_rep Binary files differ diff --git a/test2/req_rep.c b/test2/req_rep.c new file mode 100644 index 0000000..e2b4fd9 --- /dev/null +++ b/test2/req_rep.c @@ -0,0 +1,59 @@ +#include "socket.h" + + +void server(int port) { + void *socket = shm_open_socket(REQ_REP); + shm_bind(socket, port); + shm_listen(socket); + int size; + void *recvbuf; + char sendbuf[512]; + while(true) { + shm_recv(socket, &recvbuf, &size); + sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); + puts(sendbuf); + shm_send(socket, sendbuf, strlen(sendbuf)+1) ; + shm_free(recvbuf); + + } + shm_close_socket(socket); +} + +void client(int port) { + void *socket = shm_open_socket(REQ_REP); + shm_connect(socket, port); + int size; + void *recvbuf; + char sendbuf[512]; + while(true) { + printf("request: "); + scanf("%s", sendbuf); + shm_send(socket, sendbuf, strlen(sendbuf)+1) ; + shm_recv(socket, &recvbuf, &size); + printf("reply: %s\n", (char *)recvbuf); + shm_free(recvbuf); + + } + shm_close_socket(socket); +} + +int main(int argc, char *argv[]) { + shm_init(512); + int port; + if (argc < 3) { + fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); + return 1; + } + + port = atoi(argv[2]); + + if (strcmp("server", argv[1]) == 0 ) { + server(port); + } + + if (strcmp("client", argv[1]) == 0) + client(port); + shm_destroy(); + // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); + return 0; +} \ No newline at end of file -- Gitblit v1.8.0