From c1f1446058dbedd9be9b9561e6ba435e0cd15bbc Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 17 七月 2020 11:01:01 +0800 Subject: [PATCH] update --- queue/socket.c | 447 ++++++++++++++++++++++++++++++++++++++----------------- 1 files changed, 304 insertions(+), 143 deletions(-) diff --git a/queue/socket.c b/queue/socket.c index cc9a08a..29b94d4 100644 --- a/queue/socket.c +++ b/queue/socket.c @@ -1,43 +1,19 @@ -#include "usg_common.h" -#include "usg_typedef.h" -#include "shm_queue.h" -#include "shm_allocator.h" -#include "mem_pool.h" -#include "hashtable.h" -#include "sem_util.h" #include "socket.h" #include <map> -enum shm_msg_type_t -{ - SHM_SOCKET_OPEN = 1, - SHM_SOKET_CLOSE = 2, - SHM_COMMON_MSG = 3 - -}; - -typedef struct shm_msg_t { - int port; - shm_msg_type_t type; - size_t size; - void * buf; - -} shm_msg_t; - -typedef struct shm_socket_t { - int port; - shm_mod_t mod; - SHMQueue<shm_msg_t> *queue; - std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap; - int slots; - int items; - int is_server; - -} shm_socket_t; +void print_msg(char *head, shm_msg_t& msg) { + //err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); +} + +void * _server_run_msg_rev(void* _socket); + +void * _client_run_msg_rev(void* _socket); + +SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ; void shm_init(int size) { mem_pool_init(size); @@ -51,171 +27,337 @@ free(buf); } -void *shm_open_socket(int mod) { - shm_socket_t *socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); - socket->remoteQueueMap = new std::map<int, SHMQueue<shm_msg_t>* >; - socket->port = -1; - socket->mod = (shm_mod_t)mod; -printf("mod===%d\n", socket->mod); - socket->is_server = 0; - if (mod == REQ_REP) { - socket->slots = SemUtil::get(IPC_PRIVATE, 1); - socket->items = SemUtil::get(IPC_PRIVATE, 0); - } +shm_socket_t *shm_open_socket() { + shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); - return (void *)socket; + socket->port = -1; + socket->dispatch_thread = 0; + + return socket; } -int shm_close_socket(void *socket) { - shm_socket_t * _socket = (shm_socket_t *) socket; - delete _socket->queue; +int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) { + //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅 + struct timespec timeout = {1, 0}; + shm_msg_t close_msg; - std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; - for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) { - delete iter->second; - } - delete _socket->remoteQueueMap; - - if (_socket->mod == REQ_REP) { - SemUtil::remove(_socket->slots); - SemUtil::remove(_socket->items); + close_msg.port = socket->port; + close_msg.size = 0; + close_msg.type=SHM_SOCKET_CLOSE; + if(notifyRemote && socket->remoteQueue != NULL) { + socket->remoteQueue->push_timeout(close_msg, &timeout); } + if(socket->queue != NULL) { + delete socket->queue; + socket->queue = NULL; + } + + if(socket->remoteQueue != NULL) { + delete socket->remoteQueue; + socket->remoteQueue = NULL; + } + + if(socket->messageQueue != NULL) { + delete socket->messageQueue; + socket->messageQueue = NULL; + } + + if(socket->acceptQueue != NULL) { + delete socket->acceptQueue; + socket->acceptQueue = NULL; + } + + if(socket->clientSocketMap != NULL) { + shm_socket_t *client_socket; + for(auto iter = socket->clientSocketMap->begin(); iter != socket->clientSocketMap->end(); iter++) { + client_socket= iter->second; + + client_socket->remoteQueue->push_timeout(close_msg, &timeout); + delete client_socket->remoteQueue; + client_socket->remoteQueue=NULL; + delete client_socket->messageQueue; + client_socket->messageQueue=NULL; + socket->clientSocketMap->erase(iter); + free((void *)client_socket); + } + delete socket->clientSocketMap; + } + + + if(socket->dispatch_thread != 0) + pthread_cancel(socket->dispatch_thread); + free(socket); return 0; } +int shm_close_socket(shm_socket_t *socket) { + return _shm_close_socket(socket, true); +} -int shm_bind(void* socket, int port) { + + +int shm_bind(shm_socket_t * socket, int port) { shm_socket_t * _socket = (shm_socket_t *) socket; _socket -> port = port; return 0; } -int shm_listen(void* socket) { - shm_socket_t * _socket = (shm_socket_t *) socket; - _socket->is_server = 1; +int shm_listen(shm_socket_t* socket) { int port; hashtable_t *hashtable = mm_get_hashtable(); - if(_socket -> port == -1) { + if(socket -> port == -1) { port = hashtable_alloc_key(hashtable); - _socket -> port = port; + socket -> port = port; } else { - if(hashtable_get(hashtable, _socket->port)!= NULL) { - err_exit(0, "key %d has already been in used!", _socket->port); + if(hashtable_get(hashtable, socket->port)!= NULL) { + err_exit(0, "key %d has already been in used!", socket->port); } } - _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16); + socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + socket->clientSocketMap = new std::map<int, shm_socket_t* >; + + pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket); + return 0; } - -static int __shm_rev(shm_socket_t* _socket, void **buf, int *size) { - shm_msg_t src; - - std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; - bool rv = _socket->queue->pop(src); - - if (rv) { - if(src.type=="open") - - - - if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) { - if(_socket->mod == REQ_REP) - SemUtil::dec(_socket->slots); - - remoteQueueMap->insert({src.port, new SHMQueue<shm_msg_t>(src.port, 0)}); - - if(_socket->mod == REQ_REP) - SemUtil::inc(_socket->items); - } +void _server_close_conn_to_client(shm_socket_t* socket, int port) { + shm_socket_t *client_socket; + auto iter = socket->clientSocketMap->find(port); + if( iter != socket->clientSocketMap->end() ) { + // client_socket= iter->second; + // if(client_socket->remoteQueue != NULL) { + // delete client_socket->remoteQueue; + // client_socket->remoteQueue = NULL; + // } + // if(client_socket->messageQueue != NULL) { + // delete client_socket->messageQueue; + // client_socket->messageQueue = NULL; + // } - void * _buf = malloc(src.size); - memcpy(_buf, src.buf, src.size); - *buf = _buf; - *size = src.size; - mm_free(src.buf); - return 0; - } else { - return 1; + socket->clientSocketMap->erase(iter); } + //free((void *)client_socket); + } -int shm_connect(void* socket, int port) { - shm_socket_t * _socket = (shm_socket_t *) socket; +/** + * server绔悇绉嶇被鍨嬫秷鎭紙锛夊湪杩欓噷杩涚▼鍒嗘嫞 + */ +void * _server_run_msg_rev(void* _socket) { + pthread_detach(pthread_self()); + shm_socket_t* socket = (shm_socket_t*) _socket; + struct timespec timeout = {1, 0}; + shm_msg_t src; + shm_socket_t *client_socket; + std::map<int, shm_socket_t* >::iterator iter; + + while(socket->queue->pop(src)) { + + switch (src.type) { + case SHM_SOCKET_OPEN : + socket->acceptQueue->push_timeout(src, &timeout); + break; + case SHM_SOCKET_CLOSE : + _server_close_conn_to_client(socket, src.port); + break; + case SHM_COMMON_MSG : + + iter = socket->clientSocketMap->find(src.port); + print_msg("_server_run_msg_rev find before", src); + if( iter != socket->clientSocketMap->end()) { + client_socket= iter->second; + print_msg("_server_run_msg_rev push before", src); + client_socket->messageQueue->push_timeout(src, &timeout); + print_msg("_server_run_msg_rev push after", src); + } + + break; + + default: + err_msg(0, "socket.__shm_rev__: undefined message type."); + } + } + + return NULL; +} + + + + +/** + * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹� + * +*/ + +shm_socket_t* shm_accept(shm_socket_t* socket) { + hashtable_t *hashtable = mm_get_hashtable(); + int client_port; + shm_socket_t *client_socket; + shm_msg_t src; + + if (socket->acceptQueue->pop(src) ) { + +// print_msg("===accept:", src); + client_port = src.port; + client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); + client_socket->port = socket->port; + // client_socket->queue= socket->queue; + //鍒濆鍖栨秷鎭痲ueue + client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + //杩炴帴鍒板鏂筿ueue + client_socket->remoteQueue = _attach_remote_queue(client_port); + + socket->clientSocketMap->insert({client_port, client_socket}); + + /* + * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰� + */ + //鍙戦�乷pen_reply + struct timespec timeout = {1, 0}; + shm_msg_t msg; + msg.port = socket->port; + msg.size = 0; + msg.type = SHM_SOCKET_OPEN_REPLY; + + if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) + { + return client_socket; + } else { + err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触"); + return NULL; + } + + + } else { + err_exit(errno, "shm_accept"); + } + return NULL; + +} + + +int shm_connect(shm_socket_t* socket, int port) { hashtable_t *hashtable = mm_get_hashtable(); if(hashtable_get(hashtable, port)== NULL) { err_exit(0, "shm_connect锛歝onnect at port %d failed!", port); } - if(_socket -> port == -1) { - _socket -> port = hashtable_alloc_key(hashtable); + if(socket->port == -1) { + socket->port = hashtable_alloc_key(hashtable); } else { - if(hashtable_get(hashtable, _socket->port)!= NULL) { - err_exit(0, "key %d has already been in used!", _socket->port); + if(hashtable_get(hashtable, socket->port)!= NULL) { + err_exit(0, "key %d has already been in used!", socket->port); } } - _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16); - _socket->remoteQueueMap->insert({port, new SHMQueue<shm_msg_t>(port, 0)}); + socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + socket->remoteQueue = _attach_remote_queue(port); + socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + + + //鍙戦�乷pen璇锋眰 + struct timespec timeout = {1, 0}; + shm_msg_t msg; + msg.port = socket->port; + msg.size = 0; + msg.type=SHM_SOCKET_OPEN; + socket->remoteQueue->push_timeout(msg, &timeout); + + //鎺ュ彈open reply + if(socket->queue->pop(msg)) { + // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡 + if(msg.type == SHM_SOCKET_OPEN_REPLY) { + + pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); + } else { + err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!"); + } + + } else { + err_exit(0, "connect failted!"); + } + return 0; } -int shm_send(void *socket, void *buf, int size) { - shm_socket_t * _socket = (shm_socket_t *) socket; - hashtable_t *hashtable = mm_get_hashtable(); +void _client_close_conn_to_server(shm_socket_t* socket) { + + _shm_close_socket(socket, false); +} + + +/** + * client绔殑鍚勭绫诲瀷娑堟伅锛堬級鍦ㄨ繖閲岃繘绋嬪垎鎷� + */ +void * _client_run_msg_rev(void* _socket) { + pthread_detach(pthread_self()); + shm_socket_t* socket = (shm_socket_t*) _socket; + struct timespec timeout = {1, 0}; + shm_msg_t src; + + while(socket->queue->pop(src)) { + switch (src.type) { + + case SHM_SOCKET_CLOSE : + _client_close_conn_to_server(socket); + break; + case SHM_COMMON_MSG : + socket->messageQueue->push_timeout(src, &timeout); + break; + default: + err_msg(0, "socket.__shm_rev__: undefined message type."); + } + } + + return NULL; +} + + + + + +int shm_send(shm_socket_t *socket, void *buf, int size) { + // hashtable_t *hashtable = mm_get_hashtable(); shm_msg_t dest; - dest.port = _socket->port; + dest.type=SHM_COMMON_MSG; + dest.port = socket->port; dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); - std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; - for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) { - if(hashtable_get(hashtable, iter->first)== NULL) { - err_msg(0, "shm_send锛歝onnect at port %d failed, the other part has been closed!", iter->first); - delete iter->second; - remoteQueueMap->erase(iter); - continue; - } - if(_socket->mod == REQ_REP && _socket->is_server == 1) - SemUtil::dec(_socket->items); + // struct timeval time; + // gettimeofday(&time, NULL); +//err_msg(0, "%d %d=======>push befor %d", time.tv_sec, time.tv_usec, socket->port); + if(socket->remoteQueue->push(dest)) { - iter->second->push(dest); - - if( _socket->mod == REQ_REP && _socket->is_server == 1) { - delete iter->second; - remoteQueueMap->erase(iter); - SemUtil::inc(_socket->slots); - } - - + //gettimeofday(&time, NULL); +//err_msg(0, "%d %d=======>push after %d", time.tv_sec, time.tv_usec, socket->port); + return 0; + } else { + err_msg(errno, "connection has been closed!"); + return -1; } - return 0; + + } -int shm_recv(void* socket, void **buf, int *size) { - shm_socket_t * _socket = (shm_socket_t *) socket; +int shm_recv(shm_socket_t* socket, void **buf, int *size) { shm_msg_t src; - - std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap; - bool rv = _socket->queue->pop(src); - if (rv) { - if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) { - if(_socket->mod == REQ_REP) - SemUtil::dec(_socket->slots); - remoteQueueMap->insert({src.port, new SHMQueue<shm_msg_t>(src.port, 0)}); - - if(_socket->mod == REQ_REP) - SemUtil::inc(_socket->items); - } - +// struct timeval time; +// gettimeofday(&time, NULL); +// err_msg(0, "%d %d=======>pop befor %d", time.tv_sec, time.tv_usec, socket->port); + if (socket->messageQueue->pop(src)) { +// gettimeofday(&time, NULL); +// err_msg(0, "%d %d=======>pop after %d", time.tv_sec, time.tv_usec, socket->port); void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; @@ -223,11 +365,30 @@ mm_free(src.buf); return 0; } else { - return 1; + return -1; } + + + } +/** + * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑� + */ +SHMQueue<shm_msg_t> * _attach_remote_queue(int port) { + hashtable_t *hashtable = mm_get_hashtable(); + if(hashtable_get(hashtable, port)== NULL) { + err_exit(0, "_remote_queue_attach锛歝onnet at port %d failed!", port); + return NULL; + } + + SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0); + return queue; +} + + + -- Gitblit v1.8.0