wangzhengquan
2021-01-13 940bb9e9238488025bf41eb2b2d3df077274004f
update
5个文件已修改
167 ■■■■ 已修改文件
src/queue/lock_free_queue.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.cpp 27 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.cpp 109 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.h 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h
@@ -354,7 +354,7 @@
     struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
    while (sem_timedwait(&items, &timeout) == -1) {
        if (errno == EAGAIN)
        if (errno == ETIMEDOUT)
            return false;
        else if(errno == EINTR)
            continue;
src/shm/hashtable.cpp
@@ -189,11 +189,7 @@
void *hashtable_get(hashtable_t *hashtable, int key) {
  
   if (SemUtil::dec_timeout(hashtable->mutex, &TIMEOUT) != 0) {
    SemUtil::inc(hashtable->mutex);
    SemUtil::dec(hashtable->mutex);
   }
   SemUtil::dec(hashtable->mutex);
   hashtable->readcnt++;
   if (hashtable->readcnt == 1) {
    //获取读写锁
@@ -221,30 +217,21 @@
}
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
  struct timespec timeout = {2, 0};
  if (SemUtil::dec_timeout(hashtable->mutex, &timeout) != 0) {
    SemUtil::inc(hashtable->mutex);
    SemUtil::dec(hashtable->mutex);
  }
  SemUtil::dec(hashtable->mutex);
  // 设置读优先级高
  while (hashtable->readcnt > 0)
  {
    SemUtil::set(hashtable->cond, 0);
    SemUtil::inc(hashtable->mutex);
    //等待写通知
    if (SemUtil::dec_timeout(hashtable->cond, &timeout) != 0) {
      hashtable->readcnt = 0;
      SemUtil::inc(hashtable->cond);
      SemUtil::dec(hashtable->cond);
    }
    SemUtil::dec(hashtable->cond);
    SemUtil::dec(hashtable->mutex);
  }
  SemUtil::inc(hashtable->mutex);
  //获取读写锁
  SemUtil::dec(hashtable->wlock);
 // err_msg(0, "hashtable_put dec %d\n", --hashtable->tmp);
@@ -321,11 +308,7 @@
int hashtable_alloc_key(hashtable_t *hashtable) {
  int key = START_KEY;
  if (SemUtil::dec_timeout(hashtable->wlock, &TIMEOUT) != 0) {
    SemUtil::inc(hashtable->wlock);
    SemUtil::dec(hashtable->wlock);
  }
  SemUtil::dec(hashtable->wlock);
  while(_hashtable_get(hashtable, key) != NULL) {
    key++;
src/socket/net_mod_socket.cpp
@@ -1,7 +1,7 @@
#include "net_mod_socket.h"
#include "socket_io.h"
#include "net_mod_socket_io.h"
#include "net_conn_pool.h"
#include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>
#include <pthread.h>
@@ -15,13 +15,37 @@
NetModSocket::NetModSocket() 
{
  int s;
  if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
      logger->error(errno, "NetModSocket::NetModSocket signal");
  gpool = new NetConnPool();
  pthread_mutexattr_t mtxAttr;
  s = pthread_mutexattr_init(&mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_init");
  s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_settype");
  s = pthread_mutex_init(&sendMutex, &mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutex_init");
  s = pthread_mutexattr_destroy(&mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_destroy");
}
NetModSocket::~NetModSocket() {
  int s;
  delete gpool;
  s =  pthread_mutex_destroy(&sendMutex);
  if(s != 0) {
    err_exit(s, "shm_close_socket");
  }
}
@@ -80,23 +104,10 @@
  }
}
int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
NetConnPool* NetModSocket::_get_threadlocal_pool() {
  int i, n, recv_size, connfd;
  net_node_t *node;
  void *recv_buf = NULL;
  struct timespec timeout;
  int ret;
  int n_req = 0, n_recv_suc = 0, n_resp =0;
  net_mod_request_head_t request_head = {};
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
  NetConnPool *mpool;
  /* Make first caller allocate key for thread-specific data */
  ret = pthread_once(&once, _createConnPoolKey_);
  if (ret != 0) {
@@ -115,16 +126,50 @@
      exit(1);
    }
    ret = pthread_setspecific(poolKey, mpool);
    if (ret != 0) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_setspecific");
      exit(1);
    }
  }
  return mpool;
}
NetConnPool* NetModSocket::_get_pool() {
  return gpool;
}
int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
  int s, rv;
  if ((s = pthread_mutex_lock(&sendMutex)) != 0)
    err_exit(s, "NetModSocket : pthread_mutex_lock");
  rv = _sendandrecv_unsafe(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size,   msec );
  if ((s = pthread_mutex_unlock(&sendMutex)) != 0)
    err_exit(s, "NetModSocket : pthread_mutex_lock");
  return rv;
}
int NetModSocket::_sendandrecv_unsafe(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
  int i, n, recv_size, connfd;
  net_node_t *node;
  void *recv_buf = NULL;
  struct timespec timeout;
  int ret;
  int n_req = 0, n_recv_suc = 0, n_resp =0;
  
  net_mod_request_head_t request_head = {};
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
 
  NetConnPool *mpool = _get_pool();
  for (i = 0; i< arrlen; i++) {
@@ -142,7 +187,7 @@
        ret = shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
      }
      if( ret == 0) {
        strcpy( ret_arr[n_recv_suc].host,"");
        strcpy( ret_arr[n_recv_suc].host, "");
        ret_arr[n_recv_suc].port = 0;
        ret_arr[n_recv_suc].key = node->key;
        ret_arr[n_recv_suc].content = recv_buf;
@@ -290,31 +335,7 @@
  int n_req = 0, n_pub_suc = 0, n_resp = 0;
  int ret;
  NetConnPool *mpool;
  /* Make first caller allocate key for thread-specific data */
  ret = pthread_once(&once, _createConnPoolKey_);
  if (ret != 0) {
    LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_once");
    exit(1);
  }
  mpool = (NetConnPool *)pthread_getspecific(poolKey);
  if (mpool == NULL)
  {
    /* If first call from this thread, allocte buffer for thread, and save its location */
    mpool = new NetConnPool();
    if (mpool == NULL) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ malloc");
      exit(1);
    }
    ret = pthread_setspecific(poolKey, mpool);
    if (ret != 0) {
      LoggerFactory::getLogger()->error(errno, "NetModSocket::_sendandrecv_ pthread_setspecific");
      exit(1);
    }
  }
  NetConnPool *mpool = _get_pool();
  // 本地发送
  if(node_arr == NULL || arrlen == 0) {
src/socket/net_mod_socket.h
@@ -5,7 +5,7 @@
#include "socket_io.h"
#include <poll.h>
#include "socket_def.h"
#include "net_conn_pool.h"
class NetModServerSocket;
@@ -63,7 +63,9 @@
   
  ShmModSocket shmModSocket;
  // pool req_resp_pool;
  NetConnPool *gpool;
  pthread_mutex_t sendMutex;
  // request header 编码为网络传输的字节
  static void * encode_request_head(net_mod_request_head_t & request);
@@ -78,6 +80,10 @@
  // 创建thread local key
  static void _createConnPoolKey_(void);
  NetConnPool* _get_threadlocal_pool();
  NetConnPool* _get_pool();
  //读取返回信息
  int read_response(int clientfd, net_mod_recv_msg_t *recv_msg);
  // 发送请求信息
@@ -86,6 +92,9 @@
  int _sendandrecv_(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout);
  int _sendandrecv_unsafe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size,
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout);
  int _pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int timeout) ;
  
src/socket/shm_socket.cpp
@@ -49,7 +49,7 @@
  int s, type;
  pthread_mutexattr_t mtxAttr;
  // logger->debug("shm_open_socket\n");
  logger->debug("shm_open_socket\n");
  shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
  socket->socket_type = socket_type;
  socket->key = -1;
@@ -78,7 +78,7 @@
  
  int ret, s;
  
  // logger->debug("shm_close_socket\n");
  logger->debug("shm_close_socket\n");
  switch (socket->socket_type) {
    case SHM_SOCKET_STREAM:
      ret =  _shm_close_stream_socket(socket, true);
@@ -386,9 +386,9 @@
  } else {
    delete remoteQueue;
    mm_free(dest.buf);
    if(errno == EAGAIN) {
    if(errno == ETIMEDOUT) {
      bus_errno = EBUS_TIMEOUT;
      logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno));
      logger->error(errno, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
      return EBUS_TIMEOUT;
    } else {
      logger->error(errno, "sendto key %d failed!", key);
@@ -498,8 +498,8 @@
}
int shm_sendandrecv_safe2(shm_socket_t *socket, const void *send_buf,
// use thread local
int _shm_sendandrecv_thread_local(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  int recv_key;
@@ -547,7 +547,7 @@
  return -1;
}
int shm_sendandrecv_safe(shm_socket_t *socket, const void *send_buf,
int _shm_sendandrecv_alloc_new(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  int recv_key;
@@ -601,7 +601,7 @@
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  return  shm_sendandrecv_safe(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout,  flags);
  return  shm_sendandrecv_unsafe(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout,  flags);
}