wangzhengquan
2021-01-21 4fd62552d8277f3d0ed20e66663cd219c36796df
update
2个文件已删除
6个文件已添加
12 文件已重命名
7个文件已修改
1049 ■■■■ 已修改文件
src/CMakeLists.txt 91 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_conn_pool.cpp 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_conn_pool.h 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_server_socket.cpp 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_server_socket.h 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_server_socket_wrapper.cpp 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_server_socket_wrapper.h 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.h 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_io.cpp 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_io.h 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.h 补丁 | 查看 | 原始文档 | blame | 历史
src/psem.cpp 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/psem.h 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/px_sem_util.cpp 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/px_sem_util.h 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_queue2.h 176 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 55 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.cpp 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/svsem_util.cpp 253 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/svsem_util.h 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/CMakeLists.txt 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/svsem_test.cpp 242 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/CMakeLists.txt 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/heart_beat.sh 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/svsem_mon.cpp 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/CMakeLists.txt
@@ -8,24 +8,24 @@
configure_file(bus_config.h.in bus_config.h)
add_library(shm_queue 
        logger_factory.cpp
        socket/bus_server_socket.cpp
        socket/bus_server_socket_wrapper.cpp
        socket/shm_stream_mod_socket.cpp
        socket/shm_socket.cpp
        socket/net_conn_pool.cpp
        socket/shm_mod_socket.cpp
        socket/net_mod_server_socket_wrapper.cpp
        socket/net_mod_socket_wrapper.cpp
        socket/net_mod_socket.cpp
        socket/net_mod_socket_io.cpp
        socket/net_mod_server_socket.cpp
        bus_error.cpp
        shm/shm_mm_wrapper.cpp
        shm/mm.cpp
        shm/hashtable.cpp
        px_sem_util.cpp
    svsem_util.cpp
  ./logger_factory.cpp
  ./socket/bus_server_socket.cpp
  ./socket/bus_server_socket_wrapper.cpp
  ./socket/shm_stream_mod_socket.cpp
  ./socket/shm_socket.cpp
  ./socket/shm_mod_socket.cpp
  ./psem.cpp
  ./svsem_util.cpp
  ./bus_error.cpp
  ./net/net_conn_pool.cpp
  ./net/net_mod_server_socket_wrapper.cpp
  ./net/net_mod_socket_wrapper.cpp
  ./net/net_mod_socket.cpp
  ./net/net_mod_socket_io.cpp
  ./net/net_mod_server_socket.cpp
  ./shm/shm_mm_wrapper.cpp
  ./shm/mm.cpp
  ./shm/hashtable.cpp
    )
@@ -39,6 +39,7 @@
                           ${CMAKE_CURRENT_SOURCE_DIR}/shm
                           ${CMAKE_CURRENT_SOURCE_DIR}/queue
                           ${CMAKE_CURRENT_SOURCE_DIR}/socket
                           ${CMAKE_CURRENT_SOURCE_DIR}/net
                           )
 
target_link_libraries(shm_queue PUBLIC  ${EXTRA_LIBS} )
@@ -46,32 +47,34 @@
# install rules
install(TARGETS shm_queue DESTINATION lib)
install(FILES 
  socket/socket_def.h
  socket/net_conn_pool.h
  socket/bus_server_socket.h
  socket/shm_socket.h
  socket/net_mod_socket.h
  socket/shm_stream_mod_socket.h
  socket/net_mod_server_socket_wrapper.h
  socket/net_mod_socket_io.h
  socket/net_mod_server_socket.h
  socket/shm_mod_socket.h
  socket/net_mod_socket_wrapper.h
  socket/bus_server_socket_wrapper.h
  key_def.h
  bus_error.h
  px_sem_util.h
  logger_factory.h
  queue/linked_lock_free_queue.h
  queue/array_lock_free_queue2.h
  queue/array_lock_free_queue.h
  queue/shm_queue.h
  queue/lock_free_queue.h
  shm/hashtable.h
  shm/mem_pool.h
  shm/mm.h
  shm/shm_mm_wrapper.h
  shm/shm_allocator.h
 ./socket/socket_def.h
./socket/bus_server_socket.h
./socket/shm_socket.h
./socket/shm_stream_mod_socket.h
./socket/shm_mod_socket.h
./socket/bus_server_socket_wrapper.h
./psem.h
./key_def.h
./bus_error.h
./svsem_util.h
./logger_factory.h
./queue/linked_lock_free_queue.h
./queue/array_lock_free_queue2.h
./queue/array_lock_free_queue.h
./queue/shm_queue.h
./queue/lock_free_queue.h
./net/net_conn_pool.h
./net/net_mod_socket.h
./net/net_mod_server_socket_wrapper.h
./net/net_mod_socket_io.h
./net/net_mod_server_socket.h
./net/net_mod_socket_wrapper.h
./shm/hashtable.h
./shm/mem_pool.h
./shm/mm.h
./shm/shm_mm_wrapper.h
./shm/shm_allocator.h
  DESTINATION include)
