From 044e10574fa4e007be408d991861d34ecf22622a Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 22 二月 2021 19:17:37 +0800
Subject: [PATCH] update

---
 shm_util/CMakeLists.txt           |    8 ++
 src/shm/shm_mm_wrapper.h          |    4 
 shm_util/shm_map.cpp              |   54 +++++++++++++
 src/net/net_mod_socket.h          |   10 ++
 src/CMakeLists.txt                |    4 
 src/net/net_mod_server_socket.cpp |    1 
 src/net/net_mod_socket.cpp        |   62 +++++++++++----
 src/socket/shm_socket.cpp         |   46 +++++-----
 8 files changed, 145 insertions(+), 44 deletions(-)

diff --git a/shm_util/CMakeLists.txt b/shm_util/CMakeLists.txt
index c604f55..e58ff5b 100644
--- a/shm_util/CMakeLists.txt
+++ b/shm_util/CMakeLists.txt
@@ -3,4 +3,12 @@
 target_include_directories(shm_util PRIVATE
                             "${PROJECT_BINARY_DIR}"
                              ${EXTRA_INCLUDES}
+                            )
+
+
+add_executable(shm_map shm_map.cpp )
+target_link_libraries(shm_map PRIVATE shm_queue ${EXTRA_LIBS} )
+target_include_directories(shm_map PRIVATE
+                            "${PROJECT_BINARY_DIR}"
+                             ${EXTRA_INCLUDES}
                             )
\ No newline at end of file
diff --git a/shm_util/shm_map.cpp b/shm_util/shm_map.cpp
new file mode 100644
index 0000000..d9dffa4
--- /dev/null
+++ b/shm_util/shm_map.cpp
@@ -0,0 +1,54 @@
+/**
+ * 鏌ョ湅涓庣Щ鍑簁ey鐨勫伐鍏�
+ */
+#include <assert.h>
+#include "net_mod_server_socket_wrapper.h"
+#include "net_mod_socket_wrapper.h"
+#include "bus_server_socket_wrapper.h"
+
+#include "shm_mm_wrapper.h"
+#include "usg_common.h"
+#include <getopt.h>
+#include "logger_factory.h"
+
+
+static void usage(const char *name) {
+	printf("Usage: %s { list }\n", name);
+
+}
+
+
+void list () {
+	ShmQueueStMap * shmQueueStMap =  shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY);
+ 
+  for(auto it = shmQueueStMap->begin(); it != shmQueueStMap->end(); ++it ) {
+  	printf("%10d \t %10d\n", it->first, it->second.status);
+    
+  }
+
+}
+
+
+
+int main(int argc, char *argv[]) {
+	shm_mm_wrapper_init(512);
+
+	int key;
+	int i;
+	if(argc < 2) {
+		usage(argv[0]);
+		return 1;
+	}
+
+	if(strcmp(argv[1], "list") == 0) {
+		list();
+	} else {
+		usage(argv[0]);
+	}
+	 
+
+	shm_mm_wrapper_destroy();
+	
+
+
+}
\ No newline at end of file
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ca0e995..c66f660 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -32,10 +32,10 @@
 if (BUILD_SHARED_LIBS)
   add_library(shm_queue SHARED ${_SOURCES_})
 else()
- add_library(shm_queue STATIC ${_SOURCES_})
+ add_library(shm_queue SHARED ${_SOURCES_})
 endif()
 
-# STATIC
+# STATIC SHARED
 # add_library(shm_queue ${_SOURCES_})
 
 target_include_directories(shm_queue PUBLIC ${EXTRA_INCLUDES} )
diff --git a/src/net/net_mod_server_socket.cpp b/src/net/net_mod_server_socket.cpp
index d7a4c0c..10b0aac 100644
--- a/src/net/net_mod_server_socket.cpp
+++ b/src/net/net_mod_server_socket.cpp
@@ -181,6 +181,7 @@
     }
 
     if( ret != 0) {
+      logger->error("杞彂澶辫触 : NetModServerSocket::process_client sendandrecv to %d , %s", request_head.key,  bus_strerror(ret));
       // 杞彂澶辫触
       response_head.code = ret;
       response_head.content_length = 0;
diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index e689cea..a863f6d 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -150,17 +150,18 @@
 int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
 
-  int i, n, recv_size, connfd;
+  int i,  recv_size, connfd;
   net_node_t *node;
   void *recv_buf = NULL;
   struct timespec timeout;
   int ret;
-  int n_req = 0, n_recv_suc = 0, n_resp =0;
+  int n_req = 0, n_recv_suc = 0, n_recv_err = 0, n_resp =0;
   
   net_mod_request_head_t request_head = {};
    
   net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
 
+  net_mod_recv_err_t err_arr[arrlen];
  
   NetConnPool *mpool = _get_pool();
 
@@ -179,6 +180,7 @@
       } else {
         ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
       }
