fujuntang
2021-09-11 dc01e4cbb01e96d19b470a366bbe648d426ed171
Add topics sub and request support.
13个文件已修改
364 ■■■■ 已修改文件
src/bh_api.cpp 32 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/key_def.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 40 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.h 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 44 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.h 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proc_def.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.cpp 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 118 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp
@@ -139,6 +139,12 @@
    gNetmod_socket = net_mod_socket_open();
    hashtable_t *hashtable = mm_get_hashtable();
    key = hashtable_alloc_key(hashtable);
    count = hashtable_alloc_key(hashtable);
    rv = hashtable_alloc_key(hashtable);
    net_mod_socket_int_set(gNetmod_socket, count);
    net_mod_socket_svr_set(gNetmod_socket, rv);
    sprintf(pData.int_info, "%d", count);
    sprintf(pData.svr_info, "%d", rv);
    net_mod_socket_bind(gNetmod_socket, key);
  
    rv = net_mod_socket_reg(gNetmod_socket, &pData, sizeof(ProcInfo), NULL, 0, timeout_ms, PROC_REG);
@@ -932,8 +938,8 @@
  ::bhome_msg::MsgCommonReply mcr;
    mcr.mutable_errmsg()->set_errcode(::bhome_msg::ErrorCode(rv));
    mcr.mutable_errmsg()->set_errstring(errString);
    *reply_len=mcr.ByteSizeLong();
    *reply=malloc(*reply_len);
    *reply_len = mcr.ByteSizeLong();
    *reply = malloc(*reply_len);
    mcr.SerializePartialToArray(*reply,*reply_len);
#endif 
@@ -1207,6 +1213,7 @@
  int val;
  int len;
  int min;
  int data;
  int sec, nsec;
  std::string MsgID;
  int timeout_ms = 3000;
@@ -1309,20 +1316,21 @@
        strncpy(topics_buf + strlen(buf_temp) + 1, _input1.data, strlen(_input1.data));
#endif 
        data = net_mod_socket_svr_get(gNetmod_socket);
        if (timeout_ms > 0) {
          sec = timeout_ms / 1000;
          nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
          
          rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec);
          rv = net_mod_socket_sendto_timeout(gNetmod_socket, topics_buf, len, val, sec, nsec, SVR_STR, data);
          
        } else if (timeout_ms == 0) {
          rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val);
          rv = net_mod_socket_sendto_nowait(gNetmod_socket, topics_buf, len, val, SVR_STR, data);
        } else {
          rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val);
          rv = net_mod_socket_sendto(gNetmod_socket, topics_buf, len, val, SVR_STR, data);
        } 
        free(topics_buf);
@@ -1377,6 +1385,7 @@
  int size;
  int val;
  int min, len;
  int data;
  net_node_t node;
  int node_size;  
  int recv_arr_size;
@@ -1469,6 +1478,7 @@
        len += strlen(_input1.data);
