wangzhengquan
2021-01-13 973692652774a5ffe98478ee287b40af529d0b39
update
12个文件已修改
381 ■■■■ 已修改文件
include/usgcommon/sem_util.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
lib/libusgcommon.a 补丁 | 查看 | 原始文档 | blame | 历史
lib/libusgcommon.so 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 84 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.cpp 204 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.cpp 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.h 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket_wrapper.cpp 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/heart_beat.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_bus_stop.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
include/usgcommon/sem_util.h
@@ -1,5 +1,5 @@
#ifndef PCSEM_H
#define PCSEM_H
#ifndef _SEM_UTIL_H
#define _SEM_UTIL_H
#include "usg_common.h"
#include "usg_typedef.h"
@@ -14,9 +14,12 @@
    int zero_nowait(int semId);
    int zero_timeout(const int semId, const struct timespec * timeout);
    int inc(int semId);
    int set(int semId, int val);
    void remove(int semid);
    void set(int semId, int val);
}
lib/libusgcommon.a
Binary files differ
lib/libusgcommon.so
Binary files differ
src/queue/lock_free_queue.h
@@ -8,6 +8,7 @@
#include "logger_factory.h"
#include "shm_allocator.h"
#include "px_sem_util.h"
#include "bus_error.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
@@ -119,17 +120,17 @@
    /// structures to be inserted in the queue you should think of instantiate the template
    /// of the queue as a pointer to that large structure
    /// @return true if the element was inserted in the queue. False if the queue was full
    bool push(const ELEM_T &a_data);
    bool push_nowait(const ELEM_T &a_data);
    bool push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
    int push(const ELEM_T &a_data);
    int push_nowait(const ELEM_T &a_data);
    int push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
    /// @brief pop the element at the head of the queue
    /// @param a reference where the element in the head of the queue will be saved to
    /// Note that the a_data parameter might contain rubbish if the function returns false
    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
    bool pop(ELEM_T &a_data);
    bool pop_nowait(ELEM_T &a_data);
    bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
    int pop(ELEM_T &a_data);
    int pop_nowait(ELEM_T &a_data);
    int pop_timeout(ELEM_T &a_data, struct timespec * timeout);
    void *operator new(size_t size);
@@ -213,20 +214,20 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");   
    if (sem_wait(&slots) == -1) {
        err_msg(errno, "LockFreeQueue push");
        return false;
        return errno;
    }
    
    if ( m_qImpl.push(a_data) ) {
        sem_post(&items);   
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");   
        return true;
        return 0;
    }
    return false;
    return -1;
    
}
@@ -234,23 +235,23 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (sem_trywait(&slots) == -1) {
        if (errno == EAGAIN)
            return false;
            return EAGAIN;
        else {
            err_msg(errno, "LockFreeQueue push_nowait");
            return false;
            return errno;
        }
    }
    if ( m_qImpl.push(a_data)) {
        sem_post(&items);     
        return true;
        return 0;
    }
    return false;
    return -1;
    
}
@@ -258,16 +259,9 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
    // int tmp_sec;
    // struct timespec timeout;
    // if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
    //     err_exit(errno, "clock_gettime");
    // timeout.tv_nsec += ts->tv_nsec;
    // tmp_sec =  timeout.tv_nsec / 10e9;
    // timeout.tv_nsec =  timeout.tv_nsec - tmp_sec * 10e9;
    // timeout.tv_sec += ts->tv_sec + tmp_sec;
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
  // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld", 
