From b750d359d742349be27463a5408f352aa042fdec Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 15 一月 2021 15:59:31 +0800
Subject: [PATCH] update

---
 src/queue/lock_free_queue.h           |   11 
 src/socket/net_mod_socket.h           |    4 
 src/socket/shm_socket.h               |    6 
 test/test1.cpp                        |   33 ++++
 CMakeLists.txt                        |    1 
 src/bus_error.cpp                     |   10 
 src/socket/shm_socket.cpp             |   39 ++--
 /dev/null                             |   12 -
 src/queue/shm_queue.h                 |   26 +-
 src/socket/net_mod_socket_wrapper.cpp |   40 ++--
 src/logger_factory.cpp                |   12 +
 src/bus_error.h                       |    8 
 src/socket/shm_mod_socket.cpp         |    2 
 test/CMakeLists.txt                   |   11 +
 src/logger_factory.h                  |    1 
 src/socket/net_mod_socket.cpp         |  218 +++++++++++----------------
 16 files changed, 220 insertions(+), 214 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 7c64d13..00acf78 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -20,4 +20,5 @@
 list(APPEND EXTRA_LIBS ${PROJECT_SOURCE_DIR}/lib/libusgcommon.a pthread)
 
 add_subdirectory(${PROJECT_SOURCE_DIR}/src)
+add_subdirectory(${PROJECT_SOURCE_DIR}/test)
 add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
\ No newline at end of file
diff --git a/src/bus_error.cpp b/src/bus_error.cpp
index c2dd63a..6cee508 100644
--- a/src/bus_error.cpp
+++ b/src/bus_error.cpp
@@ -41,9 +41,9 @@
 char *
 bus_strerror(int err)
 {
-  int s;
+  int s, eindex;
   char *buf;
-
+  eindex = err - 10000;
   /* Make first caller allocate key for thread-specific data */
 
   s = pthread_once(&once, createKey);
@@ -64,13 +64,13 @@
       err_exit(s, "pthread_setspecific");
   }
 
-  if (err < 0 || err >= _bus_nerr || _bus_errlist[err] == NULL)
+  if (eindex < 0 || eindex >= _bus_nerr || _bus_errlist[eindex] == NULL)
   {
-    snprintf(buf, MAX_ERROR_LEN, "Unknown error %d", err);
+    snprintf(buf, MAX_ERROR_LEN, "Unknown error %d", eindex);
   }
   else
   {
-    strncpy(buf, _bus_errlist[err], MAX_ERROR_LEN - 1);
+    strncpy(buf, _bus_errlist[eindex], MAX_ERROR_LEN - 1);
     buf[MAX_ERROR_LEN - 1] = '\0';          /* Ensure null termination */
   }
 
diff --git a/src/bus_error.h b/src/bus_error.h
index ba4fc59..a951e89 100644
--- a/src/bus_error.h
+++ b/src/bus_error.h
@@ -4,10 +4,10 @@
 
 
 
-
-#define EBUS_TIMEOUT 1
-#define EBUS_CLOSED 2
-#define ESHM_BUS_KEY_INUSED 3
+#define EBUS_BASE 10000
+#define EBUS_TIMEOUT 10001
+#define EBUS_CLOSED 10002
+#define ESHM_BUS_KEY_INUSED 10003
 
 extern int bus_errno;
 
diff --git a/src/logger_factory.cpp b/src/logger_factory.cpp
index 9d445fe..4fa978e 100644
--- a/src/logger_factory.cpp
+++ b/src/logger_factory.cpp
@@ -1,4 +1,5 @@
 #include "logger_factory.h"
+#include "bus_error.h"
 
 Logger * LoggerFactory::logger = NULL;
 
@@ -19,4 +20,15 @@
 	config.console = 1;
 	logger = new Logger(config);
 	return logger;
+}
+
+void  LoggerFactory::error(int s) {
+	Logger* logger = LoggerFactory::getLogger();
+	if(s == EBUS_TIMEOUT) {
+    logger->error("shm_recvfrom  failed, %s", bus_strerror(EBUS_TIMEOUT));
+   
+  } else {
+    logger->error(s, "shm_recvfrom  failed!");
+    
+  }
 }
\ No newline at end of file
diff --git a/src/logger_factory.h b/src/logger_factory.h
index e736e64..37eaefd 100644
--- a/src/logger_factory.h
+++ b/src/logger_factory.h
@@ -10,6 +10,7 @@
 public:
 
 	static Logger* getLogger();
