From 45e00aca28504b27f3ad6b4abf364c3d57f34510 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期一, 22 二月 2021 14:05:28 +0800
Subject: [PATCH] lock free queue

---
 src/net/net_mod_socket.cpp |   64 +++++++++++++++++++------------
 1 files changed, 39 insertions(+), 25 deletions(-)

diff --git a/src/net/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
index 3668b14..e689cea 100644
--- a/src/net/net_mod_socket.cpp
+++ b/src/net/net_mod_socket.cpp
@@ -19,35 +19,39 @@
   if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
     logger->error(errno, "NetModSocket::NetModSocket signal");
 
-  gpool = new NetConnPool();
+  // gpool = new NetConnPool();
 
-  pthread_mutexattr_t mtxAttr;
-  s = pthread_mutexattr_init(&mtxAttr);
-  if (s != 0)
-    err_exit(s, "pthread_mutexattr_init");
-  s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK);
-  if (s != 0)
-    err_exit(s, "pthread_mutexattr_settype");
-  s = pthread_mutex_init(&sendMutex, &mtxAttr);
-  if (s != 0)
-    err_exit(s, "pthread_mutex_init");
+  // pthread_mutexattr_t mtxAttr;
+  // s = pthread_mutexattr_init(&mtxAttr);
+  // if (s != 0)
+  //   err_exit(s, "pthread_mutexattr_init");
+  // s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK);
+  // if (s != 0)
+  //   err_exit(s, "pthread_mutexattr_settype");
+  // s = pthread_mutex_init(&sendMutex, &mtxAttr);
+  // if (s != 0)
+  //   err_exit(s, "pthread_mutex_init");
 
-  s = pthread_mutexattr_destroy(&mtxAttr);
-  if (s != 0)
-    err_exit(s, "pthread_mutexattr_destroy");
+  // s = pthread_mutexattr_destroy(&mtxAttr);
+  // if (s != 0)
+  //   err_exit(s, "pthread_mutexattr_destroy");
 
 }
 
 
 NetModSocket::~NetModSocket() {
-  int s;
-  delete gpool;
-  s =  pthread_mutex_destroy(&sendMutex);
-  if(s != 0) {
-    err_exit(s, "shm_close_socket");
-  }
+  // int s;
+  // delete gpool;
+  // s =  pthread_mutex_destroy(&sendMutex);
+  // if(s != 0) {
+  //   err_exit(s, "shm_socket_close");
+  // }
 }
 
+
+int NetModSocket::stop() {
+  return shmModSocket.stop();
+}
 
 /**
  * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓�
@@ -139,6 +143,8 @@
 NetConnPool* NetModSocket::_get_pool() {
   return _get_threadlocal_pool();
 }
+
+
 
 
 int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
@@ -302,22 +308,22 @@
 }
 
 
-int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
+int NetModSocket::pub(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) {
   return _pub_(node_arr, arrlen, topic, topic_size, content,   content_size, -1);
 }
 
-int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
+int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) {
   return _pub_(node_arr, arrlen, topic, topic_size, content,   content_size, 0);
 }
 
-int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int  msec ) {
+int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int  msec ) {
   return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec);
 }
 
 
 // int  pub(char *topic, int topic_size, void *content, int content_size, int port);
 
-int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content,
+int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content,
  int content_size, int  msec) {
   int i, connfd;
   net_node_t *node;
@@ -495,6 +501,14 @@
   return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
 }
 
+
+int NetModSocket::recvandsend(recvandsend_callback_fn callback,
+                              const struct timespec *timeout , int flag, void * user_data ) {
+
+  return shmModSocket.recvandsend(callback, timeout, flag, user_data);
+}
+ 
+
 /**
  * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟
  * @key 鍙戦�佺粰璋�
@@ -586,7 +600,7 @@
 //======================================================================================
 
 int NetModSocket::write_request(int clientfd, net_mod_request_head_t &request_head, 
-  void *content_buf, int content_size, void *topic_buf, int topic_size) {
+  const void *content_buf, int content_size, const void *topic_buf, int topic_size) {
  
   int buf_size;
   char *buf;

--
Gitblit v1.8.0