From 8df2b63c21d0aabaa894930e3ab1ea63c49d47ff Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期三, 13 一月 2021 17:54:51 +0800
Subject: [PATCH] fix bug invalid argument

---
 src/queue/lock_free_queue.h    |   12 +++--
 src/px_sem_util.cpp            |   15 ++++---
 src/socket/shm_mod_socket.cpp  |   32 +++++++++------
 src/socket/shm_socket.cpp      |   17 ++++++--
 src/socket/net_mod_socket.cpp  |   18 ++++----
 test_net_socket/heart_beat.cpp |   11 +++--
 6 files changed, 64 insertions(+), 41 deletions(-)

diff --git a/src/px_sem_util.cpp b/src/px_sem_util.cpp
index 80008fb..9de9c38 100644
--- a/src/px_sem_util.cpp
+++ b/src/px_sem_util.cpp
@@ -1,13 +1,16 @@
 #include "px_sem_util.h"
 
+#define NANO 1000000000
 struct timespec PXSemUtil::calc_sem_timeout(const struct timespec *ts) {
-	int tmp_sec;
+ 
+	struct timespec res;
   struct timespec timeout;
   if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
       err_exit(errno, "clock_gettime");
-  timeout.tv_nsec += ts->tv_nsec;
-  tmp_sec =  timeout.tv_nsec / 10e9;
-  timeout.tv_nsec =  timeout.tv_nsec - tmp_sec * 10e9;
-  timeout.tv_sec += ts->tv_sec + tmp_sec;
-  return timeout;
+
+  res.tv_sec = timeout.tv_sec + ts->tv_sec;
+  res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
+  res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
+  res.tv_nsec = res.tv_nsec % NANO;
+  return res;
 }
