From 9e6ceaad059b2aec84df92c8750f6d87eab708c2 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 16 七月 2020 20:46:31 +0800
Subject: [PATCH] udpate

---
 queue/libshm_queue.a            |    0 
 test/communication.c            |  100 +++++++++++++++++++++++-
 queue/include/lock_free_queue.h |   18 +++-
 queue/socket.c                  |   89 ++++++++++++---------
 test/communication              |    0 
 5 files changed, 158 insertions(+), 49 deletions(-)

diff --git a/queue/include/lock_free_queue.h b/queue/include/lock_free_queue.h
index 566c7ed..f9a4667 100644
--- a/queue/include/lock_free_queue.h
+++ b/queue/include/lock_free_queue.h
@@ -78,6 +78,7 @@
     int items;
    
 public:
+    int mutex;
     LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
     
     /// @brief destructor of the class. 
@@ -150,6 +151,7 @@
 // std::cout << "LockFreeQueue init reference=" << reference << std::endl;
     slots = SemUtil::get(IPC_PRIVATE, qsize);
     items = SemUtil::get(IPC_PRIVATE, 0);
+    mutex = SemUtil::get(IPC_PRIVATE, 1);
 }
 
 template <
@@ -198,7 +200,8 @@
 bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
 {
     if (SemUtil::dec(slots) == -1) {
-        err_exit(errno, "push");
+        err_msg(errno, "LockFreeQueue push");
+        return false;
     }
 
     if ( m_qImpl.push(a_data) ) {
@@ -218,8 +221,11 @@
     if (SemUtil::dec_nowait(slots) == -1) {
         if (errno == EAGAIN)
             return false;
-        else
-            err_exit(errno, "push_nowait");
+        else {
+            err_msg(errno, "LockFreeQueue push_nowait");
+            return false;
+        }
+
     }
 
     if ( m_qImpl.push(a_data)) {
@@ -240,8 +246,10 @@
     if (SemUtil::dec_timeout(slots, timeout) == -1) {
         if (errno == EAGAIN)
             return false;
-        else 
-            err_exit(errno, "push_timeout");
+        else {
+            err_msg(errno, "LockFreeQueue push_timeout");
+            return false;
+        }
     }
 
     if (m_qImpl.push(a_data)){
diff --git a/queue/libshm_queue.a b/queue/libshm_queue.a
index 65416b5..553db5a 100644
--- a/queue/libshm_queue.a
+++ b/queue/libshm_queue.a
Binary files differ
diff --git a/queue/socket.c b/queue/socket.c
index 1f3ff24..a52d532 100644
--- a/queue/socket.c
+++ b/queue/socket.c
@@ -37,29 +37,37 @@
 }
 
 
-int shm_close_socket(shm_socket_t *socket) {
+int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) {
 	//缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅
 	struct timespec timeout = {1, 0};
 	shm_msg_t close_msg;
+
 	close_msg.port = socket->port;
 	close_msg.size = 0;
 	close_msg.type=SHM_SOCKET_CLOSE;
-	if(socket->remoteQueue != NULL) {
+	if(notifyRemote && socket->remoteQueue != NULL) {
 		socket->remoteQueue->push_timeout(close_msg, &timeout);
 	}
 	
-
-
-	if(socket->queue != NULL)
+	if(socket->queue != NULL) {
 		delete socket->queue;
-	if(socket->remoteQueue != NULL)
+		socket->queue = NULL;
+	}
+
+	if(socket->remoteQueue != NULL) {
 		delete socket->remoteQueue;
+		socket->remoteQueue = NULL;
+	}
 
-	if(socket->messageQueue != NULL)
+	if(socket->messageQueue != NULL) {
 		delete socket->messageQueue;
+		socket->messageQueue = NULL;
+	}
 
-	if(socket->acceptQueue != NULL)
+	if(socket->acceptQueue != NULL) {
 		delete socket->acceptQueue;
+		socket->acceptQueue = NULL;
+	}
 
 	if(socket->clientSocketMap != NULL) {
 		shm_socket_t *client_socket;
@@ -68,7 +76,9 @@
 
 			client_socket->remoteQueue->push_timeout(close_msg, &timeout);
 			delete client_socket->remoteQueue;
+			client_socket->remoteQueue=NULL;
 			delete client_socket->messageQueue;
+			client_socket->messageQueue=NULL;
 			socket->clientSocketMap->erase(iter);
 			free((void *)client_socket);
 		}
@@ -78,13 +88,16 @@
 
 	if(socket->dispatch_thread != 0)
 		pthread_cancel(socket->dispatch_thread);
-
-
 	 
 	free(socket);
 	return 0;
 
 }
+
+int shm_close_socket(shm_socket_t *socket) {
+	return _shm_close_socket(socket, true);
+}
+
 
 
 int shm_bind(shm_socket_t * socket, int port) {
@@ -119,12 +132,19 @@
 	shm_socket_t *client_socket;
 	auto iter = socket->clientSocketMap->find(port);
 	if( iter !=  socket->clientSocketMap->end() ) {
-		client_socket= iter->second;
-		delete client_socket->remoteQueue;
-		delete client_socket->messageQueue;
+		// client_socket= iter->second;
+		// if(client_socket->remoteQueue != NULL) {
+		// 	delete client_socket->remoteQueue;
+		// 	client_socket->remoteQueue = NULL;
+		// }
+		// if(client_socket->messageQueue != NULL) {
+		// 	delete client_socket->messageQueue;
+		// 	client_socket->messageQueue = NULL;
+		// }
+		
 		socket->clientSocketMap->erase(iter);
 	}
-	free((void *)client_socket);
+	//free((void *)client_socket);
 
 }
 
@@ -139,7 +159,6 @@
 	shm_socket_t *client_socket;
 	std::map<int, shm_socket_t* >::iterator iter;
     while(socket->queue->pop(src)) {
-print_msg("=====_server_run_msg_rev:", src);
     	switch (src.type) {
 			case SHM_SOCKET_OPEN : 
 				socket->acceptQueue->push_timeout(src, &timeout);
@@ -148,11 +167,9 @@
 				_server_close_conn_to_client(socket, src.port);
 				break;
 			case SHM_COMMON_MSG :
-err_msg(0, "===_server_run_msg_rev 1");				
 				iter = socket->clientSocketMap->find(src.port);
 				if( iter !=  socket->clientSocketMap->end()) {
 					client_socket= iter->second;
-err_msg(0, "===_server_run_msg_rev client_socket->messageQueue=%p", client_socket->messageQueue);	
 					client_socket->messageQueue->push_timeout(src, &timeout);
 				}
 				
@@ -178,7 +195,7 @@
 
 	if (socket->acceptQueue->pop(src) ) {
 
-print_msg("===accept:", src);
+// print_msg("===accept:", src);
 		client_port = src.port;
 		client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
 		client_socket->port = socket->port;
@@ -228,20 +245,8 @@
 }
 
 void _client_close_conn_to_server(shm_socket_t* socket) {
-	if(socket->queue != NULL)
-		delete socket->queue;
-	if(socket->remoteQueue != NULL)
-		delete socket->remoteQueue;
-
-	if(socket->messageQueue != NULL)
-		delete socket->messageQueue;
-
-	if(socket->acceptQueue != NULL)
-		delete socket->acceptQueue;
-
-	if(socket->dispatch_thread != 0)
-		pthread_cancel(socket->dispatch_thread);
-
+	 
+	_shm_close_socket(socket, false);
 }
 
 
@@ -283,24 +288,30 @@
 	dest.size = size;
 	dest.buf = mm_malloc(size);
 	memcpy(dest.buf, buf, size);
-
-	socket->remoteQueue->push(dest);
-	return 0;
+	if(socket->remoteQueue->push(dest)) {
+		return 0;
+	} else {
+		err_msg(errno, "connection has been closed!");
+		return -1;
+	}
+	
+	
 }
 
 int shm_recv(shm_socket_t* socket, void **buf, int *size) {
 	shm_msg_t src;
-err_msg(0, "====shm_recv socket ==%p", socket);
-	bool rv = socket->messageQueue->pop(src);
-	if (rv) {
+	if (socket->messageQueue->pop(src)) {
 		void * _buf = malloc(src.size);
 		memcpy(_buf, src.buf, src.size);
 		*buf = _buf;
 		*size = src.size;
 		mm_free(src.buf);
+		return 0;
+	} else {
+		return -1;
 	}
 	
-	return 0;
+	
 	
 }
 
diff --git a/test/communication b/test/communication
index ca89adb..3a0e6b1 100755
--- a/test/communication
+++ b/test/communication
Binary files differ
diff --git a/test/communication.c b/test/communication.c
index a314dcb..16a9b05 100644
--- a/test/communication.c
+++ b/test/communication.c
@@ -1,5 +1,8 @@
 #include "socket.h"
-
+typedef struct Targ {
+	int port;
+	int id;
+}Targ;
 
 void * precess_client(void *_socket) {
 	pthread_detach(pthread_self());
@@ -7,13 +10,11 @@
 	int size;
 	void *recvbuf;
 	char sendbuf[512];
-	while(true) {
-		shm_recv(socket, &recvbuf, &size);
+	while (shm_recv(socket, &recvbuf, &size) == 0 ) {
 		sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf);
 		puts(sendbuf);
-		shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
+		shm_send(socket, sendbuf, strlen(sendbuf)+1);
 		shm_free(recvbuf);
-
 	}
 	shm_close_socket(socket);
 }
@@ -51,6 +52,92 @@
 	shm_close_socket(socket);
 }
 
+void client_send(shm_socket_t *socket, char *sendbuf) {
+	
+	int size;
+	void *recvbuf;
+	shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
+	shm_recv(socket, &recvbuf, &size);
+	printf("reply: %s\n", (char *)recvbuf);
+	shm_free(recvbuf);
+
+	
+}
+
+
+void multyProcessorsClient(int port) {
+
+	int status, i = 0, processors = 4, scale = 100000;
+	pid_t productors[processors];
+	pid_t pid;
+	char sendbuf[512];
+	for ( i = 0; i < processors; i++) {
+		if ((productors[i] = fork()) == 0)     /* Child runs user job */
+		{
+			shm_socket_t *socket = shm_open_socket();
+			shm_connect(socket, port);
+			while( scale-- > 0) {
+				sprintf(sendbuf, "processor(%d) %d", i, scale);
+				client_send(socket, sendbuf);
+			}
+			shm_close_socket(socket);
+			exit(0);
+		}
+	}
+
+	while ((pid = waitpid(-1, &status, 0)) > 0) {
+		if(WIFEXITED(status)) {
+			//fprintf(stderr, "child %d terminated normally with exit status=%d\n", pid, WEXITSTATUS(status));
+		}else
+			fprintf(stderr, "child %d terminated abnormally\n", pid);
+	}
+
+	if (errno != ECHILD)
+		perror("waitpid error");
+}
+
+void *threadrun(void *arg) {
+	Targ * targ = ( Targ * )arg;
+	int port = targ->port;
+	char sendbuf[512];
+	int scale = 100000;
+	int i;
+	shm_socket_t *socket = shm_open_socket();
+
+	shm_connect(socket, port);
+	for( i = 0; i<scale; i++) {
+		sprintf(sendbuf, "processor(%d) %d", targ->id, i);
+		client_send(socket, sendbuf);
+	}
+	shm_close_socket(socket);
+	return (void*)i;
+}
+
+void multyThreadClient(int port) {
+
+	int status, i = 0, processors = 2;
+	void *res[processors];
+	Targ *targs= (Targ*)calloc(processors, sizeof(Targ));
+	pthread_t tids[processors];
+	char sendbuf[512];
+	for ( i = 0; i < processors; i++) {
+		targs[i].port = port;
+		targs[i].id = i;
+		pthread_create(&tids[i], NULL, threadrun, (void *)&targs[i]);
+		
+	}
+
+	for (i = 0; i< processors; i++) {
+	    if(pthread_join(tids[i], &res[i])!=0) {
+	      perror("multyThreadClient pthread_join");
+	    } else {
+	      fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]);
+	    }
+	  }
+
+	
+}
+
 int main(int argc, char *argv[]) {
   shm_init(512);
   int port;
@@ -67,6 +154,9 @@
 
   if (strcmp("client", argv[1]) == 0)
      client(port);
+
+ if (strcmp("mclient", argv[1]) == 0)
+     multyThreadClient(port);
  shm_destroy();
  // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
   return 0;

--
Gitblit v1.8.0