wangzhengquan
2021-02-22 45e00aca28504b27f3ad6b4abf364c3d57f34510
src/net/net_mod_socket.cpp
@@ -19,35 +19,39 @@
  if (Signal(SIGPIPE, SIG_IGN) == SIG_ERR)
    logger->error(errno, "NetModSocket::NetModSocket signal");
  gpool = new NetConnPool();
  // gpool = new NetConnPool();
  pthread_mutexattr_t mtxAttr;
  s = pthread_mutexattr_init(&mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_init");
  s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_settype");
  s = pthread_mutex_init(&sendMutex, &mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutex_init");
  // pthread_mutexattr_t mtxAttr;
  // s = pthread_mutexattr_init(&mtxAttr);
  // if (s != 0)
  //   err_exit(s, "pthread_mutexattr_init");
  // s = pthread_mutexattr_settype(&mtxAttr, PTHREAD_MUTEX_ERRORCHECK);
  // if (s != 0)
  //   err_exit(s, "pthread_mutexattr_settype");
  // s = pthread_mutex_init(&sendMutex, &mtxAttr);
  // if (s != 0)
  //   err_exit(s, "pthread_mutex_init");
  s = pthread_mutexattr_destroy(&mtxAttr);
  if (s != 0)
    err_exit(s, "pthread_mutexattr_destroy");
  // s = pthread_mutexattr_destroy(&mtxAttr);
  // if (s != 0)
  //   err_exit(s, "pthread_mutexattr_destroy");
}
NetModSocket::~NetModSocket() {
  int s;
  delete gpool;
  s =  pthread_mutex_destroy(&sendMutex);
  if(s != 0) {
    err_exit(s, "shm_close_socket");
  }
  // int s;
  // delete gpool;
  // s =  pthread_mutex_destroy(&sendMutex);
  // if(s != 0) {
  //   err_exit(s, "shm_socket_close");
  // }
}
int NetModSocket::stop() {
  return shmModSocket.stop();
}
/**
 * 绑定端口到socket, 如果不绑定则系统自动分配一个
@@ -139,6 +143,8 @@
NetConnPool* NetModSocket::_get_pool() {
  return _get_threadlocal_pool();
}
int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
@@ -302,22 +308,22 @@
}
int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
int NetModSocket::pub(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) {
  return _pub_(node_arr, arrlen, topic, topic_size, content,   content_size, -1);
}
int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
int NetModSocket::pub_nowait(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) {
  return _pub_(node_arr, arrlen, topic, topic_size, content,   content_size, 0);
}
int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int  msec ) {
int NetModSocket::pub_timeout(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int  msec ) {
  return _pub_(node_arr, arrlen, topic, topic_size, content, content_size, msec);
}
// int  pub(char *topic, int topic_size, void *content, int content_size, int port);
int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content,
int NetModSocket::_pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content,
 int content_size, int  msec) {
  int i, connfd;
  net_node_t *node;
@@ -495,6 +501,14 @@
  return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
}
int NetModSocket::recvandsend(recvandsend_callback_fn callback,
                              const struct timespec *timeout , int flag, void * user_data ) {
  return shmModSocket.recvandsend(callback, timeout, flag, user_data);
}
/**
 * 发送请求信息并等待接收应答
 * @key 发送给谁
@@ -586,7 +600,7 @@
//======================================================================================
int NetModSocket::write_request(int clientfd, net_mod_request_head_t &request_head, 
  void *content_buf, int content_size, void *topic_buf, int topic_size) {
  const void *content_buf, int content_size, const void *topic_buf, int topic_size) {
 
  int buf_size;
  char *buf;