From c1f1446058dbedd9be9b9561e6ba435e0cd15bbc Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 17 七月 2020 11:01:01 +0800 Subject: [PATCH] update --- queue/socket.c | 75 +++++++++++++++++++++++++++++++++---- 1 files changed, 66 insertions(+), 9 deletions(-) diff --git a/queue/socket.c b/queue/socket.c index a52d532..29b94d4 100644 --- a/queue/socket.c +++ b/queue/socket.c @@ -6,7 +6,7 @@ void print_msg(char *head, shm_msg_t& msg) { - err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); + //err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); } void * _server_run_msg_rev(void* _socket); @@ -158,7 +158,9 @@ shm_msg_t src; shm_socket_t *client_socket; std::map<int, shm_socket_t* >::iterator iter; + while(socket->queue->pop(src)) { + switch (src.type) { case SHM_SOCKET_OPEN : socket->acceptQueue->push_timeout(src, &timeout); @@ -167,10 +169,14 @@ _server_close_conn_to_client(socket, src.port); break; case SHM_COMMON_MSG : + iter = socket->clientSocketMap->find(src.port); + print_msg("_server_run_msg_rev find before", src); if( iter != socket->clientSocketMap->end()) { client_socket= iter->second; + print_msg("_server_run_msg_rev push before", src); client_socket->messageQueue->push_timeout(src, &timeout); + print_msg("_server_run_msg_rev push after", src); } break; @@ -186,6 +192,10 @@ +/** + * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹� + * +*/ shm_socket_t* shm_accept(shm_socket_t* socket) { hashtable_t *hashtable = mm_get_hashtable(); @@ -207,10 +217,29 @@ socket->clientSocketMap->insert({client_port, client_socket}); - return client_socket; + /* + * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰� + */ + //鍙戦�乷pen_reply + struct timespec timeout = {1, 0}; + shm_msg_t msg; + msg.port = socket->port; + msg.size = 0; + msg.type = SHM_SOCKET_OPEN_REPLY; + + if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) + { + return client_socket; + } else { + err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触"); + return NULL; + } + + } else { err_exit(errno, "shm_accept"); } + return NULL; } @@ -230,17 +259,32 @@ } socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); - socket->remoteQueue = new SHMQueue<shm_msg_t>(port, 0); + socket->remoteQueue = _attach_remote_queue(port); socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + + + //鍙戦�乷pen璇锋眰 struct timespec timeout = {1, 0}; + shm_msg_t msg; + msg.port = socket->port; + msg.size = 0; + msg.type=SHM_SOCKET_OPEN; + socket->remoteQueue->push_timeout(msg, &timeout); - shm_msg_t open_msg; - open_msg.port = socket->port; - open_msg.size = 0; - open_msg.type=SHM_SOCKET_OPEN; - socket->remoteQueue->push_timeout(open_msg, &timeout); + //鎺ュ彈open reply + if(socket->queue->pop(msg)) { + // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡 + if(msg.type == SHM_SOCKET_OPEN_REPLY) { - pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); + pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); + } else { + err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!"); + } + + } else { + err_exit(0, "connect failted!"); + } + return 0; } @@ -288,7 +332,14 @@ dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); + + // struct timeval time; + // gettimeofday(&time, NULL); +//err_msg(0, "%d %d=======>push befor %d", time.tv_sec, time.tv_usec, socket->port); if(socket->remoteQueue->push(dest)) { + + //gettimeofday(&time, NULL); +//err_msg(0, "%d %d=======>push after %d", time.tv_sec, time.tv_usec, socket->port); return 0; } else { err_msg(errno, "connection has been closed!"); @@ -300,7 +351,13 @@ int shm_recv(shm_socket_t* socket, void **buf, int *size) { shm_msg_t src; + +// struct timeval time; +// gettimeofday(&time, NULL); +// err_msg(0, "%d %d=======>pop befor %d", time.tv_sec, time.tv_usec, socket->port); if (socket->messageQueue->pop(src)) { +// gettimeofday(&time, NULL); +// err_msg(0, "%d %d=======>pop after %d", time.tv_sec, time.tv_usec, socket->port); void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; -- Gitblit v1.8.0