+
       if( ret == 0) {
         strcpy( ret_arr[n_recv_suc].host, "");
         ret_arr[n_recv_suc].port = 0;
@@ -187,13 +189,11 @@
         ret_arr[n_recv_suc].content_length = recv_size;
         n_recv_suc++;
       } else {
-        if(ret > EBUS_BASE) {
-          // bus_errno = EBUS_TIMEOUT;
-          logger->debug("NetModSocket:: %d _sendandrecv_ to key %d failed, %s", get_key(), node->key, bus_strerror(ret));
-           
-        } else {
-          logger->error(ret, "NetModSocket:: %d _sendandrecv_ to key %d failed", get_key(),  node->key);
-        }
+        err_arr[n_recv_err].port = 0;
+        err_arr[n_recv_err].key = node->key;
+        err_arr[n_recv_err].code = ret;
+        n_recv_err++;
+        logger->error("NetModSocket:: %d _sendandrecv_ to key %d failed, %s", get_key(), node->key, bus_strerror(ret));
       }
 
      
@@ -201,6 +201,11 @@
     }
 
     if( (connfd = mpool->getConn(node->host, node->port)) < 0 ) {
+      memcpy(err_arr[n_recv_err].host, node->host, sizeof(err_arr[n_recv_err].host));
+      err_arr[n_recv_err].port =  node->port;
+      err_arr[n_recv_err].key = node->key;
+      err_arr[n_recv_err].code = EBUS_NET;
+      n_recv_err++;
       continue;
     }
 