@@ -278,21 +272,21 @@
    // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno);
        if(errno == ETIMEDOUT)
            return false;
            return EBUS_TIMEOUT;
        else if(errno == EINTR)
            continue;
        else {
           // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
            return false;
           LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
           return errno;
        }
    }
    if (m_qImpl.push(a_data)){
        sem_post(&items);   
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");    
        return true;
        return 0;
    }
    return false;
    return -1;
    
}
@@ -303,43 +297,43 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (sem_wait(&items) == -1) {
        LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop");
        return false;
        return errno;
    }
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");      
        return true;
        return 0;
    }
    return false;
    return -1;
}
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (sem_trywait(&items) == -1) {
        if (errno == EAGAIN)
            return false;
            return errno;
        else {
            LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_nowait");
            return false;
            return errno;
        }
    }
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);     
        return true;
        return 0;
    }
    return false;
    return -1;
}
 
@@ -347,29 +341,29 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
     struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
    while (sem_timedwait(&items, &timeout) == -1) {
        if (errno == ETIMEDOUT)
            return false;
            return EBUS_TIMEOUT;
        else if(errno == EINTR)
            continue;
        else {
           // LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout");
            return false;
          LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop_timeout");
          return -1;
        }
    }
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);  
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");     
        return true;
        return 0;
    }
    return false;
    return -1;
    
}
src/shm/hashtable.cpp
@@ -2,6 +2,7 @@
#include "hashtable.h"
#include "mm.h"
#include "sem_util.h"
#include "logger_factory.h"
#include <set>
#include <functional>
@@ -27,7 +28,7 @@
void hashtable_init(hashtable_t *hashtable )
{
  memset(hashtable, 0, sizeof(hashtable_t));
  hashtable->mutex = SemUtil::get(IPC_PRIVATE, 1);
  hashtable->wlock = SemUtil::get(IPC_PRIVATE, 1);
@@ -102,8 +103,11 @@
  size_t code = hashcode(key);
  tailq_entry_t *item;
  void *oldvalue;
  int rv;
  SemUtil::dec(hashtable->wlock);
  if( (rv = SemUtil::dec(hashtable->wlock)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
  }
  tailq_header_t *my_tailq_head = hashtable->array[code] ;
  if ( my_tailq_head == NULL)
  {
@@ -127,7 +131,10 @@
      }
    }
  }
  SemUtil::inc(hashtable->wlock);
  if((rv = SemUtil::inc(hashtable->wlock)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
  }
  return NULL;
}
@@ -139,7 +146,11 @@
void hashtable_removeall(hashtable_t *hashtable)
{
  tailq_entry_t *item;
  SemUtil::dec(hashtable->wlock);
  int rv;
  rv = SemUtil::dec(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
  }
  for (int i = 0; i < MAPSIZE; i++)
  {
    tailq_header_t *my_tailq_head = hashtable->array[i] ;
@@ -155,7 +166,10 @@
    mm_free(my_tailq_head);
    hashtable->array[i] = NULL;
  }
  SemUtil::inc(hashtable->wlock);
  rv = SemUtil::inc(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
  }
}
/**
@@ -182,64 +196,110 @@
static size_t hashcode(int key)
{
  return key % MAPSIZE;
  /*printf("hashfun = %ld\n", code);*/
}
void *hashtable_get(hashtable_t *hashtable, int key) {
   SemUtil::dec(hashtable->mutex);
   hashtable->readcnt++;
   if (hashtable->readcnt == 1) {
  int rv;
  rv = SemUtil::dec(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get\n");
  }
  hashtable->readcnt++;
  if (hashtable->readcnt == 1) {
    //获取读写锁
    SemUtil::dec(hashtable->wlock);
// err_msg(0, "hashtable_get dec %d %d\n", --hashtable->tmp);
   }
   SemUtil::inc(hashtable->mutex);
   // ================
    rv = SemUtil::dec(hashtable->wlock);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_get\n");
    }
  }
  rv = SemUtil::inc(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get\n");
  }
  // ================
   void * res = _hashtable_get(hashtable, key);
  void * res = _hashtable_get(hashtable, key);
   // ==================
  // ==================
   SemUtil::dec(hashtable->mutex);
   hashtable->readcnt--;
   if(hashtable->readcnt == 0) {
  rv = SemUtil::dec(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get\n");
  }
  hashtable->readcnt--;
  if(hashtable->readcnt == 0) {
    //释放读写锁
    SemUtil::inc(hashtable->wlock);
// err_msg(0, "hashtable_get inc %d\n", ++hashtable->tmp);
  //通知写
    SemUtil::set(hashtable->cond, 1);
   }
   SemUtil::inc(hashtable->mutex);
   return res;
    rv = SemUtil::inc(hashtable->wlock);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_get\n");
    }
    //通知写
    rv = SemUtil::set(hashtable->cond, 1);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_get\n");
    }
  }
  rv = SemUtil::inc(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get\n");
  }
  return res;
}
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
  SemUtil::dec(hashtable->mutex);
  int rv;
  rv = SemUtil::dec(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
  // 设置读优先级高
  while (hashtable->readcnt > 0)
  {
    SemUtil::set(hashtable->cond, 0);
    SemUtil::inc(hashtable->mutex);
    rv = SemUtil::set(hashtable->cond, 0);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
    rv = SemUtil::inc(hashtable->mutex);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
    //等待写通知
    SemUtil::dec(hashtable->cond);
    rv = SemUtil::dec(hashtable->cond);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
    SemUtil::dec(hashtable->mutex);
    rv = SemUtil::dec(hashtable->mutex);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
  }
  SemUtil::inc(hashtable->mutex);
  rv = SemUtil::inc(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
  //获取读写锁
  SemUtil::dec(hashtable->wlock);
 // err_msg(0, "hashtable_put dec %d\n", --hashtable->tmp);
  rv = SemUtil::dec(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
  _hashtable_put(hashtable, key, value);
  //释放读写锁
  SemUtil::inc(hashtable->wlock);
// err_msg(0, "hashtable_put inc %d\n", ++hashtable->tmp);
  rv = SemUtil::inc(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
}
@@ -261,29 +321,51 @@
void hashtable_foreach(hashtable_t *hashtable,  std::function<void(int, void *)>  cb) {
   SemUtil::dec(hashtable->mutex);
   hashtable->readcnt++;
   if (hashtable->readcnt == 1) {
  int rv;
  rv = SemUtil::dec(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach\n");
  }
  hashtable->readcnt++;
  if (hashtable->readcnt == 1) {
    //获取读写锁
    SemUtil::dec(hashtable->wlock);
   }
   SemUtil::inc(hashtable->mutex);
    rv = SemUtil::dec(hashtable->wlock);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_foreach\n");
    }
  }
  rv = SemUtil::inc(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach\n");
  }
   // ==================
  // ==================
    _hashtable_foreach(hashtable, cb);
  _hashtable_foreach(hashtable, cb);
   // ==================
  // ==================
   SemUtil::dec(hashtable->mutex);
   hashtable->readcnt--;
   if(hashtable->readcnt == 0) {
  rv = SemUtil::dec(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach\n");
  }
  hashtable->readcnt--;
  if(hashtable->readcnt == 0) {
    //释放读写锁
    SemUtil::inc(hashtable->wlock);
  //通知写
    SemUtil::set(hashtable->cond, 1);
   }
   SemUtil::inc(hashtable->mutex);
    rv = SemUtil::inc(hashtable->wlock);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_foreach\n");
    }
    //通知写
    rv = SemUtil::set(hashtable->cond, 1);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_foreach\n");
    }
  }
  rv = SemUtil::inc(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach\n");
  }
}
@@ -307,8 +389,12 @@
int hashtable_alloc_key(hashtable_t *hashtable) {
  int rv;
  int key = START_KEY;
  SemUtil::dec(hashtable->wlock);
  rv = SemUtil::dec(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
  }
  while(_hashtable_get(hashtable, key) != NULL) {
    key++;
@@ -316,7 +402,9 @@
  // 占用key
  _hashtable_put(hashtable, key, (void *)1);
  SemUtil::inc(hashtable->wlock);
// err_msg(0, "hashtable_alloc_key inc %d\n", ++hashtable->tmp);
  rv = SemUtil::inc(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
  }
  return key;
}
src/socket/net_mod_socket.cpp
@@ -137,24 +137,11 @@
}
NetConnPool* NetModSocket::_get_pool() {
  return gpool;
  return _get_threadlocal_pool();
}
int NetModSocket::_sendandrecv_(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
  int s, rv;
  if ((s = pthread_mutex_lock(&sendMutex)) != 0)
    err_exit(s, "NetModSocket : pthread_mutex_lock");
  rv = _sendandrecv_unsafe(node_arr,  arrlen, send_buf,  send_size, recv_arr, recv_arr_size,   msec );
  if ((s = pthread_mutex_unlock(&sendMutex)) != 0)
    err_exit(s, "NetModSocket : pthread_mutex_lock");
  return rv;
}
int NetModSocket::_sendandrecv_unsafe(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int  msec ) {
  int i, n, recv_size, connfd;
src/socket/net_mod_socket.h
@@ -92,9 +92,6 @@
  int _sendandrecv_(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout);
  int _sendandrecv_unsafe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size,
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, int timeout);
  int _pub_(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size, int timeout) ;
  