src/net/net_conn_pool.cpp
src/net/net_conn_pool.h
src/net/net_mod_server_socket.cpp
src/net/net_mod_server_socket.h
src/net/net_mod_server_socket_wrapper.cpp
src/net/net_mod_server_socket_wrapper.h
src/net/net_mod_socket.cpp
src/net/net_mod_socket.h
src/net/net_mod_socket_io.cpp
src/net/net_mod_socket_io.h
src/net/net_mod_socket_wrapper.cpp
src/net/net_mod_socket_wrapper.h
src/psem.cpp
New file
@@ -0,0 +1,57 @@
#include "psem.h"
#include <semaphore.h>
#define NANO 1000000000
static struct timespec psem_calc_abs_timeout(const struct timespec *ts) {
    struct timespec res;
  struct timespec timeout;
  if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
      err_exit(errno, "clock_gettime");
  res.tv_sec = timeout.tv_sec + ts->tv_sec;
  res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
  res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
  res.tv_nsec = res.tv_nsec % NANO;
  return res;
}
int psem_timedwait(sem_t *sem, const struct timespec *ts) {
    struct timespec abs_timeout = psem_calc_abs_timeout(ts);
  int rv ;
  while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) {
      if(errno == EINTR)
          continue;
      else {
         // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
         return rv;
      }
  }
  return 0;
}
int psem_wait(sem_t *sem) {
  int rv;
  while ( (rv = sem_wait(sem)) == -1) {
      if(errno == EINTR)
          continue;
      else {
         // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
         return rv;
      }
  }
  return 0;
}
int psem_trywait(sem_t *sem) {
    return sem_trywait(sem);
}
int psem_post(sem_t *sem) {
    return sem_post(sem);
}
src/psem.h
New file
@@ -0,0 +1,14 @@
#ifndef _PSEM_H_
#define _PSEM_H_
#include "usg_common.h"
int psem_wait(sem_t *sem) ;
int psem_timedwait(sem_t *sem, const struct timespec *ts);
int psem_trywait(sem_t *sem) ;
int psem_post(sem_t *sem);
#endif
src/px_sem_util.cpp
File was deleted
src/px_sem_util.h
File was deleted
src/queue/array_lock_free_queue2.h
@@ -1,9 +1,11 @@
#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
#include "atomic_ops.h"
#include <assert.h> // assert()
#include <sched.h>  // sched_yield()
#include "logger_factory.h"
#include "mem_pool.h"
#include "shm_allocator.h"
/// @brief implementation of an array based lock free queue with support for 
///        multiple producers
@@ -15,13 +17,15 @@
#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator = SHM_Allocator>
class ArrayLockFreeQueue
{
    // ArrayLockFreeQueue will be using this' private members
    template <
        typename ELEM_T_, 
        template <typename T> class Q_TYPE >
        typename Allocator_,
        template <typename T, typename AT> class Q_TYPE
        >
    friend class LockFreeQueue;
private:
@@ -52,10 +56,10 @@
    ELEM_T *m_theQueue;
    /// @brief where a new element will be inserted
    std::atomic<uint32_t> m_writeIndex;
    uint32_t m_writeIndex;
    /// @brief where the next element where be extracted from
    std::atomic<uint32_t> m_readIndex;
    uint32_t m_readIndex;
    
    /// @brief maximum read index for multiple producer queues
    /// If it's not the same as m_writeIndex it means
@@ -65,23 +69,23 @@
    /// to wait for those other threads to save the data into the queue
    ///
    /// note this is only used for multiple producers
    std::atomic<uint32_t> m_maximumReadIndex;
    uint32_t m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    /// @brief number of elements in the queue
    std::atomic<uint32_t> m_count;
    uint32_t m_count;
#endif
   
    
private:
    /// @brief disable copy constructor declaring it private
    ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
};
template <typename ELEM_T>
ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
    Q_SIZE(qsize),
    m_writeIndex(0),      // initialisation is not atomic
    m_readIndex(0),       //
