From 7032fedd41386f8a0b779d234620b473d978f889 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 17 七月 2020 17:43:18 +0800 Subject: [PATCH] req_rep finished --- queue/shm_socket.c | 39 ++++++++++----------------------------- 1 files changed, 10 insertions(+), 29 deletions(-) diff --git a/queue/shm_socket.c b/queue/shm_socket.c index f896e33..e0bdac6 100644 --- a/queue/shm_socket.c +++ b/queue/shm_socket.c @@ -16,29 +16,19 @@ SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ; -void shm_init(int size) { - mem_pool_init(size); -} - -void shm_destroy() { - mem_pool_destroy(); -} - -void shm_free(void *buf) { - free(buf); -} - shm_socket_t *shm_open_socket() { shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); socket->port = -1; socket->dispatch_thread = 0; + socket->status=SHM_CONN_CLOSED; return socket; } int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) { + socket->status = SHM_CONN_CLOSED; //缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅 struct timespec timeout = {1, 0}; shm_msg_t close_msg; @@ -101,7 +91,7 @@ -int shm_bind(shm_socket_t * socket, int port) { +int shm_soket_bind(shm_socket_t * socket, int port) { shm_socket_t * _socket = (shm_socket_t *) socket; _socket -> port = port; return 0; @@ -126,6 +116,7 @@ pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket); + socket->status = SHM_CONN_LISTEN; return 0; } @@ -133,16 +124,6 @@ shm_socket_t *client_socket; auto iter = socket->clientSocketMap->find(port); if( iter != socket->clientSocketMap->end() ) { - // client_socket= iter->second; - // if(client_socket->remoteQueue != NULL) { - // delete client_socket->remoteQueue; - // client_socket->remoteQueue = NULL; - // } - // if(client_socket->messageQueue != NULL) { - // delete client_socket->messageQueue; - // client_socket->messageQueue = NULL; - // } - socket->clientSocketMap->erase(iter); } //free((void *)client_socket); @@ -172,12 +153,11 @@ case SHM_COMMON_MSG : iter = socket->clientSocketMap->find(src.port); - print_msg("_server_run_msg_rev find before", src); if( iter != socket->clientSocketMap->end()) { client_socket= iter->second; - print_msg("_server_run_msg_rev push before", src); + // print_msg("_server_run_msg_rev push before", src); client_socket->messageQueue->push_timeout(src, &timeout); - print_msg("_server_run_msg_rev push after", src); + // print_msg("_server_run_msg_rev push after", src); } break; @@ -221,7 +201,7 @@ /* * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰� */ - //鍙戦�乷pen_reply + //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰 struct timespec timeout = {1, 0}; shm_msg_t msg; msg.port = socket->port; @@ -230,6 +210,7 @@ if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) { + client_socket->status = SHM_CONN_ESTABLISHED; return client_socket; } else { err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触"); @@ -274,9 +255,9 @@ //鎺ュ彈open reply if(socket->queue->pop(msg)) { - // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡 + // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺� if(msg.type == SHM_SOCKET_OPEN_REPLY) { - + socket->status = SHM_CONN_ESTABLISHED; pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); } else { err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!"); -- Gitblit v1.8.0