From 1b94589dcb8d497d2d8a208efd61a54631f6b84e Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期三, 23 十二月 2020 16:08:33 +0800
Subject: [PATCH] update
---
src/socket/net_mod_socket.c | 60 +++++++++--
src/socket/shm_socket.c | 45 ++++++---
test_net_socket/test_net_mod_socket.c | 6
src/queue/lock_free_queue.h | 22 ++-
src/socket/bus_server_socket.c | 36 +++---
src/socket/bus_server_socket_wrapper.c | 9 +
test_net_socket/test_bus_stop.c | 54 ++++++++++
test_net_socket/Makefile | 3
src/socket/net_mod_server_socket.c | 2
test_net_socket/heart_beat.c | 41 +++++--
10 files changed, 202 insertions(+), 76 deletions(-)
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index ee11da6..84c885c 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -11,6 +11,7 @@
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
+// static Logger *logger = LoggerFactory::getLogger();
// define this macro if calls to "size" must return the real size of the
// queue. If it is undefined that function will try to take a snapshot of
// the queue, but returned value might be bogus
@@ -200,7 +201,7 @@
template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
- // printf("==================LockFreeQueue push before\n");
+LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
if (SemUtil::dec(slots) == -1) {
err_msg(errno, "LockFreeQueue push");
return false;
@@ -209,7 +210,7 @@
if ( m_qImpl.push(a_data) ) {
SemUtil::inc(items);
- // printf("==================LockFreeQueue push after\n");
+LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
return true;
}
return false;
@@ -247,18 +248,19 @@
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout)
{
-
+LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
if (SemUtil::dec_timeout(slots, timeout) == -1) {
if (errno == EAGAIN)
return false;
else {
- // err_msg(errno, "LockFreeQueue push_timeout");
+ err_msg(errno, "LockFreeQueue push_timeout");
return false;
}
}
if (m_qImpl.push(a_data)){
- SemUtil::inc(items);
+ SemUtil::inc(items);
+LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
return true;
}
return false;
@@ -274,7 +276,8 @@
template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
- // printf("==================LockFreeQueue pop before\n");
+
+LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
if (SemUtil::dec(items) == -1) {
err_msg(errno, "LockFreeQueue pop");
return false;
@@ -282,7 +285,7 @@
if (m_qImpl.pop(a_data)) {
SemUtil::inc(slots);
- // printf("==================LockFreeQueue pop after\n");
+LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
return true;
}
return false;
@@ -319,7 +322,7 @@
template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
{
-// printf("==================LockFreeQueue pop_timeout before\n");
+LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
if (SemUtil::dec_timeout(items, timeout) == -1) {
if (errno == EAGAIN)
return false;
@@ -331,7 +334,7 @@
if (m_qImpl.pop(a_data)) {
SemUtil::inc(slots);
-// printf("==================LockFreeQueue pop_timeout after\n");
+LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");
return true;
}
return false;
@@ -346,6 +349,7 @@
return m_qImpl.operator[](i);
}
+
template <
typename ELEM_T,
typename Allocator,
diff --git a/src/socket/bus_server_socket.c b/src/socket/bus_server_socket.c
index 8c50d37..0f4e52e 100644
--- a/src/socket/bus_server_socket.c
+++ b/src/socket/bus_server_socket.c
@@ -60,15 +60,19 @@
BusServerSocket::BusServerSocket() {
+ logger->debug("BusServerSocket Init");
shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
topic_sub_map = NULL;
+
}
BusServerSocket::~BusServerSocket() {
SHMKeySet *subscripter_set;
SHMTopicSubMap::iterator map_iter;
+ logger->debug("BusServerSocket destory 1");
stop();
+ logger->debug("BusServerSocket destory 2");
if(topic_sub_map != NULL) {
for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
@@ -83,6 +87,7 @@
mem_pool_free_by_key(BUS_MAP_KEY);
}
shm_close_socket(shm_socket);
+ logger->debug("BusServerSocket destory 3");
}
@@ -109,14 +114,13 @@
run_pubsub_proxy();
// 杩涚▼鍋滄鐨勬椂鍊欙紝棰勭暀3绉掕祫婧愬洖鏀剁殑鏃堕棿銆傚惁鍒欙紝浼氬彂鐢熻皟鐢╟lose鐨勬椂鍊欙紝鍏变韩鍐呭瓨鐨勮祫婧愯繕娌℃潵寰楀強鍥炴敹杩涚▼灏遍��鍑轰簡
- sleep(3);
return 0;
}
int BusServerSocket::stop(){
int ret;
-
+ logger->debug("====>stopping");
if( shm_socket->key <= 0) {
return -1;
}
@@ -127,15 +131,11 @@
head.topic_size = 0;
head.content_size = 0;
- void *recv_buf;
- int recv_size;
-
void *buf;
int size = ShmModSocket::get_bus_sendbuf(head, NULL, 0, NULL, 0, &buf);
if(size > 0) {
- ret = shm_sendandrecv(shm_socket, buf, size, shm_socket->key, &recv_buf, &recv_size);
+ ret = shm_sendandrecv_unsafe(shm_socket, buf, size, shm_socket->key, NULL, NULL);
free(buf);
- free(recv_buf);
return ret;
} else {
return -1;
@@ -260,7 +260,8 @@
topic = strtok(NULL, topic_delim);
}
- } else if(strcmp(action, "desub") == 0) {
+ }
+ else if(strcmp(action, "desub") == 0) {
// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
if(strcmp(trim(topics, 0), "") == 0) {
// 鍙栨秷鎵�鏈夎闃�
@@ -274,27 +275,26 @@
}
}
- } else if(strcmp(action, "pub") == 0) {
+ }
+ else if(strcmp(action, "pub") == 0) {
content = topics + head.topic_size;
_proxy_pub(topics, content, head.content_size, key);
- } else if(strcmp(action, "stop") == 0) {
- logger->info( "Stopping Bus...");
- // snprintf(resp_buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
- shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
+ }
+ else if(strcmp(action, "stop") == 0) {
+
free(buf);
break;
} else {
logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action %s", action);
}
- // free(action);
- // free(topics);
- // } else {
- // logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
- // }
free(buf);
}
+
+ logger->info( "Stopping Bus...");
+ shm_sendto(shm_socket, "stop_finished", strlen( "stop_finished") +1, key);
+
return NULL;
}
diff --git a/src/socket/bus_server_socket_wrapper.c b/src/socket/bus_server_socket_wrapper.c
index 5c793c1..220461e 100644
--- a/src/socket/bus_server_socket_wrapper.c
+++ b/src/socket/bus_server_socket_wrapper.c
@@ -7,7 +7,7 @@
* 鍒涘缓
*/
void * bus_server_socket_wrapper_open() {
- printf("===bus_server_socket_wrapper_open\n");
+ logger->debug("===bus_server_socket_wrapper_open\n");
BusServerSocket *sockt = new BusServerSocket;
return (void *)sockt;
}
@@ -16,9 +16,10 @@
* 鍏抽棴
*/
void bus_server_socket_wrapper_close(void *_socket) {
- printf("===bus_server_socket_wrapper_close\n");
- BusServerSocket *sockt = (BusServerSocket *)_socket;
- delete sockt;
+
+ // BusServerSocket *sockt = (BusServerSocket *)_socket;
+ //delete sockt;
+ logger->debug("===bus_server_socket_wrapper_close\n");
}
/**
diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c
index 2fb70e8..46c3bce 100644
--- a/src/socket/net_mod_server_socket.c
+++ b/src/socket/net_mod_server_socket.c
@@ -168,9 +168,7 @@
if(request_head.timeout > 0) {
timeout.tv_sec = request_head.timeout / 1000;
timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6;
-
// printf(" timeout.tv_sec = %d, timeout.tv_nsec=%ld\n", timeout.tv_sec, timeout.tv_nsec );
-
ret = shmModSocket.sendandrecv_unsafe_timeout(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size, &timeout);
}
else if(request_head.timeout == 0) {
diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c
index 48b0e7c..fb5003e 100644
--- a/src/socket/net_mod_socket.c
+++ b/src/socket/net_mod_socket.c
@@ -86,15 +86,15 @@
int i, n, recv_size, connfd;
net_node_t *node;
void *recv_buf = NULL;
+ struct timespec timeout;
+ int ret;
+ int n_req = 0, n_recv_suc = 0, n_resp =0;
net_mod_request_head_t request_head = {};
-
- int n_req = 0, n_recv_suc = 0, n_resp =0;
-
net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
- int ret;
+
NetConnPool *mpool;
/* Make first caller allocate key for thread-specific data */
@@ -131,7 +131,17 @@
node = &node_arr[i];
if(node->host == NULL || strcmp(node->host, "") == 0 ) {
// 鏈湴鍙戦��
- if(shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size) == 0) {
+
+ if(msec == 0) {
+ ret = shmModSocket.sendandrecv_nowait(send_buf, send_size, node->key, &recv_buf, &recv_size);
+ } else if(msec > 0){
+ timeout.tv_sec = msec / 1000;
+ timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
+ ret = shmModSocket.sendandrecv_timeout(send_buf, send_size, node->key, &recv_buf, &recv_size, &timeout);
+ } else {
+ ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
+ }
+ if( ret == 0) {
strcpy( ret_arr[n_recv_suc].host,"");
ret_arr[n_recv_suc].port = 0;
ret_arr[n_recv_suc].key = node->key;
@@ -229,7 +239,12 @@
mpool->maxi = -1;
- *recv_arr = ret_arr;
+ if(recv_arr != NULL) {
+ *recv_arr = ret_arr;
+ } else {
+ free_recv_msg_arr(ret_arr, n_recv_suc);
+ }
+
if(recv_arr_size != NULL) {
*recv_arr_size = n_recv_suc;
}
@@ -264,9 +279,10 @@
// int pub(char *topic, int topic_size, void *content, int content_size, int port);
int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content,
- int content_size, int timeout) {
+ int content_size, int msec) {
int i, connfd;
net_node_t *node;
+ struct timespec timeout;
net_mod_request_head_t request_head;
net_mod_recv_msg_t recv_msg;
@@ -302,7 +318,16 @@
// 鏈湴鍙戦��
if(node_arr == NULL || arrlen == 0) {
- if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
+ if(msec == 0) {
+ ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
+ } else if(msec > 0) {
+ timeout.tv_sec = msec / 1000;
+ timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
+ ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
+ } else {
+ ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
+ }
+ if(ret == 0 ) {
n_pub_suc++;
}
}
@@ -312,9 +337,20 @@
node = &node_arr[i];
if(node->host == NULL) {
// 鏈湴鍙戦��
- if(shmModSocket.pub(topic, topic_size, content, content_size, node->key) == 0 ) {
- n_pub_suc++;
+ if(msec == 0) {
+ ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
+ } else if(msec > 0) {
+ timeout.tv_sec = msec / 1000;
+ timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
+ ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
+ } else {
+ ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
}
+
+ if(ret == 0 ) {
+ n_pub_suc++;
+ }
+
} else {
sprintf(portstr, "%d", node->port);
@@ -326,7 +362,7 @@
request_head.key = node->key;
request_head.content_length = content_size;
request_head.topic_length = strlen(topic) + 1;
- request_head.timeout = timeout;
+ request_head.timeout = msec;
if(write_request(connfd, request_head, content, content_size, topic, request_head.topic_length) != 0) {
LoggerFactory::getLogger()->error(" NetModSocket::_pub_ write_request failture %s:%d\n", node->host, node->port);
@@ -341,7 +377,7 @@
while(n_resp < n_req)
{
/* Wait for listening/connected descriptor(s) to become ready */
- if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, timeout) ) <= 0) {
+ if( (mpool->nready = poll(mpool->conns, mpool->maxi + 1, msec) ) <= 0) {
// wirite_set 鍜� read_set 鍦ㄦ寚瀹氭椂闂村唴閮芥病鍑嗗濂�
break;
}
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index efb3ef7..c1ac3c8 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -45,6 +45,7 @@
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
+ logger->debug("shm_open_socket\n");
shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
socket->socket_type = socket_type;
socket->key = -1;
@@ -52,11 +53,11 @@
socket->dispatch_thread = 0;
socket->status = SHM_CONN_CLOSED;
socket->mutex = SemUtil::get(IPC_PRIVATE, 1);
- logger->debug("shm_open_socket\n");
+
return socket;
}
-static int _shm_close_socket(shm_socket_t *socket) {
+int shm_close_socket(shm_socket_t *socket) {
int ret;
@@ -76,12 +77,12 @@
return ret;
}
-int shm_close_socket(shm_socket_t *socket) {
+// int shm_close_socket(shm_socket_t *socket) {
- // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_));
+// // _destrory_tmp_recv_socket_((shm_socket_t *)pthread_getspecific(_tmp_recv_socket_key_));
- return _shm_close_socket(socket);;
-}
+// return shm_close_socket(socket);;
+// }
int shm_socket_bind(shm_socket_t *socket, int key) {
socket->key = key;
@@ -391,11 +392,18 @@
}
if (rv) {
- void *_buf = malloc(src.size);
- memcpy(_buf, src.buf, src.size);
- *buf = _buf;
- *size = src.size;
- *key = src.key;
+ if(buf != NULL) {
+ void *_buf = malloc(src.size);
+ memcpy(_buf, src.buf, src.size);
+ *buf = _buf;
+ }
+
+ if(size != NULL)
+ *size = src.size;
+
+ if(key != NULL)
+ *key = src.key;
+
mm_free(src.buf);
// printf("shm_recvfrom pop after\n");
return 0;
@@ -411,12 +419,13 @@
int rv;
if(tmp_socket == NULL)
return;
+
logger->debug("%d destroy tmp socket\n", pthread_self());
- _shm_close_socket((shm_socket_t *)tmp_socket);
+ shm_close_socket((shm_socket_t *)tmp_socket);
rv = pthread_setspecific(_tmp_recv_socket_key_, NULL);
if ( rv != 0) {
- logger->error(rv, "shm_sendandrecv : pthread_setspecific");
- exit(1);
+ logger->error(rv, "shm_sendandrecv : pthread_setspecific");
+ exit(1);
}
}
@@ -438,7 +447,7 @@
-int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
+int shm_sendandrecv_safe(shm_socket_t *socket, const void *send_buf,
const int send_size, const int send_key, void **recv_buf,
int *recv_size, struct timespec *timeout, int flags) {
int recv_key;
@@ -508,6 +517,12 @@
return -1;
}
+int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
+ const int send_size, const int send_key, void **recv_buf,
+ int *recv_size, struct timespec *timeout, int flags) {
+ return shm_sendandrecv_unsafe(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout, flags);
+}
+
// ============================================================================================================
/**
diff --git a/test_net_socket/Makefile b/test_net_socket/Makefile
index 0d8505f..832a130 100644
--- a/test_net_socket/Makefile
+++ b/test_net_socket/Makefile
@@ -14,8 +14,7 @@
#-I$(ROOT)/include/usgcommon
INCLUDES += -I${ROOT}/src -I${ROOT}/src/queue -I${ROOT}/src/socket -I${ROOT}/src/common/include -I${ROOT}/include/usgcommon
-
-PROGS = ${DEST}/test_net_mod_socket
+PROGS = ${DEST}/test_net_mod_socket ${DEST}/test_bus_stop ${DEST}/heart_beat
DEPENDENCES = $(patsubst %, %.d, $(PROGS))
diff --git a/test_socket/dgram_mod_survey.c b/test_net_socket/heart_beat.c
similarity index 69%
rename from test_socket/dgram_mod_survey.c
rename to test_net_socket/heart_beat.c
index da3260f..562cb23 100644
--- a/test_socket/dgram_mod_survey.c
+++ b/test_net_socket/heart_beat.c
@@ -1,6 +1,10 @@
-#include "dgram_mod_socket.h"
+#include "net_mod_server_socket_wrapper.h"
+#include "net_mod_socket_wrapper.h"
+#include "bus_server_socket_wrapper.h"
+
#include "shm_mm_wraper.h"
#include "usg_common.h"
+#include <getopt.h>
typedef struct Targ {
@@ -10,43 +14,50 @@
}Targ;
void sigint_handler(int sig) {
- //dgram_mod_close_socket(server_socket);
+ // net_mod_socket_close(server_socket);
printf("===Catch sigint======================\n");
shm_mm_wrapper_destroy();
exit(0);
}
void server(int port) {
- void *socket = dgram_mod_open_socket();
- dgram_mod_bind(socket, port);
+ void *serv = net_mod_socket_open();
+ net_mod_socket_bind(serv, port);
int size;
void *recvbuf;
char sendbuf[512];
int rv;
int remote_port;
while (true) {
- if ((rv = dgram_mod_recvfrom_timeout(socket, &recvbuf, &size, &remote_port, 15, 0) ) == 0) {
+ if ((rv = net_mod_socket_recvfrom(serv, &recvbuf, &size, &remote_port) ) == 0) {
printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
+ net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
free(recvbuf);
}
}
- dgram_mod_close_socket(socket);
+ net_mod_socket_close(serv);
}
void client(int port) {
- void *socket = dgram_mod_open_socket();
+ int rv;
+ void *client = net_mod_socket_open();
int size;
char sendbuf[512];
long i = 0;
+ net_node_t node_arr[] = {"", 0, 100};
+ int node_arr_size = 1;
+
+ int recv_arr_size;
+ net_mod_recv_msg_t *recv_arr;
while (true) {
sprintf(sendbuf, "%d", i);
printf("SEND HEART:%s\n", sendbuf);
- dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
+ rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
// sleep(1);
i++;
}
- dgram_mod_close_socket(socket);
+ net_mod_socket_close(client);
}
@@ -54,20 +65,26 @@
signal(SIGINT, sigint_handler);
Targ *targ = (Targ *)arg;
int port = targ->port;
- void *socket = dgram_mod_open_socket();
+ void *socket = net_mod_socket_open();
int size;
char sendbuf[512];
long scale = 10;
long i = 0;
+ net_node_t node_arr[] = {"", 0, 100};
+ int node_arr_size = 1;
+
+ int recv_arr_size;
+ net_mod_recv_msg_t *recv_arr;
+
while (i < scale) {
sprintf(sendbuf, "%d", i);
printf("%d SEND HEART:%s\n", targ->id, sendbuf);
- dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
+ net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
sleep(1);
i++;
}
- dgram_mod_close_socket(socket);
+ net_mod_socket_close(socket);
return (void *)i;
}
diff --git a/test_net_socket/test_bus_stop.c b/test_net_socket/test_bus_stop.c
new file mode 100644
index 0000000..ed2f60f
--- /dev/null
+++ b/test_net_socket/test_bus_stop.c
@@ -0,0 +1,54 @@
+#include "net_mod_server_socket_wrapper.h"
+#include "net_mod_socket_wrapper.h"
+#include "bus_server_socket_wrapper.h"
+
+#include "shm_mm_wraper.h"
+#include "usg_common.h"
+#include <getopt.h>
+
+static void * server_sockt;
+
+static void *_start_bus_(void *arg) {
+ // pthread_detach(pthread_self());
+ printf("Start bus server\n");
+ pthread_t tid;
+
+ server_sockt = bus_server_socket_wrapper_open();
+
+ if(bus_server_socket_wrapper_start_bus(server_sockt) != 0) {
+ printf("start bus failed\n");
+ }
+}
+
+int main() {
+
+
+ pthread_t tid;
+ char action[512];
+
+ shm_mm_wrapper_init(512);
+ pthread_create(&tid, NULL, _start_bus_, NULL);
+
+
+ while (true) {
+ printf("Input action: Close?\n");
+ if(scanf("%s", action) < 1) {
+ printf("Invalide action\n");
+ continue;
+ }
+
+ if(strcmp(action, "close") == 0) {
+ bus_server_socket_wrapper_close(server_sockt);
+ break;
+ } else {
+ printf("Invalide action\n");
+ }
+ }
+
+ if (pthread_join(tid, NULL) != 0) {
+ perror(" pthread_join");
+ }
+
+
+ shm_mm_wrapper_destroy();
+}
\ No newline at end of file
diff --git a/test_net_socket/test_net_mod_socket.c b/test_net_socket/test_net_mod_socket.c
index f509e2c..5693cf6 100644
--- a/test_net_socket/test_net_mod_socket.c
+++ b/test_net_socket/test_net_mod_socket.c
@@ -137,6 +137,7 @@
sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf);
net_mod_socket_sendto(client, sendbuf, strlen(sendbuf) + 1, remote_port);
free(recvbuf);
+ sleep(1000);
}
}
@@ -259,8 +260,9 @@
for (i = 0; i < SCALE; i++) {
sprintf(sendbuf, "thread(%d) %d", targ->id, i);
fprintf(fp, "requst:%s\n", sendbuf);
- n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
- //printf("send %d nodes\n", n);
+ // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
+ n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1000);
+ printf("send %d nodes\n", n);
for(j=0; j < recv_arr_size; j++) {
fprintf(fp, "reply: host:%s, port: %d, key:%d, content: %s\n",
recv_arr[j].host,
--
Gitblit v1.8.0