tmp
wangzhengquan
2021-01-22 4c73fd7179e92bee9cccb65e46823b00f568acb3
tmp
15个文件已修改
3个文件已添加
1个文件已删除
623 ■■■■ 已修改文件
CMakeLists.txt 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
build.sh 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bus_def.h 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_server_socket.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_queue.h 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_sem_queue.h 41 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 156 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue.h 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.cpp 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.cpp 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.cpp 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/CMakeLists.txt 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/bus_test.cpp 116 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_bus.cpp 131 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -13,7 +13,10 @@
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}")
# set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}")
option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
if (CMAKE_BUILD_TYPE EQUAL "Release")
#option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
endif()
option(BUILD_DOC "Build doc" OFF)
@@ -29,4 +32,5 @@
    add_subdirectory(${PROJECT_SOURCE_DIR}/src)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_net_socket)
    add_subdirectory(${PROJECT_SOURCE_DIR}/test_socket)
endif()
build.sh
@@ -2,6 +2,7 @@
BUILD_TYPE="Debug"
BUILD_DOC="OFF"
BUILD_SHARED_LIBS="OFF"
function usage() {
    echo "build.sh [release | debug | doc]"
@@ -10,6 +11,7 @@
case ${1} in
  "release")
  BUILD_TYPE="Release"
  BUILD_SHARED_LIBS="ON"
  ;;
  
  "debug")
@@ -48,7 +50,7 @@
# -DBUILD_SHARED_LIBS=ON
# -DCMAKE_INSTALL_PREFIX=$(pwd/../dest)
# -DQCA_MAN_INSTALL_DIR:PATH=/usr/share/man 
cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=ON \
cmake -DCMAKE_INSTALL_PREFIX="$(pwd)/../dest" -DCMAKE_BUILD_TYPE=${BUILD_TYPE} -DBUILD_SHARED_LIBS=${BUILD_SHARED_LIBS} \
  -DBUILD_DOC=${BUILD_DOC} -DSUPPORT_RDMA=OFF ..
cmake --build .
src/CMakeLists.txt
@@ -1,5 +1,3 @@
# should we use our own math functions
option(SUPPORT_RDMA "If support rdma" OFF)
@@ -16,9 +14,9 @@
./socket/shm_mod_socket.cpp
./time_util.cpp
./psem.cpp
./svsem.cpp
./bus_error.cpp
./futex_sem.cpp
./svsem.cpp
./net/net_conn_pool.cpp
./net/net_mod_server_socket_wrapper.cpp
./net/net_mod_socket_wrapper.cpp
@@ -28,7 +26,6 @@
./shm/shm_mm_wrapper.cpp
./shm/mm.cpp
./shm/hashtable.cpp
    )
@@ -62,13 +59,14 @@
./time_util.h
./futex_sem.h
./bus_error.h
./svsem.h
./bus_def.h
./logger_factory.h
./queue/linked_lock_free_queue.h
./queue/array_lock_free_queue2.h
./queue/array_lock_free_queue.h
./queue/shm_queue.h
./queue/array_lock_free_sem_queue.h
./queue/lock_free_queue.h
./svsem.h
./net/net_conn_pool.h
./net/net_mod_socket.h
./net/net_mod_server_socket_wrapper.h
@@ -82,6 +80,7 @@
./shm/shm_allocator.h
  DESTINATION include)
