From f85c9b875b060681b51f57b15074ba1c7c9f5636 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期一, 20 七月 2020 11:10:02 +0800 Subject: [PATCH] update --- queue/shm_socket.c | 50 +++++++++++++++++--------------------------------- 1 files changed, 17 insertions(+), 33 deletions(-) diff --git a/queue/shm_socket.c b/queue/shm_socket.c index f896e33..4fb90cf 100644 --- a/queue/shm_socket.c +++ b/queue/shm_socket.c @@ -16,29 +16,19 @@ SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ; -void shm_init(int size) { - mem_pool_init(size); -} - -void shm_destroy() { - mem_pool_destroy(); -} - -void shm_free(void *buf) { - free(buf); -} - shm_socket_t *shm_open_socket() { shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); socket->port = -1; socket->dispatch_thread = 0; + socket->status=SHM_CONN_CLOSED; return socket; } int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) { + socket->status = SHM_CONN_CLOSED; //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅 struct timespec timeout = {1, 0}; shm_msg_t close_msg; @@ -78,6 +68,7 @@ client_socket->remoteQueue->push_timeout(close_msg, &timeout); delete client_socket->remoteQueue; client_socket->remoteQueue=NULL; + delete client_socket->messageQueue; client_socket->messageQueue=NULL; socket->clientSocketMap->erase(iter); @@ -99,9 +90,7 @@ return _shm_close_socket(socket, true); } - - -int shm_bind(shm_socket_t * socket, int port) { +int shm_socket_bind(shm_socket_t * socket, int port) { shm_socket_t * _socket = (shm_socket_t *) socket; _socket -> port = port; return 0; @@ -123,9 +112,10 @@ socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); socket->clientSocketMap = new std::map<int, shm_socket_t* >; - + socket->status = SHM_CONN_LISTEN; pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket); + return 0; } @@ -133,16 +123,6 @@ shm_socket_t *client_socket; auto iter = socket->clientSocketMap->find(port); if( iter != socket->clientSocketMap->end() ) { - // client_socket= iter->second; - // if(client_socket->remoteQueue != NULL) { - // delete client_socket->remoteQueue; - // client_socket->remoteQueue = NULL; - // } - // if(client_socket->messageQueue != NULL) { - // delete client_socket->messageQueue; - // client_socket->messageQueue = NULL; - // } - socket->clientSocketMap->erase(iter); } //free((void *)client_socket); @@ -172,12 +152,11 @@ case SHM_COMMON_MSG : iter = socket->clientSocketMap->find(src.port); - print_msg("_server_run_msg_rev find before", src); if( iter != socket->clientSocketMap->end()) { client_socket= iter->second; - print_msg("_server_run_msg_rev push before", src); + // print_msg("_server_run_msg_rev push before", src); client_socket->messageQueue->push_timeout(src, &timeout); - print_msg("_server_run_msg_rev push after", src); + // print_msg("_server_run_msg_rev push after", src); } break; @@ -221,7 +200,7 @@ /* * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰� */ - //鍙戦�乷pen_reply + //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰 struct timespec timeout = {1, 0}; shm_msg_t msg; msg.port = socket->port; @@ -230,6 +209,7 @@ if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) { + client_socket->status = SHM_CONN_ESTABLISHED; return client_socket; } else { err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触"); @@ -274,9 +254,9 @@ //鎺ュ彈open reply if(socket->queue->pop(msg)) { - // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡 + // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺� if(msg.type == SHM_SOCKET_OPEN_REPLY) { - + socket->status = SHM_CONN_ESTABLISHED; pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); } else { err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!"); @@ -325,8 +305,12 @@ -int shm_send(shm_socket_t *socket, void *buf, int size) { +int shm_send(shm_socket_t *socket, const void *buf, const int size) { // hashtable_t *hashtable = mm_get_hashtable(); + // if(socket->remoteQueue == NULL) { + // err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!"); + // return -1; + // } shm_msg_t dest; dest.type=SHM_COMMON_MSG; dest.port = socket->port; -- Gitblit v1.8.0