@@ -90,38 +94,38 @@
    ,m_count(0)           //
#endif
{
    m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T));
    m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
}
template <typename ELEM_T>
ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
template <typename ELEM_T, typename Allocator>
ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
{
    // std::cout << "destroy ArrayLockFreeQueue\n";
    mm_free(m_theQueue);
    Allocator::deallocate(m_theQueue);
    
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count)
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
{
    // if Q_SIZE is a power of 2 this statement could be also written as 
    // return (a_count & (Q_SIZE - 1));
    return (a_count % Q_SIZE);
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
uint32_t ArrayLockFreeQueue<ELEM_T>::size()
uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return m_count.load();
    return m_count;
#else
    uint32_t currentWriteIndex = m_maximumReadIndex.load();
    uint32_t currentReadIndex  = m_readIndex.load();
    uint32_t currentWriteIndex = m_maximumReadIndex;
    uint32_t currentReadIndex  = m_readIndex;
    // let's think of a scenario where this function returns bogus data
    // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
@@ -146,13 +150,13 @@
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
bool ArrayLockFreeQueue<ELEM_T>::full()
bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count.load() == (Q_SIZE));
    return (m_count == (Q_SIZE));
#else
    uint32_t currentWriteIndex = m_writeIndex;
@@ -171,16 +175,16 @@
#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
}
template <typename ELEM_T>
template <typename ELEM_T, typename Allocator>
inline 
bool ArrayLockFreeQueue<ELEM_T>::empty()
bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
{
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    return (m_count.load() == 0);
    return (m_count == 0);
#else
    if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load()))
    if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
    {
        // the queue is full
        return true;
@@ -194,54 +198,44 @@
}
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
{
    uint32_t currentReadIndex;
    uint32_t currentWriteIndex;
    do
    {
        currentWriteIndex = m_writeIndex.load();
        currentReadIndex = m_readIndex.load();
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count.load() == Q_SIZE) {
        currentWriteIndex = m_writeIndex;
        currentReadIndex  = m_readIndex;
    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count == Q_SIZE) {
            return false;
        }
#else
    #else
        if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
        {
            // the queue is full
            return false;
        }
#endif
    // There is more than one producer. Keep looping till this thread is able
    // to allocate space for current piece of data
    //
    // using compare_exchange_strong because it isn't allowed to fail spuriously
    // When the compare_exchange operation is in a loop the weak version
    // will yield better performance on some platforms, but here we'd have to
    // load m_writeIndex all over again
    } while (!m_writeIndex.compare_exchange_strong(
                currentWriteIndex, (currentWriteIndex + 1)));
    // Just made sure this index is reserved for this thread.
    #endif
    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
    // We know now that this index is reserved for us. Use it to save the data
    m_theQueue[countToIndex(currentWriteIndex)] = a_data;
    //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) );
    // update the maximum read index after saving the piece of data. It can't
    // fail if there is only one thread inserting in the queue. It might fail
    // if there is more than 1 producer thread because this operation has to
    // be done in the same order as the previous CAS
    //
    // using compare_exchange_weak because they are allowed to fail spuriously
    // (act as if *this != expected, even if they are equal), but when the
    // compare_exchange operation is in a loop the weak version will yield
    // better performance on some platforms.
    while (!m_maximumReadIndex.compare_exchange_weak(
                currentWriteIndex, (currentWriteIndex + 1)))
    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
    // inserting in the queue. It might fail if there are more than 1 producer threads because this
    // operation has to be done in the same order as the previous CAS
    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
    {
        // this is a good place to yield the thread in case there are more
        // software threads than hardware processors and you have more
@@ -250,37 +244,35 @@
        sched_yield();
    }
    // The value was successfully inserted into the queue
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    m_count.fetch_add(1);
    AtomicAdd(&m_count, 1);