\ No newline at end of file
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 3914b85..e1429eb 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -262,12 +262,12 @@
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
 {
      
-
+    int rv;
     struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
   // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld", 
   //   timeout.tv_sec, timeout.tv_nsec);
 
-    while (sem_timedwait(&slots, &timeout) == -1) {
+    while ( sem_timedwait(&slots, &timeout) == -1) {
     //     LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n", 
     // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno);
 
@@ -343,9 +343,11 @@
     template <typename T, typename AT> class Q_TYPE>
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
 {
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
+// LoggerFactory::getLogger()->debug("=================ts sec = %d, nsec = %ld \n", ts->tv_sec,  ts->tv_nsec );
 
+    // struct timespec timeout_tmp = {1, 0};
     struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
+// LoggerFactory::getLogger()->debug("================== timeout before sec = %d, nsec = %ld \n", timeout.tv_sec,  timeout.tv_nsec );
 
     while (sem_timedwait(&items, &timeout) == -1) {
         if (errno == ETIMEDOUT)
@@ -353,8 +355,8 @@
         else if(errno == EINTR)
             continue;
         else {
-          LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout");
-          return -1;
+          // LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout %d", errno);
+          return errno;
         }
     }
 
diff --git a/src/socket/net_mod_socket.cpp b/src/socket/net_mod_socket.cpp
index f7c1242..7eaa906 100644
--- a/src/socket/net_mod_socket.cpp
+++ b/src/socket/net_mod_socket.cpp
@@ -67,15 +67,15 @@
 
 int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
-  _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
+  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
 }
 int NetModSocket::sendandrecv_timeout(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec) {
-  _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
+  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, msec);
 }
 int NetModSocket::sendandrecv_nowait(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
-   _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
+  return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, 0);
 
 }
 
@@ -327,13 +327,13 @@
   // 鏈湴鍙戦��
   if(node_arr == NULL || arrlen == 0) {
     if(msec == 0) {
-      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
+      ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
     } else if(msec > 0) {
       timeout.tv_sec = msec / 1000;
       timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
-      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
+      ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
     } else {
-      ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
+      ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
     }
     if(ret == 0 ) {
       n_pub_suc++;
@@ -346,13 +346,13 @@
     if(node->host == NULL) {
       // 鏈湴鍙戦��
       if(msec == 0) {
-        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, node->key);
+        ret = shmModSocket.pub_nowait(topic, topic_size, content, content_size, SHM_BUS_KEY);
       } else if(msec > 0) {
         timeout.tv_sec = msec / 1000;
         timeout.tv_nsec = (msec - timeout.tv_sec * 1000) * 10e6;
-        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, node->key, &timeout);
+        ret = shmModSocket.pub_timeout(topic, topic_size, content, content_size, SHM_BUS_KEY, &timeout);
       } else {
-        ret = shmModSocket.pub(topic, topic_size, content, content_size, node->key);
+        ret = shmModSocket.pub(topic, topic_size, content, content_size, SHM_BUS_KEY);
       }
 
       if(ret == 0 ) {
diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 6e622a8..8a9133d 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -65,26 +65,32 @@
 // printf("dgram_mod_recvfrom  before\n");
 	int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags);
 // printf("dgram_mod_recvfrom  after\n");
+
 	return rv;
 }
+
 /**
  * 鎺ユ敹淇℃伅
  * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
 int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
-		
-		return  _recvfrom_( buf, size, key, NULL, 0);
+	int rv =  _recvfrom_( buf, size, key, NULL, 0);
+	// logger->error(rv, "ShmModSocket::recvfrom failed!");
+  return rv;
 }
 
 
 // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
 int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) {
-	return _recvfrom_(buf, size, key, timeout, 0);
+	int rv =  _recvfrom_(buf, size, key, timeout, 0);
+ 	return rv;
 }
 
 int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
-	return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
+	int rv =  _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
+	// logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
+  return rv;
 }
 
 /**
@@ -188,14 +194,7 @@
  */
 int  ShmModSocket::_sub_(char *topic, int topic_size, int key,  
 	struct timespec *timeout, int flags) {
-	// char buf[8192];
-	// int rv;
-	// snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
-	// rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
-	// if(rv == 0) {
-	// 	bus_set->insert(key);
-	// }
-	// return rv;
+	 
 
 	int ret;
 	bus_head_t head = {};
@@ -240,7 +239,14 @@
 	if(size > 0) {
 		ret = shm_sendto(shm_socket, buf, size, key, timeout, flags);
 		free(buf);
-		return ret;
+		if(ret == EBUS_TIMEOUT) {
+	    logger->error(ret, "ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
+	    return EBUS_TIMEOUT;
+	  } else {
+	    logger->error(ret, "ShmModSocket::_desub_ key %d failed!", key);
+	    return ret;
+	  }
+		
 	} else {
 		return -1;
 	}
diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index c0f4c44..d094753 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -386,12 +386,12 @@
     delete remoteQueue;
     mm_free(dest.buf);
     if(rv == EBUS_TIMEOUT) {
-      bus_errno = EBUS_TIMEOUT;
-      logger->error(errno, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
+      // bus_errno = EBUS_TIMEOUT;
+      logger->error(rv, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
       return EBUS_TIMEOUT;
     } else {
       //logger->error(errno, "sendto key %d failed!", key);
-      return -1;
+      return rv;
     }
    
    
@@ -457,7 +457,16 @@
     mm_free(src.buf);
     return 0;
   } else {
-    return -1;
+
+    if(rv == EBUS_TIMEOUT) {
+      // bus_errno = EBUS_TIMEOUT;
+      logger->error("shm_recvfrom  failed, %s", bus_strerror(EBUS_TIMEOUT));
+      return EBUS_TIMEOUT;
+    } else {
+      logger->error(rv, "shm_recvfrom  failed!");
+      return rv;
+    }
+
   }
 }
 
diff --git a/test_net_socket/heart_beat.cpp b/test_net_socket/heart_beat.cpp
index 554df68..40ea621 100644
--- a/test_net_socket/heart_beat.cpp
+++ b/test_net_socket/heart_beat.cpp
@@ -28,10 +28,13 @@
   char sendbuf[512];
   int rv;
   int remote_port;
-  while (net_mod_socket_recvfrom(serv, &recvbuf, &size, &remote_port) == 0) {
-    printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
-    net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
-    free(recvbuf);
+  while (true) {
+    if(net_mod_socket_recvfrom_timeout(serv, &recvbuf, &size, &remote_port, 0, 2000000000)==0) {
+      printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
+      net_mod_socket_sendto(serv, "suc", strlen("suc")+1, remote_port);
+      free(recvbuf);
+    }
+    
   }
   // sleep(1000);
   net_mod_socket_close(serv);

--
Gitblit v1.8.0