From c1f1446058dbedd9be9b9561e6ba435e0cd15bbc Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 17 七月 2020 11:01:01 +0800
Subject: [PATCH] update

---
 queue/libshm_queue.a            |    0 
 test/communication.c            |   12 +++--
 queue/include/socket.h          |    5 +-
 queue/include/lock_free_queue.h |    4 +-
 queue/socket.c                  |   75 +++++++++++++++++++++++++++++++++----
 test/communication              |    0 
 6 files changed, 78 insertions(+), 18 deletions(-)

diff --git a/queue/include/lock_free_queue.h b/queue/include/lock_free_queue.h
index f9a4667..f34079f 100644
--- a/queue/include/lock_free_queue.h
+++ b/queue/include/lock_free_queue.h
@@ -78,7 +78,7 @@
     int items;
    
 public:
-    int mutex;
+    // int mutex;
     LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
     
     /// @brief destructor of the class. 
@@ -151,7 +151,7 @@
 // std::cout << "LockFreeQueue init reference=" << reference << std::endl;
     slots = SemUtil::get(IPC_PRIVATE, qsize);
     items = SemUtil::get(IPC_PRIVATE, 0);
-    mutex = SemUtil::get(IPC_PRIVATE, 1);
+    // mutex = SemUtil::get(IPC_PRIVATE, 1);
 }
 
 template <
diff --git a/queue/include/socket.h b/queue/include/socket.h
index f51c021..b7e2807 100644
--- a/queue/include/socket.h
+++ b/queue/include/socket.h
@@ -28,8 +28,9 @@
 enum shm_msg_type_t
 {
 	SHM_SOCKET_OPEN = 1,
-	SHM_SOCKET_CLOSE = 2,
-	SHM_COMMON_MSG = 3
+	SHM_SOCKET_OPEN_REPLY = 2,
+	SHM_SOCKET_CLOSE = 3,
+	SHM_COMMON_MSG = 4
 	
 };
 
diff --git a/queue/libshm_queue.a b/queue/libshm_queue.a
index 553db5a..f1b3525 100644
--- a/queue/libshm_queue.a
+++ b/queue/libshm_queue.a
Binary files differ
diff --git a/queue/socket.c b/queue/socket.c
index a52d532..29b94d4 100644
--- a/queue/socket.c
+++ b/queue/socket.c
@@ -6,7 +6,7 @@
 
 
 void print_msg(char *head, shm_msg_t& msg) {
-	err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
+	//err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
 }
 
 void * _server_run_msg_rev(void* _socket);
@@ -158,7 +158,9 @@
 	shm_msg_t src;
 	shm_socket_t *client_socket;
 	std::map<int, shm_socket_t* >::iterator iter;
+
     while(socket->queue->pop(src)) {
+
     	switch (src.type) {
 			case SHM_SOCKET_OPEN : 
 				socket->acceptQueue->push_timeout(src, &timeout);
@@ -167,10 +169,14 @@
 				_server_close_conn_to_client(socket, src.port);
 				break;
 			case SHM_COMMON_MSG :
+
 				iter = socket->clientSocketMap->find(src.port);
+	print_msg("_server_run_msg_rev find before", src);
 				if( iter !=  socket->clientSocketMap->end()) {
 					client_socket= iter->second;
+	print_msg("_server_run_msg_rev push before", src);
 					client_socket->messageQueue->push_timeout(src, &timeout);
+	print_msg("_server_run_msg_rev push after", src);
 				}
 				
 				break;
@@ -186,6 +192,10 @@
 
 
 
+/**
+ * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹�
+ *
+*/
 
 shm_socket_t* shm_accept(shm_socket_t* socket) {
 	hashtable_t *hashtable = mm_get_hashtable();
@@ -207,10 +217,29 @@
 
 		socket->clientSocketMap->insert({client_port, client_socket});
 
-		return client_socket;
+		/*
+         * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰�
+		*/
+		//鍙戦�乷pen_reply
+		struct timespec timeout = {1, 0};
+		shm_msg_t msg;
+		msg.port = socket->port;
+		msg.size = 0;
+		msg.type = SHM_SOCKET_OPEN_REPLY;
+
+		if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
+		{
+			return client_socket;
+		} else {
+			err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触");
+			return NULL;
+		}
+
+		
 	} else {
 		err_exit(errno, "shm_accept");
 	}
+	return NULL;
 	
 }
 
@@ -230,17 +259,32 @@
 	}
 
 	socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