#endif
    return true;
}
template <typename ELEM_T>
bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
template <typename ELEM_T, typename Allocator>
bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
{
    uint32_t currentMaximumReadIndex;
    uint32_t currentReadIndex;
    do
    {
        currentReadIndex = m_readIndex.load();
        currentMaximumReadIndex = m_maximumReadIndex.load();
        // to ensure thread-safety when there is more than 1 producer thread
        // a second index is defined (m_maximumReadIndex)
        currentReadIndex        = m_readIndex;
        currentMaximumReadIndex = m_maximumReadIndex;
    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
     #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
        if (m_count.load() == 0) {
        if (m_count == 0) {
            return false;
        }
    #else
        // to ensure thread-safety when there is more than 1 producer
        // thread a second index is defined (m_maximumReadIndex)
        if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
        {
            // the queue is empty or
            // a producer thread has allocate space in the queue but is
            // a producer thread has allocate space in the queue but is
            // waiting to commit the data into it
            return false;
        }
@@ -288,23 +280,22 @@
        // retrieve the data from the queue
        a_data = m_theQueue[countToIndex(currentReadIndex)];
        //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) );
        // try to perfrom now the CAS operation on the read index. If we succeed
        // a_data already contains what m_readIndex pointed to before we
        // a_data already contains what m_readIndex pointed to before we
        // increased it
        if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1)))
        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
        {
            // got here. The value was retrieved from the queue. Note that the
            // data inside the m_queue array is not deleted nor reseted
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
            m_count.fetch_sub(1);
#endif
        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
            // m_count.fetch_sub(1);
            AtomicSub(&m_count, 1);
        #endif
            return true;
        }
        // it failed retrieving the element off the queue. Someone else must
        // have read the element stored at countToIndex(currentReadIndex)
        // before we could perform the CAS operation
        // before we could perform the CAS operation
    } while(1); // keep looping to try again!
@@ -312,18 +303,17 @@
    assert(0);
    // Add this return statement to avoid compiler warnings
    return false;
    return false;
}
template <typename ELEM_T>
ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i)
template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
    int currentCount = m_count.load();
    uint32_t currentReadIndex = m_readIndex.load();
    if (i < 0 || i >= currentCount)
    int currentCount = m_count;
    uint32_t currentReadIndex = m_readIndex;
    if (i >= currentCount)
    {
        std::cerr << "Error in array limits: " << i << " is out of range\n";
        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
        std::exit(EXIT_FAILURE);
    }
    return m_theQueue[countToIndex(currentReadIndex+i)];