#endif
        data = net_mod_socket_svr_get(gNetmod_socket);
        topics_buf = (char *)malloc(len);
        if (topics_buf == NULL) {
          
@@ -1620,6 +1630,7 @@
  int key;
  int size;
  int len;
  int data;
  int sec, nsec;
  char buf_temp[MAX_STR_LEN] = { 0x00 };
  char *topics_buf = NULL;
@@ -1642,20 +1653,21 @@
        return false;
  }
  data = net_mod_socket_svr_get(gNetmod_socket);
  if (timeout_ms > 0) {
    sec = timeout_ms / 1000;
    nsec = (timeout_ms - sec * 1000) * 1000 * 1000;
    rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec);
    rv = net_mod_socket_recvfrom_timeout(gNetmod_socket, &buf, &size, &key, sec, nsec, SVR_STR, data);
  } else if (timeout_ms == 0) {
    rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key);
    rv = net_mod_socket_recvfrom_nowait(gNetmod_socket, &buf, &size, &key, SVR_STR, data);
  
  } else {
    rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key);
    rv = net_mod_socket_recvfrom(gNetmod_socket, &buf, &size, &key, SVR_STR, data);
  }
  if (rv == 0) {
@@ -1735,6 +1747,7 @@
int BHSendReply(void *src, const void *reply, const int reply_len)
{
  int rv;
  int data;
  const char *_input;
  
#if defined(PRO_DE_SERIALIZE)
@@ -1777,7 +1790,8 @@
  rv = pthread_mutex_trylock(&mutex);
  if (rv == 0) {
    rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src);
    data = net_mod_socket_svr_get(gNetmod_socket);
    rv = net_mod_socket_sendto(gNetmod_socket, _input, strlen(_input), *(int *)src, SVR_STR, data);
    memset(errString, 0x00, sizeof(errString));
    strncpy(errString, bus_strerror(rv), sizeof(errString));
src/key_def.h
@@ -9,6 +9,7 @@
// BUS key
#define SHM_BUS_KEY 8
#define SHM_BUS_INT_KEY 9
// 网络代理key
#define SHM_NET_PROXY_KEY 99
src/net/net_mod_socket.cpp
@@ -55,6 +55,22 @@
  return shmModSocket.reg(pData, len, buf, size, timeout_ms, flag);
}
void NetModSocket::int_set(int data) {
  int_val = data;
}
void NetModSocket::svr_set(int data) {
  svr_val = data;
}
int NetModSocket::int_get(void) {
  return int_val;
}
int NetModSocket::svr_get(void) {
  return svr_val;
}
// int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
//   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) {
//   return _sendandrecv_(node_arr,  arrlen, send_buf,send_size, recv_arr, recv_arr_size, -1);
@@ -493,20 +509,20 @@
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
 */
int NetModSocket::sendto(const void *buf, const int size, const int key){
  return shmModSocket.sendto(buf, size, key);
int NetModSocket::sendto(const void *buf, const int size, const int key, int reset, int data_set){
  return shmModSocket.sendto(buf, size, key, 0, 0, reset, data_set);
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec){
int NetModSocket::sendto_timeout(const void *buf, const int size, const int key, int sec, int nsec, int reset, int data_set){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
  return shmModSocket.sendto(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set);
   
}
// 发送信息立刻返回。
int NetModSocket::sendto_nowait(const void *buf, const int size, const int key){
  return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG);
int NetModSocket::sendto_nowait(const void *buf, const int size, const int key, int reset, int data_set){
  return shmModSocket.sendto(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set);
  
}
@@ -515,21 +531,21 @@
 * @key 从谁哪里收到的信息
 * @return 0 成功, 其他值 失败的错误码
*/
int NetModSocket::recvfrom(void **buf, int *size, int *key) {
int NetModSocket::recvfrom(void **buf, int *size, int *key, int reset, int data_set) {
  return shmModSocket.recvfrom(buf, size, key);
  return shmModSocket.recvfrom(buf, size, key, 0, 0, reset, data_set);
 
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec){
int NetModSocket::recvfrom_timeout(void **buf, int *size, int *key, int sec, int nsec, int reset, int data_set){
  struct timespec timeout = {sec, nsec};
  return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG);
  return shmModSocket.recvfrom(buf, size, key, &timeout, BUS_TIMEOUT_FLAG, reset, data_set);
  
}
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
  return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG);
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key, int reset, int data_set){
  return shmModSocket.recvfrom(buf, size, key, NULL, BUS_NOWAIT_FLAG, reset, data_set);
}
int NetModSocket::recvandsend(recvandsend_callback_fn callback,
src/net/net_mod_socket.h
@@ -71,7 +71,8 @@
private:
   
  ShmModSocket shmModSocket;
  int int_val;
  int svr_val;
  // pthread_mutex_t sendMutex;
  // request header 编码为网络传输的字节
@@ -136,7 +137,10 @@
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, 
    net_mod_err_t ** _err_arr, int *_err_arr_size, int timeout); 
 
  void int_set(int data);
  void svr_set(int data);
  int int_get(void);
  int svr_get(void);
  /**
   * 功能同sendandrecv
   * 优点:线程安全
@@ -146,27 +150,27 @@
  // int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
  //   net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
  /**
   * 发送信息
   * @key 发送给谁
   * @return 0 成功, 其他值 失败的错误码
   */
  int sendto( const void *buf, const int size, const int key);
  int sendto( const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
  // 发送信息超时返回。 @sec 秒 , @nsec 纳秒
  int sendto_timeout( const void *buf, const int size, const int key, int sec, int nsec);
  int sendto_timeout( const void *buf, const int size, const int key, int sec, int nsec, int reset = 0, int data_set = 0);
  // 发送信息立刻返回。
  int sendto_nowait( const void *buf, const int size, const int key);
  int sendto_nowait( const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
  /**
   * 接收信息
   * @key 从谁哪里收到的信息
   * @return 0 成功, 其他值 失败的错误码
  */
  int recvfrom( void **buf, int *size, int *key);
  int recvfrom( void **buf, int *size, int *key, int reset = 0, int data_set = 0);
  // 接受信息超时返回。 @sec 秒 , @nsec 纳秒
  int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec);
  int recvfrom_nowait( void **buf, int *size, int *key);
  int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec, int reset = 0, int data_set = 0);
  int recvfrom_nowait( void **buf, int *size, int *key, int reset = 0, int data_set = 0);
  /**
   * 本地发送请求信息并等待接收应答
   * @key 发送给谁
src/net/net_mod_socket_wrapper.cpp
@@ -57,20 +57,20 @@
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
 */
int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key) {
int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key, int reset, int data_set) {
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendto(buf, size, key);
    return sockt->sendto(buf, size, key, reset, data_set);
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec){
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec, int reset, int data_set){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendto_timeout(buf, size, key, sec, nsec);
    return sockt->sendto_timeout(buf, size, key, sec, nsec, reset, data_set);
    // return sockt->sendto(buf, size, key);
}
// 发送信息立刻返回。
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key){
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key, int reset, int data_set){
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->sendto_nowait(buf, size, key);
    return sockt->sendto_nowait(buf, size, key, reset, data_set);
}
/**
@@ -78,23 +78,23 @@
 * @port 从谁哪里收到的信息
 * @return 0 成功, 其他值 失败的错误码
*/
int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){
int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key, int reset, int data_set){
    int rv;
    NetModSocket *sockt = (NetModSocket *)_socket;
    rv = sockt->recvfrom(buf, size, key);
    rv = sockt->recvfrom(buf, size, key, reset, data_set);
    return rv;
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec, int reset, int data_set){
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->recvfrom_timeout(buf, size, key, sec, nsec);
  return sockt->recvfrom_timeout(buf, size, key, sec, nsec, reset, data_set);
}
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key){
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key, int reset, int data_set){
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->recvfrom_nowait(buf, size, key);
  return sockt->recvfrom_nowait(buf, size, key, reset, data_set);
}
int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
@@ -108,6 +108,26 @@
  return sockt->bind_proc_id(proc_id, len);
}
void net_mod_socket_int_set(void * _socket, int data) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  sockt->int_set(data);
}
void net_mod_socket_svr_set(void * _socket, int data) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  sockt->svr_set(data);
}
int net_mod_socket_int_get(void * _socket) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->int_get();
}
int net_mod_socket_svr_get(void * _socket) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->svr_get();
}
/**
 * 如果建立连接的节点没有接受到消息等待timeout的时间后返回
 * @timeout 等待时间,单位是千分之一秒
src/net/net_mod_socket_wrapper.h
@@ -67,7 +67,7 @@
 *
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key);
int net_mod_socket_sendto(void *_socket, const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
/**
 * @brief  发送信息,在指定时间内没发送完成也返回。 
@@ -80,7 +80,7 @@
 *
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec);
int net_mod_socket_sendto_timeout(void *_socket, const void *buf, const int size, const int key, int sec, int nsec, int reset = 0, int data_set = 0);
/**
 * @brief 发送信息,无论是否发送完成立刻返回。
@@ -91,7 +91,7 @@
 *
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key);
int net_mod_socket_sendto_nowait(void *_socket, const void *buf, const int size, const int key, int reset = 0, int data_set = 0);
/**
 * @brief 等待接收信息,直到有消息接受到才返回
@@ -102,7 +102,7 @@
 * 
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key);
int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key, int reset = 0, int data_set = 0);
/**
 * @brief 等待接收信息,在指定的时间内即使没有接受到消息也要返回
@@ -115,7 +115,7 @@
 * 
 * @return 0是成功, 其他值是失败的错误码
 */
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec);
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec, int reset = 0, int data_set = 0);
/**
 * @brief 等待接收信息,直到有消息接受到才返回
@@ -126,10 +126,12 @@
 * 
 * @return 0是成功,其他值是失败的错误码
 */
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key);
int net_mod_socket_recvfrom_nowait(void *_socket, void **buf, int *size, int *key, int reset = 0, int data_set = 0);
void net_mod_socket_int_set(void * _socket, int data);
void net_mod_socket_svr_set(void * _socket, int data);
int net_mod_socket_int_get(void * _socket);
int net_mod_socket_svr_get(void * _socket);
/**
 * @brief 跨机器发送消息并接受返回的应答消息,直到发送完成才返回
src/proc_def.h
@@ -35,6 +35,8 @@
  char name[MAX_STR_LEN];
  char public_info[MAX_STR_LEN]; 
  char private_info[MAX_STR_LEN];
  char int_info[MAX_STR_LEN];
  char svr_info[MAX_STR_LEN];
#endif
} ProcInfo;
@@ -65,6 +67,8 @@
}
#endif
#define INT_STR     0x01
#define SVR_STR     0x02
#endif  //end of file
src/socket/bus_server_socket.cpp
@@ -302,6 +302,7 @@
  int count = 0;
  int i = 0;
  int len = 0;
  int data1, data2;
  char *data_ptr;
  ProcInfo Data_stru;
  ProcZone::iterator proc_iter;
@@ -333,6 +334,13 @@
      
      memcpy(Data_stru.private_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.int_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
      memcpy(Data_stru.svr_info, buf + count, strlen(buf + count) + 1);
      count += strlen(buf + count) + 1;
    }
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
@@ -362,6 +370,9 @@
      if ((proc_iter = proc->find(key)) != proc->end()) {
        data1 = atoi((proc_iter->second).int_info);
        data2 = atoi((proc_iter->second).svr_info);
        BusServerSocket::_data_remove(data1, data2);
        len = (sizeof(buf_temp) - 1) > strlen((proc_iter->second).proc_id) ? strlen((proc_iter->second).proc_id) : (sizeof(buf_temp) - 1);
        strncpy(buf_temp, (proc_iter->second).proc_id, len);
        proc->erase(proc_iter);
@@ -504,7 +515,9 @@
    free(last_buf);
  } else if (flag == PROC_QUE_STCS) {
    SvrTcs *SvrData = shm_mm_attach<SvrTcs>(SHM_BUS_TCS_MAP_KEY);
    ProcZone *proc = shm_mm_attach<ProcZone>(SHM_BUS_PROC_MAP_KEY);
    strncpy(buf_temp, topic, topic_size > (sizeof(buf_temp) - 1) ? (sizeof(buf_temp) - 1) : topic_size);
    if ((svr_tcs_iter = SvrData->find(buf_temp)) != SvrData->end()) {
@@ -512,6 +525,9 @@
    
      for(svr_proc_iter = SvrSub_ele->begin(); svr_proc_iter != SvrSub_ele->end(); ++svr_proc_iter) { 
        count = *svr_proc_iter;
        if ((proc_iter = proc->find(count)) != proc->end()) {
          count = atoi((proc_iter->second).svr_info);
        }
        break;
      }
@@ -770,3 +786,40 @@
    return rv;
}
void BusServerSocket::_data_remove(int val1, int val2) {
  int i;
  LockFreeQueue<shm_packet_t> *queue = NULL;
  hashtable_t *hashtable = mm_get_hashtable();
  void *data_ptr1 = hashtable_get(hashtable, val1);
  void *data_ptr2 = hashtable_get(hashtable, val2);
  if (data_ptr1 != NULL) {
    if (data_ptr1 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr1;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
      }
      sleep(1);
    }
    hashtable_remove(hashtable, val1);
  }
  if (data_ptr2 != NULL) {
    if (data_ptr2 != (void *)1) {
      queue = (LockFreeQueue<shm_packet_t> *)data_ptr2;
      queue->close();
      for (i = 0; i < queue->size(); i++) {
        mm_free((*queue)[i].buf);
      }
      sleep(1);
    }
    hashtable_remove(hashtable, val2);
  }
}
src/socket/bus_server_socket.h
@@ -81,6 +81,7 @@
     */
    int get_key() ;
  void _data_remove(int val1, int val2);
};
src/socket/shm_mod_socket.cpp
@@ -166,8 +166,8 @@
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
 */