-	socket->remoteQueue = new SHMQueue<shm_msg_t>(port, 0);
+	socket->remoteQueue = _attach_remote_queue(port);
 	socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); 
+	
+
+	//鍙戦�乷pen璇锋眰
 	struct timespec timeout = {1, 0};
+	shm_msg_t msg;
+	msg.port = socket->port;
+	msg.size = 0;
+	msg.type=SHM_SOCKET_OPEN;
+	socket->remoteQueue->push_timeout(msg, &timeout);
 
-	shm_msg_t open_msg;
-	open_msg.port = socket->port;
-	open_msg.size = 0;
-	open_msg.type=SHM_SOCKET_OPEN;
-	socket->remoteQueue->push_timeout(open_msg, &timeout);
+	//鎺ュ彈open reply
+	if(socket->queue->pop(msg)) {
+		// 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡
+		if(msg.type == SHM_SOCKET_OPEN_REPLY) {
 
-	pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
+			pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
+		} else {
+			err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!");
+		}
+		
+	} else {
+		err_exit(0, "connect failted!");
+	}
+	
 	return 0;
 }
 
@@ -288,7 +332,14 @@
 	dest.size = size;
 	dest.buf = mm_malloc(size);
 	memcpy(dest.buf, buf, size);
+
+	// struct timeval time;
+	// gettimeofday(&time, NULL);
+//err_msg(0, "%d %d=======>push befor %d", time.tv_sec, time.tv_usec, socket->port);
 	if(socket->remoteQueue->push(dest)) {
+
+		//gettimeofday(&time, NULL);
+//err_msg(0, "%d %d=======>push after %d", time.tv_sec, time.tv_usec, socket->port);
 		return 0;
 	} else {
 		err_msg(errno, "connection has been closed!");
@@ -300,7 +351,13 @@
 
 int shm_recv(shm_socket_t* socket, void **buf, int *size) {
 	shm_msg_t src;
+
+// 	struct timeval time;
+// 	gettimeofday(&time, NULL);
+// err_msg(0, "%d %d=======>pop befor %d", time.tv_sec, time.tv_usec, socket->port);
 	if (socket->messageQueue->pop(src)) {
+// gettimeofday(&time, NULL);
+// err_msg(0, "%d %d=======>pop after %d", time.tv_sec, time.tv_usec, socket->port);
 		void * _buf = malloc(src.size);
 		memcpy(_buf, src.buf, src.size);
 		*buf = _buf;
diff --git a/test/communication b/test/communication
index 3a0e6b1..1f9adb7 100755
--- a/test/communication
+++ b/test/communication
Binary files differ
diff --git a/test/communication.c b/test/communication.c
index 16a9b05..a46adc2 100644
--- a/test/communication.c
+++ b/test/communication.c
@@ -27,7 +27,7 @@
 	shm_socket_t *client_socket;
 	while(true) {
 		client_socket = shm_accept(socket);
-printf("server messageQueue = %p\n", client_socket->messageQueue);
+// printf("server messageQueue = %p\n", client_socket->messageQueue);
 		pthread_create(&tid, NULL, precess_client , (void *)client_socket);
 	}
 
@@ -56,6 +56,7 @@
 	
 	int size;
 	void *recvbuf;
+	printf("requst:%s\n", sendbuf);
 	shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
 	shm_recv(socket, &recvbuf, &size);
 	printf("reply: %s\n", (char *)recvbuf);
@@ -103,10 +104,10 @@
 	int scale = 100000;
 	int i;
 	shm_socket_t *socket = shm_open_socket();
-
 	shm_connect(socket, port);
 	for( i = 0; i<scale; i++) {
-		sprintf(sendbuf, "processor(%d) %d", targ->id, i);
+		sprintf(sendbuf, "thread(%d) %d", targ->id, i);
+		
 		client_send(socket, sendbuf);
 	}
 	shm_close_socket(socket);
@@ -115,7 +116,7 @@
 
 void multyThreadClient(int port) {
 
-	int status, i = 0, processors = 2;
+	int status, i = 0, processors = 4;
 	void *res[processors];
 	Targ *targs= (Targ*)calloc(processors, sizeof(Targ));
 	pthread_t tids[processors];
@@ -157,7 +158,8 @@
 
  if (strcmp("mclient", argv[1]) == 0)
      multyThreadClient(port);
- shm_destroy();
+
+  shm_destroy();
  // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
   return 0;
 }
\ No newline at end of file

--
Gitblit v1.8.0