From 940bb9e9238488025bf41eb2b2d3df077274004f Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期三, 13 一月 2021 13:00:50 +0800
Subject: [PATCH] update

---
 src/socket/net_mod_socket.cpp |  109 ++++++++++++++++++++++++++++++++----------------------
 1 files changed, 65 insertions(+), 44 deletions(-)

diff --git a/src/socket/net_mod_socket.cpp b/src/socket/net_mod_socket.cpp
index fb5003e..2ac2dfa 100644
--- a/src/socket/net_mod_socket.cpp
+++ b/src/socket/net_mod_socket.cpp
@@ -1,7 +1,7 @@
 #include "net_mod_socket.h"
 #include "socket_io.h"
 #include "net_mod_socket_io.h"
-#include "net_conn_pool.h"
+
 #include <sys/types.h>          /* See NOTES */
 #include <sys/socket.h>
 #include <pthread.h>
@@ -15,13 +15,37 @@
 
 NetModSocket::NetModSocket() 
 {
+  int s;
   if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
       logger->error(errno, "NetModSocket::NetModSocket signal");
+
+  gpool = new NetConnPool();
+
+  pthread_mutexattr_t mtxAttr;
+  s = pthread_mutexattr_init(&mtxAttr);
+  if (s != 0)
+    err_exit(s, "pthread_mutexattr_init");
+  s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK);
+  if (s != 0)
+    err_exit(s, "pthread_mutexattr_settype");
+  s = pthread_mutex_init(&sendMutex, &mtxAttr);
+  if (s != 0)
+    err_exit(s, "pthread_mutex_init");
+
+  s = pthread_mutexattr_destroy(&mtxAttr);
+  if (s != 0)
+    err_exit(s, "pthread_mutexattr_destroy");
+
 }
 
 
 NetModSocket::~NetModSocket() {
-  
+  int s;
+  delete gpool;
+  s =  pthread_mutex_destroy(&sendMutex);
+  if(s != 0) {
+    err_exit(s, "shm_close_socket");
+  }
 }
 
 
@@ -80,23 +104,10 @@
   }
 }
 
-int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
-  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
+NetConnPool* NetModSocket::_get_threadlocal_pool() {
 
-  int i, n, recv_size, connfd;
-  net_node_t *node;
-  void *recv_buf = NULL;
-  struct timespec timeout;
   int ret;
-  int n_req = 0, n_recv_suc = 0, n_resp =0;
-  
-  net_mod_request_head_t request_head = {};
-   
-  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
-
- 
   NetConnPool *mpool;
-
   /* Make first caller allocate key for thread-specific data */
   ret = pthread_once(&once, _createConnPoolKey_);
   if (ret != 0) {
@@ -115,16 +126,50 @@
       exit(1);
     }
 
-   
-
     ret = pthread_setspecific(poolKey, mpool);
     if (ret != 0) {
       LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_setspecific");
       exit(1);
     }
   }
+  return mpool;
+
+}
+
+NetConnPool* NetModSocket::_get_pool() {
+  return gpool;
+}
+
+int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
+  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
+  int s, rv;
+  if ((s = pthread_mutex_lock(&sendMutex)) != 0)
+    err_exit(s, "NetModSocket : pthread_mutex_lock");
+
+  rv = _sendandrecv_unsafe(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size,   msec );
+
+  if ((s = pthread_mutex_unlock(&sendMutex)) != 0)
+    err_exit(s, "NetModSocket : pthread_mutex_lock");
+
+  return rv;
+}
+
+int NetModSocket::_sendandrecv_unsafe(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
+  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
+
+  int i, n, recv_size, connfd;
+  net_node_t *node;
+  void *recv_buf = NULL;
+  struct timespec timeout;
+  int ret;
+  int n_req = 0, n_recv_suc = 0, n_resp =0;
   
+  net_mod_request_head_t request_head = {};
+   
+  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
+
  
+  NetConnPool *mpool = _get_pool();
 
   for (i = 0; i< arrlen; i++) {
 
@@ -142,7 +187,7 @@
         ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
       }
       if( ret == 0) {
-        strcpy( ret_arr[n_recv_suc].host,"");
+        strcpy( ret_arr[n_recv_suc].host, "");
         ret_arr[n_recv_suc].port = 0;
         ret_arr[n_recv_suc].key = node->key;
         ret_arr[n_recv_suc].content = recv_buf;
@@ -290,31 +335,7 @@
   int n_req = 0, n_pub_suc = 0, n_resp = 0;
 
   int ret;
-  NetConnPool *mpool;
-
-  /* Make first caller allocate key for thread-specific data */
-  ret = pthread_once(&once, _createConnPoolKey_);
-  if (ret != 0) {
-    LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_once");
-    exit(1);
-  }
-
-  mpool = (NetConnPool *)pthread_getspecific(poolKey);
-  if (mpool == NULL)
-  {
-    /* If first call from this thread, allocte buffer for thread, and save its location */
-    mpool = new NetConnPool();
-    if (mpool == NULL) {
-      LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc");
-      exit(1);
-    }
-
-    ret = pthread_setspecific(poolKey, mpool);
-    if (ret != 0) {
-      LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_setspecific");
-      exit(1);
-    }
-  }
+  NetConnPool *mpool = _get_pool();
 
   // 鏈湴鍙戦��
   if(node_arr == NULL || arrlen == 0) {

--
Gitblit v1.8.0