From 5410446ade40493d17f7e2d7f0d687b0998acc6a Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期三, 27 一月 2021 11:56:54 +0800
Subject: [PATCH] timeout wait 合一

---
 src/socket/shm_mod_socket.cpp |  190 ++++++++++++++---------------------------------
 1 files changed, 57 insertions(+), 133 deletions(-)

diff --git a/src/socket/shm_mod_socket.cpp b/src/socket/shm_mod_socket.cpp
index 058c081..a012e80 100644
--- a/src/socket/shm_mod_socket.cpp
+++ b/src/socket/shm_mod_socket.cpp
@@ -18,7 +18,7 @@
 	struct timespec timeout = {1, 0};
 	if(bus_set != NULL) {
 		for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
-			desub_timeout(NULL, 0, *bus_iter, &timeout);
+			desub(NULL, 0, *bus_iter, &timeout, BUS_TIMEOUT_FLAG);
 		}
 		delete bus_set;
 	}
@@ -37,154 +37,65 @@
 int ShmModSocket::force_bind(int key) {
 	return shm_socket_force_bind(shm_socket, key);
 }
+
 /**
  * 鍙戦�佷俊鎭�
  * @key 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
  */
-int ShmModSocket::sendto(const void *buf, const int size, const int key) {
-		return shm_sendto(shm_socket, buf, size, key, NULL, 0);
+int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
+	int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
+  if(rv == 0) {
+	  logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
+	  return 0;
+  }
+
+  logger->debug("ShmModSocket::sendto : %d sendto  %d failed %s", get_key(),  key, bus_strerror(rv));
+  return rv;
 }
-// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) {
-	return shm_sendto(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-// 鍙戦�佷俊鎭珛鍒昏繑鍥炪��
-int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
-	return shm_sendto(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
-}
- 
 
 /**
  * 鎺ユ敹淇℃伅
  * @key 浠庤皝鍝噷鏀跺埌鐨勪俊鎭�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
-	int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, 0);
-   
+int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
+	int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
+
+	if(rv == 0) {
+    logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
+    return 0;
+  }
+
+  logger->debug("ShmModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv));
   return rv;
 }
-
-
-int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, const struct timespec *timeout) {
-	int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, BUS_TIMEOUT_FLAG);
-
- 	return rv;
-}
-
-int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
-	int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, BUS_NOWAIT_FLAG);
-	// logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
-  return rv;
-}
+ 
 
 /**
  * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
  * @key 鍙戦�佺粰璋�
  * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
 */
-int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
-	return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
+int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, 
+	void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
+	return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
 }
+ 
 // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
-	return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
+int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key, 
+	void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){
+	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag);
 }
-int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
-	return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
-}
-
-
-
-
-int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
-	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, const struct timespec *timeout){
-	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, BUS_TIMEOUT_FLAG);
-}
-int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
-	return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, BUS_NOWAIT_FLAG);
-}
-
-
-
-
+ 
 /**
  * 璁㈤槄鎸囧畾涓婚
  * @topic 涓婚
  * @size 涓婚闀垮害
  * @key 鎬荤嚎绔彛
  */
-int  ShmModSocket::sub(char *topic, int size, int key){
-	return _sub_( topic, size, key, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int  ShmModSocket::sub_timeout(char *topic, int size, int key, const struct timespec *timeout){
-	return _sub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int  ShmModSocket::sub_nowait(char *topic, int size, int key) {
-	return _sub_(topic, size, key, NULL,  BUS_NOWAIT_FLAG);
-}
-
-
-
-/**
- * 鍙栨秷璁㈤槄鎸囧畾涓婚
- * @topic 涓婚
- * @size 涓婚闀垮害
- * @key 鎬荤嚎绔彛
- */
-int  ShmModSocket::desub(char *topic, int size, int key){
-	return _desub_( topic, size, key, NULL, 0);
-}
-// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int  ShmModSocket::desub_timeout(char *topic, int size, int key, const struct timespec *timeout){
-	return _desub_(topic, size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int  ShmModSocket::desub_nowait(char *topic, int size, int key) {
-	return _desub_(topic, size, key, NULL,  BUS_NOWAIT_FLAG);
-}
-
-
-
-/**
- * 鍙戝竷涓婚
- * @topic 涓婚
- * @content 涓婚鍐呭
- * @key 鎬荤嚎绔彛
- */
-int  ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){
-		return _pub_(topic, topic_size, content, content_size, key, NULL, 0);
-}
-//  瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇
-int  ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec * timeout){
-	return _pub_( topic, topic_size, content, content_size, key, timeout, BUS_TIMEOUT_FLAG);
-}
-int  ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
-	return _pub_(topic, topic_size, content, content_size, key, NULL, BUS_NOWAIT_FLAG);
-}
-
-
-/**
- * 鑾峰彇soket key
- */
-int ShmModSocket::get_key(){
-	return shm_socket->key;
-}
-
-
-
-// =============================================================================
-/**
- * @key 鎬荤嚎绔彛
- */
-int  ShmModSocket::_sub_(char *topic, int topic_size, int key,  
+int  ShmModSocket::sub(char *topic, int topic_size, int key,  
 	const struct timespec *timeout, int flags) {
-	 
-
 	int ret;
 	bus_head_t head = {};
 	memcpy(head.action, "sub", sizeof(head.action));
@@ -206,10 +117,15 @@
 }
 
 
+
+
 /**
+ * 鍙栨秷璁㈤槄鎸囧畾涓婚
+ * @topic 涓婚
+ * @size 涓婚闀垮害
  * @key 鎬荤嚎绔彛
  */
-int  ShmModSocket::_desub_(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
+int  ShmModSocket::desub(char *topic, int topic_size, int key, const struct timespec *timeout, int flags) {
 	// char buf[8192];
 	int ret;
 	if(topic == NULL) {
@@ -239,18 +155,15 @@
 
 }
 
-/**
- * @key 鎬荤嚎绔彛
- * @str "<**pub**>{缁忔祹}"
- */
- 
-int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
-	// int head_len;
-	// char buf[8192+content_size];
-	// snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
-	// head_len = strlen(buf);
-	// memcpy(buf+head_len, content, content_size);
 
+
+/**
+ * 鍙戝竷涓婚
+ * @topic 涓婚
+ * @content 涓婚鍐呭
+ * @key 鎬荤嚎绔彛
+ */
+int  ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key, const struct timespec *timeout, int flags) {
 	int ret;
 	bus_head_t head = {};
 	memcpy(head.action, "pub", sizeof(head.action));
@@ -267,9 +180,20 @@
 		return -1;
 	}
 
+}
+ 
 
+
+/**
+ * 鑾峰彇soket key
+ */
+int ShmModSocket::get_key(){
+	return shm_socket->key;
 }
 
+
+
+// =============================================================================
 
 int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, 
   void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) {
@@ -277,7 +201,7 @@
   int buf_size;
   char *buf;
   int  max_buf_size;
-  if((buf = (char *)malloc(MAXBUF)) == NULL) {
+  if((buf = (char *) malloc(MAXBUF)) == NULL) {
     LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc");
     exit(1);
   } else {
@@ -287,7 +211,7 @@
   buf_size = BUS_HEAD_SIZE + content_size + topic_size  ;
   if(max_buf_size < buf_size) {
     
-    if((buf = (char *)realloc(buf, buf_size)) == NULL) {
+    if((buf = (char *) realloc(buf, buf_size)) == NULL) {
       LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf  realloc buf");
       exit(1);
     } else {

--
Gitblit v1.8.0