src/queue/lock_free_queue.h
@@ -10,7 +10,7 @@
#include "sem_util.h"
#include "logger_factory.h"
#include "shm_allocator.h"
#include "px_sem_util.h"
#include "psem.h"
#include "bus_error.h"
// default Queue size
@@ -219,15 +219,15 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
{
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if (sem_wait(&slots) == -1) {
 LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");
    if (psem_wait(&slots) == -1) {
        err_msg(errno, "LockFreeQueue push");
        return errno;
    }
    
    if ( m_qImpl.push(a_data) ) {
        sem_post(&items);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        psem_post(&items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");
        return 0;
    }
    return -1;
@@ -240,7 +240,7 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
{
    if (sem_trywait(&slots) == -1) {
    if (psem_trywait(&slots) == -1) {
        if (errno == EAGAIN)
            return EAGAIN;
        else {
@@ -251,7 +251,7 @@
    }
    if ( m_qImpl.push(a_data)) {
        sem_post(&items);
        psem_post(&items);
        return 0;
    }
    return -1;
@@ -264,20 +264,12 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
{
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");
    int rv;
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
  // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld",
  //   timeout.tv_sec, timeout.tv_nsec);
    while ( sem_timedwait(&slots, &timeout) == -1) {
    //     LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n",
    // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno);
    if ( psem_timedwait(&slots, ts) == -1) {
        if(errno == ETIMEDOUT)
            return EBUS_TIMEOUT;
        else if(errno == EINTR)
            continue;
        else {
           LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
           return errno;
@@ -285,8 +277,8 @@
    }
    if (m_qImpl.push(a_data)){
        sem_post(&items);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
        psem_post(&items);
LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");
        return 0;
    }
    return -1;
@@ -303,15 +295,15 @@
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
{
 // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (sem_wait(&items) == -1) {
  LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
    if (psem_wait(&items) == -1) {
        LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop");
        return errno;
    }
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        psem_post(&slots);
 LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");
        return 0;
    }
    return -1;
@@ -323,7 +315,7 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
{
    if (sem_trywait(&items) == -1) {
    if (psem_trywait(&items) == -1) {
        if (errno == EAGAIN)
            return errno;
        else {
@@ -333,7 +325,7 @@
    }
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);
        psem_post(&slots);
        return 0;
    }
    return -1;
@@ -346,18 +338,11 @@
    template <typename T, typename AT> class Q_TYPE>
int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
{
    // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");
    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
    while (sem_timedwait(&items, &timeout) == -1) {
        // LoggerFactory::getLogger()->error(errno, "1 LockFreeQueue pop_timeout %d %d", errno, ETIMEDOUT);
    if (psem_timedwait(&items, ts) == -1) {
        if (errno == ETIMEDOUT) {
             // LoggerFactory::getLogger()->error(errno, "2 LockFreeQueue pop_timeout %d %d", errno, EBUS_TIMEOUT);
            return EBUS_TIMEOUT;
        }
        else if(errno == EINTR)
            continue;
        else {
          LoggerFactory::getLogger()->error(errno, "3  LockFreeQueue pop_timeout %d", errno);
          return errno;
@@ -365,7 +350,7 @@
    }
    if (m_qImpl.pop(a_data)) {
        sem_post(&slots);
        psem_post(&slots);
// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");     
        return 0;
    }
src/shm/hashtable.cpp
@@ -34,7 +34,13 @@
  hashtable->wlock = SemUtil::get(IPC_PRIVATE, 1);
  hashtable->cond = SemUtil::get(IPC_PRIVATE, 1);
  hashtable->readcnt = 0;
printf("hashtable->mutex=%d\n", hashtable->mutex);
  FILE * semfile = fopen("./sem.txt", "w+");
  if(semfile == NULL) {
    err_exit(errno, "fopen");
  }
  fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex);
  fclose(semfile);
}
void hashtable_destroy(hashtable_t *hashtable) {
@@ -202,9 +208,13 @@
}
void *hashtable_get(hashtable_t *hashtable, int key) {
  LoggerFactory::getLogger()->debug( "==========hashtable_get before 1");
  int rv;
  rv = SemUtil::dec(hashtable->mutex);
  LoggerFactory::getLogger()->debug( "==========hashtable_get before 2");
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get 1");
  }
@@ -248,13 +258,14 @@
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get 7");
  }
  LoggerFactory::getLogger()->debug( "==========hashtable_get after");
  return res;
}
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
  int rv;
  LoggerFactory::getLogger()->debug( "==========hashtable_put before");
  rv = SemUtil::dec(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
@@ -300,6 +311,8 @@
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
  LoggerFactory::getLogger()->debug( "==========hashtable_put after");
}
src/svsem_util.cpp
New file
@@ -0,0 +1,253 @@
#include "svsem_util.h"
int SvsemUtil::get(key_t key, int nsems, unsigned short * arr_val) {
// printf("==================SvsemUtil::get===============================\n");
  int semid, perms;
  perms = S_IRUSR | S_IWUSR;
  semid = semget(key, nsems, IPC_CREAT | IPC_EXCL | perms);
  if (semid != -1) { /* Successfully created the semaphore */
    union semun arg;
    struct sembuf sop;
    //logger.info("%ld: created semaphore\n", (long)getpid());
    arg.array = arr_val; /* So initialize it to arr_val */
    if (semctl(semid, 0, SETALL, arg) == -1)
      err_exit(errno, "semctl 1");
    //logger.info("%ld: initialized semaphore\n", (long)getpid());
    /* Perform a "no-op" semaphore operation - changes sem_otime
       so other processes can see we've initialized the set. */
    sop.sem_num = 0; /* Operate on semaphore 0 */
    sop.sem_op = arr_val[0];
    sop.sem_flg = 0;
    if (semop(semid, &sop, 1) == -1)
      err_exit(errno, "semop");
    //logger.info("%ld: completed dummy semop()\n", (long)getpid());
  } else { /* We didn't create the semaphore set */
    if (errno != EEXIST) { /* Unexpected error from semget() */
      err_exit(errno, "semget 1");
    } else { /* Someone else already created it */
      const int MAX_TRIES = 10;
      int j;
      union semun arg;
      struct semid_ds ds;
      semid = semget(key, nsems, perms); /* So just get ID */
      if (semid == -1)
        err_exit(errno, "semget 2");
     // logger.info("%ld: got semaphore key\n", (long)getpid());
      /* Wait until another process has called semop() */
      arg.buf = &ds;
      for (j = 0; j < MAX_TRIES; j++) {
        //logger.info("Try %d\n", j);
        if (semctl(semid, 0, IPC_STAT, arg) == -1)
          err_exit(errno, "semctl 2");
        if (ds.sem_otime != 0) /* Semop() performed? */
          break;               /* Yes, quit loop */
        sleep(1);              /* If not, wait and retry */
      }
      if (ds.sem_otime == 0) /* Loop ran to completion! */
        err_exit(errno, "Existing semaphore not initialized");
    }
  }
  return semid;
}
/* Reserve semaphore (blocking), return 0 on success, or -1 with 'errno'
   set to EINTR if operation was interrupted by a signal handler */
/* Reserve semaphore - decrement it by 1 */
int SvsemUtil::dec(int semId) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = 0;
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "SvsemUtil::dec");
      return errno;
    }
  return 0;
}
int SvsemUtil::dec_nowait(int semId) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = IPC_NOWAIT | SEM_UNDO;
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "SvsemUtil::dec_nowait");
      return errno;
    }
  return 0;
}
int SvsemUtil::dec_timeout(const int semId, const struct timespec *timeout) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = 0;
  while (semtimedop(semId, &sops, 1, timeout) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "SvsemUtil::dec_timeout");
      return errno;
    }
  return 0;
}
/**
 * If sem_op equals 0, the value of the semaphore is checked to see whether it
 * currently equals 0. If it does, the operation completes immediately; otherwise,
 * semop() blocks until the semaphore value becomes 0.
 */
int SvsemUtil::zero(int semId) {
// logger.debug("%d: SvsemUtil::dec\n", semId);
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = 0;
  sops.sem_flg = 0;
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "SvsemUtil::zero");
      return errno;
    }
  return 0;
}
int SvsemUtil::zero_nowait(int semId) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = 0;
  sops.sem_flg = IPC_NOWAIT;
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "SvsemUtil::zero_nowait");
      return errno;
    }
  return 0;
}
int SvsemUtil::zero_timeout(const int semId, const struct timespec *timeout) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = 0;
  sops.sem_flg = 0;
  while (semtimedop(semId, &sops, 1, timeout) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "SvsemUtil::zero_timeout");
      return errno;
    }
  return 0;
}
/* Release semaphore - increment it by 1 */
int SvsemUtil::inc(int semId) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = 1;
  sops.sem_flg = 0;
  int rv = semop(semId, &sops, 1);
  if (rv == -1) {
    // err_msg(errno, "SvsemUtil::inc");
    return errno;
  }
  return 0;
}
int SvsemUtil::set(int semId, int val) {
  union semun arg;
  arg.val = val;
  if (semctl(semId, 0, SETVAL, arg) == -1) {
    err_msg(errno, "SvsemUtil::set");
    return errno;
  }
  return 0;
}
int SvsemUtil::cond_wait(int semId ){
  struct sembuf sops[2];
  //释放mutex
  sops[0].sem_num = 0;
  sops[0].sem_op = 1;
  sops[0].sem_flg = 0;
  // 等待cond
  sops[1].sem_num = 1;
  sops[1].sem_op = -1;
  sops[1].sem_flg = 0;
  while (semop(semId, sops, 2) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "SvsemUtil::dec");
      return errno;
    }
  return 0;
}
int SvsemUtil::cond_signal(int semId ){
  struct sembuf sops;
  // 通知等待cond的进程
  sops.sem_num = 1;
  sops.sem_op = 1;
  sops.sem_flg = 0;
  int rv = semop(semId, &sops, 1);
  if (rv == -1) {
    // err_msg(errno, "SvsemUtil::inc");
    return errno;
  }
  return 0;
}
void SvsemUtil::remove(int semid) {
  union semun dummy;
  if (semctl(semid, 0, IPC_RMID, dummy) == -1)
    err_msg(errno, "SvsemUtil::remove");
}
src/svsem_util.h
New file
@@ -0,0 +1,47 @@
#ifndef _SVSEM_UTIL_H
#define _SVSEM_UTIL_H
#include "usg_common.h"
class SvsemUtil {
public:
  static int get(key_t key, int nsems, unsigned short * arr_val) ;
  /* Reserve semaphore (blocking), return 0 on success, or -1 with 'errno'
     set to EINTR if operation was interrupted by a signal handler */
  /* Reserve semaphore - decrement it by 1 */
  static int dec(int semId)  ;
  static int dec_nowait(int semId)  ;
  static int dec_timeout(const int semId, const struct timespec *timeout) ;
  /**
   * If sem_op equals 0, the value of the semaphore is checked to see whether it
   * currently equals 0. If it does, the operation completes immediately; otherwise,
   * semop() blocks until the semaphore value becomes 0.
   */
  static int zero(int semId) ;
  static int zero_nowait(int semId) ;
  static int zero_timeout(const int semId, const struct timespec *timeout)  ;
  /* Release semaphore - increment it by 1 */
  static int inc(int semId) ;
  static int set(int semId, int val) ;
  static int cond_wait(int semid ) ;
  static int cond_signal(int semid ) ;
  static void remove(int semid) ;
};
#endif
test/CMakeLists.txt
@@ -32,11 +32,3 @@
add_executable(svsem_mon svsem_mon.cpp )
target_link_libraries(svsem_mon PRIVATE  ${EXTRA_LIBS} )
target_include_directories(svsem_mon PRIVATE
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
                            )