@@ -215,6 +220,11 @@
  // printf("write_request %s:%d\n", request_head.host, request_head.port);
     if(write_request(connfd, request_head, send_buf, send_size, NULL, 0) != 0) {
       LoggerFactory::getLogger()->error("write_request failture %s:%d\n", node->host, node->port);
+      memcpy(err_arr[n_recv_err].host, node->host, sizeof(err_arr[n_recv_err].host));
+      err_arr[n_recv_err].port =  node->port;
+      err_arr[n_recv_err].key = node->key;
+      err_arr[n_recv_err].code = EBUS_NET;
+      n_recv_err++;
       mpool->closeConn( connfd);
     } else {
       n_req++;
@@ -240,19 +250,25 @@
         {
           mpool->nready--;
 // printf("POLLIN %d\n", connfd);
-          if( (n = read_response(connfd, ret_arr+n_recv_suc)) == 0) {
+          if( (ret = read_response(connfd, ret_arr+n_recv_suc, err_arr + n_recv_err)) == 0) {
             n_recv_suc++;
             // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅
             mpool->conns[i].fd = -1;
             
           }
-          else if(n == EBUS_NET)  {
+          else if(ret == EBUS_NET)  {
             // 缃戠粶閿欒
+
+            logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(),  bus_strerror(ret));
             mpool->closeConn( connfd);
+            n_recv_err++;
             // mpool->conns[i].fd = -1;
           } else {
             // 浠g悊鏈嶅姟娌℃湁杞彂鎴愬姛
-             mpool->conns[i].fd = -1;
+             
+            logger->error("NetModSocket::_sendandrecv_ read_response key = %d , %s", get_key(),  bus_strerror(ret));
+            mpool->conns[i].fd = -1;
+            n_recv_err++;
           }
 
           n_resp++;
@@ -337,6 +353,8 @@
   int ret;
   NetConnPool *mpool = _get_pool();
 
+  net_mod_recv_err_t err_msg;
+
   // 鏈湴鍙戦��
   if(node_arr == NULL || arrlen == 0) {
     if(msec == 0) {
@@ -410,7 +428,7 @@
         {
           mpool->nready--;
 // printf("POLLIN %d\n", connfd);
-          if( (ret = read_response(connfd, &recv_msg)) == 0) {
+          if( (ret = read_response(connfd, &recv_msg, &err_msg)) == 0) {
             
             // 鎴愬姛鏀跺埌杩斿洖娑堟伅锛屾竻绌鸿鍏ヤ綅
             mpool->conns[i].fd = -1;
@@ -641,7 +659,7 @@
  * @return 0 鎴愬姛,   EBUS_NET 缃戠粶閿欒锛� 鍏朵粬鍊� 浠g悊鏈嶅姟娌℃湁杞彂鎴愬姛銆�
  *
  */
-int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg) {
+int NetModSocket::read_response(int connfd, net_mod_recv_msg_t *recv_msg, net_mod_recv_err_t *err_arr) {
   int recv_size;
   void *recv_buf;
   char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
@@ -649,14 +667,21 @@
   net_mod_response_head_t response_head;
   if ( rio_readn(connfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) !=  NET_MODE_RESPONSE_HEAD_LENGTH) {
     LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response  rio_readnb response_head");
-    
-    return -1;
+    memcpy(err_arr->host, "unkown", sizeof(err_arr->host));
+    err_arr->port =  0;
+    err_arr->key = 0;
+    err_arr->code = EBUS_NET;
+    return EBUS_NET;
   }
 
   response_head =  NetModSocket::decode_response_head(response_head_bs);
 // printf(">>>> read_response %s\n", response_head.host);
   if(response_head.code != 0) {
     // 浠g悊鏈嶅姟娌¤兘鎴愬姛鍙戦�佺粰瀵瑰簲鐨刱ey
+    memcpy(err_arr->host, response_head.host, sizeof(err_arr->host));
+    err_arr->port =  response_head.port;
+    err_arr->key = response_head.key;
+    err_arr->code = response_head.code;
     return response_head.code;
   }
 
@@ -666,6 +691,11 @@
     exit(1);
   }
   if ( (recv_size = rio_readn(connfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
+
+    memcpy(err_arr->host, response_head.host, sizeof(err_arr->host));
+    err_arr->port =  response_head.port;
+    err_arr->key = response_head.key;
+    err_arr->code = EBUS_NET;
     LoggerFactory::getLogger()->error(errno, "NetModSocket::read_response  rio_readnb recv_buf");
     //缃戠粶閿欒
     return EBUS_NET;
diff --git a/src/net/net_mod_socket.h b/src/net/net_mod_socket.h
index 9eb0a23..68f5294 100644
--- a/src/net/net_mod_socket.h
+++ b/src/net/net_mod_socket.h
@@ -55,6 +55,14 @@
   
 };
 
+struct net_mod_recv_err_t
+{
+  char host[NI_MAXHOST];
+  int port;
+  int key;
+  int code;  
+};
+
 class NetModSocket {
  
 
@@ -83,7 +91,7 @@
   NetConnPool* _get_pool();
 
   //璇诲彇杩斿洖淇℃伅
-  int read_response(int clientfd, net_mod_recv_msg_t *recv_msg);
+  int read_response(int clientfd, net_mod_recv_msg_t *recv_msg, net_mod_recv_err_t *err_arr);
   // 鍙戦�佽姹備俊鎭�
   int write_request(int clientfd, net_mod_request_head_t &request_head, const void *send_buf, int send_size, const void *topic_buf, int topic_size);
 
diff --git a/src/shm/shm_mm_wrapper.h b/src/shm/shm_mm_wrapper.h
index 48cfc83..cf353b8 100644
--- a/src/shm/shm_mm_wrapper.h
+++ b/src/shm/shm_mm_wrapper.h
@@ -26,8 +26,8 @@
 void shm_mm_wrapper_destroy();
 
 /**
- * @brief 鍥炴敹鏍囪涓哄垹闄ょ殑闃熷垪
- * @return 閿欒鐮�
+ * @brief 鍥炴敹鏍囪涓哄垹闄ょ殑闃熷垪銆備綔涓轰竴涓崟鐙殑杩涚▼杩愯銆�
+ * @return 鍙湁鍑洪敊鐨勬椂鍊欐墠浼氳繑鍥為敊璇�
  */
 int shm_mm_wrapper_start_resycle() ;
 
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 7e33cc6..d8304fd 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -25,7 +25,7 @@
 static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag);
 
    
-static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak,
+static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak,
                const int key, const struct timespec *timeout, const int flag);
 
 
@@ -114,14 +114,11 @@
   
   int rv;
   logger->debug("shm_socket_close\n");
+  // hashtable_remove(hashtable, mkey);
   // if(sockt->queue != NULL) {
-  //   delete sockt->queue;
   //   sockt->queue = NULL;
   // }
 
-  // hashtable_remove(hashtable, mkey);
-
- 
   if(sockt->key != 0) {
     auto it =  shmQueueStMap->find(sockt->key);
     if(it != shmQueueStMap->end()) {
@@ -257,12 +254,8 @@
 
   if (rv != 0) {
 
-    if(rv == ETIMEDOUT)
-      return EBUS_TIMEOUT;
-    else {
-      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
-      return rv;
-    }
+    logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
+    return rv;
    
   } 
 
@@ -279,6 +272,9 @@
   if(key != NULL)
     *key = recvpak.key;
 
+  if(recvpak.key == 0) {
+    err_exit(0, "key = %d, pid= %d, recvpak.key == 0",  shm_socket_get_key(sockt), getpid());
+  }
   mm_free(recvpak.buf);
   return 0;
 }
@@ -402,15 +398,13 @@
                     int *recv_size,  const struct timespec *timeout,  int flags) {
   
  
-  int rv, tryn = 6;
+  int rv = 0, tryn = 16;
   shm_packet_t sendpak;
   shm_packet_t recvpak;
   std::map<int, shm_packet_t>::iterator recvbufIter;
   // 鐢╰hread local 淇濊瘉姣忎釜绾跨▼鐢ㄤ竴涓嫭鍗犵殑socket鎺ュ彈瀵规柟杩斿洖鐨勪俊鎭�
   shm_socket_t *tmp_socket;
  
-  /* If first call from this thread, allocate buffer for thread, and save its location */
-  // logger->debug("%d create tmp socket\n", pthread_self() );
   rv = pthread_once(&_once_, _create_socket_key_perthread);
   if (rv != 0) {
     logger->error(rv, "shm_sendandrecv pthread_once");
@@ -431,7 +425,6 @@
     }
   }
  
-
   sendpak.key = tmp_socket->key;
   sendpak.size = send_size;
   if(send_buf != NULL) {
@@ -458,11 +451,6 @@
     rv = shm_recvpakfrom(tmp_socket, &recvpak, timeout, flags);
 
     if (rv != 0) {
-
-      if(rv == ETIMEDOUT) {
-        return EBUS_TIMEOUT;
-      }
-
       logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(tmp_socket), bus_strerror(rv));
       return rv;
     } 
@@ -538,7 +526,7 @@
  
 
    
-static int shm_sendpakto(shm_socket_t *sockt, const shm_packet_t *sendpak,
+static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak,
                const int key, const struct timespec *timeout, const int flag) {
 
   int rv;
@@ -564,6 +552,8 @@
       sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
       if(sockt->queue  == NULL ) {
         logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
+        if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
+          err_exit(rv, "shm_sendto : pthread_mutex_unlock");
         return EBUS_KEY_INUSED;
       }
 
@@ -601,7 +591,11 @@
     goto ERR_CLOSED;
   }
 
+  sendpak->key = sockt->key;
   rv = remoteQueue->push(*sendpak, timeout, flag);
+  if(rv == ETIMEDOUT) {
+    return EBUS_TIMEOUT;
+  }
   return rv;
 
 ERR_CLOSED:
@@ -635,6 +629,8 @@
     sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
     if(sockt->queue  == NULL ) {
       logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
+      if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
+        err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
       return EBUS_KEY_INUSED;
     }
     
@@ -661,14 +657,18 @@
   // }
  
   rv = sockt->queue->pop(recvpak, timeout, flag);
-  if(rv != 0) 
+  if(rv != 0) {
+    if(rv == ETIMEDOUT) {
+      return EBUS_TIMEOUT;
+    }
     return rv;
+  }
 
   
   if(recvpak.action == BUS_ACTION_STOP) {
     return EBUS_STOPED;
   }
   *_recvpak = recvpak;
-  return rv;
+  return 0;
 }
  

--
Gitblit v1.8.0