+	static void error(int s);
 };
 
 #endif
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 4c55f7b..bb0bfb5 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -343,19 +343,20 @@
     template <typename T, typename AT> class Q_TYPE>
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
 {
-// LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec,  ts->tv_nsec );
 
-    LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");   
+    // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");   
     struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
-// LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec,  timeout.tv_nsec );
 
     while (sem_timedwait(&items, &timeout) == -1) {
-        if (errno == ETIMEDOUT)
+        // LoggerFactory::getLogger()->error(errno, "1 LockFreeQueue pop_timeout %d %d", errno, ETIMEDOUT);
+        if (errno == ETIMEDOUT) {
+             // LoggerFactory::getLogger()->error(errno, "2 LockFreeQueue pop_timeout %d %d", errno, EBUS_TIMEOUT);
             return EBUS_TIMEOUT;
+        }
         else if(errno == EINTR)
             continue;
         else {
-          LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno);
+          LoggerFactory::getLogger()->error(errno, "3  LockFreeQueue pop_timeout %d", errno);
           return errno;
         }
     }
diff --git a/src/queue/shm_queue.h b/src/queue/shm_queue.h
index 8a23da1..64d4600 100644
--- a/src/queue/shm_queue.h
+++ b/src/queue/shm_queue.h
@@ -28,13 +28,12 @@
   inline bool full();
   inline bool empty();
 
-  inline bool push(const ELEM_T &a_data);
-  inline bool push_nowait(const ELEM_T &a_data);
-  inline bool push_timeout(const ELEM_T &a_data,
-                           const struct timespec *timeout);
-  inline bool pop(ELEM_T &a_data);
-  inline bool pop_nowait(ELEM_T &a_data);
-  inline bool pop_timeout(ELEM_T &a_data, struct timespec *timeout);
+  inline int push(const ELEM_T &a_data);
+  inline int push_nowait(const ELEM_T &a_data);
+  inline int push_timeout(const ELEM_T &a_data, const struct timespec *timeout);
+  inline int pop(ELEM_T &a_data);
+  inline int pop_nowait(ELEM_T &a_data);
+  inline int pop_timeout(ELEM_T &a_data, struct timespec *timeout);
 
   inline ELEM_T &operator[](unsigned i);
 
@@ -167,23 +166,23 @@
 }
 
 template <typename ELEM_T>
-inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
+inline int SHMQueue<ELEM_T>::push(const ELEM_T &a_data) {
   return queue->push(a_data);
 }
 
 template <typename ELEM_T>
-inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
+inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
   return queue->push_nowait(a_data);
 }
 
 template <typename ELEM_T>
-inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data,
+inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data,
                                            const struct timespec *timeout) {
 
   return queue->push_timeout(a_data, timeout);
 }
 
-template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
+template <typename ELEM_T> inline int SHMQueue<ELEM_T>::pop(ELEM_T &a_data) {
   // printf("SHMQueue pop before\n");
   int rv = queue->pop(a_data);
   // printf("SHMQueue after before\n");
@@ -191,13 +190,12 @@
 }
 
 template <typename ELEM_T>
-inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
+inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
   return queue->pop_nowait(a_data);
 }
 
 template <typename ELEM_T>
-inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data,
-                                          struct timespec *timeout) {
+inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
   return queue->pop_timeout(a_data, timeout);
 }
 
diff --git a/src/socket/net_mod_socket.cpp b/src/socket/net_mod_socket.cpp
index 7eaa906..455c0a5 100644
--- a/src/socket/net_mod_socket.cpp
+++ b/src/socket/net_mod_socket.cpp
@@ -180,7 +180,16 @@
         ret_arr[n_recv_suc].content = recv_buf;
         ret_arr[n_recv_suc].content_length = recv_size;
         n_recv_suc++;
+      } else {
+        if(ret > EBUS_BASE) {
+          // bus_errno = EBUS_TIMEOUT;
+          logger->debug("NetModSocket:: %d _sendandrecv_ to key %d failed, %s", get_key(), node->key, bus_strerror(ret));
+           
+        } else {
+          logger->error(ret, "NetModSocket:: %d _sendandrecv_ to key %d failed", get_key(),  node->key);
+        }
       }
+
      
       continue;
     }