int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag) {
    int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag);
int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) {
    int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set);
  if(rv == 0) {
      logger->debug("ShmModSocket::sendto: %d sendto %d success.\n", get_key(), key);
      return 0;
@@ -182,9 +182,9 @@
 * @key 从谁哪里收到的信息
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag) {
int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) {
  int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag);
  int rv =  shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set);
    if(rv == 0) {
    logger->debug("ShmModSocket::recvfrom: %d recvfrom %d success.\n", get_key(), *key);
src/socket/shm_mod_socket.h
@@ -64,22 +64,11 @@
  int bind_proc_id(char *buf, int len);
  int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag);
    /**
     * 发送信息
     * @key 发送给谁
     * @flag BUS_TIMEOUT_FLAG  BUS_NOWAIT_FLAG
     * @return 0 成功, 其他值 失败的错误码
     */
    int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0);
  int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0);
    /**
     * 接收信息
     * @key 从谁哪里收到的信息
     * @return 0 成功, 其他值 失败的错误码
    */
    int recvfrom(void **buf, int *size, int *key,  const struct timespec *timeout = NULL, int flag = 0);
  int recvfrom(void **buf, int *size, int *key,  const struct timespec *timeout = NULL, int flag = 0, int reset = 0, int data_set = 0);
    /**
     * 发送请求信息并等待接收应答
     * @key 发送给谁
src/socket/shm_socket.cpp
@@ -23,11 +23,12 @@
static void _destrory_threadlocal_socket_(void *tmp_socket);
static void _create_threadlocal_socket_key_(void);
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,  int flag);
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *recvpak ,  const struct timespec *timeout,
                int flag, int reset = 0, int data_set = 0);
   
static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak,
               const int key, const struct timespec *timeout, const int flag);
static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak, const int key, const struct timespec *timeout,
                const int flag, int reset = 0, int data_set = 0);
static int _shm_sendandrecv_uuid(shm_socket_t *sockt, const void *send_buf,
@@ -183,20 +184,24 @@
}
// 短连接方式发送
int shm_sendto(shm_socket_t *sockt, const void *buf, const int size,
               const int key, const struct timespec *timeout, const int flag) {
int shm_sendto(shm_socket_t *sockt, const void *buf, const int size, const int key, const struct timespec *timeout,
                    const int flag, int reset, int data_set) {
  int rv;
 
  shm_packet_t sendpak = {0};
  sendpak.key = sockt->key;
  if (reset == 0) {
    sendpak.key = sockt->key;
  } else {
    sendpak.key = data_set;
  }
  sendpak.size = size;
  if(buf != NULL) {
    sendpak.buf = mm_malloc(size);
    memcpy(sendpak.buf, buf, size);
  }
 
  rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag);
  rv = shm_sendpakto(sockt, &sendpak, key, timeout, flag, reset, data_set);
  return rv;
}
@@ -262,11 +267,11 @@
}
// 短连接方式接受
int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key,  const struct timespec *timeout,  int flag) {
int shm_recvfrom(shm_socket_t *sockt, void **buf, int *size, int *key,  const struct timespec *timeout,  int flag, int reset, int data_set) {
  int rv;
  
  shm_packet_t recvpak;
  rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag);
  rv = shm_recvpakfrom(sockt , &recvpak, timeout, flag, reset, data_set);
  if (rv != 0) {
@@ -544,15 +549,24 @@
 
   
static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak,
               const int key, const struct timespec *timeout, const int flag) {
static int shm_sendpakto(shm_socket_t *sockt,  shm_packet_t *sendpak, const int key, const struct timespec *timeout,
                          const int flag, int reset, int data_set) {
  int rv;
  shm_queue_status_t stRecord;
  LockFreeQueue<shm_packet_t> *remoteQueue;
  LockFreeQueue<shm_packet_t> *fixedQueue;
  hashtable_t *hashtable = mm_get_hashtable();
  if( sockt->queue != NULL)
  if ((reset != 0) && (data_set == 0)) {
    return EBUS_KEY_INUSED;
  }
  if (reset != 0) {
    fixedQueue = shm_socket_attach_queue(data_set);
  }
  if (((reset == 0) && (sockt->queue != NULL)) || ((reset != 0) && (fixedQueue != NULL)))
    goto LABEL_PUSH;
  // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
@@ -563,7 +577,7 @@
    if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
    err_exit(rv, "shm_sendto : pthread_mutex_lock");
    
    if (sockt->queue == NULL) {
    if ((sockt->queue == NULL) && (reset == 0)) {
      if (sockt->key == 0) {
        sockt->key = hashtable_alloc_key(hashtable);
      }
@@ -580,6 +594,16 @@
      // stRecord.createTime = time(NULL);
      // shmQueueStMap->insert({sockt->key, stRecord});
      
    }
    if ((fixedQueue == NULL) && (reset != 0)) {
      fixedQueue = shm_socket_bind_queue(data_set, false);
      if (fixedQueue == NULL ) {
        logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
        if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
          err_exit(rv, "shm_sendto : pthread_mutex_unlock");
        return EBUS_KEY_INUSED;
      }
    }
    if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
@@ -611,7 +635,9 @@
    goto ERR_CLOSED;
  }
  sendpak->key = sockt->key;
  if (reset == 0) {
    sendpak->key = sockt->key;
  }
  rv = remoteQueue->push(*sendpak, timeout, flag);
  if(rv != 0) {
@@ -629,13 +655,23 @@
}
// 短连接方式接受
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak ,  const struct timespec *timeout,  int flag) {
static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak ,  const struct timespec *timeout,
                            int flag, int reset, int data_set) {
  int rv;
  shm_queue_status_t stRecord;
  LockFreeQueue<shm_packet_t> *fixedQueue;
  hashtable_t *hashtable = mm_get_hashtable();
  shm_packet_t recvpak;
  if( sockt->queue != NULL)
  if ((reset != 0) && (data_set == 0)) {
    return EBUS_KEY_INUSED;
  }
  if (reset != 0) {
    fixedQueue = shm_socket_attach_queue(data_set);
  }
  if (((sockt->queue != NULL) && (reset == 0)) || ((reset != 0) && (fixedQueue != NULL)))
    goto LABEL_POP;
  // if(hashtable_get_queue_count(hashtable) > QUEUE_COUNT_LIMIT) {
@@ -646,21 +682,33 @@
    if ((rv = pthread_mutex_lock(&(sockt->mutex))) != 0)
      err_exit(rv, "shm_recvfrom : pthread_mutex_lock");
 
    if (sockt->key == 0) {
      sockt->key = hashtable_alloc_key(hashtable);
    }
    sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
    if(sockt->queue  == NULL ) {
      logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
      if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
        err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
      return EBUS_KEY_INUSED;
    if ((sockt->queue == NULL) && (reset == 0)) {
      if (sockt->key == 0) {
        sockt->key = hashtable_alloc_key(hashtable);
      }
      sockt->queue = shm_socket_bind_queue( sockt->key, sockt->force_bind);
      if(sockt->queue  == NULL ) {
        logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
        if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
          err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
        return EBUS_KEY_INUSED;
      }
      // 标记key对应的状态 ,为opened
      // stRecord.status = SHM_QUEUE_ST_OPENED;
      // stRecord.createTime = time(NULL);
      // shmQueueStMap->insert({sockt->key, stRecord});
    }
    // 标记key对应的状态 ,为opened
    // stRecord.status = SHM_QUEUE_ST_OPENED;
    // stRecord.createTime = time(NULL);
    // shmQueueStMap->insert({sockt->key, stRecord});
    if ((fixedQueue == NULL) && (reset != 0)) {
      fixedQueue = shm_socket_bind_queue(data_set, false);
      if (fixedQueue == NULL ) {
        logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key);
        if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
          err_exit(rv, "shm_sendto : pthread_mutex_unlock");
        return EBUS_KEY_INUSED;
      }
    }
    
    if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0)
      err_exit(rv, "shm_recvfrom : pthread_mutex_unlock");
@@ -669,7 +717,11 @@
  
LABEL_POP:
  rv = sockt->queue->pop(recvpak, timeout, flag);
  if (reset == 0) {
    rv = sockt->queue->pop(recvpak, timeout, flag);
  } else {
    rv = fixedQueue->pop(recvpak, timeout, flag);
  }
  if(rv != 0) {
    if(rv == ETIMEDOUT) {
      return EBUS_TIMEOUT;
@@ -697,6 +749,10 @@
  count += strlen(ptr->public_info) + 1;
  memcpy(dst + count, ptr->private_info, strlen(ptr->private_info) + 1);
  count += strlen(ptr->private_info) + 1;
  memcpy(dst + count, ptr->int_info, strlen(ptr->int_info) + 1);
  count += strlen(ptr->int_info) + 1;
  memcpy(dst + count, ptr->svr_info, strlen(ptr->svr_info) + 1);
  count += strlen(ptr->svr_info) + 1;
  *counter = count;
}
src/socket/shm_socket.h
@@ -66,9 +66,9 @@
/**
 * @flags : BUS_NOWAIT_FLAG
 */
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0);
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0, int reset = 0, int data_set = 0);
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  const struct timespec * timeout = NULL,  int flags=0);
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  const struct timespec * timeout = NULL,  int flags=0, int reset = 0, int data_set = 0);
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,  
    const struct timespec * timeout = NULL,  int flags = 0);