From 4d5adf4ac44c864e67e8019bb97d89199bb0b4b7 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 06 八月 2020 17:06:27 +0800
Subject: [PATCH] fix survey

---
 /dev/null                           |  229 ---------------------------------------------
 src/queue/hashtable.c               |   36 ++++++
 test_socket/dgram_mod_req_rep.c     |    4 
 src/queue/include/lock_free_queue.h |   12 +-
 4 files changed, 42 insertions(+), 239 deletions(-)

diff --git a/src/queue/hashtable.c b/src/queue/hashtable.c
index 110b429..ed3fe34 100755
--- a/src/queue/hashtable.c
+++ b/src/queue/hashtable.c
@@ -20,6 +20,7 @@
 
 typedef  TAILQ_HEAD(tailq_header_t, tailq_entry_t) tailq_header_t;
 
+
 static size_t hashcode(int key);
 
 void hashtable_init(hashtable_t *hashtable )
@@ -30,6 +31,7 @@
   hashtable->wlock = SemUtil::get(IPC_PRIVATE, 1);
   hashtable->cond = SemUtil::get(IPC_PRIVATE, 1);
   hashtable->readcnt = 0;
+
 }
 
 
@@ -181,11 +183,17 @@
 
 
 void *hashtable_get(hashtable_t *hashtable, int key) {
-   SemUtil::dec(hashtable->mutex);
+   struct timespec timeout = {1, 0};
+   if (SemUtil::dec_timeout(hashtable->mutex, &timeout) != 0) {
+    SemUtil::inc(hashtable->mutex);
+    SemUtil::dec(hashtable->mutex);
+   }
+
    hashtable->readcnt++;
    if (hashtable->readcnt == 1) {
     //鑾峰彇璇诲啓閿�
     SemUtil::dec(hashtable->wlock);
+// err_msg(0, "hashtable_get dec %d %d\n", --hashtable->tmp);
    }
    SemUtil::inc(hashtable->mutex);
    // ================
@@ -199,6 +207,7 @@
    if(hashtable->readcnt == 0) {
     //閲婃斁璇诲啓閿�
     SemUtil::inc(hashtable->wlock);
+// err_msg(0, "hashtable_get inc %d\n", ++hashtable->tmp);
   //閫氱煡鍐�
     SemUtil::set(hashtable->cond, 1);
    }
@@ -207,14 +216,22 @@
 }
 
 void hashtable_put(hashtable_t *hashtable, int key, void *value) {
-  SemUtil::dec(hashtable->mutex);
+  struct timespec timeout = {2, 0};
+  if (SemUtil::dec_timeout(hashtable->mutex, &timeout) != 0) {
+    SemUtil::inc(hashtable->mutex);
+    SemUtil::dec(hashtable->mutex);
+  }
   // 璁剧疆璇讳紭鍏堢骇楂�
   while (hashtable->readcnt > 0)
   {
     SemUtil::set(hashtable->cond, 0);
     SemUtil::inc(hashtable->mutex);
     //绛夊緟鍐欓�氱煡
-    SemUtil::dec(hashtable->cond);
+    if (SemUtil::dec_timeout(hashtable->cond, &timeout) != 0) {
+      hashtable->readcnt = 0;
+      SemUtil::inc(hashtable->cond);
+      SemUtil::dec(hashtable->cond);
+    }
 
     SemUtil::dec(hashtable->mutex);
      
@@ -222,11 +239,15 @@
   SemUtil::inc(hashtable->mutex);
 
   //鑾峰彇璇诲啓閿�
+ 
   SemUtil::dec(hashtable->wlock);
+ // err_msg(0, "hashtable_put dec %d\n", --hashtable->tmp);
 
   _hashtable_put(hashtable, key, value);
+
   //閲婃斁璇诲啓閿�
   SemUtil::inc(hashtable->wlock);
+// err_msg(0, "hashtable_put inc %d\n", ++hashtable->tmp);
 }
 
 
@@ -295,13 +316,20 @@
 
 int hashtable_alloc_key(hashtable_t *hashtable) {
   int key = START_KEY;
-  SemUtil::dec(hashtable->wlock);
+
+  struct timespec timeout = {1, 0};
+  if (SemUtil::dec_timeout(hashtable->wlock, &timeout) != 0) {
+    SemUtil::inc(hashtable->wlock);
+    SemUtil::dec(hashtable->wlock);
+  }
 
   while(_hashtable_get(hashtable, key) != NULL) {
     key++;
   }
   // 鍗犵敤key
   _hashtable_put(hashtable, key, (void *)1);
+
   SemUtil::inc(hashtable->wlock);
+// err_msg(0, "hashtable_alloc_key inc %d\n", ++hashtable->tmp);
   return key;
 }
diff --git a/src/queue/include/lock_free_queue.h b/src/queue/include/lock_free_queue.h
index 25a3392..8a65e81 100644
--- a/src/queue/include/lock_free_queue.h
+++ b/src/queue/include/lock_free_queue.h
@@ -200,7 +200,7 @@
     template <typename T, typename AT> class Q_TYPE>
 bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
 {
-// printf("==================LockFreeQueue push before\n");   
+ // printf("==================LockFreeQueue push before\n");   
     if (SemUtil::dec(slots) == -1) {
         err_msg(errno, "LockFreeQueue push");
         return false;
@@ -209,7 +209,7 @@
     if ( m_qImpl.push(a_data) ) {
 
         SemUtil::inc(items);   
-// printf("==================LockFreeQueue push after\n");   
+ // printf("==================LockFreeQueue push after\n");   
         return true;
     }
     return false;
@@ -274,7 +274,7 @@
     template <typename T, typename AT> class Q_TYPE>
 bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
 {
-// printf("==================LockFreeQueue pop before\n");
+ // printf("==================LockFreeQueue pop before\n");
     if (SemUtil::dec(items) == -1) {
         err_msg(errno, "LockFreeQueue pop");
         return false;
@@ -282,7 +282,7 @@
 
     if (m_qImpl.pop(a_data)) {
         SemUtil::inc(slots);
-// printf("==================LockFreeQueue pop after\n");      
+ // printf("==================LockFreeQueue pop after\n");      
         return true;
     }
     return false;
@@ -319,6 +319,7 @@
     template <typename T, typename AT> class Q_TYPE>
 bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
 {
+// printf("==================LockFreeQueue pop_timeout before\n");
     if (SemUtil::dec_timeout(items, timeout) == -1) {
         if (errno == EAGAIN)
             return false;
@@ -329,7 +330,8 @@
     }
 
     if (m_qImpl.pop(a_data)) {
-        SemUtil::inc(slots);       
+        SemUtil::inc(slots);  
+// printf("==================LockFreeQueue pop_timeout after\n");     
         return true;
     }
     return false;
diff --git a/src/queue/socket.c.bk b/src/queue/socket.c.bk
deleted file mode 100644
index d240907..0000000
--- a/src/queue/socket.c.bk
+++ /dev/null
@@ -1,229 +0,0 @@
-#include "usg_common.h"
-#include "usg_typedef.h"
-#include "shm_queue.h"
-#include "shm_allocator.h"
-
-#include "mem_pool.h"
-#include "hashtable.h"
-#include "sem_util.h"
-#include "socket.h"
-#include <map>
-
-
-enum shm_msg_type_t
-{
-	SHM_SOCKET_OPEN = 1,
-	SHM_SOKET_CLOSE = 2,
-	SHM_COMMON_MSG = 3
-	
-};
-
-typedef struct shm_msg_t {
-	int port;
-	shm_msg_type_t type;
-	size_t size;
-	void * buf;
-
-} shm_msg_t;
-
-typedef struct shm_socket_t {
-	int port;
-	shm_mod_t mod;
-	SHMQueue<shm_msg_t> *queue;
-	SHMQueue<shm_msg_t> *remoteQueue;
-	// std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap;
-	int slots;
-	int items;
-	int is_server;
-
-} shm_socket_t;
-
-
-
-void shm_init(int size) {
-	mem_pool_init(size);
-}
-
-void shm_destroy() {
-	mem_pool_destroy();
-}
-
-void shm_free(void *buf) {
-	free(buf);
-}
-
-void *shm_open_socket(int mod) {
-	shm_socket_t *socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
-	socket->remoteQueueMap = new std::map<int, SHMQueue<shm_msg_t>* >;
-	socket->port = -1;
-	socket->mod = (shm_mod_t)mod;
-printf("mod===%d\n", socket->mod);
-	socket->is_server = 0;
-	if (mod == REQ_REP) {
-		socket->slots = SemUtil::get(IPC_PRIVATE, 1);
-    	socket->items = SemUtil::get(IPC_PRIVATE, 0);
-	}
-	
-	return (void *)socket;
-}
-
-
-int shm_close_socket(void *socket) {
-	shm_socket_t * _socket = (shm_socket_t *) socket;
-	delete _socket->queue;
-
-	std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
-	for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) {
-		delete iter->second;
-	}
-	delete _socket->remoteQueueMap;
-
-	if (_socket->mod == REQ_REP) {
-		SemUtil::remove(_socket->slots);
-    SemUtil::remove(_socket->items);
-	}
-	
-	free(socket);
-	return 0;
-
-}
-
-
-int shm_bind(void* socket, int port) {
-	shm_socket_t * _socket = (shm_socket_t *) socket;
-	_socket -> port = port;
-	return 0;
-}
-
-int shm_listen(void* socket) {
-	shm_socket_t * _socket = (shm_socket_t *) socket;
-	_socket->is_server = 1;
-	int  port;
-	hashtable_t *hashtable = mm_get_hashtable();
-	if(_socket -> port == -1) {
-		port = hashtable_alloc_key(hashtable);
-		_socket -> port = port;
-	} else {
-		 
-		if(hashtable_get(hashtable, _socket->port)!= NULL) {
-			err_exit(0, "key %d has already been in used!", _socket->port);
-		}
-	}
-
-	_socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16);
-	return 0;
-}
-
-
-static int __shm_rev__(shm_socket_t* _socket) {
-	shm_msg_t src;
-	
-	std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
-	bool rv = _socket->queue->pop(src);
-
-	if (rv) {
-		if(src.type == SHM_SOCKET_OPEN) {
-			_socket->remoteQueue = new SHMQueue<shm_msg_t>(src.port, 0);
-		} 
-
-
-
-		 
-		
-		void * _buf = malloc(src.size);
-		memcpy(_buf, src.buf, src.size);
-		*buf = _buf;
-		*size = src.size;
-		mm_free(src.buf);
-		return 0;
-	} else {
-		return 1;
-	}
-}
-
-int shm_connect(void* socket, int port) {
-	shm_socket_t * _socket = (shm_socket_t *) socket;
-	hashtable_t *hashtable = mm_get_hashtable();
-	if(hashtable_get(hashtable, port)== NULL) {
-		err_exit(0, "shm_connect锛歝onnect at port %d  failed!", port);
-	}
-	if(_socket -> port == -1) {
-		_socket -> port = hashtable_alloc_key(hashtable);
-	} else {
-		 
-		if(hashtable_get(hashtable, _socket->port)!= NULL) {
-			err_exit(0, "key %d has already been in used!", _socket->port);
-		}
-	}
-
-	_socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16);
-	_socket->remoteQueueMap->insert({port,  new SHMQueue<shm_msg_t>(port, 0)});
-	return 0;
-}
-
-int shm_send(void *socket, void *buf, int size) {
-	shm_socket_t * _socket = (shm_socket_t *) socket;
-	hashtable_t *hashtable = mm_get_hashtable();
-	shm_msg_t dest;
-	dest.port = _socket->port;
-	dest.size = size;
-	dest.buf = mm_malloc(size);
-	memcpy(dest.buf, buf, size);
-
-	std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
-	for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) {
-		if(hashtable_get(hashtable, iter->first)== NULL) {
-			err_msg(0, "shm_send锛歝onnect at port %d  failed, the other part has been closed!", iter->first);
-			delete iter->second;
-			remoteQueueMap->erase(iter);
-			continue;
-		}
-		if(_socket->mod == REQ_REP && _socket->is_server == 1)
-		  	SemUtil::dec(_socket->items);
-
-		iter->second->push(dest);
-
-		if( _socket->mod == REQ_REP && _socket->is_server == 1) {
-			delete iter->second;
-			remoteQueueMap->erase(iter);
-			SemUtil::inc(_socket->slots);
-		}
-
-		  	
-	}
-	return 0;
-}
-
-int shm_recv(void* socket, void **buf, int *size) {
-	shm_socket_t * _socket = (shm_socket_t *) socket;
-	shm_msg_t src;
-	
-	std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
-	bool rv = _socket->queue->pop(src);
-	if (rv) {
-		if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) {
-		 if(_socket->mod == REQ_REP)
-		  	SemUtil::dec(_socket->slots);
-
-		  remoteQueueMap->insert({src.port,  new SHMQueue<shm_msg_t>(src.port, 0)});
-
-		  if(_socket->mod == REQ_REP)
-		  	SemUtil::inc(_socket->items);
-		}
-		
-		void * _buf = malloc(src.size);
-		memcpy(_buf, src.buf, src.size);
-		*buf = _buf;
-		*size = src.size;
-		mm_free(src.buf);
-		return 0;
-	} else {
-		return 1;
-	}
-}
-
-
-
-
- 
-
diff --git a/test_socket/dgram_mod_req_rep.c b/test_socket/dgram_mod_req_rep.c
index d35c6ea..9a021a1 100644
--- a/test_socket/dgram_mod_req_rep.c
+++ b/test_socket/dgram_mod_req_rep.c
@@ -163,8 +163,10 @@
     // int temp = shm_alloc_key();
     // printf("tmp=%d\n", temp);
     server(port);
-  } else if (strcmp("client", argv[1]) == 0) {
+  } else if (strcmp("mclient", argv[1]) == 0) {
     startClients(port);
+  } else if (strcmp("client", argv[1]) == 0) {
+    client(port);
   } else {
     printf("input invalidate arguments\n");
   }

--
Gitblit v1.8.0