@@ -443,147 +452,61 @@
   return n_pub_suc;
 }
 
-
-int NetModSocket::sendandrecv_safe(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
-  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
-
-  int i,  clientfd;
-  net_node_t *node;
-  void *recv_buf;
-  int recv_size;
-  char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
-  net_mod_request_head_t request_head = {};
-  net_mod_response_head_t response_head;
- 
-   
-  char portstr[32];
-  char *buf = NULL;
-  int buf_size, max_buf_size;
- 
-  if(buf == NULL) {
-    buf = (char *)malloc(MAXBUF);
-    max_buf_size = MAXBUF;
-    LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv malloc");
-  }
-
-  int nsuc = 0;
-  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
-  
-  for (i = 0; i< arrlen; i++) {
-
-    node = &node_arr[i];
-    if(node->host == NULL) {
-      // 鏈湴鍙戦��
-      shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
-      goto LABEL_ARR_PUSH;
-    }
-
-    sprintf(portstr, "%d", node->port);
-    clientfd = open_clientfd(node->host, portstr);
-    if(clientfd < 0) {
-      continue;
-    }
-
-    buf_size = send_size + NET_MODE_REQUEST_HEAD_LENGTH;
-    if(max_buf_size < buf_size) {
-      if((buf = (char *)realloc(buf, buf_size)) == NULL) {
-        LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe realloc buf");
-      } else {
-        max_buf_size = buf_size;
-      }
-      
-    }
-    
-
-    request_head.mod = REQ_REP;
-    request_head.key = node->key;
-    request_head.content_length = send_size;
-    request_head.topic_length = 0;
-
-    // optval = 1;
-    // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
-    memcpy(buf, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH);
-    memcpy(buf + NET_MODE_REQUEST_HEAD_LENGTH, send_buf, send_size);
-
-
-    if(rio_writen(clientfd, buf, buf_size) != buf_size ) {
-      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_writen  buf");
-     
-      close(clientfd);
-      continue;
-    }
-    // optval = 0;
-    // setsockopt(clientfd, IPPROTO_TCP, TCP_CORK, &optval, sizeof(optval));
-
-    if ( rio_readn(clientfd, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) !=  NET_MODE_RESPONSE_HEAD_LENGTH) {
-      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_readnb response_head_bs");
-     
-      close(clientfd);
-      continue;
-    }
-
-    response_head =  NetModSocket::decode_response_head(response_head_bs);
-    if(response_head.code != 0) {
-      continue;
-    }
-
-    recv_buf = malloc(response_head.content_length);
-    if(recv_buf == NULL) {
-      LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc");
-      exit(1);
-    }
-    if ( (recv_size = rio_readn(clientfd, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
-      LoggerFactory::getLogger()->error(errno, "NetModSocket::sendandrecv_safe  rio_readnb recv_buf");
-      
-      close(clientfd);
-      continue;
-    }
-
-LABEL_ARR_PUSH:
-    if(node->host != NULL) {
-      strcpy(ret_arr[nsuc].host, node->host);
-    } else {
-      strcpy(ret_arr[nsuc].host, "local");
-    }
-   
-    ret_arr[nsuc].port = node->port;
-    ret_arr[nsuc].key = node->key;
-    ret_arr[nsuc].content = recv_buf;
-    ret_arr[nsuc].content_length = recv_size;
-
-    nsuc++;
-  }
-
-  *recv_arr = ret_arr;
-  if(recv_arr_size != NULL) {
-    *recv_arr_size = nsuc;
-  }
-
-  free(buf);
-  return nsuc;
-     
-}
-
-
-
 /**
  * 鍙戦�佷俊鎭�
  * @key 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
  */
 int NetModSocket::sendto(const void *buf, const int size, const int key){
-  return shmModSocket.sendto(buf, size, key);
+  int rv = shmModSocket.sendto(buf, size, key);
+  if(rv == 0) {
+    logger->debug("NetModSocket::sendto: %d sendto %d success.\n", get_key(), key);
+    return 0;
+  }
+
+  if(rv > EBUS_BASE) {
+    // bus_errno = EBUS_TIMEOUT;
+    logger->debug("NetModSocket::sendto: %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
+  } else {
+    logger->error(rv, "NetModSocket::sendto : %d sendto  %d failed", get_key(), key);
+  }
+  return rv;
 }
 
 // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
 int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
   struct timespec timeout = {sec, nsec};
-  return shmModSocket.sendto_timeout(buf, size, key, &timeout);
+  int rv = shmModSocket.sendto_timeout(buf, size, key, &timeout);
+  if(rv == 0) {
+    logger->debug("NetModSocket::sendto_timeout: %d sendto %d success.\n", get_key(), key);
+    return 0;
+  }
+
+  if(rv > EBUS_BASE) {
+    // bus_errno = EBUS_TIMEOUT;
+    logger->debug("NetModSocket::sendto_timeout : %d sendto  %d failed %s", get_key(),  key, bus_strerror(rv));
+  } else {
+    logger->error(rv, "NetModSocket::sendto_timeout:  %d sendto  %d failed", get_key(),  key);
+  }
+  return rv;
 }
 
 // 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
 int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
-  return shmModSocket.sendto_nowait(buf, size, key);
+  int rv = shmModSocket.sendto_nowait(buf, size, key);
+  if(rv == 0) {
+    logger->debug("NetModSocket::sendto_nowait: %d sendto %d success.\n", get_key(), key);
+    return 0;
+  }
+
+  if(rv > EBUS_BASE) {
+    // bus_errno = EBUS_TIMEOUT;
+    logger->debug("NetModSocket::sendto_nowait %d sendto  %d failed %s", get_key(), key, bus_strerror(rv));
+     
+  } else {
+    logger->error(rv, "NetModSocket::sendto_nowait %d sendto  %d failed", get_key(), key);
+  }
+  return rv;
 }
 
 /**
@@ -592,16 +515,53 @@
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
 int NetModSocket::recvfrom(void **buf, int *size, int *key) {
-  return shmModSocket.recvfrom(buf, size, key);
+  int rv = shmModSocket.recvfrom(buf, size, key);
+
+  if(rv == 0) {
+    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
+    return 0;
+  }
+
+  if(rv > EBUS_BASE) {
+    logger->debug("NetModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
+  } else {
+    logger->error(rv, "NetModSocket::recvfrom: socket %d recvfrom failed",  get_key());
+  }
+  return rv;
 }
+
 // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
 int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
   struct timespec timeout = {sec, nsec};
-  return shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
+  int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
+  if(rv == 0) {
+    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
+    return 0;
+  }
+
+  if(rv > EBUS_BASE) {
+    // bus_errno = EBUS_TIMEOUT;
+    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
+  } else {
+    logger->error(rv, "NetModSocket::recvfrom_timeout:  %d recvfrom failed",  get_key());
+  }
+  return rv;
 }
 
 int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
-  return shmModSocket.recvfrom_nowait(buf, size, key);
+  int rv = shmModSocket.recvfrom_nowait(buf, size, key);
+  if(rv == 0) {
+    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
+    return 0;
+  }
+
+  if(rv > EBUS_BASE) {
+    // bus_errno = EBUS_TIMEOUT;
+    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom failed %s", get_key(), bus_strerror(rv));
+  } else {
+    logger->error(rv, "NetModSocket::recvfrom_nowait:  %d recvfrom failed",  get_key());
+  }
+  return rv;
 }
 
 /**
diff --git a/src/socket/net_mod_socket.h b/src/socket/net_mod_socket.h
index 19bb2b2..612f784 100644
--- a/src/socket/net_mod_socket.h
+++ b/src/socket/net_mod_socket.h
@@ -157,8 +157,8 @@
    * 缂虹偣锛氶樆濉炵殑锛屾�ц兘涓嶅sendandrecv
    * 
    */
-  int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
-    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
+  // int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
+  //   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
 
  
   /**
diff --git a/src/socket/net_mod_socket_wrapper.cpp b/src/socket/net_mod_socket_wrapper.cpp
index 4752db4..cfda368 100644
--- a/src/socket/net_mod_socket_wrapper.cpp
+++ b/src/socket/net_mod_socket_wrapper.cpp
@@ -2,6 +2,7 @@
 
 
 
+static Logger *logger = LoggerFactory::getLogger();
 
 /**
  * 鍒涘缓
@@ -27,37 +28,41 @@
  * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int net_mod_socket_bind(void * _socket, int port){
+int net_mod_socket_bind(void * _socket, int key){
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->bind(port);
+	return sockt->bind(key);
 }
 
 /**
  * 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int net_mod_socket_force_bind(void * _socket, int port) {
+int net_mod_socket_force_bind(void * _socket, int key) {
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->force_bind(port);
+	return sockt->force_bind(key);
 }
+
 /**
  * 鍙戦�佷俊鎭�
- * @port 鍙戦�佺粰璋�
+ * @key 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
  */
-int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int port) {
+int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) {
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->sendto(buf, size, port);
+	logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
+	return sockt->sendto(buf, size, key);
 }
 // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int port, int sec, int nsec){
