From c1f1446058dbedd9be9b9561e6ba435e0cd15bbc Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 17 七月 2020 11:01:01 +0800
Subject: [PATCH] update

---
 queue/socket.c |   75 +++++++++++++++++++++++++++++++++----
 1 files changed, 66 insertions(+), 9 deletions(-)

diff --git a/queue/socket.c b/queue/socket.c
index a52d532..29b94d4 100644
--- a/queue/socket.c
+++ b/queue/socket.c
@@ -6,7 +6,7 @@
 
 
 void print_msg(char *head, shm_msg_t& msg) {
-	err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
+	//err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
 }
 
 void * _server_run_msg_rev(void* _socket);
@@ -158,7 +158,9 @@
 	shm_msg_t src;
 	shm_socket_t *client_socket;
 	std::map<int, shm_socket_t* >::iterator iter;
+
     while(socket->queue->pop(src)) {
+
     	switch (src.type) {
 			case SHM_SOCKET_OPEN : 
 				socket->acceptQueue->push_timeout(src, &timeout);
@@ -167,10 +169,14 @@
 				_server_close_conn_to_client(socket, src.port);
 				break;
 			case SHM_COMMON_MSG :
+
 				iter = socket->clientSocketMap->find(src.port);
+	print_msg("_server_run_msg_rev find before", src);
 				if( iter !=  socket->clientSocketMap->end()) {
 					client_socket= iter->second;
+	print_msg("_server_run_msg_rev push before", src);
 					client_socket->messageQueue->push_timeout(src, &timeout);
+	print_msg("_server_run_msg_rev push after", src);
 				}
 				
 				break;
@@ -186,6 +192,10 @@
 
 
 
+/**
+ * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹�
+ *
+*/
 
 shm_socket_t* shm_accept(shm_socket_t* socket) {
 	hashtable_t *hashtable = mm_get_hashtable();
@@ -207,10 +217,29 @@
 
 		socket->clientSocketMap->insert({client_port, client_socket});
 
-		return client_socket;
+		/*
+         * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰�
+		*/
+		//鍙戦�乷pen_reply
+		struct timespec timeout = {1, 0};
+		shm_msg_t msg;
+		msg.port = socket->port;
+		msg.size = 0;
+		msg.type = SHM_SOCKET_OPEN_REPLY;
+
+		if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
+		{
+			return client_socket;
+		} else {
+			err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触");
+			return NULL;
+		}
+
+		
 	} else {
 		err_exit(errno, "shm_accept");
 	}
+	return NULL;
 	
 }
 
@@ -230,17 +259,32 @@
 	}
 
 	socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
-	socket->remoteQueue = new SHMQueue<shm_msg_t>(port, 0);
+	socket->remoteQueue = _attach_remote_queue(port);
 	socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); 
+	
+
+	//鍙戦�乷pen璇锋眰
 	struct timespec timeout = {1, 0};
+	shm_msg_t msg;
+	msg.port = socket->port;
+	msg.size = 0;
+	msg.type=SHM_SOCKET_OPEN;
+	socket->remoteQueue->push_timeout(msg, &timeout);
 
-	shm_msg_t open_msg;
-	open_msg.port = socket->port;
-	open_msg.size = 0;
-	open_msg.type=SHM_SOCKET_OPEN;
-	socket->remoteQueue->push_timeout(open_msg, &timeout);
+	//鎺ュ彈open reply
+	if(socket->queue->pop(msg)) {
+		// 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡
+		if(msg.type == SHM_SOCKET_OPEN_REPLY) {
 
-	pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
+			pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
+		} else {
+			err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!");
+		}
+		
+	} else {
+		err_exit(0, "connect failted!");
+	}
+	
 	return 0;
 }
 
@@ -288,7 +332,14 @@
 	dest.size = size;
 	dest.buf = mm_malloc(size);
 	memcpy(dest.buf, buf, size);
+
+	// struct timeval time;
+	// gettimeofday(&time, NULL);
+//err_msg(0, "%d %d=======>push befor %d", time.tv_sec, time.tv_usec, socket->port);
 	if(socket->remoteQueue->push(dest)) {
+
+		//gettimeofday(&time, NULL);
+//err_msg(0, "%d %d=======>push after %d", time.tv_sec, time.tv_usec, socket->port);
 		return 0;
 	} else {
 		err_msg(errno, "connection has been closed!");
@@ -300,7 +351,13 @@
 
 int shm_recv(shm_socket_t* socket, void **buf, int *size) {
 	shm_msg_t src;
+
+// 	struct timeval time;
+// 	gettimeofday(&time, NULL);
+// err_msg(0, "%d %d=======>pop befor %d", time.tv_sec, time.tv_usec, socket->port);
 	if (socket->messageQueue->pop(src)) {
+// gettimeofday(&time, NULL);
+// err_msg(0, "%d %d=======>pop after %d", time.tv_sec, time.tv_usec, socket->port);
 		void * _buf = malloc(src.size);
 		memcpy(_buf, src.buf, src.size);
 		*buf = _buf;

--
Gitblit v1.8.0