From 607ac3ae8bfc017e10a7907e69dcbc3ab2a0fb63 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 05 二月 2021 13:54:56 +0800
Subject: [PATCH] add stop method

---
 src/socket/shm_socket.cpp |  102 +++++++++++++++++++++++++++++++++-----------------
 1 files changed, 67 insertions(+), 35 deletions(-)

diff --git a/src/socket/shm_socket.cpp b/src/socket/shm_socket.cpp
index 9ce94da..fc410bf 100644
--- a/src/socket/shm_socket.cpp
+++ b/src/socket/shm_socket.cpp
@@ -112,23 +112,31 @@
 
 int shm_close_socket(shm_socket_t *sockt) {
   
-  int s;
-  
+  int rv;
   logger->debug("shm_close_socket\n");
   if(sockt->queue != NULL) {
     delete sockt->queue;
     sockt->queue = NULL;
   }
 
-  s =  pthread_mutex_destroy(&(sockt->mutex) );
-  if(s != 0) {
-    err_exit(s, "shm_close_socket");
+  rv =  pthread_mutex_destroy(&(sockt->mutex) );
+  if(rv != 0) {
+    err_exit(rv, "shm_close_socket");
   }
 
   free(sockt);
   return 0;
 }
 
+
+int shm_socket_stop(shm_socket_t *sockt) {
+  struct timespec timeout = {5, 0};
+  shm_packet_t sendpak = {0};
+  sendpak.key = sockt->key;
+  sendpak.action = BUS_ACTION_STOP;
+  sendpak.size = 0;
+  return shm_sendpakto(sockt, &sendpak, sockt->key, &timeout, BUS_TIMEOUT_FLAG);
+}
 
 int shm_socket_bind(shm_socket_t *sockt, int key) {
   sockt->key = key;
@@ -153,11 +161,14 @@
 
   int rv;
  
-  shm_packet_t sendpak;
+  shm_packet_t sendpak = {0};
   sendpak.key = sockt->key;
   sendpak.size = size;
-  sendpak.buf = mm_malloc(size);
-  memcpy(sendpak.buf, buf, size);
+  if(buf != NULL) {
+    sendpak.buf = mm_malloc(size);
+    memcpy(sendpak.buf, buf, size);
+  }
+ 
   rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag);
   return rv;
 }
@@ -172,12 +183,15 @@
   shm_packet_t sendpak;
   shm_packet_t recvpak;
   std::map<std::string, shm_packet_t>::iterator recvbufIter;
+
   std::string uuid = sole::uuid4().str();
   
   sendpak.key = sockt->key;
   sendpak.size = send_size;
-  sendpak.buf = mm_malloc(send_size);
-  memcpy(sendpak.buf, send_buf, send_size);
+  if(send_buf != NULL) {
+    sendpak.buf = mm_malloc(send_size);
+    memcpy(sendpak.buf, send_buf, send_size);
+  }
   memcpy(sendpak.uuid, uuid.c_str(), uuid.length() + 1);
   // uuid.copy(sendpak.uuid, sizeof sendpak.uuid);
   rv = shm_sendpakto(sockt, &sendpak, key, timeout, flags);
@@ -186,7 +200,7 @@
     return rv;
   }
 
-  while(true) {
+  while(tryn > 0) {
     tryn--;
     recvbufIter = sockt->recvbuf.find(uuid);
     if(recvbufIter != sockt->recvbuf.end()) {
@@ -194,36 +208,37 @@
 logger->debug("get from recvbuf: %s", uuid.c_str());
       recvpak = recvbufIter->second;
       sockt->recvbuf.erase(recvbufIter);
-      break;
+      goto LABLE_SUC;
     }
 
     rv = shm_recvpakfrom(sockt, &recvpak, timeout, flags);
 
     if (rv != 0) {
 
-      if(rv == ETIMEDOUT)
+      if(rv == ETIMEDOUT) {
         return EBUS_TIMEOUT;
+      }
 
       logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
       return rv;
     } 
 
 logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid);
-    if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) {
+    if(strlen(recvpak.uuid) == 0) {
+      continue;
+    } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) {
       // 鍙戦�佷笌鎺ュ彈鐨刄UID鍖归厤鎴愬姛
-      break;
+      goto LABLE_SUC;
     } else {
       // 绛旈潪鎵�闂紝鏀惧埌缂撳瓨閲�
       sockt->recvbuf.insert({recvpak.uuid, recvpak});
       continue;
     }