test/svsem_test.cpp
New file
@@ -0,0 +1,242 @@
#include <stdio.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/mman.h>
#include <sys/syscall.h>
#include <linux/futex.h>
#include <sys/time.h>
#include "usg_common.h"
#include <sys/mman.h>
#include <sys/stat.h>        /* For mode constants */
#include <fcntl.h>           /* For O_* constants */
#include "sem_util.h"
#include "sem_util.h"
int _get(key_t key, unsigned int value) {
// printf("==================_get===============================\n");
  int semid, perms;
  perms = S_IRUSR | S_IWUSR;
  semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms);
  if (semid != -1) { /* Successfully created the semaphore */
    union semun arg;
    struct sembuf sop;
    //logger.info("%ld: created semaphore\n", (long)getpid());
    arg.val = 0; /* So initialize it to 0 */
    if (semctl(semid, 0, SETVAL, arg) == -1)
      err_exit(errno, "semctl 1");
    //logger.info("%ld: initialized semaphore\n", (long)getpid());
    /* Perform a "no-op" semaphore operation - changes sem_otime
       so other processes can see we've initialized the set. */
    sop.sem_num = 0; /* Operate on semaphore 0 */
    sop.sem_op = value;
    sop.sem_flg = 0;
    if (semop(semid, &sop, 1) == -1)
      err_exit(errno, "semop");
    //logger.info("%ld: completed dummy semop()\n", (long)getpid());
  } else { /* We didn't create the semaphore set */
    if (errno != EEXIST) { /* Unexpected error from semget() */
      err_exit(errno, "semget 1");
    } else { /* Someone else already created it */
      const int MAX_TRIES = 10;
      int j;
      union semun arg;
      struct semid_ds ds;
      semid = semget(key, 1, perms); /* So just get ID */
      if (semid == -1)
        err_exit(errno, "semget 2");
     // logger.info("%ld: got semaphore key\n", (long)getpid());
      /* Wait until another process has called semop() */
      arg.buf = &ds;
      for (j = 0; j < MAX_TRIES; j++) {
        //logger.info("Try %d\n", j);
        if (semctl(semid, 0, IPC_STAT, arg) == -1)
          err_exit(errno, "semctl 2");
        if (ds.sem_otime != 0) /* Semop() performed? */
          break;               /* Yes, quit loop */
        sleep(1);              /* If not, wait and retry */
      }
      if (ds.sem_otime == 0) /* Loop ran to completion! */
        err_exit(errno, "Existing semaphore not initialized");
    }
  }
  return semid;
}
/* Reserve semaphore (blocking), return 0 on success, or -1 with 'errno'
   set to EINTR if operation was interrupted by a signal handler */
