From 7a12bed7a2550d037e6e869c1ed0ce115098dbb2 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期六, 13 三月 2021 18:44:51 +0800
Subject: [PATCH] update
---
test_socket/CMakeLists.txt | 7
test_socket/heart_beat.cpp | 167 +++++++++++++++++----------
src/queue/lock_free_queue.h | 55 +++++---
src/shm/mm.cpp | 5
test_net_socket/net_mod_socket.sh | 2
test_socket/heart_beat.sh | 10 +
CMakeLists.txt | 2
src/socket/shm_socket.cpp | 60 ++++-----
8 files changed, 189 insertions(+), 119 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1ad483f..0b653e7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -29,6 +29,6 @@
add_subdirectory(${PROJECT_SOURCE_DIR}/src)
add_subdirectory(${PROJECT_SOURCE_DIR}/test)
add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
-# add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
+ add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
# add_subdirectory(${PROJECT_SOURCE_DIR}/shm_util)
endif()
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 31f2cc1..f74f4bc 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -88,9 +88,9 @@
sem_t slots;
sem_t items;
- // time_t createTime;
- // time_t closeTime;
- // int status;
+ time_t createTime;
+ time_t closeTime;
+ int status;
public:
@@ -101,7 +101,8 @@
/// template
~LockFreeQueue();
- // inline void close();
+ inline void close();
+ inline bool isClosed();
// std::atomic_uint reference;
/// @brief constructor of the class
@@ -129,17 +130,17 @@
- // time_t getCreateTime() {
- // return createTime;
- // }
+ time_t getCreateTime() {
+ return createTime;
+ }
- // time_t getCloseTime() {
- // return closeTime;
- // }
+ time_t getCloseTime() {
+ return closeTime;
+ }
- // int getStatus() {
- // return status;
- // }
+ int getStatus() {
+ return status;
+ }
/// @brief push an element at the tail of the queue
/// @param the element to insert in the queue
@@ -182,20 +183,28 @@
if (sem_init(&items, 1, 0) == -1)
err_exit(errno, "LockFreeQueue sem_init");
- // createTime = time(NULL);
- // status = LOCK_FREE_Q_ST_OPENED;
+ createTime = time(NULL);
+ status = LOCK_FREE_Q_ST_OPENED;
}
-// template<
-// typename ELEM_T,
-// typename Allocator,
-// template<typename T, typename AT> class Q_TYPE>
-// inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() {
-// // status = LOCK_FREE_Q_ST_CLOSED;
-// // closeTime = time(NULL);
-// }
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() {
+ status = LOCK_FREE_Q_ST_CLOSED;
+ closeTime = time(NULL);
+}
+
+template<
+ typename ELEM_T,
+ typename Allocator,
+ template<typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::isClosed() {
+ return status == LOCK_FREE_Q_ST_CLOSED;
+}
template<
diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp
index 8fff5c2..3cdd3a2 100644
--- a/src/shm/mm.cpp
+++ b/src/shm/mm.cpp
@@ -123,8 +123,11 @@
return aptr;
} else {
SemUtil::inc(mutex);
+ err_msg(0, "mm_malloc : out of memery\n");
+ LoggerFactory::getLogger()->fatal("mm_malloc : out of memery\n");
// abort();
- err_exit(0, "mm_malloc : out of memery\n");
+ exit(1);
+
return NULL;
}
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 1f5d47b..b5d4d09 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -10,7 +10,7 @@
static Logger *logger = LoggerFactory::getLogger();
-ShmQueueStMap * shmQueueStMap ;
+// ShmQueueStMap * shmQueueStMap ;
static void print_msg(char *head, shm_packet_t &msg) {
// err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type);
@@ -104,7 +104,7 @@
err_exit(s, "pthread_mutexattr_destroy");
- shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
+ // shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
return sockt;
}
@@ -113,37 +113,33 @@
static int _shm_socket_close_(shm_socket_t *sockt) {
int rv, i;
+ hashtable_t *hashtable = mm_get_hashtable();
logger->debug("shm_socket_close\n");
- if(sockt->key != 0) {
- auto it = shmQueueStMap->find(sockt->key);
- if(it != shmQueueStMap->end()) {
- it->second.status = SHM_QUEUE_ST_CLOSED;
- it->second.closeTime = time(NULL);
- }
- }
+ // if(sockt->key != 0) {
+ // auto it = shmQueueStMap->find(sockt->key);
+ // if(it != shmQueueStMap->end()) {
+ // it->second.status = SHM_QUEUE_ST_CLOSED;
+ // it->second.closeTime = time(NULL);
+ // }
+ // }
- printf("====sockt->queue addr = %p\n", sockt->queue);
+ // printf("====sockt->queue addr = %p\n", sockt->queue);
if(sockt->queue != NULL) {
+ sockt->queue->close();
for( i = 0; i < sockt->queue->size(); i++) {
mm_free((*(sockt->queue))[i].buf);
logger->info("======= %d free queue element buf\n", sockt->key);
}
+ sleep(1);
- // hashtable_remove(hashtable, mkey);
+ hashtable_remove(hashtable, sockt->key);
// sockt->queue = NULL;
}
-
- // hashtable_remove(hashtable, mkey);
- // if(sockt->queue != NULL) {
- // sockt->queue = NULL;
- // }
-
-
pthread_mutex_destroy(&(sockt->mutex) );
free(sockt);
return 0;
@@ -578,9 +574,9 @@
}
// 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
- stRecord.status = SHM_QUEUE_ST_OPENED;
- stRecord.createTime = time(NULL);
- shmQueueStMap->insert({sockt->key, stRecord});
+ // stRecord.status = SHM_QUEUE_ST_OPENED;
+ // stRecord.createTime = time(NULL);
+ // shmQueueStMap->insert({sockt->key, stRecord});
}
@@ -597,17 +593,19 @@
}
// 妫�鏌ey鏍囪鐨勭姸鎬�
- auto it = shmQueueStMap->find(key);
- if(it != shmQueueStMap->end()) {
- if(it->second.status == SHM_QUEUE_ST_CLOSED) {
- // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨�
- goto ERR_CLOSED;
- }
- }
+ // auto it = shmQueueStMap->find(key);
+ // if(it != shmQueueStMap->end()) {
+ // if(it->second.status == SHM_QUEUE_ST_CLOSED) {
+ // // key瀵瑰簲鐨勭姸鎬佹槸鍏抽棴鐨�
+ // goto ERR_CLOSED;
+ // }
+ // }
remoteQueue = shm_socket_attach_queue(key);
if (remoteQueue == NULL ) {
+ goto ERR_CLOSED;
+ } else if(remoteQueue->isClosed()) {
goto ERR_CLOSED;
}
@@ -659,9 +657,9 @@
}
// 鏍囪key瀵瑰簲鐨勭姸鎬� 锛屼负opened
- stRecord.status = SHM_QUEUE_ST_OPENED;
- stRecord.createTime = time(NULL);
- shmQueueStMap->insert({sockt->key, stRecord});
+ // stRecord.status = SHM_QUEUE_ST_OPENED;
+ // stRecord.createTime = time(NULL);
+ // shmQueueStMap->insert({sockt->key, stRecord});
if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh
index d8cf241..d79752e 100755
--- a/test_net_socket/net_mod_socket.sh
+++ b/test_net_socket/net_mod_socket.sh
@@ -11,7 +11,7 @@
./shm_util recvfrom --bind=102 & server_pid=$! && echo "pid: ${server_pid}"
# 鎵撳紑鍥為槦鍒楁敹杩涚▼
- ./shm_util start_resycle & server_pid=$! && echo "pid: ${server_pid}"
+ # ./shm_util start_resycle & server_pid=$! && echo "pid: ${server_pid}"
}
# 浜や簰寮忓鎴风
diff --git a/test_socket/CMakeLists.txt b/test_socket/CMakeLists.txt
index 111569d..18bcf4c 100644
--- a/test_socket/CMakeLists.txt
+++ b/test_socket/CMakeLists.txt
@@ -9,10 +9,13 @@
add_custom_command(
- OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh
- COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh
+ OUTPUT ${PROJECT_BINARY_DIR}/bin/heart_beat.sh
+ COMMAND cp ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh ${PROJECT_BINARY_DIR}/bin/heart_beat.sh
DEPENDS ${CMAKE_CURRENT_SOURCE_DIR}/heart_beat.sh
)
+
+add_custom_target("heart_beat.sh" ALL DEPENDS ${PROJECT_BINARY_DIR}/bin/heart_beat.sh)
+
add_executable(heart_beat heart_beat.cpp ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh)
target_link_libraries(heart_beat PRIVATE shm_queue ${EXTRA_LIBS} )
diff --git a/test_socket/heart_beat.cpp b/test_socket/heart_beat.cpp
index cd21a84..021ca3d 100644
--- a/test_socket/heart_beat.cpp
+++ b/test_socket/heart_beat.cpp
@@ -6,7 +6,7 @@
#include "usg_common.h"
#include <getopt.h>
-
+static Logger *logger = LoggerFactory::getLogger();
typedef struct Targ {
int port;
int id;
@@ -20,24 +20,45 @@
// exit(0);
}
+
+void *serverSockt;
+static void server_stop_handler(int sig) {
+ printf("stop_handler\n");
+
+ int rv = net_mod_socket_stop(serverSockt);
+ if(rv ==0) {
+ logger->debug("send stop suc");
+ return;
+ } else {
+ logger->debug("send stop fail.%s\n", bus_strerror(rv));
+ }
+}
+
void server(int port) {
- void *serv = net_mod_socket_open();
- net_mod_socket_bind(serv, port);
+ serverSockt = net_mod_socket_open();
+ net_mod_socket_bind(serverSockt, port);
int size;
void *recvbuf;
char sendbuf[512];
int rv;
int remote_port;
+
+ signal(SIGTERM, server_stop_handler);
+ signal(SIGINT, server_stop_handler);
while (true) {
- if(net_mod_socket_recvfrom_timeout(serv, &recvbuf, &size, &remote_port, 0, 2000000000)==0) {
+ rv = net_mod_socket_recvfrom_timeout(serverSockt, &recvbuf, &size, &remote_port, 0, 2000000000);
+ if(rv == 0 ) {
printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
- net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
+ net_mod_socket_sendto(serverSockt, "suc", strlen("suc")+1, remote_port);
free(recvbuf);
+ } else if(rv == EBUS_STOPED) {
+ logger->debug("Stopping\n");
+ break;
}
}
// sleep(1000);
- net_mod_socket_close(serv);
+ net_mod_socket_close(serverSockt);
}
void client(int port) {
@@ -49,14 +70,42 @@
net_node_t node_arr[] = {"", 0, port};
int node_arr_size = 1;
- int recv_arr_size;
+ int recv_arr_size, n;
net_mod_recv_msg_t *recv_arr;
+ net_mod_err_t *errarr;
+ int errarr_size = 0;
+
+ // int recv_arr_size;
+ // net_mod_recv_msg_t *recv_arr;
while (true) {
sprintf(sendbuf, "%d", i);
- rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000);
+ rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf),
+ &recv_arr, &recv_arr_size, &errarr, &errarr_size, 1000);
+ // rv = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL, 2000);
//rv = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
printf("SEND HEART:%s, suc nodes = %d\n", sendbuf, rv);
+ if(recv_arr_size > 0) {
+ for(i=0; i<recv_arr_size; i++) {
+ printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
+ recv_arr[i].host,
+ recv_arr[i].port,
+ recv_arr[i].key,
+ (char *)recv_arr[i].content
+ );
+ }
+
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ }
+
+
+ if(errarr_size > 0) {
+ for(i = 0; i < errarr_size; i++) {
+ printf("error: (host:%s, port: %d, key:%d) : %s\n", errarr[i].host, errarr[i].port, errarr[i].key, bus_strerror(errarr[i].code));
+ }
+ free(errarr);
+ }
// sleep(1);
i++;
}
@@ -64,66 +113,66 @@
}
-void *runclient(void *arg) {
- // signal(SIGINT, sigint_handler);
- Targ *targ = (Targ *)arg;
- int port = targ->port;
- void *client = net_mod_socket_open();
- int size;
- char sendbuf[512];
- long scale = 100000;
- long i = 0;
- net_node_t node_arr[] = {"", 0, 100};
- int node_arr_size = 1;
+// void *runclient(void *arg) {
+// // signal(SIGINT, sigint_handler);
+// Targ *targ = (Targ *)arg;
+// int port = targ->port;
+// void *client = net_mod_socket_open();
+// int size;
+// char sendbuf[512];
+// long scale = 100000;
+// long i = 0;
+// net_node_t node_arr[] = {"", 0, 100};
+// int node_arr_size = 1;
- int recv_arr_size;
- net_mod_recv_msg_t *recv_arr;
+// int recv_arr_size;
+// net_mod_recv_msg_t *recv_arr;
- while (i < scale) {
- sprintf(sendbuf, "%d", i);
- printf("%d SEND HEART:%s\n", targ->id, sendbuf);
- net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
- // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
- i++;
- }
+// while (i < scale) {
+// sprintf(sendbuf, "%d", i);
+// printf("%d SEND HEART:%s\n", targ->id, sendbuf);
+// net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf)+1, NULL, NULL);
+// // net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, port);
+// i++;
+// }
- net_mod_socket_close(client);
- return (void *)i;
-}
+// net_mod_socket_close(client);
+// return (void *)i;
+// }
-void mclient(int port) {
+// void mclient(int port) {
- int status, i = 0, processors = 4;
- void *res[processors];
- Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
- pthread_t tids[processors];
- char sendbuf[512];
+// int status, i = 0, processors = 4;
+// void *res[processors];
+// Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
+// pthread_t tids[processors];
+// char sendbuf[512];
- struct timeval start;
- gettimeofday(&start, NULL);
- for (i = 0; i < processors; i++) {
- targs[i].port = port;
- targs[i].id = i;
- pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]);
- }
+// struct timeval start;
+// gettimeofday(&start, NULL);
+// for (i = 0; i < processors; i++) {
+// targs[i].port = port;
+// targs[i].id = i;
+// pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]);
+// }
- for (i = 0; i < processors; i++) {
- if (pthread_join(tids[i], &res[i]) != 0) {
- perror("multyThreadClient pthread_join");
- } else {
- fprintf(stderr, "client(%d) 鍙戦�� %ld 鏉℃暟鎹甛n", i, (long)res[i]);
- }
- }
+// for (i = 0; i < processors; i++) {
+// if (pthread_join(tids[i], &res[i]) != 0) {
+// perror("multyThreadClient pthread_join");
+// } else {
+// fprintf(stderr, "client(%d) 鍙戦�� %ld 鏉℃暟鎹甛n", i, (long)res[i]);
+// }
+// }
- struct timeval end;
- gettimeofday(&end, NULL);
+// struct timeval end;
+// gettimeofday(&end, NULL);
- double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
- long diffsec = (long) (difftime/1000000);
- long diffmsec = difftime - diffsec*1000000;
- printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec);
-}
+// double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
+// long diffsec = (long) (difftime/1000000);
+// long diffmsec = difftime - diffsec*1000000;
+// printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec);
+// }
int main(int argc, char *argv[]) {
shm_mm_wrapper_init(512);
@@ -139,8 +188,6 @@
server(port);
else if (strcmp("client", argv[1]) == 0)
client(port);
- else if (strcmp("mclient", argv[1]) == 0)
- mclient(port);
shm_mm_wrapper_destroy();
return 0;
diff --git a/test_socket/heart_beat.sh b/test_socket/heart_beat.sh
index 92b0867..a284cec 100755
--- a/test_socket/heart_beat.sh
+++ b/test_socket/heart_beat.sh
@@ -50,6 +50,16 @@
close
;;
+ "test2")
+ start_server
+ sleep 1
+ start_clients
+ sleep 5
+ kill -15 server_pid
+ sleep 2
+ close_clients
+ ;;
+
"")
start_server
sleep 1
--
Gitblit v1.8.0