From 4d5adf4ac44c864e67e8019bb97d89199bb0b4b7 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 06 八月 2020 17:06:27 +0800
Subject: [PATCH] fix survey
---
/dev/null | 229 ---------------------------------------------
src/queue/hashtable.c | 36 ++++++
test_socket/dgram_mod_req_rep.c | 4
src/queue/include/lock_free_queue.h | 12 +-
4 files changed, 42 insertions(+), 239 deletions(-)
diff --git a/src/queue/hashtable.c b/src/queue/hashtable.c
index 110b429..ed3fe34 100755
--- a/src/queue/hashtable.c
+++ b/src/queue/hashtable.c
@@ -20,6 +20,7 @@
typedef TAILQ_HEAD(tailq_header_t, tailq_entry_t) tailq_header_t;
+
static size_t hashcode(int key);
void hashtable_init(hashtable_t *hashtable )
@@ -30,6 +31,7 @@
hashtable->wlock = SemUtil::get(IPC_PRIVATE, 1);
hashtable->cond = SemUtil::get(IPC_PRIVATE, 1);
hashtable->readcnt = 0;
+
}
@@ -181,11 +183,17 @@
void *hashtable_get(hashtable_t *hashtable, int key) {
- SemUtil::dec(hashtable->mutex);
+ struct timespec timeout = {1, 0};
+ if (SemUtil::dec_timeout(hashtable->mutex, &timeout) != 0) {
+ SemUtil::inc(hashtable->mutex);
+ SemUtil::dec(hashtable->mutex);
+ }
+
hashtable->readcnt++;
if (hashtable->readcnt == 1) {
//鑾峰彇璇诲啓閿�
SemUtil::dec(hashtable->wlock);
+// err_msg(0, "hashtable_get dec %d %d\n", --hashtable->tmp);
}
SemUtil::inc(hashtable->mutex);
// ================
@@ -199,6 +207,7 @@
if(hashtable->readcnt == 0) {
//閲婃斁璇诲啓閿�
SemUtil::inc(hashtable->wlock);
+// err_msg(0, "hashtable_get inc %d\n", ++hashtable->tmp);
//閫氱煡鍐�
SemUtil::set(hashtable->cond, 1);
}
@@ -207,14 +216,22 @@
}
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
- SemUtil::dec(hashtable->mutex);
+ struct timespec timeout = {2, 0};
+ if (SemUtil::dec_timeout(hashtable->mutex, &timeout) != 0) {
+ SemUtil::inc(hashtable->mutex);
+ SemUtil::dec(hashtable->mutex);
+ }
// 璁剧疆璇讳紭鍏堢骇楂�
while (hashtable->readcnt > 0)
{
SemUtil::set(hashtable->cond, 0);
SemUtil::inc(hashtable->mutex);
//绛夊緟鍐欓�氱煡
- SemUtil::dec(hashtable->cond);
+ if (SemUtil::dec_timeout(hashtable->cond, &timeout) != 0) {
+ hashtable->readcnt = 0;
+ SemUtil::inc(hashtable->cond);
+ SemUtil::dec(hashtable->cond);
+ }
SemUtil::dec(hashtable->mutex);
@@ -222,11 +239,15 @@
SemUtil::inc(hashtable->mutex);
//鑾峰彇璇诲啓閿�
+
SemUtil::dec(hashtable->wlock);
+ // err_msg(0, "hashtable_put dec %d\n", --hashtable->tmp);
_hashtable_put(hashtable, key, value);
+
//閲婃斁璇诲啓閿�
SemUtil::inc(hashtable->wlock);
+// err_msg(0, "hashtable_put inc %d\n", ++hashtable->tmp);
}
@@ -295,13 +316,20 @@
int hashtable_alloc_key(hashtable_t *hashtable) {
int key = START_KEY;
- SemUtil::dec(hashtable->wlock);
+
+ struct timespec timeout = {1, 0};
+ if (SemUtil::dec_timeout(hashtable->wlock, &timeout) != 0) {
+ SemUtil::inc(hashtable->wlock);
+ SemUtil::dec(hashtable->wlock);
+ }
while(_hashtable_get(hashtable, key) != NULL) {
key++;
}
// 鍗犵敤key
_hashtable_put(hashtable, key, (void *)1);
+
SemUtil::inc(hashtable->wlock);
+// err_msg(0, "hashtable_alloc_key inc %d\n", ++hashtable->tmp);
return key;
}
diff --git a/src/queue/include/lock_free_queue.h b/src/queue/include/lock_free_queue.h
index 25a3392..8a65e81 100644
--- a/src/queue/include/lock_free_queue.h
+++ b/src/queue/include/lock_free_queue.h
@@ -200,7 +200,7 @@
template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
-// printf("==================LockFreeQueue push before\n");
+ // printf("==================LockFreeQueue push before\n");
if (SemUtil::dec(slots) == -1) {
err_msg(errno, "LockFreeQueue push");
return false;
@@ -209,7 +209,7 @@
if ( m_qImpl.push(a_data) ) {
SemUtil::inc(items);
-// printf("==================LockFreeQueue push after\n");
+ // printf("==================LockFreeQueue push after\n");
return true;
}
return false;
@@ -274,7 +274,7 @@
template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
-// printf("==================LockFreeQueue pop before\n");
+ // printf("==================LockFreeQueue pop before\n");
if (SemUtil::dec(items) == -1) {
err_msg(errno, "LockFreeQueue pop");
return false;
@@ -282,7 +282,7 @@
if (m_qImpl.pop(a_data)) {
SemUtil::inc(slots);
-// printf("==================LockFreeQueue pop after\n");
+ // printf("==================LockFreeQueue pop after\n");
return true;
}
return false;
@@ -319,6 +319,7 @@
template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
{
+// printf("==================LockFreeQueue pop_timeout before\n");
if (SemUtil::dec_timeout(items, timeout) == -1) {
if (errno == EAGAIN)
return false;
@@ -329,7 +330,8 @@
}
if (m_qImpl.pop(a_data)) {
- SemUtil::inc(slots);
+ SemUtil::inc(slots);
+// printf("==================LockFreeQueue pop_timeout after\n");
return true;
}
return false;
diff --git a/src/queue/socket.c.bk b/src/queue/socket.c.bk
deleted file mode 100644
index d240907..0000000
--- a/src/queue/socket.c.bk
+++ /dev/null
@@ -1,229 +0,0 @@
-#include "usg_common.h"
-#include "usg_typedef.h"
-#include "shm_queue.h"
-#include "shm_allocator.h"
-
-#include "mem_pool.h"
-#include "hashtable.h"
-#include "sem_util.h"
-#include "socket.h"
-#include <map>
-
-
-enum shm_msg_type_t
-{
- SHM_SOCKET_OPEN = 1,
- SHM_SOKET_CLOSE = 2,
- SHM_COMMON_MSG = 3
-
-};
-
-typedef struct shm_msg_t {
- int port;
- shm_msg_type_t type;
- size_t size;
- void * buf;
-
-} shm_msg_t;
-
-typedef struct shm_socket_t {
- int port;
- shm_mod_t mod;
- SHMQueue<shm_msg_t> *queue;
- SHMQueue<shm_msg_t> *remoteQueue;
- // std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap;
- int slots;
- int items;
- int is_server;
-
-} shm_socket_t;
-
-
-
-void shm_init(int size) {
- mem_pool_init(size);
-}
-
-void shm_destroy() {
- mem_pool_destroy();
-}
-
-void shm_free(void *buf) {
- free(buf);
-}
-
-void *shm_open_socket(int mod) {
- shm_socket_t *socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
- socket->remoteQueueMap = new std::map<int, SHMQueue<shm_msg_t>* >;
- socket->port = -1;
- socket->mod = (shm_mod_t)mod;
-printf("mod===%d\n", socket->mod);
- socket->is_server = 0;
- if (mod == REQ_REP) {
- socket->slots = SemUtil::get(IPC_PRIVATE, 1);
- socket->items = SemUtil::get(IPC_PRIVATE, 0);
- }
-
- return (void *)socket;
-}
-
-
-int shm_close_socket(void *socket) {
- shm_socket_t * _socket = (shm_socket_t *) socket;
- delete _socket->queue;
-
- std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
- for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) {
- delete iter->second;
- }
- delete _socket->remoteQueueMap;
-
- if (_socket->mod == REQ_REP) {
- SemUtil::remove(_socket->slots);
- SemUtil::remove(_socket->items);
- }
-
- free(socket);
- return 0;
-
-}
-
-
-int shm_bind(void* socket, int port) {
- shm_socket_t * _socket = (shm_socket_t *) socket;
- _socket -> port = port;
- return 0;
-}
-
-int shm_listen(void* socket) {
- shm_socket_t * _socket = (shm_socket_t *) socket;
- _socket->is_server = 1;
- int port;
- hashtable_t *hashtable = mm_get_hashtable();
- if(_socket -> port == -1) {
- port = hashtable_alloc_key(hashtable);
- _socket -> port = port;
- } else {
-
- if(hashtable_get(hashtable, _socket->port)!= NULL) {
- err_exit(0, "key %d has already been in used!", _socket->port);
- }
- }
-
- _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16);
- return 0;
-}
-
-
-static int __shm_rev__(shm_socket_t* _socket) {
- shm_msg_t src;
-
- std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
- bool rv = _socket->queue->pop(src);
-
- if (rv) {
- if(src.type == SHM_SOCKET_OPEN) {
- _socket->remoteQueue = new SHMQueue<shm_msg_t>(src.port, 0);
- }
-
-
-
-
-
- void * _buf = malloc(src.size);
- memcpy(_buf, src.buf, src.size);
- *buf = _buf;
- *size = src.size;
- mm_free(src.buf);
- return 0;
- } else {
- return 1;
- }
-}
-
-int shm_connect(void* socket, int port) {
- shm_socket_t * _socket = (shm_socket_t *) socket;
- hashtable_t *hashtable = mm_get_hashtable();
- if(hashtable_get(hashtable, port)== NULL) {
- err_exit(0, "shm_connect锛歝onnect at port %d failed!", port);
- }
- if(_socket -> port == -1) {
- _socket -> port = hashtable_alloc_key(hashtable);
- } else {
-
- if(hashtable_get(hashtable, _socket->port)!= NULL) {
- err_exit(0, "key %d has already been in used!", _socket->port);
- }
- }
-
- _socket->queue = new SHMQueue<shm_msg_t>(_socket->port, 16);
- _socket->remoteQueueMap->insert({port, new SHMQueue<shm_msg_t>(port, 0)});
- return 0;
-}
-
-int shm_send(void *socket, void *buf, int size) {
- shm_socket_t * _socket = (shm_socket_t *) socket;
- hashtable_t *hashtable = mm_get_hashtable();
- shm_msg_t dest;
- dest.port = _socket->port;
- dest.size = size;
- dest.buf = mm_malloc(size);
- memcpy(dest.buf, buf, size);
-
- std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
- for(auto iter = remoteQueueMap->begin(); iter != remoteQueueMap->end(); iter++) {
- if(hashtable_get(hashtable, iter->first)== NULL) {
- err_msg(0, "shm_send锛歝onnect at port %d failed, the other part has been closed!", iter->first);
- delete iter->second;
- remoteQueueMap->erase(iter);
- continue;
- }
- if(_socket->mod == REQ_REP && _socket->is_server == 1)
- SemUtil::dec(_socket->items);
-
- iter->second->push(dest);
-
- if( _socket->mod == REQ_REP && _socket->is_server == 1) {
- delete iter->second;
- remoteQueueMap->erase(iter);
- SemUtil::inc(_socket->slots);
- }
-
-
- }
- return 0;
-}
-
-int shm_recv(void* socket, void **buf, int *size) {
- shm_socket_t * _socket = (shm_socket_t *) socket;
- shm_msg_t src;
-
- std::map<int, SHMQueue<shm_msg_t>* > *remoteQueueMap = _socket->remoteQueueMap;
- bool rv = _socket->queue->pop(src);
- if (rv) {
- if( _socket->is_server == 1 && remoteQueueMap->find(src.port) == remoteQueueMap->end()) {
- if(_socket->mod == REQ_REP)
- SemUtil::dec(_socket->slots);
-
- remoteQueueMap->insert({src.port, new SHMQueue<shm_msg_t>(src.port, 0)});
-
- if(_socket->mod == REQ_REP)
- SemUtil::inc(_socket->items);
- }
-
- void * _buf = malloc(src.size);
- memcpy(_buf, src.buf, src.size);
- *buf = _buf;
- *size = src.size;
- mm_free(src.buf);
- return 0;
- } else {
- return 1;
- }
-}
-
-
-
-
-
-
diff --git a/test_socket/dgram_mod_req_rep.c b/test_socket/dgram_mod_req_rep.c
index d35c6ea..9a021a1 100644
--- a/test_socket/dgram_mod_req_rep.c
+++ b/test_socket/dgram_mod_req_rep.c
@@ -163,8 +163,10 @@
// int temp = shm_alloc_key();
// printf("tmp=%d\n", temp);
server(port);
- } else if (strcmp("client", argv[1]) == 0) {
+ } else if (strcmp("mclient", argv[1]) == 0) {
startClients(port);
+ } else if (strcmp("client", argv[1]) == 0) {
+ client(port);
} else {
printf("input invalidate arguments\n");
}
--
Gitblit v1.8.0