From 91ec036cace39fd5b5f04644f6bced1f477005e0 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期二, 21 七月 2020 19:33:28 +0800 Subject: [PATCH] update --- src/socket/shm_socket.c | 678 ++++++++++++++++++++++++++++++++------------------------ 1 files changed, 388 insertions(+), 290 deletions(-) diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c index 4fb90cf..260cdc2 100644 --- a/src/socket/shm_socket.c +++ b/src/socket/shm_socket.c @@ -1,4 +1,5 @@ #include "shm_socket.h" +#include "hashtable.h" #include "logger_factory.h" #include <map> @@ -14,11 +15,16 @@ void * _client_run_msg_rev(void* _socket); +int _shm_close_dgram_socket(shm_socket_t *socket); + + +int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); + SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ; -shm_socket_t *shm_open_socket() { +shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); - + socket->socket_type = socket_type; socket->port = -1; socket->dispatch_thread = 0; socket->status=SHM_CONN_CLOSED; @@ -26,8 +32,381 @@ return socket; } +int shm_close_socket(shm_socket_t *socket) { + switch(socket->socket_type) { + case SHM_SOCKET_STREAM: + return _shm_close_stream_socket(socket, true); + case SHM_SOCKET_DGRAM: + return _shm_close_dgram_socket(socket); + default: + return -1; + } + return -1; + +} -int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) { +int shm_socket_bind(shm_socket_t * socket, int port) { + socket -> port = port; + return 0; +} + +int shm_listen(shm_socket_t* socket) { + + if(socket->socket_type != SHM_SOCKET_STREAM) { + err_exit(0, "can not invoke shm_listen method with a socket which is not a SHM_SOCKET_STREAM socket"); + } + + int port; + hashtable_t *hashtable = mm_get_hashtable(); + if(socket -> port == -1) { + port = hashtable_alloc_key(hashtable); + socket -> port = port; + } else { + + if(hashtable_get(hashtable, socket->port)!= NULL) { + err_exit(0, "key %d has already been in used!", socket->port); + } + } + + socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + socket->clientSocketMap = new std::map<int, shm_socket_t* >; + socket->status = SHM_CONN_LISTEN; + pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket); + + + return 0; +} + + +/** + * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹� + * +*/ +shm_socket_t* shm_accept(shm_socket_t* socket) { + if(socket->socket_type != SHM_SOCKET_STREAM) { + err_exit(0, "can not invoke shm_accept method with a socket which is not a SHM_SOCKET_STREAM socket"); + } + hashtable_t *hashtable = mm_get_hashtable(); + int client_port; + shm_socket_t *client_socket; + shm_msg_t src; + + if (socket->acceptQueue->pop(src) ) { + +// print_msg("===accept:", src); + client_port = src.port; + client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); + client_socket->port = socket->port; + // client_socket->queue= socket->queue; + //鍒濆鍖栨秷鎭痲ueue + client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + //杩炴帴鍒板鏂筿ueue + client_socket->remoteQueue = _attach_remote_queue(client_port); + + socket->clientSocketMap->insert({client_port, client_socket}); + + /* + * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰� + */ + //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰 + struct timespec timeout = {1, 0}; + shm_msg_t msg; + msg.port = socket->port; + msg.size = 0; + msg.type = SHM_SOCKET_OPEN_REPLY; + + if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) + { + client_socket->status = SHM_CONN_ESTABLISHED; + return client_socket; + } else { + err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触"); + return NULL; + } + + + } else { + err_exit(errno, "shm_accept"); + } + return NULL; + +} + + +int shm_connect(shm_socket_t* socket, int port) { + if(socket->socket_type != SHM_SOCKET_STREAM) { + err_exit(0, "can not invoke shm_connect method with a socket which is not a SHM_SOCKET_STREAM socket"); + } + hashtable_t *hashtable = mm_get_hashtable(); + if(hashtable_get(hashtable, port)== NULL) { + err_exit(0, "shm_connect锛歝onnect at port %d failed!", port); + } + + if(socket->port == -1) { + socket->port = hashtable_alloc_key(hashtable); + } else { + + if(hashtable_get(hashtable, socket->port)!= NULL) { + err_exit(0, "key %d has already been in used!", socket->port); + } + } + + socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + socket->remoteQueue = _attach_remote_queue(port); + socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + + + //鍙戦�乷pen璇锋眰 + struct timespec timeout = {1, 0}; + shm_msg_t msg; + msg.port = socket->port; + msg.size = 0; + msg.type=SHM_SOCKET_OPEN; + socket->remoteQueue->push_timeout(msg, &timeout); + + //鎺ュ彈open reply + if(socket->queue->pop(msg)) { + // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺� + if(msg.type == SHM_SOCKET_OPEN_REPLY) { + socket->status = SHM_CONN_ESTABLISHED; + pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); + } else { + err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!"); + } + + } else { + err_exit(0, "connect failted!"); + } + + return 0; +} + + +int shm_send(shm_socket_t *socket, const void *buf, const int size) { + if(socket->socket_type != SHM_SOCKET_STREAM) { + err_exit(0, "can not invoke shm_send method with a socket which is not a SHM_SOCKET_STREAM socket"); + } + // hashtable_t *hashtable = mm_get_hashtable(); + // if(socket->remoteQueue == NULL) { + // err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!"); + // return -1; + // } + shm_msg_t dest; + dest.type=SHM_COMMON_MSG; + dest.port = socket->port; + dest.size = size; + dest.buf = mm_malloc(size); + memcpy(dest.buf, buf, size); + + + if(socket->remoteQueue->push(dest)) { + return 0; + } else { + err_msg(errno, "connection has been closed!"); + return -1; + } +} + + +int shm_recv(shm_socket_t* socket, void **buf, int *size) { + if(socket->socket_type != SHM_SOCKET_STREAM) { + err_exit(0, "can not invoke shm_recv method with a socket which is not a SHM_SOCKET_STREAM socket"); + } + shm_msg_t src; + + if (socket->messageQueue->pop(src)) { + void * _buf = malloc(src.size); + memcpy(_buf, src.buf, src.size); + *buf = _buf; + *size = src.size; + mm_free(src.buf); + return 0; + } else { + return -1; + } + +} + + + +// 鐭繛鎺ユ柟寮忓彂閫� +int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port) { + hashtable_t *hashtable = mm_get_hashtable(); + + if(socket->queue == NULL) { + if(socket->port == -1) { + socket->port = hashtable_alloc_key(hashtable); + } else { + + if(hashtable_get(hashtable, socket->port)!= NULL) { + err_exit(0, "key %d has already been in used!", socket->port); + } + } + + socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + } + if (port == socket->port) { + err_msg(0, "can not send to your self!"); + return -1; + } + + shm_msg_t dest; + dest.type=SHM_COMMON_MSG; + dest.port = socket->port; + dest.size = size; + dest.buf = mm_malloc(size); + memcpy(dest.buf, buf, size); + + SHMQueue<shm_msg_t> *remoteQueue = _attach_remote_queue(port); + if(remoteQueue->push(dest)) { + delete remoteQueue; + return 0; + } else { + delete remoteQueue; + err_msg(errno, "sendto port %d failed!", port); + return -1; + } +} + + +// 鐭繛鎺ユ柟寮忔帴鍙� +int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port){ + hashtable_t *hashtable = mm_get_hashtable(); + if(socket->queue == NULL) { + if(socket->port == -1) { + socket->port = hashtable_alloc_key(hashtable); + } else { + + if(hashtable_get(hashtable, socket->port)!= NULL) { + err_exit(0, "key %d has already been in used!", socket->port); + } + } + + socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); + } + + shm_msg_t src; +//logger.debug("shm_recvfrom pop before"); + if (socket->queue->pop(src)) { + void * _buf = malloc(src.size); + memcpy(_buf, src.buf, src.size); + *buf = _buf; + *size = src.size; + *port = src.port; + mm_free(src.buf); +//logger.debug("shm_recvfrom pop after"); + return 0; + } else { + return -1; + } +} + + +/** + * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑� + */ +SHMQueue<shm_msg_t> * _attach_remote_queue(int port) { + hashtable_t *hashtable = mm_get_hashtable(); + if(hashtable_get(hashtable, port)== NULL) { + err_exit(0, "_remote_queue_attach锛歝onnet at port %d failed!", port); + return NULL; + } + + SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0); + return queue; +} + + + + + +void _server_close_conn_to_client(shm_socket_t* socket, int port) { + shm_socket_t *client_socket; + auto iter = socket->clientSocketMap->find(port); + if( iter != socket->clientSocketMap->end() ) { + socket->clientSocketMap->erase(iter); + } + //free((void *)client_socket); + +} + +/** + * server绔悇绉嶇被鍨嬫秷鎭紙锛夊湪杩欓噷杩涚▼鍒嗘嫞 + */ +void * _server_run_msg_rev(void* _socket) { + pthread_detach(pthread_self()); + shm_socket_t* socket = (shm_socket_t*) _socket; + struct timespec timeout = {1, 0}; + shm_msg_t src; + shm_socket_t *client_socket; + std::map<int, shm_socket_t* >::iterator iter; + + while(socket->queue->pop(src)) { + + switch (src.type) { + case SHM_SOCKET_OPEN : + socket->acceptQueue->push_timeout(src, &timeout); + break; + case SHM_SOCKET_CLOSE : + _server_close_conn_to_client(socket, src.port); + break; + case SHM_COMMON_MSG : + + iter = socket->clientSocketMap->find(src.port); + if( iter != socket->clientSocketMap->end()) { + client_socket= iter->second; + // print_msg("_server_run_msg_rev push before", src); + client_socket->messageQueue->push_timeout(src, &timeout); + // print_msg("_server_run_msg_rev push after", src); + } + + break; + + default: + err_msg(0, "socket.__shm_rev__: undefined message type."); + } + } + + return NULL; +} + + + +void _client_close_conn_to_server(shm_socket_t* socket) { + + _shm_close_stream_socket(socket, false); +} + + +/** + * client绔殑鍚勭绫诲瀷娑堟伅锛堬級鍦ㄨ繖閲岃繘绋嬪垎鎷� + */ +void * _client_run_msg_rev(void* _socket) { + pthread_detach(pthread_self()); + shm_socket_t* socket = (shm_socket_t*) _socket; + struct timespec timeout = {1, 0}; + shm_msg_t src; + + while(socket->queue->pop(src)) { + switch (src.type) { + + case SHM_SOCKET_CLOSE : + _client_close_conn_to_server(socket); + break; + case SHM_COMMON_MSG : + socket->messageQueue->push_timeout(src, &timeout); + break; + default: + err_msg(0, "socket.__shm_rev__: undefined message type."); + } + } + + return NULL; +} + + +int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote) { socket->status = SHM_CONN_CLOSED; //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅 struct timespec timeout = {1, 0}; @@ -86,295 +465,14 @@ } -int shm_close_socket(shm_socket_t *socket) { - return _shm_close_socket(socket, true); -} - -int shm_socket_bind(shm_socket_t * socket, int port) { - shm_socket_t * _socket = (shm_socket_t *) socket; - _socket -> port = port; +int _shm_close_dgram_socket(shm_socket_t *socket){ + if(socket->queue != NULL) { + delete socket->queue; + socket->queue = NULL; + } + free(socket); return 0; } - -int shm_listen(shm_socket_t* socket) { - int port; - hashtable_t *hashtable = mm_get_hashtable(); - if(socket -> port == -1) { - port = hashtable_alloc_key(hashtable); - socket -> port = port; - } else { - - if(hashtable_get(hashtable, socket->port)!= NULL) { - err_exit(0, "key %d has already been in used!", socket->port); - } - } - - socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); - socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); - socket->clientSocketMap = new std::map<int, shm_socket_t* >; - socket->status = SHM_CONN_LISTEN; - pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket); - - - return 0; -} - -void _server_close_conn_to_client(shm_socket_t* socket, int port) { - shm_socket_t *client_socket; - auto iter = socket->clientSocketMap->find(port); - if( iter != socket->clientSocketMap->end() ) { - socket->clientSocketMap->erase(iter); - } - //free((void *)client_socket); - -} - -/** - * server绔悇绉嶇被鍨嬫秷鎭紙锛夊湪杩欓噷杩涚▼鍒嗘嫞 - */ -void * _server_run_msg_rev(void* _socket) { - pthread_detach(pthread_self()); - shm_socket_t* socket = (shm_socket_t*) _socket; - struct timespec timeout = {1, 0}; - shm_msg_t src; - shm_socket_t *client_socket; - std::map<int, shm_socket_t* >::iterator iter; - - while(socket->queue->pop(src)) { - - switch (src.type) { - case SHM_SOCKET_OPEN : - socket->acceptQueue->push_timeout(src, &timeout); - break; - case SHM_SOCKET_CLOSE : - _server_close_conn_to_client(socket, src.port); - break; - case SHM_COMMON_MSG : - - iter = socket->clientSocketMap->find(src.port); - if( iter != socket->clientSocketMap->end()) { - client_socket= iter->second; - // print_msg("_server_run_msg_rev push before", src); - client_socket->messageQueue->push_timeout(src, &timeout); - // print_msg("_server_run_msg_rev push after", src); - } - - break; - - default: - err_msg(0, "socket.__shm_rev__: undefined message type."); - } - } - - return NULL; -} - - - - -/** - * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹� - * -*/ - -shm_socket_t* shm_accept(shm_socket_t* socket) { - hashtable_t *hashtable = mm_get_hashtable(); - int client_port; - shm_socket_t *client_socket; - shm_msg_t src; - - if (socket->acceptQueue->pop(src) ) { - -// print_msg("===accept:", src); - client_port = src.port; - client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); - client_socket->port = socket->port; - // client_socket->queue= socket->queue; - //鍒濆鍖栨秷鎭痲ueue - client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); - //杩炴帴鍒板鏂筿ueue - client_socket->remoteQueue = _attach_remote_queue(client_port); - - socket->clientSocketMap->insert({client_port, client_socket}); - - /* - * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰� - */ - //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰 - struct timespec timeout = {1, 0}; - shm_msg_t msg; - msg.port = socket->port; - msg.size = 0; - msg.type = SHM_SOCKET_OPEN_REPLY; - - if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) - { - client_socket->status = SHM_CONN_ESTABLISHED; - return client_socket; - } else { - err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触"); - return NULL; - } - - - } else { - err_exit(errno, "shm_accept"); - } - return NULL; - -} - - -int shm_connect(shm_socket_t* socket, int port) { - hashtable_t *hashtable = mm_get_hashtable(); - if(hashtable_get(hashtable, port)== NULL) { - err_exit(0, "shm_connect锛歝onnect at port %d failed!", port); - } - if(socket->port == -1) { - socket->port = hashtable_alloc_key(hashtable); - } else { - - if(hashtable_get(hashtable, socket->port)!= NULL) { - err_exit(0, "key %d has already been in used!", socket->port); - } - } - - socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); - socket->remoteQueue = _attach_remote_queue(port); - socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); - - - //鍙戦�乷pen璇锋眰 - struct timespec timeout = {1, 0}; - shm_msg_t msg; - msg.port = socket->port; - msg.size = 0; - msg.type=SHM_SOCKET_OPEN; - socket->remoteQueue->push_timeout(msg, &timeout); - - //鎺ュ彈open reply - if(socket->queue->pop(msg)) { - // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺� - if(msg.type == SHM_SOCKET_OPEN_REPLY) { - socket->status = SHM_CONN_ESTABLISHED; - pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); - } else { - err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!"); - } - - } else { - err_exit(0, "connect failted!"); - } - - return 0; -} - -void _client_close_conn_to_server(shm_socket_t* socket) { - - _shm_close_socket(socket, false); -} - - -/** - * client绔殑鍚勭绫诲瀷娑堟伅锛堬級鍦ㄨ繖閲岃繘绋嬪垎鎷� - */ -void * _client_run_msg_rev(void* _socket) { - pthread_detach(pthread_self()); - shm_socket_t* socket = (shm_socket_t*) _socket; - struct timespec timeout = {1, 0}; - shm_msg_t src; - - while(socket->queue->pop(src)) { - switch (src.type) { - - case SHM_SOCKET_CLOSE : - _client_close_conn_to_server(socket); - break; - case SHM_COMMON_MSG : - socket->messageQueue->push_timeout(src, &timeout); - break; - default: - err_msg(0, "socket.__shm_rev__: undefined message type."); - } - } - - return NULL; -} - - - - - -int shm_send(shm_socket_t *socket, const void *buf, const int size) { - // hashtable_t *hashtable = mm_get_hashtable(); - // if(socket->remoteQueue == NULL) { - // err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!"); - // return -1; - // } - shm_msg_t dest; - dest.type=SHM_COMMON_MSG; - dest.port = socket->port; - dest.size = size; - dest.buf = mm_malloc(size); - memcpy(dest.buf, buf, size); - - // struct timeval time; - // gettimeofday(&time, NULL); -//err_msg(0, "%d %d=======>push befor %d", time.tv_sec, time.tv_usec, socket->port); - if(socket->remoteQueue->push(dest)) { - - //gettimeofday(&time, NULL); -//err_msg(0, "%d %d=======>push after %d", time.tv_sec, time.tv_usec, socket->port); - return 0; - } else { - err_msg(errno, "connection has been closed!"); - return -1; - } - - -} - -int shm_recv(shm_socket_t* socket, void **buf, int *size) { - shm_msg_t src; - -// struct timeval time; -// gettimeofday(&time, NULL); -// err_msg(0, "%d %d=======>pop befor %d", time.tv_sec, time.tv_usec, socket->port); - if (socket->messageQueue->pop(src)) { -// gettimeofday(&time, NULL); -// err_msg(0, "%d %d=======>pop after %d", time.tv_sec, time.tv_usec, socket->port); - void * _buf = malloc(src.size); - memcpy(_buf, src.buf, src.size); - *buf = _buf; - *size = src.size; - mm_free(src.buf); - return 0; - } else { - return -1; - } - - - -} - - -/** - * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑� - */ -SHMQueue<shm_msg_t> * _attach_remote_queue(int port) { - hashtable_t *hashtable = mm_get_hashtable(); - if(hashtable_get(hashtable, port)== NULL) { - err_exit(0, "_remote_queue_attach锛歝onnet at port %d failed!", port); - return NULL; - } - - SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0); - return queue; -} - - - - -- Gitblit v1.8.0