/* Reserve semaphore - decrement it by 1 */
int _dec(int semId) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = SEM_UNDO;
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "_dec");
      return errno;
    }
  return 0;
}
int _dec_nowait(int semId) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = IPC_NOWAIT ;
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "_dec_nowait");
      return errno;
    }
  return 0;
}
int _dec_timeout(const int semId, const struct timespec *timeout) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = -1;
  sops.sem_flg = 0;
  while (semtimedop(semId, &sops, 1, timeout) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "_dec_timeout");
      return errno;
    }
  return 0;
}
/**
 * If sem_op equals 0, the value of the semaphore is checked to see whether it
 * currently equals 0. If it does, the operation completes immediately; otherwise,
 * semop() blocks until the semaphore value becomes 0.
 */
int _zero(int semId) {
// logger.debug("%d: _dec\n", semId);
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = 0;
  sops.sem_flg = 0;
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "_zero");
      return errno;
    }
  return 0;
}
int _zero_nowait(int semId) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = 0;
  sops.sem_flg = IPC_NOWAIT;
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "_zero_nowait");
      return errno;
    }
  return 0;
}
int _zero_timeout(const int semId, const struct timespec *timeout) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = 0;
  sops.sem_flg = 0;
  while (semtimedop(semId, &sops, 1, timeout) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "_zero_timeout");
      return errno;
    }
  return 0;
}
/* Release semaphore - increment it by 1 */
int _inc(int semId) {
  struct sembuf sops;
  sops.sem_num = 0;
  sops.sem_op = 1;
  sops.sem_flg = 0;
  int rv = semop(semId, &sops, 1);
  if (rv == -1) {
    // err_msg(errno, "_inc");
    return errno;
  }
  return 0;
}
void _remove(int semid) {
  union semun dummy;
  if (semctl(semid, 0, IPC_RMID, dummy) == -1)
    err_msg(errno, "_remove");
}
int _set(int semId, int val) {
  union semun arg;
  arg.val = val;
  if (semctl(semId, 0, SETVAL, arg) == -1) {
    err_msg(errno, "_set");
    return errno;
  }
  return 0;
}
#define KEY 0x383
int main() {
    int semid = _get(KEY, 1) ;
    if(_dec(semid) != 0)
        err_exit(errno, "_dec");
        printf("(%ld) 进入互斥区\n", (long) getpid());
        sleep(10);
    if(_inc(semid) != 0)
        err_exit(errno, "_inc");
}
test_net_socket/CMakeLists.txt
@@ -15,11 +15,25 @@
add_executable(test_net_mod_socket test_net_mod_socket.cpp  ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh)
target_link_libraries(test_net_mod_socket PRIVATE shm_queue  ${EXTRA_LIBS} )
add_executable(heart_beat heart_beat.cpp ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh)
target_link_libraries(heart_beat PRIVATE shm_queue )
# target_link_libraries(heart_beat PRIVATE shm_queue  ${EXTRA_LIBS} )
add_executable(svsem_mon svsem_mon.cpp )
target_link_libraries(svsem_mon PRIVATE  ${EXTRA_LIBS} )
target_include_directories(svsem_mon PRIVATE
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
                            )
