From 1a7a9bdc976e4496d739bc57053e613f993bd85b Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 16 七月 2020 10:03:11 +0800
Subject: [PATCH] update

---
 queue/libshm_queue.a              |    0 
 queue/include/shm_queue.h         |    2 
 queue/include/socket.h            |   61 +++++++
 queue/include/shm_queue_wrapper.h |   12 -
 queue/socket.c                    |  234 +++++++++++++++++++++++++++++
 test2/Makefile                    |    2 
 queue/hashtable.c                 |    8 
 /dev/null                         |    0 
 queue/Makefile                    |    6 
 test2/req_rep.c                   |   59 +++++++
 test2/pub_sub.c                   |   60 +++++++
 queue/hashtable.h                 |    2 
 queue/shm_queue_wrapper.c         |    7 
 test2/req_rep                     |    0 
 14 files changed, 426 insertions(+), 27 deletions(-)

diff --git a/queue/Makefile b/queue/Makefile
index e26b381..83a16dd 100644
--- a/queue/Makefile
+++ b/queue/Makefile
@@ -22,6 +22,12 @@
 
 MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
 
+ifeq ($(DEBUG),y)
+  MYLIBS = $(LIBSQUEUE)
+else
+  MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
+endif
+
 all: build
  
 
diff --git a/queue/hashtable.c b/queue/hashtable.c
index 629d2af..43c59ce 100755
--- a/queue/hashtable.c
+++ b/queue/hashtable.c
@@ -15,8 +15,6 @@
   TAILQ_ENTRY(tailq_entry_t) joint;
 } tailq_entry_t;
 
-static int hashtable_mutex;
-
 #define START_KEY 1000
 
 typedef  TAILQ_HEAD(tailq_header_t, tailq_entry_t) tailq_header_t;
@@ -56,7 +54,7 @@
 
 }
 
-void* _hashtable_put(hashtable_t *hashtable, int key, void *value)
+void * _hashtable_put(hashtable_t *hashtable, int key, void *value)
 {
   size_t code = hashcode(key);
   void *oldvalue;
@@ -209,7 +207,7 @@
    return res;
 }
 
-void* hashtable_put(hashtable_t *hashtable, int key, void *value) {
+void hashtable_put(hashtable_t *hashtable, int key, void *value) {
   SemUtil::dec(hashtable->mutex);
   while (hashtable->readcnt > 0)
   {
@@ -262,4 +260,4 @@
     }
   }
   return keyset;
-}
\ No newline at end of file
+}
diff --git a/queue/hashtable.h b/queue/hashtable.h
index 2cdfb4d..726a5bc 100755
--- a/queue/hashtable.h
+++ b/queue/hashtable.h
@@ -19,7 +19,7 @@
 
 void hashtable_init(hashtable_t *hashtable);
 void *hashtable_get(hashtable_t *hashtable, int key);
-void* hashtable_put(hashtable_t *hashtable, int key, void *value);
+void hashtable_put(hashtable_t *hashtable, int key, void *value);
 void *hashtable_remove(hashtable_t *hashtable, int key);
 void hashtable_removeall(hashtable_t *hashtable);
 
diff --git a/queue/include/shm_queue.h b/queue/include/shm_queue.h
index 3a506c9..a62a6f0 100644
--- a/queue/include/shm_queue.h
+++ b/queue/include/shm_queue.h
@@ -64,7 +64,7 @@
     bool found;
     for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
         found = false;