install(FILES "${PROJECT_BINARY_DIR}/src/bus_config.h"
src/bus_def.h
New file
@@ -0,0 +1,7 @@
#ifndef _BUS_DEF_H_
#define _BUS_DEF_H_
#define BUS_TIMEOUT_FLAG  1
#define BUS_NOWAIT_FLAG  1 << 1
#endif
src/net/net_mod_server_socket.cpp
@@ -54,7 +54,7 @@
  sprintf(portstr, "%d", port);
  listenfd = open_listenfd(portstr);
  if(listenfd < 0) {
    LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start . errno=%d ", errno);
    LoggerFactory::getLogger()->error(errno, "NetModServerSocket::start. port = %d ", port);
    return -1;
  }
  init_pool(listenfd);
src/net/net_mod_socket.cpp
@@ -513,10 +513,12 @@
 * @return 0 成功, 其他值 失败的错误码
*/
int NetModSocket::recvfrom(void **buf, int *size, int *key) {
  logger->debug(" %d NetModSocket::recvfrom before", get_key());
  int rv = shmModSocket.recvfrom(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    logger->debug("NetModSocket::recvfrom: <<<< %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
@@ -533,7 +535,7 @@
  struct timespec timeout = {sec, nsec};
  int rv = shmModSocket.recvfrom_timeout(buf, size, key, &timeout);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    logger->debug("NetModSocket::recvfrom_timeout:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
@@ -549,7 +551,7 @@
int NetModSocket::recvfrom_nowait(void **buf, int *size, int *key){
  int rv = shmModSocket.recvfrom_nowait(buf, size, key);
  if(rv == 0) {
    logger->debug("NetModSocket::recvfrom:  %d recvfrom %d success.\n", get_key(), *key);
    logger->debug("NetModSocket::recvfrom_nowait:  %d recvfrom %d success.\n", get_key(), *key);
    return 0;
  }
src/net/net_mod_socket_wrapper.cpp
@@ -67,8 +67,13 @@
 * @return 0 成功, 其他值 失败的错误码
*/
int net_mod_socket_recvfrom(void *_socket, void **buf, int *size, int *key){
    int rv;
    NetModSocket *sockt = (NetModSocket *)_socket;
    return sockt->recvfrom(buf, size, key);
    logger->debug(" %d net_mod_socket_recvfrom before", net_mod_socket_get_key(_socket));
    rv = sockt->recvfrom(buf, size, key);
    logger->debug(" %d net_mod_socket_recvfrom after. rv = %d", net_mod_socket_get_key(_socket), rv);
    return rv;
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int net_mod_socket_recvfrom_timeout(void *_socket, void **buf, int *size, int *key, int sec, int nsec){
src/queue/array_lock_free_queue.h
@@ -1,5 +1,6 @@
#ifndef __ARRAY_LOCK_FREE_QUEUE_H__
#define __ARRAY_LOCK_FREE_QUEUE_H__
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
@@ -18,15 +19,15 @@
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
template <typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeQueue
{
class ArrayLockFreeQueue {
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_, 
        typename Allocator_,
        template <typename T, typename AT> class Q_TYPE
        >
    friend class LockFreeQueue;
  friend
  class LockFreeQueue;
private:
    /// @brief constructor of the class
@@ -99,8 +100,7 @@
}
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
{
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() {
    // std::cout << "destroy ArrayLockFreeQueue\n";
    Allocator::deallocate(m_theQueue);
    
@@ -108,8 +108,7 @@
template <typename ELEM_T, typename Allocator>
inline 
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
{
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) {
    // if Q_SIZE is a power of 2 this statement could be also written as 
    // return (a_count & (Q_SIZE - 1));
    return (a_count % Q_SIZE);
@@ -117,8 +116,7 @@
template <typename ELEM_T, typename Allocator>
inline 
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
{
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return m_count;
@@ -152,8 +150,7 @@
template <typename ELEM_T, typename Allocator>
inline 
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
{
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count == (Q_SIZE));
@@ -177,8 +174,7 @@
template <typename ELEM_T, typename Allocator>
inline 
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
{
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count == 0);
@@ -198,18 +194,12 @@
}
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
{
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) {
    uint32_t currentReadIndex;
    uint32_t currentWriteIndex;
    do
    {
  do {
        currentWriteIndex = m_writeIndex;
        currentReadIndex  = m_readIndex;
@@ -235,8 +225,7 @@
    // inserting in the queue. It might fail if there are more than 1 producer threads because this
    // operation has to be done in the same order as the previous CAS
    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
    {
  while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) {
        // this is a good place to yield the thread in case there are more
        // software threads than hardware processors and you have more
        // than 1 producer thread
@@ -252,13 +241,11 @@
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
{
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) {
    uint32_t currentMaximumReadIndex;
    uint32_t currentReadIndex;
    do
    {
  do {
        // to ensure thread-safety when there is more than 1 producer thread
        // a second index is defined (m_maximumReadIndex)
        currentReadIndex        = m_readIndex;
@@ -284,8 +271,7 @@
        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
        {
    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) {
        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
            // m_count.fetch_sub(1);
            AtomicSub(&m_count, 1);
@@ -307,13 +293,12 @@
}
template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
ELEM_T &ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) {
    int currentCount = m_count;
    uint32_t currentReadIndex = m_readIndex;
    if (i >= currentCount)
    {
        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
  if (i >= currentCount) {
    std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i
              << " is out of range\n";
        std::exit(EXIT_FAILURE);
    }
    return m_theQueue[countToIndex(currentReadIndex+i)];
src/queue/array_lock_free_sem_queue.h
@@ -8,7 +8,7 @@
#include "shm_allocator.h"
#include "futex_sem.h"
#include "time_util.h"
#include "bus_def.h"
/// @brief implementation of an array based lock free queue with support for
///        multiple producers
@@ -17,8 +17,7 @@
/// you must use the ArrayLockFreeSemQueue fachade:
///   ArrayLockFreeSemQueue<int, 100, ArrayLockFreeSemQueue> q;
#define LOCK_FREE_QUEUE_TIMEOUT  1
#define LOCK_FREE_QUEUE_NOWAIT  1 << 1
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
@@ -207,15 +206,17 @@
  uint32_t currentWriteIndex;
  int s;
  // sigset_t mask_all, pre;
  // sigfillset(&mask_all);
  do
  {
    currentWriteIndex = m_writeIndex;
    currentReadIndex  = m_readIndex;
  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    if (m_count == Q_SIZE) {
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
@@ -235,9 +236,9 @@
    if (currentReadIndex == currentWriteIndex - Q_SIZE  + 1   )
    {
        // the queue is full
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG)
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
@@ -260,10 +261,11 @@
  // We know now that this index is reserved for us. Use it to save the data
  m_theQueue[countToIndex(currentWriteIndex)] = a_data;
  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
  // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
  // inserting in the queue. It might fail if there are more than 1 producer threads because this
  // operation has to be done in the same order as the previous CAS
  while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
  {
    // this is a good place to yield the thread in case there are more
@@ -283,6 +285,7 @@
      err_exit(errno, "futex-FUTEX_WAKE");  
#endif
  // sigprocmask(SIG_SETMASK, &pre, NULL);
  return 0;
}
@@ -293,6 +296,11 @@
  uint32_t currentMaximumReadIndex;
  uint32_t currentReadIndex;
  int s;
  // sigset_t mask_all, pre;
  // sigfillset(&mask_all);
  // sigprocmask(SIG_BLOCK, &mask_all, &pre);
  do
  {
@@ -305,19 +313,23 @@
    if (m_count == 0) {
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        // sigprocmask(SIG_SETMASK, &pre, NULL);
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
      }
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
        }
            
      } else {
        s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
        }
      }
@@ -330,19 +342,23 @@
      // the queue is empty or
      // a producer thread has allocate space in the queue but is
      // waiting to commit the data into it
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
      if( (flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        // sigprocmask(SIG_SETMASK, &pre, NULL);
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
      }
      else if( (flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
        }
            
      } else {
        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
         // sigprocmask(SIG_SETMASK, &pre, NULL);
          return -1;
        }
      }
@@ -367,6 +383,7 @@
        err_exit(errno, "futex-FUTEX_WAKE");
    #endif
     
      // sigprocmask(SIG_SETMASK, &pre, NULL);
      return 0;
    }
src/queue/lock_free_queue.h
@@ -12,7 +12,7 @@
#include "shm_allocator.h"
#include "psem.h"
#include "bus_error.h"
#include "bus_def.h"
// default Queue size
#define LOCK_FREE_Q_DEFAULT_SIZE 16
@@ -76,23 +76,23 @@
    typename Allocator = SHM_Allocator,
    template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
     >
class LockFreeQueue
{
class LockFreeQueue {
private:
    sem_t slots;  
    sem_t items;
public:
    sem_t mutex;
    LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
    
    /// @brief destructor of the class. 
    /// Note it is not virtual since it is not expected to inherit from this
    /// template
    ~LockFreeQueue();
    std::atomic_uint reference;    
    /// @brief constructor of the class
   
@@ -123,20 +123,17 @@
    /// structures to be inserted in the queue you should think of instantiate the template
    /// of the queue as a pointer to that large structure
    /// @return true if the element was inserted in the queue. False if the queue was full
    int push(const ELEM_T &a_data);
    int push_nowait(const ELEM_T &a_data);
    int push_timeout(const ELEM_T &a_data, const struct timespec * timeout);
  int push(const ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
    /// @brief pop the element at the head of the queue
    /// @param a reference where the element in the head of the queue will be saved to
    /// Note that the a_data parameter might contain rubbish if the function returns false
    /// @return true if the element was successfully extracted from the queue. False if the queue was empty
    int pop(ELEM_T &a_data);
    int pop_nowait(ELEM_T &a_data);
    int pop_timeout(ELEM_T &a_data, struct timespec * timeout);
  int pop(ELEM_T &a_data, const struct timespec *timeout = NULL, int flag = 0);
    void *operator new(size_t size);
    void operator delete(void *p);
protected:
@@ -154,8 +151,7 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
{
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) {
// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
    if (sem_init(&slots, 1, qsize) == -1)
        err_exit(errno, "LockFreeQueue sem_init");
@@ -167,12 +163,12 @@
   
}
template <
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
{
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() {
    // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
    if(sem_destroy(&slots) == -1) {
        err_exit(errno, "LockFreeQueue sem_destroy");
@@ -189,8 +185,7 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
{
inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() {
    return m_qImpl.size();
}  
@@ -198,8 +193,7 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
{
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() {
    return m_qImpl.full();
}
@@ -207,22 +201,30 @@
    typename ELEM_T, 
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
{
inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() {
    return m_qImpl.empty();
}  
template <
    typename ELEM_T,
template<typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) {
 LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");   
    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        if (psem_trywait(&slots) == -1) {
            return -1;
        }
    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        if (psem_timedwait(&slots, timeout) == -1) {
            return -1;
        }
    } else {
    if (psem_wait(&slots) == -1) {
        return -1;
    }
    }
    
    if ( m_qImpl.push(a_data) ) {
        psem_post(&items);   
@@ -233,58 +235,28 @@
    
}
template <
    typename ELEM_T,
template<typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (psem_trywait(&slots) == -1) {
        return -1;
    }
    if ( m_qImpl.push(a_data)) {
        psem_post(&items);
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
    if ( psem_timedwait(&slots, ts) == -1) {
        return -1;
    }
    if (m_qImpl.push(a_data)){
        psem_post(&items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) {
  LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if ((flag & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
        if (psem_trywait(&items) == -1) {
            return -1;
        }
    } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
        if (psem_timedwait(&items, timeout) == -1) {
            return -1;
        }
    } else {
    if (psem_wait(&items) == -1) {
        return -1;
    }
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
@@ -294,45 +266,7 @@
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (psem_trywait(&items) == -1) {
        return -1;
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
    if (psem_timedwait(&items, ts) == -1) {
       return -1;
    }
    if (m_qImpl.pop(a_data)) {
        psem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");
        return 0;
    }
    return -1;
}
template <
    typename ELEM_T,
template<typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
@@ -340,16 +274,14 @@
}
template <
    typename ELEM_T,
template<typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
        return Allocator::allocate(size);
}
template <
    typename ELEM_T,
template<typename ELEM_T,
    typename Allocator,
    template <typename T, typename AT> class Q_TYPE>
void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
src/queue/shm_queue.h
@@ -12,6 +12,7 @@
#include "shm_allocator.h"
#include "usg_common.h"
#include "array_lock_free_sem_queue.h"
#include "lock_free_queue.h"
#include "bus_error.h"
template <typename ELEM_T> class SHMQueue {
@@ -51,7 +52,7 @@
  /// @brief the actual queue-> methods are forwarded into the real
  ///        implementation
  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *queue;
  LockFreeQueue<ELEM_T, SHM_Allocator> *queue;
private:
  /// @brief disable copy constructor declaring it private
@@ -64,7 +65,7 @@
  hashtable_t *hashtable = mm_get_hashtable();
  std::set<int> *keyset = hashtable_keyset(hashtable);
  std::set<int>::iterator keyItr;
  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
  bool found;
  size_t count = 0;
  for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
@@ -77,7 +78,7 @@
    }
    if (!found) {
      // 销毁共享内存的queue
      mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
      mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
      delete mqueue;
      hashtable_remove(hashtable, *keyItr);
      count++;
@@ -91,11 +92,11 @@
template <typename ELEM_T>
size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) {
  hashtable_t *hashtable = mm_get_hashtable();
  ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *mqueue;
  LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue;
  size_t count = 0;
  for(int i = 0; i< length; i++) {
    // 销毁共享内存的queue
    mqueue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
    mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]);
    delete mqueue;
    hashtable_remove(hashtable, keys[i]);
    count++;
@@ -113,9 +114,9 @@
SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) {
  hashtable_t *hashtable = mm_get_hashtable();
  queue = (ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
  queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
  if (queue == NULL || (void *)queue == (void *)1) {
    queue = new ArrayLockFreeSemQueue<ELEM_T, SHM_Allocator>(qsize);
    queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
    hashtable_put(hashtable, key, (void *)queue);
  }
  // queue->reference++;
@@ -155,7 +156,7 @@
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) {
  int rv =  queue->push(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
  int rv =  queue->push(a_data, NULL, BUS_NOWAIT_FLAG);
  if(rv == -1) {
    if (errno == EAGAIN)
      return EAGAIN;
@@ -170,7 +171,7 @@
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec *timeout) {
  int rv = queue->push(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
  int rv = queue->push(a_data, timeout, BUS_TIMEOUT_FLAG);
  if(rv == -1) {
    if(errno == ETIMEDOUT)
        return EBUS_TIMEOUT;
@@ -195,7 +196,7 @@
template <typename ELEM_T>
inline int SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) {
  int rv = queue->pop(a_data, NULL, LOCK_FREE_QUEUE_NOWAIT);
  int rv = queue->pop(a_data, NULL, BUS_NOWAIT_FLAG);
  if(rv == -1) {
    if (errno == EAGAIN)
@@ -213,7 +214,7 @@
inline int SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec *timeout) {
  int rv;
  rv = queue->pop(a_data, timeout, LOCK_FREE_QUEUE_TIMEOUT);
  rv = queue->pop(a_data, timeout, BUS_TIMEOUT_FLAG);
  if(rv == -1) {
    if (errno == ETIMEDOUT) {
      return EBUS_TIMEOUT;
src/socket/shm_mod_socket.cpp
@@ -51,7 +51,7 @@
}
// 发送信息立刻返回。
int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
    return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
    return shm_sendto(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG);
}
 
@@ -74,7 +74,7 @@
}
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
    int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
    int rv =  shm_recvfrom(shm_socket, buf, size, key, NULL, (int)BUS_NOWAIT_FLAG);
    // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!");
  return rv;
}
@@ -92,7 +92,7 @@
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
}
int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
@@ -103,7 +103,7 @@
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)BUS_NOWAIT_FLAG);
}
@@ -123,7 +123,7 @@
    return _sub_(topic, size, key, timeout, 0);
}
int  ShmModSocket::sub_nowait(char *topic, int size, int key) {
    return _sub_(topic, size, key, NULL,  (int)SHM_MSG_NOWAIT);
    return _sub_(topic, size, key, NULL,  (int)BUS_NOWAIT_FLAG);
}
@@ -142,7 +142,7 @@
    return _desub_(topic, size, key, timeout, 0);
}
int  ShmModSocket::desub_nowait(char *topic, int size, int key) {
    return _desub_(topic, size, key, NULL,  (int)SHM_MSG_NOWAIT);
    return _desub_(topic, size, key, NULL,  (int)BUS_NOWAIT_FLAG);
}
@@ -161,7 +161,7 @@
    return _pub_( topic, topic_size, content, content_size, key, timeout, 0);
}
int  ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
    return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT);
    return _pub_(topic, topic_size, content, content_size, key, NULL, (int)BUS_NOWAIT_FLAG);
}
src/socket/shm_socket.cpp
@@ -373,9 +373,9 @@
  memcpy(dest.buf, buf, size);
 
  if(flags & SHM_MSG_NOWAIT != 0) {
  if( (flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
    rv = remoteQueue->push_nowait(dest);
  } else if(timeout != NULL) {
  } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
      rv = remoteQueue->push_timeout(dest, timeout);
  } else {
      rv = remoteQueue->push(dest);
@@ -393,8 +393,6 @@
      logger->error(rv, "sendto key %d failed", key);
    }
    return rv;
  }
}
@@ -433,9 +431,9 @@
  shm_msg_t src;
 
   if(flags & SHM_MSG_NOWAIT != 0) {
   if((flags & BUS_NOWAIT_FLAG) == BUS_NOWAIT_FLAG) {
    rv = socket->queue->pop_nowait(src);
  } else if(timeout != NULL) {
  } else if((flags & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) {
    rv = socket->queue->pop_timeout(src, timeout);
// printf("0 shm_recvfrom====%d\n", rv);
  } else {
@@ -649,7 +647,7 @@
    switch (src.type) {
    case SHM_SOCKET_OPEN:
      socket->acceptQueue->push_timeout(src, &timeout);
      socket->acceptQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
      break;
    case SHM_SOCKET_CLOSE:
      _server_close_conn_to_client(socket, src.key);
@@ -660,7 +658,7 @@
      if (iter != socket->clientSocketMap->end()) {
        client_socket = iter->second;
        // print_msg("_server_run_msg_rev push before", src);
        client_socket->messageQueue->push_timeout(src, &timeout);
        client_socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
        // print_msg("_server_run_msg_rev push after", src);
      }
@@ -695,7 +693,7 @@
      _client_close_conn_to_server(socket);
      break;
    case SHM_COMMON_MSG:
      socket->messageQueue->push_timeout(src, &timeout);
      socket->messageQueue->push(src, &timeout, BUS_TIMEOUT_FLAG);
      break;
    default:
       logger->error( "shm_socket._client_run_msg_rev: undefined message type.");
src/socket/shm_socket.h
@@ -6,11 +6,7 @@
#include "shm_queue.h"
#include "lock_free_queue.h"
enum shm_socket_flag_t
{
  SHM_MSG_TIMEOUT = 1,
  SHM_MSG_NOWAIT = 2
};
enum shm_connection_status_t {
    SHM_CONN_CLOSED=1,
@@ -88,7 +84,7 @@
int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
/**
 * @flags : SHM_MSG_NOWAIT
 * @flags : BUS_NOWAIT_FLAG
 */
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0);
test_net_socket/net_mod_socket.sh
@@ -36,7 +36,7 @@
}
function close() {
    ps -ef | grep -e "test_net_mod_socket" -e "net_mod_socket"| awk  '{print $2}' | xargs -i kill -9 {}
    ps -ef | grep -e "test_net_mod_socket" -e "heart_beat"| awk  '{print $2}' | xargs -i kill -9 {}
    ipcrm -a
}
@@ -48,8 +48,6 @@
case ${1} in
  "server")
    close
    sleep 2
  server
  ;;
  "client")
test_net_socket/test_net_mod_socket.cpp
@@ -76,11 +76,14 @@
  void *recvbuf;
  int size;
  int key;
  while (net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) == 0) {
  int rv;
  while ((rv = net_mod_socket_recvfrom( sockt, &recvbuf, &size, &key) ) == 0) {
    printf("收到订阅消息:%s\n", recvbuf);
    free(recvbuf);
  }
  
  printf("print_sub_msg return . rv = %d\n", rv);
}
test_socket/CMakeLists.txt
New file
@@ -0,0 +1,11 @@
# add the executable
add_executable(bus_test bus_test.cpp)
target_link_libraries(bus_test PRIVATE shm_queue  ${EXTRA_LIBS} )
target_include_directories(bus_test PRIVATE
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
                            )
test_socket/bus_test.cpp
New file
@@ -0,0 +1,116 @@
#include "bus_server_socket.h"
#include "shm_mod_socket.h"
#include "shm_mm_wrapper.h"
#include "usg_common.h"
#include "mm.h"
BusServerSocket * server_socket;
void sigint_handler(int sig) {
   exit(0);
}
void server(int key) {
  server_socket = new BusServerSocket();
  server_socket->bind( key);
  server_socket->start();
}
void *run_recv(void *skptr) {
  pthread_detach(pthread_self());
  void *recvbuf;
  int size;
  int key;
  ShmModSocket *sk = (ShmModSocket *)skptr;
  while (sk->recvfrom( &recvbuf, &size, &key) == 0) {
    printf("收到订阅消息:%s\n", recvbuf);
    free(recvbuf);
  }
}
void client(int key) {
  ShmModSocket *sk = new ShmModSocket();
  pthread_t tid;
  pthread_create(&tid, NULL, run_recv, (void *)socket);
  int size;
  char action[512];
  char topic[512];
  char content[512];
  long i = 0;
  while (true) {
    //printf("Usage: pub <topic> [content] or sub <topic>\n");
    printf("Can I help you? sub, pub, desub or quit\n");
    scanf("%s",action);
    if(strcmp(action, "sub") == 0) {
      printf("Please input topic!\n");
      scanf("%s", topic);
      if (sk->sub(topic, strlen(topic),  key) == 0) {
         printf("%d Sub success!\n", sk->get_key());
      } else {
        printf("Sub failture!\n");
        exit(0);
      }
    } else if(strcmp(action, "desub") == 0) {
      printf("Please input topic!\n");
      scanf("%s", topic);
      if (sk->desub(topic, strlen(topic),  key) == 0) {
         printf("%d Desub success!\n", sk->get_key());
      } else {
        printf("Desub failture!\n");
        exit(0);
      }
    } else if(strcmp(action, "pub") == 0) {
      // printf("%s %s %s\n", action, topic, content);
      printf("Please input topic and content\n");
      scanf("%s %s", topic, content);
      if(sk->pub(topic, strlen(topic)+1, content, strlen(content)+1,  key) == 0){
        printf("%d Pub success!\n", sk->get_key());
      } else {
        printf("Pub failture!\n");
      }
    } else if(strcmp(action, "quit") == 0) {
      printf("(%d) quit\n", sk->get_key());
      delete sk;
      break;
    } else {
      printf("error input argument\n");
      continue;
    }
  }
}
int main(int argc, char *argv[]) {
  shm_mm_wrapper_init(512);
  int key;
  if (argc < 3) {
    fprintf(stderr, "Usage: %s %s|%s  <key> ...\n", argv[0], "server", "client");
    return 1;
  }
  key = atoi(argv[2]);
  if (strcmp("server", argv[1]) == 0) {
   server(key);
  } else if (strcmp("client", argv[1]) == 0) {
    client(key);
  }
  return 0;
}
test_socket/dgram_mod_bus.cpp
File was deleted