From c813f2bf58edb8b3760f776052a5f708a952ba52 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 28 一月 2021 17:25:59 +0800
Subject: [PATCH] update
---
src/queue/lock_free_queue.h | 152 +++++++++++++++++++-------------------
test_net_socket/test_net_mod_socket.cpp | 79 +++++++++++++------
test_net_socket/net_mod_socket.sh | 2
3 files changed, 131 insertions(+), 102 deletions(-)
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 56eac66..ef6a893 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -72,9 +72,9 @@
/// ArrayLockFreeQueue are supported (single producer
/// by default)
template<
- typename ELEM_T,
- typename Allocator = SHM_Allocator,
- template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
+ typename ELEM_T,
+ typename Allocator = SHM_Allocator,
+ template<typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
>
class LockFreeQueue {
@@ -148,67 +148,67 @@
template<
- typename ELEM_T,
- typename Allocator,
- template<typename T, typename AT> class Q_TYPE>
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) {
- // std::cout << "LockFreeQueue init reference=" << reference << std::endl;
- if (sem_init(&slots, 1, qsize) == -1)
- err_exit(errno, "LockFreeQueue sem_init");
- if (sem_init(&items, 1, 0) == -1)
- err_exit(errno, "LockFreeQueue sem_init");
- if (sem_init(&mutex, 1, 1) == -1)
- err_exit(errno, "LockFreeQueue sem_init");
+ // std::cout << "LockFreeQueue init reference=" << reference << std::endl;
+ if (sem_init(&slots, 1, qsize) == -1)
+ err_exit(errno, "LockFreeQueue sem_init");
+ if (sem_init(&items, 1, 0) == -1)
+ err_exit(errno, "LockFreeQueue sem_init");
+ if (sem_init(&mutex, 1, 1) == -1)
+ err_exit(errno, "LockFreeQueue sem_init");
}
template<
- typename ELEM_T,
- typename Allocator,
- template<typename T, typename AT> class Q_TYPE>
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
- // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
- if (sem_destroy(&slots) == -1) {
- err_exit(errno, "LockFreeQueue sem_destroy");
- }
- if (sem_destroy(&items) == -1) {
- err_exit(errno, "LockFreeQueue sem_destroy");
- }
- if (sem_destroy(&mutex) == -1) {
- err_exit(errno, "LockFreeQueue sem_destroy");
- }
+ // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
+ if (sem_destroy(&slots) == -1) {
+ err_exit(errno, "LockFreeQueue sem_destroy");
+ }
+ if (sem_destroy(&items) == -1) {
+ err_exit(errno, "LockFreeQueue sem_destroy");
+ }
+ if (sem_destroy(&mutex) == -1) {
+ err_exit(errno, "LockFreeQueue sem_destroy");
+ }
}
template<
- typename ELEM_T,
- typename Allocator,
- template<typename T, typename AT> class Q_TYPE>
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() {
- return m_qImpl.size();
+ return m_qImpl.size();
}
template<
- typename ELEM_T,
- typename Allocator,
- template<typename T, typename AT> class Q_TYPE>
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() {
- return m_qImpl.full();
+ return m_qImpl.full();
}
template<
- typename ELEM_T,
- typename Allocator,
- template<typename T, typename AT> class Q_TYPE>
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() {
- return m_qImpl.empty();
+ return m_qImpl.empty();
}
template<typename ELEM_T,
- typename Allocator,
- template<typename T, typename AT> class Q_TYPE>
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
// sigset_t mask_all, pre;
@@ -217,17 +217,17 @@
// sigprocmask(SIG_BLOCK, &mask_all, &pre);
if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
- if (psem_trywait(&slots) == -1) {
- goto LABEL_FAILTURE;
- }
+ if (psem_trywait(&slots) == -1) {
+ goto LABEL_FAILTURE;
+ }
} else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
- if (psem_timedwait(&slots, timeout) == -1) {
- goto LABEL_FAILTURE;
- }
+ if (psem_timedwait(&slots, timeout) == -1) {
+ goto LABEL_FAILTURE;
+ }
} else {
- if (psem_wait(&slots) == -1) {
- goto LABEL_FAILTURE;
- }
+ if (psem_wait(&slots) == -1) {
+ goto LABEL_FAILTURE;
+ }
}
@@ -237,15 +237,15 @@
LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
return 0;
}
-
-LABEL_FAILTURE:
+
+ LABEL_FAILTURE:
// sigprocmask(SIG_SETMASK, &pre, NULL);
return errno;
}
template<typename ELEM_T,
- typename Allocator,
- template<typename T, typename AT> class Q_TYPE>
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before....");
@@ -255,53 +255,53 @@
// sigprocmask(SIG_BLOCK, &mask_all, &pre);
if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
- if (psem_trywait(&items) == -1) {
- goto LABEL_FAILTURE;
- }
+ if (psem_trywait(&items) == -1) {
+ goto LABEL_FAILTURE;
+ }
} else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before. flag=%d , %d\n", flag, timeout->tv_sec);
- if (psem_timedwait(&items, timeout) == -1) {
- goto LABEL_FAILTURE;
- }
+ if (psem_timedwait(&items, timeout) == -1) {
+ goto LABEL_FAILTURE;
+ }
} else {
- if (psem_wait(&items) == -1) {
- goto LABEL_FAILTURE;
- }
+ if (psem_wait(&items) == -1) {
+ goto LABEL_FAILTURE;
+ }
}
if (m_qImpl.pop(a_data)) {
- psem_post(&slots);
- // sigprocmask(SIG_SETMASK, &pre, NULL);
- // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
- return 0;
+ psem_post(&slots);
+ // sigprocmask(SIG_SETMASK, &pre, NULL);
+ // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
+ return 0;
}
-LABEL_FAILTURE:
+ LABEL_FAILTURE:
// sigprocmask(SIG_SETMASK, &pre, NULL);
return errno;
}
template<typename ELEM_T,
- typename Allocator,
- template<typename T, typename AT> class Q_TYPE>
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
ELEM_T &LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
- return m_qImpl.operator[](i);
+ return m_qImpl.operator[](i);
}
template<typename ELEM_T,
- typename Allocator,
- template<typename T, typename AT> class Q_TYPE>
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
void *LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size) {
- return Allocator::allocate(size);
+ return Allocator::allocate(size);
}
template<typename ELEM_T,
- typename Allocator,
- template<typename T, typename AT> class Q_TYPE>
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
- return Allocator::deallocate(p);
+ return Allocator::deallocate(p);
}
// include implementation files
diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh
index 39b54cd..4b41abc 100755
--- a/test_net_socket/net_mod_socket.sh
+++ b/test_net_socket/net_mod_socket.sh
@@ -27,7 +27,7 @@
# 鏃犻檺寰幆send
function send() {
./test_net_mod_socket --fun="test_net_sendandrecv" \
- --sendlist=" :5000:100, :5000:100"
+ --sendlist="localhost:5000:100, localhost:5000:100"
}
# 澶氱嚎绋媠end
diff --git a/test_net_socket/test_net_mod_socket.cpp b/test_net_socket/test_net_mod_socket.cpp
index 7672213..74b9258 100644
--- a/test_net_socket/test_net_mod_socket.cpp
+++ b/test_net_socket/test_net_mod_socket.cpp
@@ -1,3 +1,4 @@
+#include <assert.h>
#include "net_mod_server_socket_wrapper.h"
#include "net_mod_socket_wrapper.h"
#include "bus_server_socket_wrapper.h"
@@ -11,7 +12,7 @@
typedef struct Targ {
char *nodelist;
- int id;
+ long id;
}Targ;
@@ -142,7 +143,7 @@
int remote_port;
while ( (rv = net_mod_socket_recvfrom(client, &recvbuf, &size, &remote_port) ) == 0) {
// printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
- sprintf(sendbuf, "RECEIVED锛� %s", recvbuf);
+ sprintf(sendbuf, "%s", recvbuf);
net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port);
free(recvbuf);
}
@@ -246,9 +247,9 @@
void *_run_sendandrecv_(void *arg) {
Targ *targ = (Targ *)arg;
- char sendbuf[512];
+ char sendbuf[128];
- int i,j, n;
+ int j, n;
int recv_arr_size;
net_mod_recv_msg_t *recv_arr;
int total = 0;
@@ -257,6 +258,10 @@
net_node_t *node_arr;
int node_arr_size = parse_node_list(targ->nodelist, &node_arr);
+ long rtid;
+ unsigned int l = 0 , rl;
+ const char *hello_format = "%ld say Hello %d";
+
char filename[512];
sprintf(filename, "test%d.tmp", targ->id);
@@ -266,19 +271,27 @@
int recvsize;
void *recvbuf;
- for (i = 0; i < SCALE; i++) {
- sprintf(sendbuf, "thread(%d) %d", targ->id, i);
- fprintf(fp, "requst:%s\n", sendbuf);
+ for (l = 0; l < SCALE; l++) {
+ sprintf(sendbuf, hello_format, targ->id, l);
+ // fprintf(fp, "requst:%s\n", sendbuf);
// n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
- printf("%d: send %d nodes\n", i, n);
+ printf("%d: send %d nodes\n", l, n);
for(j=0; j < recv_arr_size; j++) {
- fprintf(fp, "reply from (host:%s, port: %d, key:%d) >> %s\n",
- recv_arr[j].host,
- recv_arr[j].port,
- recv_arr[j].key,
- recv_arr[j].content
- );
+
+ fprintf(fp, "%ld send '%s'. received '%s' from (host:%s, port: %d, key:%d) \n",
+ targ->id,
+ sendbuf,
+ recv_arr[j].content,
+ recv_arr[j].host,
+ recv_arr[j].port,
+ recv_arr[j].key
+
+ );
+
+ assert(sscanf((const char *)recv_arr[j].content, hello_format, &rtid, &rl) == 2);
+ assert(rtid == targ->id);
+ assert(rl == l);
}
// 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
@@ -332,30 +345,46 @@
// 鏃犻檺寰幆send
void test_net_sendandrecv(char *nodelist) {
- int n, i;
+ int n, j;
void * client;
int recv_arr_size;
net_mod_recv_msg_t *recv_arr;
net_node_t *node_arr;
int node_arr_size = parse_node_list(nodelist, &node_arr);
- char content[128];
+ char buf[128];
+ pid_t pid, rpid ;
+ unsigned int l , rl;
+ const char *hello_format = "%ld say Hello %u";
- sprintf(content, "pid:%ld say Hello!!", (long)getpid());
+ pid = getpid();
+ l = 0;
+
client = net_mod_socket_open();
while(true) {
- n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1000);
+ sprintf(buf, hello_format, (long)pid, l);
+ n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1,
+ &recv_arr, &recv_arr_size, 1000);
printf(" %d nodes reply\n", n);
- for(i=0; i<recv_arr_size; i++) {
- LoggerFactory::getLogger()->debug("%ld received reply from (host:%s, port: %d, key:%d) >> %s\n", (long)getpid(),
- recv_arr[i].host,
- recv_arr[i].port,
- recv_arr[i].key,
- recv_arr[i].content
+ for(j = 0; j < recv_arr_size; j++) {
+
+ LoggerFactory::getLogger()->debug("%ld send '%s'. received '%s' from (host:%s, port: %d, key:%d) \n",
+ (long)pid,
+ buf,
+ recv_arr[j].content,
+ recv_arr[j].host,
+ recv_arr[j].port,
+ recv_arr[j].key
+
);
+
+ assert(sscanf((const char *)recv_arr[j].content, hello_format, &rpid, &rl) == 2);
+ assert(rpid == pid);
+ assert(rl == l);
}
// 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ l++;
}
net_mod_socket_close(client);
@@ -381,7 +410,7 @@
for (i = 0; i < SCALE; i++) {
- sprintf(sendbuf, "thread(%d) %d", targ->id, i);
+ sprintf(sendbuf, "thread(%ld) %d", targ->id, i);
n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1);
// n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
--
Gitblit v1.8.0