From c1f1446058dbedd9be9b9561e6ba435e0cd15bbc Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 17 七月 2020 11:01:01 +0800 Subject: [PATCH] update --- queue/libshm_queue.a | 0 test/communication.c | 12 +++-- queue/include/socket.h | 5 +- queue/include/lock_free_queue.h | 4 +- queue/socket.c | 75 +++++++++++++++++++++++++++++++++---- test/communication | 0 6 files changed, 78 insertions(+), 18 deletions(-) diff --git a/queue/include/lock_free_queue.h b/queue/include/lock_free_queue.h index f9a4667..f34079f 100644 --- a/queue/include/lock_free_queue.h +++ b/queue/include/lock_free_queue.h @@ -78,7 +78,7 @@ int items; public: - int mutex; + // int mutex; LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); /// @brief destructor of the class. @@ -151,7 +151,7 @@ // std::cout << "LockFreeQueue init reference=" << reference << std::endl; slots = SemUtil::get(IPC_PRIVATE, qsize); items = SemUtil::get(IPC_PRIVATE, 0); - mutex = SemUtil::get(IPC_PRIVATE, 1); + // mutex = SemUtil::get(IPC_PRIVATE, 1); } template < diff --git a/queue/include/socket.h b/queue/include/socket.h index f51c021..b7e2807 100644 --- a/queue/include/socket.h +++ b/queue/include/socket.h @@ -28,8 +28,9 @@ enum shm_msg_type_t { SHM_SOCKET_OPEN = 1, - SHM_SOCKET_CLOSE = 2, - SHM_COMMON_MSG = 3 + SHM_SOCKET_OPEN_REPLY = 2, + SHM_SOCKET_CLOSE = 3, + SHM_COMMON_MSG = 4 }; diff --git a/queue/libshm_queue.a b/queue/libshm_queue.a index 553db5a..f1b3525 100644 --- a/queue/libshm_queue.a +++ b/queue/libshm_queue.a Binary files differ diff --git a/queue/socket.c b/queue/socket.c index a52d532..29b94d4 100644 --- a/queue/socket.c +++ b/queue/socket.c @@ -6,7 +6,7 @@ void print_msg(char *head, shm_msg_t& msg) { - err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); + //err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type); } void * _server_run_msg_rev(void* _socket); @@ -158,7 +158,9 @@ shm_msg_t src; shm_socket_t *client_socket; std::map<int, shm_socket_t* >::iterator iter; + while(socket->queue->pop(src)) { + switch (src.type) { case SHM_SOCKET_OPEN : socket->acceptQueue->push_timeout(src, &timeout); @@ -167,10 +169,14 @@ _server_close_conn_to_client(socket, src.port); break; case SHM_COMMON_MSG : + iter = socket->clientSocketMap->find(src.port); + print_msg("_server_run_msg_rev find before", src); if( iter != socket->clientSocketMap->end()) { client_socket= iter->second; + print_msg("_server_run_msg_rev push before", src); client_socket->messageQueue->push_timeout(src, &timeout); + print_msg("_server_run_msg_rev push after", src); } break; @@ -186,6 +192,10 @@ +/** + * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹� + * +*/ shm_socket_t* shm_accept(shm_socket_t* socket) { hashtable_t *hashtable = mm_get_hashtable(); @@ -207,10 +217,29 @@ socket->clientSocketMap->insert({client_port, client_socket}); - return client_socket; + /* + * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰� + */ + //鍙戦�乷pen_reply + struct timespec timeout = {1, 0}; + shm_msg_t msg; + msg.port = socket->port; + msg.size = 0; + msg.type = SHM_SOCKET_OPEN_REPLY; + + if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) + { + return client_socket; + } else { + err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触"); + return NULL; + } + + } else { err_exit(errno, "shm_accept"); } + return NULL; } @@ -230,17 +259,32 @@ } socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); - socket->remoteQueue = new SHMQueue<shm_msg_t>(port, 0); + socket->remoteQueue = _attach_remote_queue(port); socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + + + //鍙戦�乷pen璇锋眰 struct timespec timeout = {1, 0}; + shm_msg_t msg; + msg.port = socket->port; + msg.size = 0; + msg.type=SHM_SOCKET_OPEN; + socket->remoteQueue->push_timeout(msg, &timeout); - shm_msg_t open_msg; - open_msg.port = socket->port; - open_msg.size = 0; - open_msg.type=SHM_SOCKET_OPEN; - socket->remoteQueue->push_timeout(open_msg, &timeout); + //鎺ュ彈open reply + if(socket->queue->pop(msg)) { + // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡 + if(msg.type == SHM_SOCKET_OPEN_REPLY) { - pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); + pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); + } else { + err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!"); + } + + } else { + err_exit(0, "connect failted!"); + } + return 0; } @@ -288,7 +332,14 @@ dest.size = size; dest.buf = mm_malloc(size); memcpy(dest.buf, buf, size); + + // struct timeval time; + // gettimeofday(&time, NULL); +//err_msg(0, "%d %d=======>push befor %d", time.tv_sec, time.tv_usec, socket->port); if(socket->remoteQueue->push(dest)) { + + //gettimeofday(&time, NULL); +//err_msg(0, "%d %d=======>push after %d", time.tv_sec, time.tv_usec, socket->port); return 0; } else { err_msg(errno, "connection has been closed!"); @@ -300,7 +351,13 @@ int shm_recv(shm_socket_t* socket, void **buf, int *size) { shm_msg_t src; + +// struct timeval time; +// gettimeofday(&time, NULL); +// err_msg(0, "%d %d=======>pop befor %d", time.tv_sec, time.tv_usec, socket->port); if (socket->messageQueue->pop(src)) { +// gettimeofday(&time, NULL); +// err_msg(0, "%d %d=======>pop after %d", time.tv_sec, time.tv_usec, socket->port); void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); *buf = _buf; diff --git a/test/communication b/test/communication index 3a0e6b1..1f9adb7 100755 --- a/test/communication +++ b/test/communication Binary files differ diff --git a/test/communication.c b/test/communication.c index 16a9b05..a46adc2 100644 --- a/test/communication.c +++ b/test/communication.c @@ -27,7 +27,7 @@ shm_socket_t *client_socket; while(true) { client_socket = shm_accept(socket); -printf("server messageQueue = %p\n", client_socket->messageQueue); +// printf("server messageQueue = %p\n", client_socket->messageQueue); pthread_create(&tid, NULL, precess_client , (void *)client_socket); } @@ -56,6 +56,7 @@ int size; void *recvbuf; + printf("requst:%s\n", sendbuf); shm_send(socket, sendbuf, strlen(sendbuf)+1) ; shm_recv(socket, &recvbuf, &size); printf("reply: %s\n", (char *)recvbuf); @@ -103,10 +104,10 @@ int scale = 100000; int i; shm_socket_t *socket = shm_open_socket(); - shm_connect(socket, port); for( i = 0; i<scale; i++) { - sprintf(sendbuf, "processor(%d) %d", targ->id, i); + sprintf(sendbuf, "thread(%d) %d", targ->id, i); + client_send(socket, sendbuf); } shm_close_socket(socket); @@ -115,7 +116,7 @@ void multyThreadClient(int port) { - int status, i = 0, processors = 2; + int status, i = 0, processors = 4; void *res[processors]; Targ *targs= (Targ*)calloc(processors, sizeof(Targ)); pthread_t tids[processors]; @@ -157,7 +158,8 @@ if (strcmp("mclient", argv[1]) == 0) multyThreadClient(port); - shm_destroy(); + + shm_destroy(); // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); return 0; } \ No newline at end of file -- Gitblit v1.8.0