-        for(int i = 0; i < length; i++) {
+        for(size_t i = 0; i < length; i++) {
             if(*keyItr == keys[i]) {
                 found = true;
                 break;
diff --git a/queue/include/shm_queue_wrapper.h b/queue/include/shm_queue_wrapper.h
index 6d9013b..33ed54f 100644
--- a/queue/include/shm_queue_wrapper.h
+++ b/queue/include/shm_queue_wrapper.h
@@ -9,18 +9,6 @@
 #ifdef __cplusplus
 extern "C" {
 #endif
-/**
- * 鍒濆鍖栧叡浜唴瀛�
- * @size 鍏变韩鍐呭瓨澶у皬
- * 
- */
-void shm_init(int size);
-
-/**
- * 閿�姣佸叡浜唴瀛�
- * 鏁翠釜杩涚▼閫�鍑烘椂闇�瑕佹墽琛岃繖涓柟娉曪紝璇ユ柟娉曢鍏堜細妫�鏌ユ槸鍚﹁繕鏈夊叾浠栬繘绋嬪湪浣跨敤璇ュ叡浜唴瀛橈紝濡傛灉杩樻湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ氨鍙槸detach,濡傛灉娌℃湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ垯閿�姣佹暣鍧楀唴瀛樸��
- */
-void shm_destroy();
 
  
 
diff --git a/queue/include/socket.h b/queue/include/socket.h
new file mode 100644
index 0000000..b19a7b4
--- /dev/null
+++ b/queue/include/socket.h
@@ -0,0 +1,61 @@
+#ifndef __BASIC_SHM_SOCKET_H__
+#define __BASIC_SHM_SOCKET_H__
+
+#include "usg_common.h"
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+enum shm_mod_t
+{
+	PULL_PUSH = 1,
+	REQ_REP = 2,
+	PAIR = 3,
+	PUB_SUB = 4,
+	SURVEY = 5,
+	BUS = 6
+	
+};
+
+
+/**
+ * 鍒濆鍖栧叡浜唴瀛�
+ * @size 鍏变韩鍐呭瓨澶у皬, 鍗曚綅M
+ * 
+ */
+void shm_init(int size);
+
+/**
+ * 閿�姣佸叡浜唴瀛�
+ * 鏁翠釜杩涚▼閫�鍑烘椂闇�瑕佹墽琛岃繖涓柟娉曪紝璇ユ柟娉曢鍏堜細妫�鏌ユ槸鍚﹁繕鏈夊叾浠栬繘绋嬪湪浣跨敤璇ュ叡浜唴瀛橈紝濡傛灉杩樻湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ氨鍙槸detach,濡傛灉娌℃湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ垯閿�姣佹暣鍧楀唴瀛樸��
+ */
+void shm_destroy();
+
+/**
+ * 閲婃斁recv鏂规硶鍒嗛厤鐨刡uf
+ */
+void shm_free(void *buf);
+
+void *shm_open_socket(int mod);
+
+
+int shm_close_socket(void *socket) ;
+
+
+int shm_bind(void* socket, int port) ;
+
+int shm_listen(void* socket) ;
+
+int shm_connect(void* socket, int port);
+
+int shm_send(void *socket, void *buf, int size) ;
+
+int shm_recv(void* socket, void **buf, int *size) ;
+
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
\ No newline at end of file
diff --git a/queue/libshm_queue.a b/queue/libshm_queue.a
index 873f630..94b87c9 100644
--- a/queue/libshm_queue.a
+++ b/queue/libshm_queue.a
Binary files differ
diff --git a/queue/libshm_queue.so b/queue/libshm_queue.so
deleted file mode 100755
index b0908e5..0000000
--- a/queue/libshm_queue.so
+++ /dev/null
Binary files differ
diff --git a/queue/shm_queue_wrapper.c b/queue/shm_queue_wrapper.c
index a6abd17..f680018 100644
--- a/queue/shm_queue_wrapper.c
+++ b/queue/shm_queue_wrapper.c
@@ -13,13 +13,6 @@
 } shmqueue_t;
 
 
-void shm_init(int size) {
-	mem_pool_init(size);
-}
-
-void shm_destroy() {
-	mem_pool_destroy();
-}
 
 //绉婚櫎涓嶅寘鍚湪keys涓殑闃熷垪
 void shm_remove_queues_exclue(void *keys, int length) {
diff --git a/queue/socket.c b/queue/socket.c
new file mode 100644
index 0000000..cc9a08a
--- /dev/null
+++ b/queue/socket.c
@@ -0,0 +1,234 @@
+#include "usg_common.h"
+#include "usg_typedef.h"
+#include "shm_queue.h"
+#include "shm_allocator.h"
+
+#include "mem_pool.h"
+#include "hashtable.h"
+#include "sem_util.h"
+#include "socket.h"
+#include <map>
+
+
+enum shm_msg_type_t
+{
+	SHM_SOCKET_OPEN = 1,
+	SHM_SOKET_CLOSE = 2,
+	SHM_COMMON_MSG = 3
+	
+};
+
+typedef struct shm_msg_t {
+	int port;
+	shm_msg_type_t type;
+	size_t size;
+	void * buf;
+
+} shm_msg_t;
+
+typedef struct shm_socket_t {
+	int port;
+	shm_mod_t mod;
+	SHMQueue<shm_msg_t> *queue;
+	std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap;
+	int slots;
+	int items;
+	int is_server;
+
+} shm_socket_t;
+
+
+
+void shm_init(int size) {
+	mem_pool_init(size);
+}
+
+void shm_destroy() {
+	mem_pool_destroy();
+}
+
+void shm_free(void *buf) {
+	free(buf);
+}
+
+void *shm_open_socket(int mod) {
+	shm_socket_t *socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
+	socket->remoteQueueMap = new std::map<int, SHMQueue<shm_msg_t>* >;
+	socket->port = -1;
+	socket->mod = (shm_mod_t)mod;
+printf("mod===%d\n", socket->mod);
+	socket->is_server = 0;
+	if (mod == REQ_REP) {
+		socket->slots = SemUtil::get(IPC_PRIVATE, 1);
+    	socket->items = SemUtil::get(IPC_PRIVATE, 0);
+	}
+	
+	return (void *)socket;
+}
+
+
+int shm_close_socket(void *socket) {
+	shm_socket_t * _socket = (shm_socket_t *) socket;
+	delete _socket->queue;
+
+	std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
+	for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) {
+		delete iter->second;
+	}
+	delete _socket->remoteQueueMap;
+
+	if (_socket->mod == REQ_REP) {
+		SemUtil::remove(_socket->slots);
+    	SemUtil::remove(_socket->items);
+	}
+	
+	free(socket);
+	return 0;
+
+}
+
+
+int shm_bind(void* socket, int port) {
+	shm_socket_t * _socket = (shm_socket_t *) socket;
+	_socket -> port = port;
+	return 0;
+}
+
+int shm_listen(void* socket) {
+	shm_socket_t * _socket = (shm_socket_t *) socket;
+	_socket->is_server = 1;
+	int  port;
+	hashtable_t *hashtable = mm_get_hashtable();
+	if(_socket -> port == -1) {
+		port = hashtable_alloc_key(hashtable);
+		_socket -> port = port;
+	} else {
+		 
+		if(hashtable_get(hashtable, _socket->port)!= NULL) {
+			err_exit(0, "key %d has already been in used!", _socket->port);
+		}
+	}
+
+	_socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16);
+	return 0;
+}
+
+
+static int __shm_rev(shm_socket_t* _socket, void **buf, int *size) {
+	shm_msg_t src;
+	
+	std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
+	bool rv = _socket->queue->pop(src);
+
+	if (rv) {
+		if(src.type=="open")
+
+
+
+		if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) {
+		 if(_socket->mod == REQ_REP)
+		  	SemUtil::dec(_socket->slots);
+
+		  remoteQueueMap->insert({src.port,  new SHMQueue<shm_msg_t>(src.port, 0)});
+
+		  if(_socket->mod == REQ_REP)
+		  	SemUtil::inc(_socket->items);
+		}
+		
+		void * _buf = malloc(src.size);
+		memcpy(_buf, src.buf, src.size);
+		*buf = _buf;
+		*size = src.size;
+		mm_free(src.buf);
+		return 0;
+	} else {
+		return 1;
+	}
+}
+
+int shm_connect(void* socket, int port) {
+	shm_socket_t * _socket = (shm_socket_t *) socket;
+	hashtable_t *hashtable = mm_get_hashtable();
+	if(hashtable_get(hashtable, port)== NULL) {
+		err_exit(0, "shm_connect锛歝onnect at port %d  failed!", port);
+	}
+	if(_socket -> port == -1) {
+		_socket -> port = hashtable_alloc_key(hashtable);
+	} else {
+		 
+		if(hashtable_get(hashtable, _socket->port)!= NULL) {
+			err_exit(0, "key %d has already been in used!", _socket->port);
+		}
+	}
+
+	_socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16);
+	_socket->remoteQueueMap->insert({port,  new SHMQueue<shm_msg_t>(port, 0)});
+	return 0;
+}
+
+int shm_send(void *socket, void *buf, int size) {
+	shm_socket_t * _socket = (shm_socket_t *) socket;
+	hashtable_t *hashtable = mm_get_hashtable();
+	shm_msg_t dest;
+	dest.port = _socket->port;
+	dest.size = size;
+	dest.buf = mm_malloc(size);
+	memcpy(dest.buf, buf, size);
+
+	std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
+	for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) {
+		if(hashtable_get(hashtable, iter->first)== NULL) {
+			err_msg(0, "shm_send锛歝onnect at port %d  failed, the other part has been closed!", iter->first);
+			delete iter->second;
+			remoteQueueMap->erase(iter);
+			continue;
+		}
+		if(_socket->mod == REQ_REP && _socket->is_server == 1)
+		  	SemUtil::dec(_socket->items);
+
+		iter->second->push(dest);
+
+		if( _socket->mod == REQ_REP && _socket->is_server == 1) {
+			delete iter->second;
+			remoteQueueMap->erase(iter);
+			SemUtil::inc(_socket->slots);
+		}
+
+		  	
+	}
+	return 0;
+}
+
+int shm_recv(void* socket, void **buf, int *size) {
+	shm_socket_t * _socket = (shm_socket_t *) socket;
+	shm_msg_t src;
+	
+	std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
+	bool rv = _socket->queue->pop(src);
+	if (rv) {
+		if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) {
+		 if(_socket->mod == REQ_REP)
+		  	SemUtil::dec(_socket->slots);
+
+		  remoteQueueMap->insert({src.port,  new SHMQueue<shm_msg_t>(src.port, 0)});
+
+		  if(_socket->mod == REQ_REP)
+		  	SemUtil::inc(_socket->items);
+		}
+		
+		void * _buf = malloc(src.size);
+		memcpy(_buf, src.buf, src.size);
+		*buf = _buf;
+		*size = src.size;
+		mm_free(src.buf);
+		return 0;
+	} else {
+		return 1;
+	}
+}
+
+
+
+
+ 
+
diff --git a/test2/Makefile b/test2/Makefile
index 872f05d..e8c4421 100644
--- a/test2/Makefile
+++ b/test2/Makefile
@@ -14,7 +14,7 @@
 include $(ROOT)/Make.defines.$(PLATFORM)
 
 