+int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->sendto_timeout(buf, size, port, sec, nsec);
+	logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
+	return sockt->sendto_timeout(buf, size, key, sec, nsec);
 }
 // 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int port){
+int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->sendto_nowait(buf, size, port);
+	logger->debug("net_mod_socket_sendto: %d sendto  %d", net_mod_socket_get_key(_socket), key);
+	return sockt->sendto_nowait(buf, size, key);
 }
 
 /**
@@ -65,18 +70,19 @@
  * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *port){
+int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->recvfrom(buf, size, port);
+	return sockt->recvfrom(buf, size, key);
 }
 // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *port, int sec, int nsec){
+int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->recvfrom_timeout(buf, size, port, sec, nsec);
+	//return sockt->recvfrom(buf, size, key);
+	return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
 }
-int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *port){
+int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){
 	NetModSocket *sockt = (NetModSocket *)_socket;
-	return sockt->recvfrom_nowait(buf, size, port);
+	return sockt->recvfrom_nowait(buf, size, key);
 }
 
 int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 0f55b6b..4340bc4 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -62,7 +62,7 @@
 */
 int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
 	int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, 0);
-	// logger->error(rv, "ShmModSocket::recvfrom failed!");
+   
   return rv;
 }
 
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 672a9a7..1b7721d 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -31,7 +31,7 @@
    void *tmp_ptr = mm_get_by_key(socket->key);
     if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) {
       bus_errno = ESHM_BUS_KEY_INUSED;
-      logger->error("%s. key = %d ", bus_strerror(bus_errno), socket->key);
+      logger->error("%s. key = %d ", bus_strerror(ESHM_BUS_KEY_INUSED), socket->key);
       return 0;
     }
     return 1;
