From 68d23225a38a35f1325eb39fa4ed5a005d5de473 Mon Sep 17 00:00:00 2001
From: fujuntang <fujuntang@aiot.com>
Date: 星期三, 11 八月 2021 09:50:20 +0800
Subject: [PATCH] fix from 3.1 first commit
---
doc/product-consume-model.png | 0
src/shm/hashtable.cpp.2 | 0
demo/pub_sub | 0
src/shm/hashtable.h | 4
shm_util/shm_util.cpp | 953 ++++++++++++++++++++++++++++++++++++++++++++++++++
doc/malloc_node.png | 0
src/socket/shm_mod_socket.cpp | 10
test_socket/heart_beat.sh | 0
doc/lock_free_queue_paper/Implementing Lock-Free Queues.pdf | 0
doc/network_req_rep.png | 0
src/shm/hashtable.cpp | 31 +
src/shm/mm.cpp | 1
build.sh | 0
src/socket/shm_socket.cpp | 77 ++-
test_queue/test_lostdata.sh | 0
/dev/null | 1
src/socket/shm_mod_socket.h | 2
doc/bus_service.png | 0
src/queue/shm_queue.h | 4
src/socket/bus_server_socket.cpp | 26
src/shm/shm_mm.h | 8
demo/dgram_mod_req_rep.sh | 0
doc/malloc_list.png | 0
test_net_socket/net_mod_socket.sh | 0
systype.sh | 0
25 files changed, 1,044 insertions(+), 73 deletions(-)
diff --git a/build.sh b/build.sh
old mode 100755
new mode 100644
diff --git a/demo/dgram_mod_req_rep.sh b/demo/dgram_mod_req_rep.sh
old mode 100755
new mode 100644
diff --git a/demo/pub_sub b/demo/pub_sub
old mode 100755
new mode 100644
Binary files differ
diff --git a/doc/bus_service.png b/doc/bus_service.png
old mode 100755
new mode 100644
Binary files differ
diff --git a/doc/lock_free_queue_paper/Implementing Lock-Free Queues.pdf b/doc/lock_free_queue_paper/Implementing Lock-Free Queues.pdf
old mode 100755
new mode 100644
Binary files differ
diff --git a/doc/malloc_list.png b/doc/malloc_list.png
old mode 100755
new mode 100644
Binary files differ
diff --git a/doc/malloc_node.png b/doc/malloc_node.png
old mode 100755
new mode 100644
Binary files differ
diff --git a/doc/network_req_rep.png b/doc/network_req_rep.png
old mode 100755
new mode 100644
Binary files differ
diff --git a/doc/product-consume-model.png b/doc/product-consume-model.png
old mode 100755
new mode 100644
Binary files differ
diff --git a/shm_util/shm_util.cpp b/shm_util/shm_util.cpp
deleted file mode 120000
index 5878b94..0000000
--- a/shm_util/shm_util.cpp
+++ /dev/null
@@ -1 +0,0 @@
-../test_net_socket/shm_util.cpp
\ No newline at end of file
diff --git a/shm_util/shm_util.cpp b/shm_util/shm_util.cpp
new file mode 100644
index 0000000..4eb03f8
--- /dev/null
+++ b/shm_util/shm_util.cpp
@@ -0,0 +1,953 @@
+#include <assert.h>
+#include "net_mod_server_socket_wrapper.h"
+#include "net_mod_socket_wrapper.h"
+#include "bus_server_socket_wrapper.h"
+
+#include "shm_mm_wrapper.h"
+#include "usg_common.h"
+#include <getopt.h>
+#include "logger_factory.h"
+
+#define SCALE 100000
+
+static Logger *logger = LoggerFactory::getLogger();
+
+typedef struct Targ {
+ net_node_t *node;
+ char *nodelist;
+ long id;
+
+}Targ;
+
+struct argument_t {
+ bool interactive;
+ bool force;
+ int bind;
+ int port;
+ int key;
+ char *sendlist;
+ char *publist;
+ char **cmd_arr;
+ int cmd_arr_len;
+};
+
+argument_t parse_args (int argc, char *argv[]);
+void usage(char *name);
+int parse_node_list(const char *str, net_node_t *node_arr_addr[]) ;
+void print_node_list(net_node_t *node_arr, int len);
+
+
+
+void * client;
+
+void *proxy_server_handler(void *sockt) {
+ pthread_detach(pthread_self());
+
+ char action[512];
+ while ( true ) {
+ printf("Input action: Close?\n");
+ if(scanf("%s",action) < 1) {
+ printf("Invalide action\n");
+ continue;
+ }
+
+ if(strcmp(action, "close") == 0) {
+ net_mod_server_socket_close(sockt);
+ shm_mm_wrapper_destroy();
+ break;
+ } else {
+ printf("Invalide action\n");
+ }
+ }
+}
+
+void start_net_proxy(argument_t &arg) {
+ pthread_t tid;
+ printf("Start net proxy\n");
+ void *serverSocket = net_mod_server_socket_open(arg.port);
+
+ // 鍒涘缓涓�涓嚎绋�,鍙互鍏抽棴server
+ if(arg.interactive) {
+ pthread_create(&tid, NULL, proxy_server_handler, serverSocket);
+ }
+
+ if(net_mod_server_socket_start(serverSocket) != 0) {
+ err_exit(errno, "net_mod_server_socket_start");
+ }
+}
+
+void start_resycle() {
+ shm_mm_wrapper_start_resycle();
+}
+
+
+// 鎵撳嵃鎺ュ彈鍒扮殑璁㈤槄娑堟伅
+void *print_sub_msg(void *sockt) {
+ pthread_detach(pthread_self());
+ void *recvbuf;
+ int size;
+ int key;
+ int rv;
+ while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) {
+ printf("鏀跺埌璁㈤槄娑堟伅:%s\n", (char *)recvbuf);
+ free(recvbuf);
+ }
+
+ printf("print_sub_msg return . rv = %d\n", rv);
+
+}
+
+
+void * bus_server;
+
+static void stop_bus_handler(int sig) {
+ bus_server_socket_wrapper_stop(bus_server);
+}
+
+
+void start_bus_server(argument_t &arg) {
+ printf("Start bus server\n");
+ bus_server = bus_server_socket_wrapper_open();
+
+ signal(SIGINT, stop_bus_handler);
+ signal(SIGTERM, stop_bus_handler);
+
+ if(bus_server_socket_wrapper_start_bus(bus_server) != 0) {
+ printf("start bus failed\n");
+ exit(1);
+ }
+
+ bus_server_socket_wrapper_close(bus_server);
+}
+
+void *serverSockt;
+
+
+static void _recvandsend_callback_(void *recvbuf, int recvsize, int key, void **sendbuf_ptr, int *sendsize_ptr, void * user_data) {
+ char sendbuf[512];
+ printf( "server: RECEIVED REQUEST FROM %d : %s\n", key, (char *)recvbuf);
+ sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(serverSockt), (char *)recvbuf);
+ // buf 鍜� size鏄繑鍥炲��
+ *sendbuf_ptr = sendbuf;
+ *sendsize_ptr = strlen(sendbuf) + 1;
+ //recvbuf鏄垎閰嶅埌鍫嗛噷鐨勶紝浣跨敤瀹屽悗涓嶈蹇樿閲婃斁鎺�
+ free(recvbuf);
+ return;
+}
+
+bool stop = false;
+
+static void stop_replyserver_handler(int sig) {
+ printf("stop_handler\n");
+
+ int rv = net_mod_socket_stop(serverSockt);
+ if(rv ==0) {
+ logger->debug("send stop suc");
+ return;
+ } else {
+ logger->debug("send stop fail.%s\n", bus_strerror(rv));
+ }
+}
+
+void start_recvfrom(int mkey, bool force) {
+ logger->debug("start reply\n");
+ signal(SIGINT, stop_replyserver_handler);
+ signal(SIGTERM, stop_replyserver_handler);
+
+ serverSockt = net_mod_socket_open();
+ if(force) {
+ net_mod_socket_force_bind(serverSockt, mkey);
+ } else {
+ net_mod_socket_bind(serverSockt, mkey);
+ }
+
+
+ int rv = 0 ;
+ while( true) {
+ rv = net_mod_socket_recvandsend(serverSockt, _recvandsend_callback_ , NULL );
+ if (rv == 0)
+ continue;
+ if(rv == EBUS_STOPED) {
+ logger->debug("Stopping\n");
+ break;
+ } else if(rv == EBUS_KEY_INUSED){
+ printf("key宸茬粡琚崰鐢╘n");
+ exit(1);
+ }
+ logger->debug("net_mod_socket_recvandsend error.%s\n", bus_strerror(rv));
+
+ }
+
+ // rv = net_mod_socket_recvandsend_timeout(serverSockt, _recvandsend_callback_ , 0, 2000000, NULL );
+ net_mod_socket_close(serverSockt);
+ logger->debug("stopted\n");
+
+ // while ( (rv = net_mod_socket_recvfrom(ser, &recvbuf, &size, &key) ) == 0) {
+ // // printf( "server: RECEIVED REQUEST FROM %d NAME %s\n", key, recvbuf);
+ // sprintf(sendbuf, "%d RECEIVED %s", net_mod_socket_get_key(ser), (char *)recvbuf);
+ // net_mod_socket_sendto(ser, sendbuf, strlen(sendbuf) + 1, key);
+ // free(recvbuf);
+ // }
+}
+
+// 浜や簰寮忓鎴风
+void start_net_client(char *sendlist, char*publist ){
+ client = net_mod_socket_open();
+ char content[MAXLINE];
+ char action[512];
+ char topic[512];
+ int buskey;
+
+ int recv_arr_size, i, n;
+ net_mod_recv_msg_t *recv_arr;
+
+ pthread_t tid;
+ // 鍒涘缓涓�涓嚎绋嬫帴鍙楄闃呮秷鎭�
+ pthread_create(&tid, NULL, print_sub_msg, client);
+
+ //192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11
+ net_node_t *node_arr;
+ int node_arr_size = parse_node_list(sendlist, &node_arr);
+ print_node_list(node_arr, node_arr_size);
+
+ //192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.5.104:5000:8
+ net_node_t *pub_node_arr;
+ int pub_node_arr_size = parse_node_list(publist, &pub_node_arr);
+ print_node_list(pub_node_arr, pub_node_arr_size);
+
+ while (true) {
+ //printf("Usage: pub <topic> [content] or sub <topic>\n");
+ printf("Can I help you? pub,sub,desub,send or quit\n");
+ scanf("%s",action);
+
+ if(strcmp(action, "pub") == 0) {
+ printf("Please input topic and content\n");
+ scanf("%s %s", topic, content);
+
+ n = net_mod_socket_pub(client, pub_node_arr, pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1);
+ printf("pub %d nodes\n", n);
+ }
+ else if(strcmp(action, "send") == 0) {
+ getc(stdin);
+ printf("Please input content\n");
+
+ if (fgets(content, MAXLINE, stdin) != NULL) {
+ // 鏀跺埌娑堟伅鐨勮妭鐐瑰嵆浣挎病鏈夊搴旂殑淇℃伅锛� 涔熻鍥炲涓�涓〃绀烘棤鐨勬秷鎭�,鍚﹀垯浼氫竴鐩寸瓑寰�
+ // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size);
+ n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size, 1);
+ printf(" %d nodes reply\n", n);
+ for(i=0; i<recv_arr_size; i++) {
+ printf("reply from (host:%s, port: %d, key:%d) >> %s\n",
+ recv_arr[i].host,
+ recv_arr[i].port,
+ recv_arr[i].key,
+ (char *)recv_arr[i].content
+ );
+ }
+
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ }
+ }
+ else if(strcmp(action, "desub") == 0) {
+ printf("Please input topic!\n");
+
+ scanf("%s", topic);
+ if (net_mod_socket_desub(client, topic, strlen(topic)) == 0) {
+ printf("%d Desub success!\n", net_mod_socket_get_key(client));
+ } else {
+ printf("Desub failture!\n");
+ exit(0);
+ }
+
+ }
+ else if(strcmp(action, "sub") == 0) {
+ printf("Please input topic!\n");
+ scanf("%s",topic);
+
+ if (net_mod_socket_sub(client, topic, strlen(topic)) == 0) {
+ printf("%d Sub success!\n", net_mod_socket_get_key(client));
+ } else {
+ printf("Sub failture!\n");
+ exit(0);
+ }
+
+ }
+ else if(strcmp(action, "quit") == 0) {
+ break;
+ } else {
+ printf("error input argument\n");
+ continue;
+ }
+
+ }
+ net_mod_socket_close(client);
+
+
+}
+
+void *_run_one_sendto_many_(void *arg) {
+ Targ *targ = (Targ *)arg;
+ char sendbuf[128];
+
+ int j, n;
+ int recv_arr_size;
+ net_mod_recv_msg_t *recv_arr;
+ int total = 0;
+
+ int rkey, lkey;
+ unsigned int l = 0 , rl;
+ const char *hello_format = "%d say Hello %d";
+ const char *reply_format = "%d RECEIVED %d say Hello %d";
+
+ char filename[128];
+ sprintf(filename, "test%d.tmp", targ->node->key);
+ FILE *fp = NULL;
+ fp = fopen(filename, "w+");
+ // fp = stdout;
+
+ int recvsize;
+ void *recvbuf;
+ for (l = 0; l < SCALE; l++) {
+ sprintf(sendbuf, hello_format, net_mod_socket_get_key(client), l);
+ // fprintf(fp, "requst:%s\n", sendbuf);
+ // n = net_mod_socket_sendandrecv(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
+ n = net_mod_socket_sendandrecv_timeout(client, targ->node, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 1);
+ printf("%d: send %d nodes\n", l, n);
+ for(j=0; j < recv_arr_size; j++) {
+
+ fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n",
+ net_mod_socket_get_key(client),
+ sendbuf,
+ targ->node->key,
+ recv_arr[j].host,
+ recv_arr[j].port,
+ recv_arr[j].key,
+ (char *)recv_arr[j].content
+ );
+
+ printf("key == %d\n", net_mod_socket_get_key(client));
+ assert(sscanf((const char *)recv_arr[j].content, reply_format, &rkey, &lkey, &rl) == 3);
+ assert(targ->node->key == rkey);
+ assert(net_mod_socket_get_key(client) == lkey);
+ assert(rl == l);
+ }
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ total += n;
+ }
+ if(fp != NULL)
+ fclose(fp);
+ // net_mod_socket_close(client);
+ return (void *)total;
+}
+
+//澶氱嚎绋媠end
+void one_sendto_many(char *nodelist) {
+
+ int status, i = 0;
+
+ // Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
+
+ char sendbuf[512];
+ struct timeval start, end;
+ long total = 0;
+
+ client = net_mod_socket_open();
+ net_mod_socket_bind(client, shm_mm_wrapper_alloc_key());
+
+ net_node_t *node_arr;
+ int node_arr_size = parse_node_list(nodelist, &node_arr);
+ Targ targs[node_arr_size];
+ pthread_t tids[node_arr_size];
+ void *res[node_arr_size];
+
+ printf("寮�濮嬫祴璇�...\n");
+ gettimeofday(&start, NULL);
+ for (i = 0; i < node_arr_size; i++) {
+ targs[i].node = node_arr + i;
+ targs[i].id = i;
+ pthread_create(&tids[i], NULL, _run_one_sendto_many_, (void *)&targs[i]);
+ }
+
+ for (i = 0; i < node_arr_size; i++) {
+ if (pthread_join(tids[i], &res[i]) != 0) {
+ perror("multyThreadClient pthread_join");
+ } else {
+ total += (long)res[i];
+ //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]);
+ }
+ }
+
+ gettimeofday(&end, NULL);
+
+ double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
+ long diffsec = (long) (difftime/1000000);
+ long diffusec = difftime - diffsec*1000000;
+ fprintf(stderr,"鍙戦�佹暟鐩�:%ld, 鎴愬姛鏁扮洰: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n",
+ SCALE*node_arr_size, total, diffsec, diffusec, difftime/total );
+ // fflush(stdout);
+
+}
+
+// 鏃犻檺寰幆send
+void test_net_sendandrecv(char *nodelist) {
+
+ int n, j;
+ void * client;
+ int recv_arr_size;
+ net_mod_recv_msg_t *recv_arr;
+ net_node_t *node_arr;
+ int node_arr_size = parse_node_list(nodelist, &node_arr);
+ char buf[128];
+ pid_t pid, retPid ;
+ unsigned int l , retl;
+ int remoteKey;
+ const char *hello_format = "%d say Hello %u ";
+ const char *reply_format = "%d RECEIVED %d say Hello %d";
+
+ pid = getpid();
+ l = 0;
+
+ client = net_mod_socket_open();
+ while(true) {
+ sprintf(buf, hello_format, pid, l);
+ n = net_mod_socket_sendandrecv_timeout(client, node_arr, node_arr_size, buf, strlen(buf)+1,
+ &recv_arr, &recv_arr_size, 1000);
+ printf(" %d nodes reply\n", n);
+ for(j = 0; j < recv_arr_size; j++) {
+
+ printf("%ld send '%s' . received '%s' from (host:%s, port: %d, key:%d) \n",
+ (long)pid,
+ buf,
+ (char *)recv_arr[j].content,
+ recv_arr[j].host,
+ recv_arr[j].port,
+ recv_arr[j].key
+
+ );
+
+
+
+ assert(sscanf((const char *)recv_arr[j].content, reply_format, &remoteKey, &retPid, &retl) == 3);
+ assert(retPid == pid);
+ assert(retl == l);
+ assert(remoteKey == recv_arr[j].key);
+ }
+
+ // 浣跨敤瀹屽悗锛屼笉瑕佸繕璁伴噴鏀炬帀
+ net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
+ l++;
+ }
+
+ net_mod_socket_close(client);
+
+}
+
+void *_run_pub_(void *arg) {
+ Targ *targ = (Targ *)arg;
+ char sendbuf[128];
+
+ int i,j, n;
+ int total = 0;
+
+ net_node_t *node_arr;
+ int node_arr_size = parse_node_list(targ->nodelist, &node_arr);
+
+ const char *topic = "news";
+ // char filename[512];
+ // sprintf(filename, "test%d.tmp", targ->id);
+ // FILE *fp = NULL;
+ // fp = fopen(filename, "w+");
+ // fp = stdout;
+
+
+ for (i = 0; i < SCALE; i++) {
+ sprintf(sendbuf, "thread(%ld) %d", targ->id, i);
+
+ n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1);
+ // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
+ LoggerFactory::getLogger()->debug( "pub:%s to %d nodes\n", sendbuf, n);
+ total += n;
+ }
+ // fclose(fp);
+
+ return (void *)total;
+}
+
+//澶氱嚎绋媝ub
+void test_net_pub_threads(char *nodelist) {
+
+ int status, i = 0, processors = 4;
+ void *res[processors];
+ // Targ *targs = (Targ *)calloc(processors, sizeof(Targ));
+ Targ targs[processors];
+ pthread_t tids[processors];
+ char sendbuf[512];
+ struct timeval start, end;
+ long total = 0;
+ client = net_mod_socket_open();
+
+printf("寮�濮嬫祴璇�...\n");
+ gettimeofday(&start, NULL);
+ for (i = 0; i < processors; i++) {
+ targs[i].nodelist = nodelist;
+ targs[i].id = i;
+ pthread_create(&tids[i], NULL, _run_pub_, (void *)&targs[i]);
+ }
+
+ for (i = 0; i < processors; i++) {
+ if (pthread_join(tids[i], &res[i]) != 0) {
+ perror("multyThreadClient pthread_join");
+ } else {
+ total += (long)res[i];
+ //fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]);
+ }
+ }
+
+ gettimeofday(&end, NULL);
+
+ double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec);
+ long diffsec = (long) (difftime/1000000);
+ long diffusec = difftime - diffsec*1000000;
+ fprintf(stderr,"鍙戦�佹暟鐩�: %ld, 鐢ㄦ椂: (%ld sec %ld usec), 骞冲潎: %f\n", total, diffsec, diffusec, difftime/total );
+ // fflush(stdout);
+ net_mod_socket_close(client);
+}
+
+// 鏃犻檺寰幆pub
+void test_net_pub(char *nodelist) {
+
+ int n;
+ char sendbuf[512];
+ net_node_t *node_arr;
+ int node_arr_size = parse_node_list(nodelist, &node_arr);
+
+ char *topic = "news";
+ sprintf(sendbuf, "pid:%ld: Good news!!", (long)getpid());
+
+ void * client = net_mod_socket_open();
+ while (true) {
+ n = net_mod_socket_pub(client, node_arr, node_arr_size, topic, strlen(topic)+1, sendbuf, strlen(sendbuf)+1);
+ // n = net_mod_socket_pub(client, node_arr, node_arr_size, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size);
+ LoggerFactory::getLogger()->debug( "pub to %d nodes\n", n);
+ }
+ net_mod_socket_close(client);
+}
+
+void list () {
+ LockFreeQueue<shm_packet_t> * mqueue;
+ hashtable_t *hashtable = mm_get_hashtable();
+ printf("%10s \t %-10s \t %10s\n", "KEY", "LENGTH", "STATUS");
+ hashtable_foreach(hashtable, [&](int key, void * value){
+ if(key >= 100 ) {
+ mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, key);
+ if((long)mqueue == 0x1) {
+ printf("%10d \t %-10s\n", key, "Not In Used");
+ } else {
+ printf("%10d \t %-10d\n", key, mqueue->size());
+ }
+
+ } else {
+ printf("%10d\n", key);
+ }
+
+ });
+}
+
+void info(int key) {
+ LockFreeQueue<shm_packet_t> * mqueue;
+ hashtable_t *hashtable = mm_get_hashtable();
+ mqueue = (LockFreeQueue<shm_packet_t> *) hashtable_get(hashtable, key);
+ printf("%10s: %-10p\n", "PTR", mqueue);
+ printf("%10s: %-10d\n", "KEY", key);
+ printf("%10s: %-10d\n", "LENGTH", mqueue->size());
+
+
+}
+
+
+void remove(int key) {
+ hashtable_t *hashtable = mm_get_hashtable();
+
+ LockFreeQueue<shm_packet_t> * mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, key);
+ if(mqueue != NULL) {
+ delete mqueue;
+ hashtable_remove(hashtable, key);
+ }
+}
+
+void do_sendandrecv(int key, char *sendbuf) {
+ int n, j;
+ int recv_arr_size;
+ net_mod_recv_msg_t *recv_arr;
+
+ net_node_t node_arr[] = {NULL, 0, key};
+
+ void * client = net_mod_socket_open();
+ n = net_mod_socket_sendandrecv_timeout(client, node_arr, 1, sendbuf, strlen(sendbuf) + 1, &recv_arr, &recv_arr_size, 5000);
+ if(n == 0) {
+ printf("send failed\n");
+ return;
+ }
+ printf(" %d nodes reply\n", n);
+ for(j=0; j < recv_arr_size; j++) {
+
+ fprintf(stdout, "%d send '%s' to %d. received from (host=%s, port= %d, key=%d) '%s'\n\n",
+ net_mod_socket_get_key(client),
+ sendbuf,
+ key,
+ recv_arr[j].host,
+ recv_arr[j].port,
+ recv_arr[j].key,
+ (char *)recv_arr[j].content
+ );
+ }
+
+ net_mod_socket_close(client);
+}
+
+
+
+void usage(char *name)
+{
+ #define fpe(str) fprintf(stderr, " %s", str);
+
+ fprintf(stderr, "Usage: %s {function} [OPTIONS] [ARG...]\n\n", name);
+ fprintf(stderr, "Test shmsocket\n\n");
+
+ fprintf(stderr, "Options:\n\n");
+ fpe("-p, --port TCP/IP Port\n");
+ fpe("-k, --key SHM Key\n");
+ fpe("--sendlist format锛�--sendlist=\"192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.20.104:5000:11\"\n");
+ fpe("--publist format: --publist=\"192.168.5.10:5000:8, 192.168.5.22:5000:8, 192.168.20.104:5000:8\"\n");
+ fpe("\n");
+
+ fprintf(stderr, "Examples:\n\n");
+ fpe("# sendandrecv to socket which has key 100\n");
+ fpe("./shm_util sendandrecv 100 \"hello\"\n");
+ fpe("# list all key\n");
+ fpe("./shm_util list\n");
+ fpe("# remove key 1001\n");
+ fpe("./shm_util rm 1001\n");
+ fpe("./shm_util info 1002\n");
+ fpe("./shm_util recvfrom --bind 1002 [--force]\n")
+ fpe("\n");
+}
+
+
+
+argument_t parse_args (int argc, char *argv[])
+{
+ int c;
+
+ if(argc < 2) {
+ usage(argv[0]);
+ exit(1);
+ }
+
+
+
+ argument_t mopt = {};
+
+ // mopt.volume_list_size = 0;
+ mopt.interactive = false;
+
+ opterr = 0;
+
+
+ static struct option long_options[] =
+ {
+ /* These options set a flag. */
+
+ {"key", required_argument, 0, 'k'},
+ {"port", required_argument, 0, 'p'},
+ {"interactive", no_argument, 0, 'i'},
+ {"force", no_argument, 0, 'f'},
+ {"bind", required_argument, (int *)mopt.bind, 0},
+ {"sendlist", required_argument, (int *)mopt.sendlist, 0},
+ {"publist", required_argument, (int *)mopt.publist, 0},
+ {0, 0, 0, 0}
+ };
+ /* getopt_long stores the option index here. */
+ int option_index = 0;
+ while (1)
+ {
+
+
+ c = getopt_long (argc, argv, "+fk:p:i", long_options, &option_index);
+
+ /* Detect the end of the options. */
+ if (c == -1)
+ break;
+
+ switch (c)
+ {
+ case 0:
+ /* If this option set a flag, do nothing else now. */
+ if (long_options[option_index].flag != 0)
+ break;
+
+ if(strcmp(long_options[option_index].name, "sendlist") == 0) {
+ mopt.sendlist = optarg;
+ }
+ else if(strcmp(long_options[option_index].name, "publist") == 0) {
+ mopt.publist = optarg;
+ }
+ else if(strcmp(long_options[option_index].name, "bind") == 0) {
+ mopt.bind = atoi(optarg);
+ }
+ else {
+ printf ("option %s", long_options[option_index].name);
+ if (optarg)
+ printf (" with arg %s", optarg);
+ printf ("\n");
+ }
+
+ break;
+
+ case 'k':
+ mopt.key = atoi(optarg);
+ break;
+
+ case 'i':
+ mopt.interactive = true;
+ break;
+
+ case 'f':
+ mopt.force = true;
+ break;
+
+ case 'p':
+ // printf ("==name with value `%s'\n", optarg);
+ mopt.port = atoi(optarg);
+ break;
+
+ case '?':
+ printf ("==? optopt=%c, %s, `%s', %d\n", optopt, optarg, argv[optind], optind);
+ /* getopt_long already printed an error message. */
+ usage(argv[0]);
+ exit(1);
+ break;
+
+ default:
+ //printf ("==default optopt=%c, %s, `%s'\n",optopt, optarg, argv[optind]);
+ break;
+ }
+ }
+
+ // printf ("optind = %d, argc=%d \n", optind, argc);
+ /* Print any remaining command line arguments (not options). */
+ if (optind < argc)
+ {
+ mopt.cmd_arr = &argv[optind];
+ mopt.cmd_arr_len = argc - optind;
+ // printf ("non-option ARGV-elements: ");
+ // while (optind < argc)
+ // printf ("%d, %d, %s \n", optind, argc, argv[optind++]);
+ // putchar ('\n');
+ }
+ return mopt;
+
+}
+
+
+/**
+ * @str "192.168.5.10:5000:11, 192.168.5.22:5000:11, 192.168.5.104:5000:11"
+ * @node_arr_addr 杩斿洖澶勭悊鍚庣殑缃戠粶鑺傜偣鏁扮粍
+ * {
+ * {"192.168.5.22", 5000, 11},
+ * {"192.168.20.10", 5000, 11},
+ * {"192.168.20.104", 5000, 11}
+ * }
+ * @return 鏁扮粍鐨勯暱搴�
+ */
+int parse_node_list(const char *str, net_node_t *node_arr_addr[]) {
+ int i, j;
+ char **property_arr;
+ int property_arr_len;
+ char **entry_arr;
+ int entry_arr_len = str_split(str, ",", &entry_arr);
+
+ net_node_t *node_arr = (net_node_t *) calloc(entry_arr_len, sizeof(net_node_t));
+ for(i = 0; i < entry_arr_len; i++) {
+ property_arr_len = str_split(entry_arr[i], ":", &property_arr);
+ printf("=====%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]);
+
+ node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), 0};
+
+ free(property_arr[1]);
+ if(property_arr_len == 3) {
+ node_arr[i].key = atoi(property_arr[2]);
+ free(property_arr[2]);
+ }
+ free(entry_arr[i]);
+
+ }
+ *node_arr_addr = node_arr;
+
+
+ return entry_arr_len;
+}
+
+void print_node_list(net_node_t *node_arr, int len) {
+ printf("============node list begin==========\n");
+ for(int i = 0; i < len; i++) {
+ printf("host=%s, port=%d, key=%d \n", node_arr[i].host, node_arr[i].port, node_arr[i].key);
+ }
+ printf("============node list end==========\n");
+}
+
+
+
+
+int main(int argc, char *argv[]) {
+ int i;
+ char *prog;
+ char * fun;
+ argument_t opt = {};
+
+ shm_mm_wrapper_init(512);
+
+ if(argc < 2) {
+ usage(argv[0]);
+ exit(1);
+ }
+ prog = argv[0];
+ fun = argv[1];
+ argc--;
+ argv++;
+
+
+ if (strcmp("help", fun) == 0 ) {
+ usage(prog);
+ }
+ else if (strcmp("list", fun) == 0 ) {
+ list();
+ }
+ else if (strcmp("info", fun) == 0 ) {
+ if(argc < 2) {
+
+ usage(prog);
+
+ } else {
+ for(i = 1; i < argc; i++) {
+ int key = atoi(argv[i]);
+ info(key);
+ }
+ }
+ }
+ else if (strcmp("rm", fun) == 0 ) {
+ if(argc < 2) {
+ usage(prog);
+
+ } else {
+ for(i = 1; i < argc; i++) {
+ int key = atoi(argv[i]);
+ remove(key);
+ }
+ }
+
+ }
+ else if (strcmp("sendandrecv", fun) == 0 ) {
+ if(argc < 3) {
+ usage(prog);
+ exit(1);
+ }
+ int key = atoi(argv[1]);
+ char *content = argv[2];
+ do_sendandrecv(key, content);
+ }
+ else if (strcmp("start_bus_server", fun) == 0) {
+
+ start_bus_server(opt);
+ }
+ else if (strcmp("start_resycle", fun) == 0) {
+
+ start_resycle();
+ }
+
+ else if (strcmp("start_net_proxy", fun) == 0 ) {
+ opt = parse_args(argc, argv);
+ if(opt.port == 0) {
+ usage(prog);
+ exit(1);
+ }
+ start_net_proxy(opt);
+
+ }
+
+ else if (strcmp("recvfrom", fun) == 0) {
+ opt = parse_args(argc, argv);
+ if(opt.bind == 0) {
+ usage(argv[0]);
+ } else {
+ start_recvfrom(opt.bind, opt.force);
+ }
+
+ }
+ else if (strcmp("start_net_client", fun) == 0) {
+ opt = parse_args(argc, argv);
+ if(opt.sendlist == 0) {
+ fprintf(stderr, "Missing sendlist .\n");
+ usage(argv[0]);
+ exit(1);
+ }
+ if(opt.publist == 0) {
+ fprintf(stderr, "Missing publist.\n");
+ usage(argv[0]);
+ exit(1);
+ }
+ start_net_client(opt.sendlist, opt.publist);
+ }
+ else if (strcmp("one_sendto_many", fun) == 0) {
+ opt = parse_args(argc, argv);
+ if(opt.sendlist == 0) {
+ fprintf(stderr, "Missing sendlist .\n");
+ usage(argv[0]);
+ exit(1);
+ }
+
+ one_sendto_many(opt.sendlist);
+ }
+ else if (strcmp("test_net_sendandrecv", fun) == 0) {
+ opt = parse_args(argc, argv);
+ if(opt.sendlist == 0) {
+ fprintf(stderr, "Missing sendlist .\n");
+ usage(argv[0]);
+ exit(1);
+ }
+
+ test_net_sendandrecv(opt.sendlist);
+ }
+ else if (strcmp("test_net_pub_threads", fun) == 0) {
+ opt = parse_args(argc, argv);
+ if(opt.publist == 0) {
+ fprintf(stderr, "Missing publist .\n");
+ usage(argv[0]);
+ exit(1);
+ }
+
+ test_net_pub_threads(opt.publist);
+ }
+ else if (strcmp("test_net_pub", fun) == 0) {
+ opt = parse_args(argc, argv);
+ if(opt.publist == 0) {
+ fprintf(stderr, "Missing publist .\n");
+ usage(argv[0]);
+ exit(1);
+ }
+
+ test_net_pub(opt.publist);
+ }
+
+ else {
+ printf("%Invalid funciton name\n");
+ usage(argv[0]);
+ exit(1);
+
+ }
+
+ shm_mm_wrapper_destroy();
+
+}
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 0921af3..74b9b33 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -87,19 +87,15 @@
template <typename ELEM_T>
bool SHMQueue<ELEM_T>::bind(int key, bool force) {
-
- hashtable_lock(hashtable);
void *tmp_ptr = hashtable_get(hashtable, key);
if (tmp_ptr == NULL || tmp_ptr == (void *)1 || force) {
queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(mqsize);
hashtable_put(hashtable, key, (void *)queue);
mkey = key;
owner = true;
- hashtable_unlock(hashtable);
return true;
}
- hashtable_unlock(hashtable);
return false;
}
diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp
old mode 100755
new mode 100644
index e435172..a223c0c
--- a/src/shm/hashtable.cpp
+++ b/src/shm/hashtable.cpp
@@ -138,13 +138,32 @@
}
void *hashtable_get(hashtable_t *hashtable, int key) {
+ int rv;
+
+ if((rv = svsem_wait(hashtable->mutex)) != 0) {
+ LoggerFactory::getLogger()->error(errno, "hashtable_get\n");
+ }
void * res = _hashtable_get(hashtable, key);
+
+ if((rv = svsem_post(hashtable->mutex)) != 0) {
+ LoggerFactory::getLogger()->error(errno, "hashtable_get\n");
+ }
return res;
}
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
+ int rv;
+
+ if((rv = svsem_wait(hashtable->mutex)) != 0) {
+ LoggerFactory::getLogger()->error(errno, "hashtable_put\n");
+ }
_hashtable_put(hashtable, key, value);
hashtable->queueCount++;
+
+ if((rv = svsem_post(hashtable->mutex)) != 0) {
+ LoggerFactory::getLogger()->error(errno, "hashtable_put\n");
+ }
+
}
bool hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) {
@@ -249,17 +268,6 @@
return keyset;
}
-
-
-int hashtable_lock(hashtable_t *hashtable) {
- return svsem_wait(hashtable->mutex);
-}
-
-int hashtable_unlock(hashtable_t *hashtable) {
- return svsem_post(hashtable->mutex);
-}
-
-
void hashtable_removeall(hashtable_t *hashtable)
{
tailq_entry_t *item;
@@ -294,7 +302,6 @@
{
return key % MAPSIZE;
- /*printf("hashfun = %ld\n", code);*/
}
/**
diff --git a/src/shm/hashtable.cpp.2 b/src/shm/hashtable.cpp.2
old mode 100755
new mode 100644
diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h
old mode 100755
new mode 100644
index 90043c3..6c3cd27
--- a/src/shm/hashtable.h
+++ b/src/shm/hashtable.h
@@ -33,10 +33,6 @@
void *hashtable_remove(hashtable_t *hashtable, int key);
void hashtable_removeall(hashtable_t *hashtable);
-
-int hashtable_lock(hashtable_t *hashtable);
-int hashtable_unlock(hashtable_t *hashtable);
-
int hashtable_get_queue_count(hashtable_t *hashtable) ;
/**
* 閬嶅巻hash_table
diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp
index 8fff5c2..e55192c 100644
--- a/src/shm/mm.cpp
+++ b/src/shm/mm.cpp
@@ -113,7 +113,6 @@
newsize = ALIGN(size + (SIZE_T_SIZE << 1) + (PTR_SIZE << 1) );
- //fprintf(stderr, "mm_malloc : size=%u\n", size);
/* Search the free list for a fit */
SemUtil::dec(mutex);
if ((ptr = find_fit(newsize)) != NULL)
diff --git a/src/shm/shm_mm.h b/src/shm/shm_mm.h
index 18c5370..63a9e06 100644
--- a/src/shm/shm_mm.h
+++ b/src/shm/shm_mm.h
@@ -30,16 +30,14 @@
template <typename T>
T* shm_mm_attach(int key) {
- void *ptr;
- // T* tptr;
- hashtable_t *hashtable = mm_get_hashtable();
+ void *ptr;
+ // T* tptr;
+ hashtable_t *hashtable = mm_get_hashtable();
ptr = hashtable_get(hashtable, key);
-// printf("shm_mm_malloc_by_key malloc before %d, %p\n", key, ptr);
if(ptr == NULL || ptr == (void *)1 ) {
ptr = mm_malloc(sizeof(T));
hashtable_put(hashtable, key, ptr);
new(ptr) T;
-// printf("shm_mm_malloc_by_key use new %d, %p\n", key, ptr);
}
return (T*)ptr;
}
diff --git a/src/socket/bus_server_socket.cpp b/src/socket/bus_server_socket.cpp
index 657941b..7a45696 100644
--- a/src/socket/bus_server_socket.cpp
+++ b/src/socket/bus_server_socket.cpp
@@ -39,7 +39,6 @@
subscripter_set = map_iter->second;
if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
subscripter_set->erase(set_iter);
-// printf("remove_subscripter %s, %d\n", map_iter->first, key);
count++;
}
}
@@ -201,7 +200,6 @@
subscripter_set = map_iter->second;
for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
send_key = *set_iter;
-// logger->debug("_proxy_pub send before %d \n", send_key);
rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG);
if(rv == 0) {
continue;
@@ -232,26 +230,28 @@
char resp_buf[128];
bus_head_t head;
+ int rv;
+ char send_buf[512] = { 0x00 };
+
const char *topic_delim = ",";
-// logger.debug("_run_proxy_ server receive before\n");
+
while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
-// logger.debug("_run_proxy_ server recvfrom %d after: %s \n", key, buf);
head = ShmModSocket::decode_bus_head(buf);
topics = buf + BUS_HEAD_SIZE;
action = head.action;
-// logger.debug("_run_proxy_ : %s\n", action);
+
if(strcmp(action, "sub") == 0) {
// 璁㈤槄鏀寔澶氫富棰樿闃�
topic = strtok(topics, topic_delim);
-// logger.debug("_run_proxy_ topic = %s\n", topic);
while(topic) {
+
_proxy_sub(trim(topic, 0), key);
topic = strtok(NULL, topic_delim);
}
}
else if(strcmp(action, "desub") == 0) {
-// logger.debug("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
+
if(strcmp(trim(topics, 0), "") == 0) {
// 鍙栨秷鎵�鏈夎闃�
_proxy_desub_all(key);
@@ -259,6 +259,7 @@
topic = strtok(topics, topic_delim);
while(topic) {
+
_proxy_desub(trim(topic, 0), key);
topic = strtok(NULL, topic_delim);
}
@@ -270,7 +271,16 @@
_proxy_pub(topics, content, head.content_size, key);
}
- else if(strcmp(action, "stop") == 0) {
+ else if (strncmp(buf, "request", strlen("request")) == 0) {
+ sprintf(send_buf, "%4d", key);
+ strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4));
+
+ rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key);
+ if(rv != 0) {
+ logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n");
+ }
+ }
+ else if(strcmp(action, "stop") == 0) {
logger->info( "Stopping Bus...");
free(buf);
break;
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 466d0b5..abd9477 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -10,7 +10,6 @@
}
ShmModSocket::~ShmModSocket() {
- // logger->debug("Close ShmModSocket...\n");
struct timespec timeout = {1, 0};
if(bus_set != NULL) {
for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
@@ -216,6 +215,7 @@
int buf_size;
char *buf;
int max_buf_size;
+ void *buf_ptr;
if((buf = (char *) malloc(MAXBUF)) == NULL) {
LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
exit(1);
@@ -234,13 +234,15 @@
}
}
- memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE);
+ buf_ptr = ShmModSocket::encode_bus_head(request_head);
+ memcpy(buf, buf_ptr, BUS_HEAD_SIZE);
if(topic_size != 0 )
memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size);
if(content_size != 0)
memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size);
*retbuf = buf;
+ free(buf_ptr);
return buf_size;
}
@@ -259,7 +261,7 @@
tmp_ptr += sizeof(head.action);
PUT(tmp_ptr, htonl(head.topic_size));
- tmp_ptr += 4;
+ tmp_ptr += sizeof(head.topic_size);
PUT(tmp_ptr, htonl(head.content_size));
return headbs;
@@ -274,7 +276,7 @@
tmp_ptr += sizeof(head.action);
head.topic_size = ntohl(GET(tmp_ptr));
- tmp_ptr += 4;
+ tmp_ptr += sizeof(head.topic_size);
head.content_size = ntohl(GET(tmp_ptr));
return head;
diff --git a/src/socket/shm_mod_socket.h b/src/socket/shm_mod_socket.h
index 0c65f52..9890aef 100644
--- a/src/socket/shm_mod_socket.h
+++ b/src/socket/shm_mod_socket.h
@@ -11,7 +11,7 @@
#include <set>
#include "socket_def.h"
-#define BUS_HEAD_SIZE (64 + 2 * sizeof(uint32_t))
+#define BUS_HEAD_SIZE sizeof(bus_head_t)
class BusServerSocket;
struct bus_head_t
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 94b3fdd..978eda9 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -41,21 +41,16 @@
static LockFreeQueue<shm_packet_t> * shm_socket_bind_queue(int key, bool force) {
hashtable_t *hashtable = mm_get_hashtable();
LockFreeQueue<shm_packet_t> *queue;
- hashtable_lock(hashtable);
void *tmp_ptr = hashtable_get(hashtable, key);
-
if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) {
- queue = new LockFreeQueue<shm_packet_t>(16);
+ queue = new LockFreeQueue<shm_packet_t>(32);
hashtable_put(hashtable, key, (void *)queue);
- hashtable_unlock(hashtable);
return queue;
} else if(force) {
- hashtable_unlock(hashtable);
return (LockFreeQueue<shm_packet_t> *) tmp_ptr;
}
- hashtable_unlock(hashtable);
return NULL;
}
@@ -67,7 +62,6 @@
hashtable_t *hashtable = mm_get_hashtable();
void *tmp_ptr = hashtable_get(hashtable, key);
if (tmp_ptr == NULL || tmp_ptr == (void *)1) {
- //logger->error("shm_socket._remote_queue_attach锛歝onnet at key %d failed!", key);
return NULL;
}
@@ -112,23 +106,33 @@
static int _shm_socket_close_(shm_socket_t *sockt) {
- int rv;
+ int rv, i;
+ hashtable_t *hashtable = mm_get_hashtable();
logger->debug("shm_socket_close\n");
- // hashtable_remove(hashtable, mkey);
- // if(sockt->queue != NULL) {
- // sockt->queue = NULL;
- // }
-
- if(sockt->key != 0) {
- auto it = shmQueueStMap->find(sockt->key);
- if(it != shmQueueStMap->end()) {
- it->second.status = SHM_QUEUE_ST_CLOSED;
- it->second.closeTime = time(NULL);
- }
- }
-
+ // if(sockt->key != 0) {
+ // auto it = shmQueueStMap->find(sockt->key);
+ // if(it != shmQueueStMap->end()) {
+ // it->second.status = SHM_QUEUE_ST_CLOSED;
+ // it->second.closeTime = time(NULL);
+ // }
+ // }
+
+
+
+ if(sockt->queue != NULL) {
+ sockt->queue->close();
+ for( i = 0; i < sockt->queue->size(); i++) {
+ mm_free((*(sockt->queue))[i].buf);
+ logger->info("======= %d free queue element buf\n", sockt->key);
+ }
+ sleep(1);
+
+ hashtable_remove(hashtable, sockt->key);
+ // sockt->queue = NULL;
+ }
+
pthread_mutex_destroy(&(sockt->mutex) );
free(sockt);
return 0;
@@ -168,8 +172,6 @@
int shm_socket_get_key(shm_socket_t *sockt){
return sockt->key;
}
-
-
// 鐭繛鎺ユ柟寮忓彂閫�
int shm_sendto(shm_socket_t *sockt, const void *buf, const int size,
@@ -234,7 +236,7 @@
shm_packet_t sendpak = {0};
sendpak.key = sockt->key;
sendpak.size = sendsize;
- memcpy(sendpak.uuid, recvpak.uuid, sizeof sendpak.uuid);
+ memcpy(sendpak.uuid, recvpak.uuid, sizeof(sendpak.uuid));
if(sendbuf !=NULL && sendsize > 0) {
sendpak.buf = mm_malloc(sendsize);
memcpy(sendpak.buf, sendbuf, sendsize);
@@ -264,11 +266,14 @@
}
-
- if(buf != NULL && recvpak.buf != NULL) {
- void *_buf = malloc(recvpak.size);
- memcpy(_buf, recvpak.buf, recvpak.size);
- *buf = _buf;
+ if(recvpak.buf != NULL) {
+ if (buf == NULL) {
+ logger->warn("!!!Alert: buf should be not NULL!\n");
+ } else {
+ void *_buf = malloc(recvpak.size);
+ memcpy(_buf, recvpak.buf, recvpak.size);
+ *buf = _buf;
+ }
}
if(size != NULL)
@@ -372,7 +377,7 @@
logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid);
if(strlen(recvpak.uuid) == 0) {
continue;
- } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) {
+ } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof(recvpak.uuid)) == 0) {
// 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛
goto LABLE_SUC;
} else {
@@ -406,6 +411,8 @@
int rv = 0, tryn = 16;
+ static int Counter_suc = 0;
+ static int Counter_fail = 0;
shm_packet_t sendpak;
shm_packet_t recvpak;
std::map<int, shm_packet_t>::iterator recvbufIter;
@@ -422,7 +429,6 @@
if (tmp_socket == NULL)
{
/* If first call from this thread, allocate buffer for thread, and save its location */
- logger->debug("%lu create threadlocal socket\n", (long)pthread_self() );
tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM);
rv = pthread_setspecific(_localthread_socket_key_, tmp_socket);
@@ -449,7 +455,6 @@
recvbufIter = tmp_socket->recvbuf2.find(key);
if(recvbufIter != tmp_socket->recvbuf2.end()) {
// 鍦ㄧ紦瀛橀噷鏌ュ埌浜唊ey鍖归厤鎴愬姛鐨�
- // logger->info("get from recvbuf: %d", key);
recvpak = recvbufIter->second;
tmp_socket->recvbuf2.erase(recvbufIter);
goto LABLE_SUC;
@@ -462,7 +467,7 @@
return rv;
}
- if (key == recvpak.key) {
+ if (key == recvpak.key) {
// 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛
goto LABLE_SUC;
} else {
@@ -596,10 +601,16 @@
if (remoteQueue == NULL ) {
goto ERR_CLOSED;
+ } else if(remoteQueue->isClosed()) {
+ goto ERR_CLOSED;
}
sendpak->key = sockt->key;
rv = remoteQueue->push(*sendpak, timeout, flag);
+
+ if(rv != 0) {
+ mm_free(sendpak->buf);
+ }
if(rv == ETIMEDOUT) {
return EBUS_TIMEOUT;
}
diff --git a/systype.sh b/systype.sh
old mode 100755
new mode 100644
diff --git a/test_net_socket/net_mod_socket.sh b/test_net_socket/net_mod_socket.sh
old mode 100755
new mode 100644
diff --git a/test_queue/test_lostdata.sh b/test_queue/test_lostdata.sh
old mode 100755
new mode 100644
diff --git a/test_socket/heart_beat.sh b/test_socket/heart_beat.sh
old mode 100755
new mode 100644
--
Gitblit v1.8.0