-PROGS =	test_queue_wrapper server client pub sub
+PROGS =	req_rep
 
 
 build: $(PROGS)
diff --git a/test2/pub_sub.c b/test2/pub_sub.c
new file mode 100644
index 0000000..669783f
--- /dev/null
+++ b/test2/pub_sub.c
@@ -0,0 +1,60 @@
+#include "socket.h"
+
+
+void server(int port) {
+	void *socket = shm_open_socket(PUB_SUB);
+	shm_bind(socket, port);
+	shm_listen(socket);
+	int size;
+	void *recvbuf;
+	char sendbuf[512];
+	while(true) {
+		shm_recv(socket, &recvbuf, &size);
+		sprintf(sendbuf, "pub: %s", recvbuf);
+		puts(sendbuf);
+		shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
+		shm_free(recvbuf);
+
+	}
+	shm_close_socket(socket);
+}
+
+void client(int port) {
+	void *socket = shm_open_socket(PUB_SUB);
+	shm_connect(socket, port);
+	int size;
+	void *recvbuf;
+	char sendbuf[512];
+
+	sprintf(sendbuf, "sub");
+	shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
+	while(true) {
+		
+		shm_recv(socket, &recvbuf, &size);
+		printf("received sub message: %s\n", (char *)recvbuf);
+		shm_free(recvbuf);
+
+	}
+	shm_close_socket(socket);
+}
+
+int main(int argc, char *argv[]) {
+  shm_init(512);
+  int port;
+  if (argc < 3) {
+  	 fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
+  	 return 1;
+  }
+
+  port = atoi(argv[2]);
+
+  if (strcmp("server", argv[1]) == 0 ) {
+     server(port);
+  }
+
+  if (strcmp("client", argv[1]) == 0)
+     client(port);
+ shm_destroy();
+ // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
+  return 0;
+}
\ No newline at end of file
diff --git a/test2/req_rep b/test2/req_rep
new file mode 100755
index 0000000..b2f9345
--- /dev/null
+++ b/test2/req_rep
Binary files differ
diff --git a/test2/req_rep.c b/test2/req_rep.c
new file mode 100644
index 0000000..e2b4fd9
--- /dev/null
+++ b/test2/req_rep.c
@@ -0,0 +1,59 @@
+#include "socket.h"
+
+
+void server(int port) {
+	void *socket = shm_open_socket(REQ_REP);
+	shm_bind(socket, port);
+	shm_listen(socket);
+	int size;
+	void *recvbuf;
+	char sendbuf[512];
+	while(true) {
+		shm_recv(socket, &recvbuf, &size);
+		sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf);
+		puts(sendbuf);
+		shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
+		shm_free(recvbuf);
+
+	}
+	shm_close_socket(socket);
+}
+
+void client(int port) {
+	void *socket = shm_open_socket(REQ_REP);
+	shm_connect(socket, port);
+	int size;
+	void *recvbuf;
+	char sendbuf[512];
+	while(true) {
+		printf("request: ");
+		scanf("%s", sendbuf);
+		shm_send(socket, sendbuf, strlen(sendbuf)+1) ;
+		shm_recv(socket, &recvbuf, &size);
+		printf("reply: %s\n", (char *)recvbuf);
+		shm_free(recvbuf);
+
+	}
+	shm_close_socket(socket);
+}
+
+int main(int argc, char *argv[]) {
+  shm_init(512);
+  int port;
+  if (argc < 3) {
+  	 fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
+  	 return 1;
+  }
+
+  port = atoi(argv[2]);
+
+  if (strcmp("server", argv[1]) == 0 ) {
+     server(port);
+  }
+
+  if (strcmp("client", argv[1]) == 0)
+     client(port);
+ shm_destroy();
+ // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
+  return 0;
+}
\ No newline at end of file

--
Gitblit v1.8.0