@@ -318,7 +318,7 @@
                const int key, const struct timespec *timeout, const int flags) {
 
   int s;
-  bool rv;
+  int rv;
 
   if (socket->socket_type != SHM_SOCKET_DGRAM) {
     logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket  which is "
@@ -350,10 +350,10 @@
   if ((s = pthread_mutex_unlock(&(socket->mutex))) != 0)
     err_exit(s, "shm_sendto : pthread_mutex_unlock");
   
-  // if (key == socket->key) {
-  //   logger->error( "can not send to your self!");
-  //   return -1;
-  // }
+  if (key == socket->key) {
+    logger->error( "can not send to your self!");
+    return -1;
+  }
 
   SHMQueue<shm_msg_t> *remoteQueue;
   if ((remoteQueue = _attach_remote_queue(key)) == NULL) {
@@ -385,14 +385,13 @@
   } else {
     delete remoteQueue;
     mm_free(dest.buf);
-    if(rv == EBUS_TIMEOUT) {
+    if(rv > EBUS_BASE) {
       // bus_errno = EBUS_TIMEOUT;
-      // logger->error("sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
-      return EBUS_TIMEOUT;
+      logger->debug("sendto key %d failed %s", key, bus_strerror(rv));
     } else {
-      //logger->error(errno, "sendto key %d failed!", key);
-      return rv;
+      logger->error(rv, "sendto key %d failed", key);
     }
+    return rv;
    
    
   }
@@ -401,7 +400,7 @@
 // 鐭繛鎺ユ柟寮忔帴鍙�
 int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  struct timespec *timeout,  int flags) {
   int s;
-  bool rv;
+  int rv;
 
   if (socket->socket_type != SHM_SOCKET_DGRAM) {
     logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket  which "
@@ -437,6 +436,7 @@
     rv = socket->queue->pop_nowait(src);
   } else if(timeout != NULL) {
     rv = socket->queue->pop_timeout(src, timeout);
+// printf("0 shm_recvfrom====%d\n", rv);
   } else {
     rv = socket->queue->pop(src);
   }
@@ -457,14 +457,12 @@
     mm_free(src.buf);
     return 0;
   } else {
-
-    if(rv == EBUS_TIMEOUT) {
-      // logger->error("shm_recvfrom  failed, %s", bus_strerror(EBUS_TIMEOUT));
-      return EBUS_TIMEOUT;
+    if(rv > EBUS_BASE) {
+      logger->debug("shm_recvfrom failed %s", bus_strerror(rv));
     } else {
-      // logger->error(rv, "shm_recvfrom  failed!");
-      return rv;
+      logger->error(rv, "shm_recvfrom failed");
     }
+    return rv;
 
   }
 }
@@ -531,7 +529,7 @@
   if (tmp_socket == NULL)
   {
     /* If first call from this thread, allocate buffer for thread, and save its location */
-    logger->debug("%d create tmp socket\n", pthread_self() );
+    logger->debug("%ld create tmp socket\n", (long)pthread_self() );
     tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
 
     rv =  pthread_setspecific(_tmp_recv_socket_key_, tmp_socket);
@@ -543,13 +541,10 @@
 
   if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
     rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
-    
     return rv;
   } else {
-     
     return rv;
   }
-  return -1;
 }
 
 int _shm_sendandrecv_alloc_new(shm_socket_t *socket, const void *send_buf,
diff --git a/src/socket/shm_socket.h b/src/socket/shm_socket.h
index 164ce2d..0917d00 100644
--- a/src/socket/shm_socket.h
+++ b/src/socket/shm_socket.h
@@ -50,9 +50,9 @@
 	int key;
 	bool force_bind;
 	pthread_mutex_t mutex;
-	shm_connection_status_t status;
-	SHMQueue<shm_msg_t> *queue;
-	SHMQueue<shm_msg_t> *remoteQueue;
+	shm_connection_status_t status; 
+	SHMQueue<shm_msg_t> *queue;  //self queue
+	SHMQueue<shm_msg_t> *remoteQueue; // peer queue
 	LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue;
 	LockFreeQueue<shm_msg_t, DM_Allocator> *acceptQueue;
 	std::map<int, shm_socket_t* > *clientSocketMap;
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
new file mode 100644
index 0000000..eebe164
--- /dev/null
+++ b/test/CMakeLists.txt
@@ -0,0 +1,11 @@
+# add the executable
+add_executable(test1 test1.cpp )
+target_link_libraries(test1 PUBLIC  ${EXTRA_LIBS} )
+
+target_include_directories(test1 PUBLIC
+                            "${PROJECT_BINARY_DIR}"
+                             ${EXTRA_INCLUDES}
+                            )
+
+# add the install targets
+install(TARGETS test1 DESTINATION bin)
diff --git a/test/test.cpp b/test/test.cpp
deleted file mode 100644
index c575018..0000000
--- a/test/test.cpp
+++ /dev/null
@@ -1,12 +0,0 @@
-#include "usg_common.h"
-static void sig_quit(int);
-#define SIGCLOSE1   (SIGRTMIN +1)
-int
-main(void)
-{
-
- int a = random32();
- printf("%d, %d , %d\n", SIGRTMIN, SIGRTMAX, SIGCLOSE1);
-  /* SIGQUIT here will terminate with core file */
-}
- 
\ No newline at end of file
diff --git a/test/test1.cpp b/test/test1.cpp
new file mode 100644
index 0000000..136fb50
--- /dev/null
+++ b/test/test1.cpp
@@ -0,0 +1,33 @@
+#include "usg_common.h"
+static void sig_quit(int);
+#define SIGCLOSE1   (SIGRTMIN +1)
+
+struct cm_con_data_t
+{
+  uint64_t addr; /* Buffer address */
+  uint32_t rkey; /* Remote key */
+  uint32_t qp_num; /* QP number */
+  uint16_t lid; /* LID of the IB port */
+  uint8_t gid[16]; /* gid */
+} __attribute__ ((packed));
+
+ 
+
+
+struct cm_con_data2_t
+{
+  uint64_t addr; /* Buffer address */
+  uint32_t rkey; /* Remote key */
+  uint32_t qp_num; /* QP number */
+  uint16_t lid; /* LID of the IB port */
+  uint8_t gid[16]; /* gid */
+} ;
+
+int
+main(void)
+{
+
+ printf("===%d, %d \n", sizeof(cm_con_data_t),  sizeof(cm_con_data2_t));
+  /* SIGQUIT here will terminate with core file */
+}
+ 
\ No newline at end of file

--
Gitblit v1.8.0