-
-    if(tryn == 0) {
-      // 灏濊瘯浜唗ryn娆¢兘娌℃湁鎴愬姛
-      return EBUS_RECVFROM_WRONG_END;
-    }
-
   }
+
+LABLE_FAIL:
+  return EBUS_RECVFROM_WRONG_END;
+  // return rv;
  
 LABLE_SUC:
  if(recv_buf != NULL) {
@@ -239,6 +254,8 @@
 
   return 0;
 
+
+
 }
 
 /**
@@ -251,35 +268,41 @@
   
   int rv;
  
-  void *sendbuf = NULL;
+  void *sendbuf, *recvbuf = NULL;
   int sendsize = 0;
-  shm_packet_t recvpak;
+  shm_packet_t recvpak = {0};
 
   rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag);
 
   if (rv != 0) {
     if(rv == ETIMEDOUT){
-      logger->debug("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT));
+      logger->debug("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(EBUS_TIMEOUT));
       return EBUS_TIMEOUT;
     }
     
-    logger->error("%d shm_recvfrom failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
+    logger->error("%d shm_recvandsend failed %s", shm_socket_get_key(sockt), bus_strerror(rv));
     return rv;
   }
    
-  
-  void *recvbuf = malloc(recvpak.size);
-  memcpy(recvbuf, recvpak.buf, recvpak.size);
-  mm_free(recvpak.buf);
+  if(recvpak.buf != NULL) {
+    recvbuf = malloc(recvpak.size);
+    memcpy(recvbuf, recvpak.buf, recvpak.size);
+    mm_free(recvpak.buf);
+  }
+ 
   callback(recvbuf, recvpak.size, recvpak.key, &sendbuf, &sendsize, user_data);
 
-  shm_packet_t sendpak;
+  shm_packet_t sendpak = {0};
   sendpak.key = sockt->key;
   sendpak.size = sendsize;
   memcpy(sendpak.uuid, recvpak.uuid, sizeof sendpak.uuid);
   if(sendbuf !=NULL && sendsize > 0) {
     sendpak.buf = mm_malloc(sendsize);
     memcpy(sendpak.buf, sendbuf, sendsize);
+  } else {
+
+    logger->warn("%d shm_recvandsend : sendbuf is null", shm_socket_get_key(sockt));
+    // return -1;
   }
  
   rv = shm_sendpakto(sockt, &sendpak, recvpak.key, timeout, flag);
@@ -307,7 +330,7 @@
   } 
 
 
- if(buf != NULL) {
+ if(buf != NULL && recvpak.buf != NULL) {
     void *_buf = malloc(recvpak.size);
     memcpy(_buf, recvpak.buf, recvpak.size);
     *buf = _buf; 
@@ -493,7 +516,7 @@
   
  
  LABEL_PUSH: 
-  if (key == sockt->key) {
+  if (sendpak->action != BUS_ACTION_STOP && key == sockt->key) {
     logger->error( "can not send to your self!");
     return EBUS_SENDTO_SELF;
   }
@@ -513,10 +536,11 @@
 }
 
 // 鐭繛鎺ユ柟寮忔帴鍙�
-static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag) {
+static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak ,  const struct timespec *timeout,  int flag) {
   int rv;
   
   hashtable_t *hashtable = mm_get_hashtable();
+  shm_packet_t recvpak;
 
   if( sockt->queue != NULL) 
     goto LABEL_POP;
@@ -543,10 +567,18 @@
   
 LABEL_POP:
 
+ // 
+  // printf("%p start recv.....\n", sockt);
  
- 
-  rv = sockt->queue->pop(*recvpak, timeout, flag);
+  rv = sockt->queue->pop(recvpak, timeout, flag);
+  if(rv != 0) 
+    return rv;
 
+  
+  if(recvpak.action == BUS_ACTION_STOP) {
+    return EBUS_STOPED;
+  }
+  *_recvpak = recvpak;
   return rv;
 }
 // int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf,

--
Gitblit v1.8.0