target_include_directories(test_net_mod_socket PRIVATE
                            "${PROJECT_BINARY_DIR}"
                             ${EXTRA_INCLUDES}
test_net_socket/heart_beat.sh
@@ -1,6 +1,6 @@
#! /bin/bash
PROCESSES=10
PROCESSES=4
function clean() {
    ipcrm -a
    ps -ef | grep "heart_beat" | awk  '{print $2}' | xargs -i kill -9 {}
test_net_socket/svsem_mon.cpp
New file
@@ -0,0 +1,42 @@
#include <sys/types.h>
#include <sys/sem.h>
#include <time.h>
#include "usg_common.h"
int
main(int argc, char *argv[])
{
    struct semid_ds ds;
    union semun arg, dummy;             /* Fourth argument for semctl() */
    int semid, j;
    if (argc != 2 || strcmp(argv[1], "--help") == 0)
        err_exit(0, "%s semid\n", argv[0]);
    semid = atoi(argv[1]);
    arg.buf = &ds;
    if (semctl(semid, 0, IPC_STAT, arg) == -1)
        err_exit(errno, "semctl");
    printf("Semaphore changed: %s", ctime(&ds.sem_ctime));
    printf("Last semop():      %s", ctime(&ds.sem_otime));
    /* Display per-semaphore information */
    arg.array = (short unsigned int*)calloc(ds.sem_nsems, sizeof(arg.array[0]));
    if (arg.array == NULL)
        err_exit(errno, "calloc");
    if (semctl(semid, 0, GETALL, arg) == -1)
        err_exit(errno, "semctl-GETALL");
    printf("Sem #  Value  SEMPID  SEMNCNT  SEMZCNT\n");
    for (j = 0; j < ds.sem_nsems; j++)
        printf("%3d   %5d   %5d  %5d    %5d\n", j, arg.array[j],
                semctl(semid, j, GETPID, dummy),
                semctl(semid, j, GETNCNT, dummy),
                semctl(semid, j, GETZCNT, dummy));
    exit(EXIT_SUCCESS);
}