src/socket/net_mod_socket_wrapper.cpp
@@ -79,37 +79,6 @@
    return sockt->recvfrom_nowait(buf, size, port);
}
/**
 * 如果建立连接的节点没有接受到消息会一直等待
 * 向node_arr 中的所有网络节点发送请求消息,节点的返回信息汇总并存储在recv_arr中
 * @node_arr 网络节点组, @node_arr_len该数组长度
 * @send_buf 发送的消息,@send_size 该消息体的长度
 * @recv_arr 返回的应答消息组,@recv_arr_size 该数组长度
 * @return 成功发送的节点的个数
 * 优点:1某个节点的故障不会阻塞其他节点。2性能好
 * 缺点:不是线程安全的, 即不能有两个以上的线程同时使用这个对象的方法
 */
//*****************************************************************************
//
//! \brief Write one byte to special register
//!
//! This function is to write one byte to LIS302DL register,one byte will be
//! writen in appointed address.
//!
//! \param node_arr specifies the target register address.
//! \param send_buf is the data written to target register.
//!
//! \return Indicate the status of operation which can be one of the following
//! value \b SUCCESS or  \b FAILURE .
//!
//! \note This function is used by internal, user MUST NOT call it in your
//!  Application.
//
//*****************************************************************************
int net_mod_socket_sendandrecv(void *_socket, net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size){
    NetModSocket *sockt = (NetModSocket *)_socket;
src/socket/shm_socket.cpp
@@ -163,7 +163,7 @@
  shm_socket_t *client_socket;
  shm_msg_t src;
  if (socket->acceptQueue->pop(src)) {
  if (socket->acceptQueue->pop(src) == 0) {
    // print_msg("===accept:", src);
    client_key = src.key;
@@ -190,7 +190,7 @@
    msg.size = 0;
    msg.type = SHM_SOCKET_OPEN_REPLY;
    if (client_socket->remoteQueue->push_timeout(msg, &timeout)) {
    if (client_socket->remoteQueue->push_timeout(msg, &timeout) == 0) {
      client_socket->status = SHM_CONN_ESTABLISHED;
      return client_socket;
    } else {
@@ -246,7 +246,7 @@
  socket->remoteQueue->push_timeout(msg, &timeout);
  //接受open reply
  if (socket->queue->pop(msg)) {
  if (socket->queue->pop(msg) == 0) {
    // 在这里server端已经准备好接受客户端发送请求了,完成与服务端的连接
    if (msg.type == SHM_SOCKET_OPEN_REPLY) {
      socket->status = SHM_CONN_ESTABLISHED;
@@ -283,7 +283,7 @@
  dest.buf = mm_malloc(size);
  memcpy(dest.buf, buf, size);
  if (socket->remoteQueue->push(dest)) {
  if (socket->remoteQueue->push(dest) == 0) {
    return 0;
  } else {
    logger->error(errno, "connection has been closed!");
@@ -300,7 +300,7 @@
  }
  shm_msg_t src;
  if (socket->messageQueue->pop(src)) {
  if (socket->messageQueue->pop(src) == 0) {
    void *_buf = malloc(src.size);
    memcpy(_buf, src.buf, src.size);
    *buf = _buf;
@@ -369,7 +369,6 @@
  dest.buf = mm_malloc(size);
  memcpy(dest.buf, buf, size);
  // printf("shm_sendto push before\n");
 
  if(flags & SHM_MSG_NOWAIT != 0) {
    rv = remoteQueue->push_nowait(dest);
@@ -379,19 +378,19 @@
      rv = remoteQueue->push(dest);
  }
  if (rv) {
  if (rv == 0) {
    // printf("shm_sendto push after\n");
    delete remoteQueue;
    return 0;
  } else {
    delete remoteQueue;
    mm_free(dest.buf);
    if(errno == ETIMEDOUT) {
    if(rv == EBUS_TIMEOUT) {
      bus_errno = EBUS_TIMEOUT;
      logger->error(errno, "sendto key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT));
      return EBUS_TIMEOUT;
    } else {
      logger->error(errno, "sendto key %d failed!", key);
      //logger->error(errno, "sendto key %d failed!", key);
      return -1;
    }
   
@@ -433,7 +432,6 @@
    err_exit(s, "shm_recvfrom : pthread_mutex_unlock");
  shm_msg_t src;
  // printf("shm_recvfrom pop before\n");
 
   if(flags & SHM_MSG_NOWAIT != 0) {
    rv = socket->queue->pop_nowait(src);
@@ -443,7 +441,7 @@
    rv = socket->queue->pop(src);
  }
  if (rv) {
  if (rv == 0) {
    if(buf != NULL) {
      void *_buf = malloc(src.size);
      memcpy(_buf, src.buf, src.size);
@@ -457,7 +455,6 @@
      *key = src.key;
    mm_free(src.buf);
    // printf("shm_recvfrom pop after\n");
    return 0;
  } else {
    return -1;
@@ -601,7 +598,7 @@
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  return  shm_sendandrecv_unsafe(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout,  flags);
  return  _shm_sendandrecv_thread_local(socket, send_buf, send_size, send_key,recv_buf, recv_size, timeout,  flags);
}
@@ -644,7 +641,7 @@
  shm_socket_t *client_socket;
  std::map<int, shm_socket_t *>::iterator iter;
  while (socket->queue->pop(src)) {
  while (socket->queue->pop(src) == 0) {
    switch (src.type) {
    case SHM_SOCKET_OPEN:
@@ -687,7 +684,7 @@
  struct timespec timeout = {1, 0};
  shm_msg_t src;
  while (socket->queue->pop(src)) {
  while (socket->queue->pop(src) == 0) {
    switch (src.type) {
    case SHM_SOCKET_CLOSE:
test_net_socket/heart_beat.cpp
@@ -2,7 +2,7 @@
#include "net_mod_socket_wrapper.h"
#include "bus_server_socket_wrapper.h"
#include "shm_mm_wraper.h"
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include <getopt.h>
test_net_socket/test_bus_stop.cpp
@@ -2,7 +2,7 @@
#include "net_mod_socket_wrapper.h"
#include "bus_server_socket_wrapper.h"
#include "shm_mm_wraper.h"
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include <getopt.h>
test_net_socket/test_net_mod_socket.cpp
@@ -2,7 +2,7 @@
#include "net_mod_socket_wrapper.h"
#include "bus_server_socket_wrapper.h"
#include "shm_mm_wraper.h"
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include <getopt.h>