From 4fd62552d8277f3d0ed20e66663cd219c36796df Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期四, 21 一月 2021 11:20:22 +0800
Subject: [PATCH] update

---
 src/svsem_util.cpp                        |  253 ++++++++++++++
 src/svsem_util.h                          |   47 ++
 src/psem.h                                |   14 
 src/net/net_mod_server_socket_wrapper.h   |    0 
 src/net/net_mod_socket_io.cpp             |    0 
 src/net/net_conn_pool.h                   |    0 
 test/svsem_test.cpp                       |  242 +++++++++++++
 src/queue/array_lock_free_queue2.h        |  176 ++++-----
 src/psem.cpp                              |   57 +++
 src/net/net_mod_socket_io.h               |    0 
 src/shm/hashtable.cpp                     |   19 
 test/CMakeLists.txt                       |    8 
 src/net/net_mod_server_socket_wrapper.cpp |    0 
 test_net_socket/CMakeLists.txt            |   14 
 src/net/net_conn_pool.cpp                 |    0 
 src/queue/lock_free_queue.h               |   55 +-
 src/net/net_mod_server_socket.cpp         |    0 
 test_net_socket/heart_beat.sh             |    2 
 test_net_socket/svsem_mon.cpp             |   42 ++
 src/net/net_mod_socket.cpp                |    0 
 /dev/null                                 |   13 
 src/net/net_mod_socket_wrapper.cpp        |    0 
 src/net/net_mod_socket_wrapper.h          |    0 
 src/net/net_mod_socket.h                  |    0 
 src/CMakeLists.txt                        |   91 ++--
 src/net/net_mod_server_socket.h           |    0 
 26 files changed, 836 insertions(+), 197 deletions(-)

diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 85de799..cfd7f89 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -8,24 +8,24 @@
 configure_file(bus_config.h.in bus_config.h)
 
 add_library(shm_queue 
-		logger_factory.cpp
-		socket/bus_server_socket.cpp
-		socket/bus_server_socket_wrapper.cpp
-		socket/shm_stream_mod_socket.cpp
-		socket/shm_socket.cpp
-		socket/net_conn_pool.cpp
-		socket/shm_mod_socket.cpp
-		socket/net_mod_server_socket_wrapper.cpp
-		socket/net_mod_socket_wrapper.cpp
-		socket/net_mod_socket.cpp
-		socket/net_mod_socket_io.cpp
-		socket/net_mod_server_socket.cpp
-		bus_error.cpp
-		shm/shm_mm_wrapper.cpp
-		shm/mm.cpp
-		shm/hashtable.cpp
-		px_sem_util.cpp
-    svsem_util.cpp
+  ./logger_factory.cpp
+  ./socket/bus_server_socket.cpp
+  ./socket/bus_server_socket_wrapper.cpp
+  ./socket/shm_stream_mod_socket.cpp
+  ./socket/shm_socket.cpp
+  ./socket/shm_mod_socket.cpp
+  ./psem.cpp
+  ./svsem_util.cpp
+  ./bus_error.cpp
+  ./net/net_conn_pool.cpp
+  ./net/net_mod_server_socket_wrapper.cpp
+  ./net/net_mod_socket_wrapper.cpp
+  ./net/net_mod_socket.cpp
+  ./net/net_mod_socket_io.cpp
+  ./net/net_mod_server_socket.cpp
+  ./shm/shm_mm_wrapper.cpp
+  ./shm/mm.cpp
+  ./shm/hashtable.cpp
 
 	)
 
@@ -39,6 +39,7 @@
                            ${CMAKE_CURRENT_SOURCE_DIR}/shm
                            ${CMAKE_CURRENT_SOURCE_DIR}/queue
                            ${CMAKE_CURRENT_SOURCE_DIR}/socket
+                           ${CMAKE_CURRENT_SOURCE_DIR}/net
                            )
  
 target_link_libraries(shm_queue PUBLIC  ${EXTRA_LIBS} )
@@ -46,32 +47,34 @@
 # install rules
 install(TARGETS shm_queue DESTINATION lib)
 install(FILES 
-  socket/socket_def.h
-  socket/net_conn_pool.h
-  socket/bus_server_socket.h
-  socket/shm_socket.h
-  socket/net_mod_socket.h
-  socket/shm_stream_mod_socket.h
-  socket/net_mod_server_socket_wrapper.h
-  socket/net_mod_socket_io.h
-  socket/net_mod_server_socket.h
-  socket/shm_mod_socket.h
-  socket/net_mod_socket_wrapper.h
-  socket/bus_server_socket_wrapper.h
-  key_def.h
-  bus_error.h
-  px_sem_util.h
-  logger_factory.h
-  queue/linked_lock_free_queue.h
-  queue/array_lock_free_queue2.h
-  queue/array_lock_free_queue.h
-  queue/shm_queue.h
-  queue/lock_free_queue.h
-  shm/hashtable.h
-  shm/mem_pool.h
-  shm/mm.h
-  shm/shm_mm_wrapper.h
-  shm/shm_allocator.h
+ ./socket/socket_def.h
+./socket/bus_server_socket.h
+./socket/shm_socket.h
+./socket/shm_stream_mod_socket.h
+./socket/shm_mod_socket.h
+./socket/bus_server_socket_wrapper.h
+./psem.h
+./key_def.h
+./bus_error.h
+./svsem_util.h
+./logger_factory.h
+./queue/linked_lock_free_queue.h
+./queue/array_lock_free_queue2.h
+./queue/array_lock_free_queue.h
+./queue/shm_queue.h
+./queue/lock_free_queue.h
+./net/net_conn_pool.h
+./net/net_mod_socket.h
+./net/net_mod_server_socket_wrapper.h
+./net/net_mod_socket_io.h
+./net/net_mod_server_socket.h
+./net/net_mod_socket_wrapper.h
+./shm/hashtable.h
+./shm/mem_pool.h
+./shm/mm.h
+./shm/shm_mm_wrapper.h
+./shm/shm_allocator.h
+
 
   DESTINATION include)
 
diff --git a/src/socket/net_conn_pool.cpp b/src/net/net_conn_pool.cpp
similarity index 100%
rename from src/socket/net_conn_pool.cpp
rename to src/net/net_conn_pool.cpp
diff --git a/src/socket/net_conn_pool.h b/src/net/net_conn_pool.h
similarity index 100%
rename from src/socket/net_conn_pool.h
rename to src/net/net_conn_pool.h
diff --git a/src/socket/net_mod_server_socket.cpp b/src/net/net_mod_server_socket.cpp
similarity index 100%
rename from src/socket/net_mod_server_socket.cpp
rename to src/net/net_mod_server_socket.cpp
diff --git a/src/socket/net_mod_server_socket.h b/src/net/net_mod_server_socket.h
similarity index 100%
rename from src/socket/net_mod_server_socket.h
rename to src/net/net_mod_server_socket.h
diff --git a/src/socket/net_mod_server_socket_wrapper.cpp b/src/net/net_mod_server_socket_wrapper.cpp
similarity index 100%
rename from src/socket/net_mod_server_socket_wrapper.cpp
rename to src/net/net_mod_server_socket_wrapper.cpp
diff --git a/src/socket/net_mod_server_socket_wrapper.h b/src/net/net_mod_server_socket_wrapper.h
similarity index 100%
rename from src/socket/net_mod_server_socket_wrapper.h
rename to src/net/net_mod_server_socket_wrapper.h
diff --git a/src/socket/net_mod_socket.cpp b/src/net/net_mod_socket.cpp
similarity index 100%
rename from src/socket/net_mod_socket.cpp
rename to src/net/net_mod_socket.cpp
diff --git a/src/socket/net_mod_socket.h b/src/net/net_mod_socket.h
similarity index 100%
rename from src/socket/net_mod_socket.h
rename to src/net/net_mod_socket.h
diff --git a/src/socket/net_mod_socket_io.cpp b/src/net/net_mod_socket_io.cpp
similarity index 100%
rename from src/socket/net_mod_socket_io.cpp
rename to src/net/net_mod_socket_io.cpp
diff --git a/src/socket/net_mod_socket_io.h b/src/net/net_mod_socket_io.h
similarity index 100%
rename from src/socket/net_mod_socket_io.h
rename to src/net/net_mod_socket_io.h
diff --git a/src/socket/net_mod_socket_wrapper.cpp b/src/net/net_mod_socket_wrapper.cpp
similarity index 100%
rename from src/socket/net_mod_socket_wrapper.cpp
rename to src/net/net_mod_socket_wrapper.cpp
diff --git a/src/socket/net_mod_socket_wrapper.h b/src/net/net_mod_socket_wrapper.h
similarity index 100%
rename from src/socket/net_mod_socket_wrapper.h
rename to src/net/net_mod_socket_wrapper.h
diff --git a/src/psem.cpp b/src/psem.cpp
new file mode 100644
index 0000000..8d9333f
--- /dev/null
+++ b/src/psem.cpp
@@ -0,0 +1,57 @@
+#include "psem.h"
+#include <semaphore.h>
+
+#define NANO 1000000000
+
+
+static struct timespec psem_calc_abs_timeout(const struct timespec *ts) {
+ 
+	struct timespec res;
+  struct timespec timeout;
+  if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
+      err_exit(errno, "clock_gettime");
+
+  res.tv_sec = timeout.tv_sec + ts->tv_sec;
+  res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
+  res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
+  res.tv_nsec = res.tv_nsec % NANO;
+  return res;
+}
+
+
+int psem_timedwait(sem_t *sem, const struct timespec *ts) {
+	struct timespec abs_timeout = psem_calc_abs_timeout(ts);
+
+  int rv ;
+  while ( (rv = sem_timedwait(sem, &abs_timeout)) == -1) {
+      if(errno == EINTR)
+          continue;
+      else {
+         // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
+         return rv;
+      }
+  }
+  return 0;
+}
+
+
+int psem_wait(sem_t *sem) {
+  int rv;
+  while ( (rv = sem_wait(sem)) == -1) {
+      if(errno == EINTR)
+          continue;
+      else {
+         // LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
+         return rv;
+      }
+  }
+  return 0;
+}
+
+int psem_trywait(sem_t *sem) {
+	return sem_trywait(sem);
+}
+
+int psem_post(sem_t *sem) {
+	return sem_post(sem);
+}
diff --git a/src/psem.h b/src/psem.h
new file mode 100644
index 0000000..30c6670
--- /dev/null
+++ b/src/psem.h
@@ -0,0 +1,14 @@
+#ifndef _PSEM_H_
+#define _PSEM_H_  
+
+#include "usg_common.h"
+
+int psem_wait(sem_t *sem) ;
+
+int psem_timedwait(sem_t *sem, const struct timespec *ts);
+
+int psem_trywait(sem_t *sem) ;
+
+int psem_post(sem_t *sem);
+
+#endif
diff --git a/src/px_sem_util.cpp b/src/px_sem_util.cpp
deleted file mode 100644
index 9de9c38..0000000
--- a/src/px_sem_util.cpp
+++ /dev/null
@@ -1,16 +0,0 @@
-#include "px_sem_util.h"
-
-#define NANO 1000000000
-struct timespec PXSemUtil::calc_sem_timeout(const struct timespec *ts) {
- 
-	struct timespec res;
-  struct timespec timeout;
-  if (clock_gettime(CLOCK_REALTIME, &timeout) == -1)
-      err_exit(errno, "clock_gettime");
-
-  res.tv_sec = timeout.tv_sec + ts->tv_sec;
-  res.tv_nsec = timeout.tv_nsec + ts->tv_nsec;
-  res.tv_sec = res.tv_sec + floor(res.tv_nsec / NANO);
-  res.tv_nsec = res.tv_nsec % NANO;
-  return res;
-}
\ No newline at end of file
diff --git a/src/px_sem_util.h b/src/px_sem_util.h
deleted file mode 100644
index 60ec978..0000000
--- a/src/px_sem_util.h
+++ /dev/null
@@ -1,13 +0,0 @@
-#ifndef PX_SEM_UTIL_H
-#define PX_SEM_UTIL_H  
-
-#include "usg_common.h"
-#include "usg_typedef.h"
-
-namespace PXSemUtil {
-
-	struct timespec calc_sem_timeout(const struct timespec *ts);
-
-}
-
-#endif
diff --git a/src/queue/array_lock_free_queue2.h b/src/queue/array_lock_free_queue2.h
index 3b79b7f..233bc6a 100644
--- a/src/queue/array_lock_free_queue2.h
+++ b/src/queue/array_lock_free_queue2.h
@@ -1,9 +1,11 @@
 #ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
 #define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
-
+#include "atomic_ops.h"
 #include <assert.h> // assert()
 #include <sched.h>  // sched_yield()
 #include "logger_factory.h"
+#include "mem_pool.h"
+#include "shm_allocator.h"
 
 /// @brief implementation of an array based lock free queue with support for 
 ///        multiple producers
@@ -15,13 +17,15 @@
 
 #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator = SHM_Allocator>
 class ArrayLockFreeQueue
 {
     // ArrayLockFreeQueue will be using this' private members
     template <
         typename ELEM_T_, 
-        template <typename T> class Q_TYPE >
+        typename Allocator_,
+        template <typename T, typename AT> class Q_TYPE
+        >
     friend class LockFreeQueue;
 
 private:
@@ -52,10 +56,10 @@
     ELEM_T *m_theQueue;
 
     /// @brief where a new element will be inserted
-    std::atomic<uint32_t> m_writeIndex;
+    uint32_t m_writeIndex;
 
     /// @brief where the next element where be extracted from
-    std::atomic<uint32_t> m_readIndex;
+    uint32_t m_readIndex;
     
     /// @brief maximum read index for multiple producer queues
     /// If it's not the same as m_writeIndex it means
@@ -65,23 +69,23 @@
     /// to wait for those other threads to save the data into the queue
     ///
     /// note this is only used for multiple producers
-    std::atomic<uint32_t> m_maximumReadIndex;
+    uint32_t m_maximumReadIndex;
 
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
     /// @brief number of elements in the queue
-    std::atomic<uint32_t> m_count;
+    uint32_t m_count;
 #endif
    
     
 private:
     /// @brief disable copy constructor declaring it private
-    ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+    ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
 
 };
 
 
-template <typename ELEM_T>
-ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
     Q_SIZE(qsize),
     m_writeIndex(0),      // initialisation is not atomic
     m_readIndex(0),       //
@@ -90,38 +94,38 @@
     ,m_count(0)           //
 #endif
 {
-    m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T));
+    m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
 
 }
 
-template <typename ELEM_T>
-ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
 {
     // std::cout << "destroy ArrayLockFreeQueue\n";
-    mm_free(m_theQueue);
+    Allocator::deallocate(m_theQueue);
     
 }
 
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
 inline 
-uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count)
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
 {
     // if Q_SIZE is a power of 2 this statement could be also written as 
     // return (a_count & (Q_SIZE - 1));
     return (a_count % Q_SIZE);
 }
 
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
 inline 
-uint32_t ArrayLockFreeQueue<ELEM_T>::size()
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
 {
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-    return m_count.load();
+    return m_count;
 #else
 
-    uint32_t currentWriteIndex = m_maximumReadIndex.load();
-    uint32_t currentReadIndex  = m_readIndex.load();
+    uint32_t currentWriteIndex = m_maximumReadIndex;
+    uint32_t currentReadIndex  = m_readIndex;
 
     // let's think of a scenario where this function returns bogus data
     // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
@@ -146,13 +150,13 @@
 #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 }
 
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
 inline 
-bool ArrayLockFreeQueue<ELEM_T>::full()
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
 {
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-    return (m_count.load() == (Q_SIZE));
+    return (m_count == (Q_SIZE));
 #else
 
     uint32_t currentWriteIndex = m_writeIndex;
@@ -171,16 +175,16 @@
 #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 }
 
-template <typename ELEM_T>
+template <typename ELEM_T, typename Allocator>
 inline 
-bool ArrayLockFreeQueue<ELEM_T>::empty()
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
 {
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-    return (m_count.load() == 0);
+    return (m_count == 0);
 #else
 
-    if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load()))
+    if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
     {
         // the queue is full
         return true;
@@ -194,54 +198,44 @@
 }
 
 
-template <typename ELEM_T>
-bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
+
+
+
+
+template <typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
 {
     uint32_t currentReadIndex;
     uint32_t currentWriteIndex;
-    
+
     do
     {
-        currentWriteIndex = m_writeIndex.load();
-        currentReadIndex = m_readIndex.load();
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-        if (m_count.load() == Q_SIZE) {
+        currentWriteIndex = m_writeIndex;
+        currentReadIndex  = m_readIndex;
+    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+        if (m_count == Q_SIZE) {
             return false;
         }
-#else
+    #else
         if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
         {
             // the queue is full
             return false;
         }
-#endif
-        
-    // There is more than one producer. Keep looping till this thread is able 
-    // to allocate space for current piece of data
-    //
-    // using compare_exchange_strong because it isn't allowed to fail spuriously
-    // When the compare_exchange operation is in a loop the weak version
-    // will yield better performance on some platforms, but here we'd have to
-    // load m_writeIndex all over again
-    } while (!m_writeIndex.compare_exchange_strong(
-                currentWriteIndex, (currentWriteIndex + 1)));
-    
-    // Just made sure this index is reserved for this thread.
+    #endif
+
+    } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+    // We know now that this index is reserved for us. Use it to save the data
     m_theQueue[countToIndex(currentWriteIndex)] = a_data;
-    //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) );
-    
-    // update the maximum read index after saving the piece of data. It can't
-    // fail if there is only one thread inserting in the queue. It might fail 
-    // if there is more than 1 producer thread because this operation has to
-    // be done in the same order as the previous CAS
-    //
-    // using compare_exchange_weak because they are allowed to fail spuriously
-    // (act as if *this != expected, even if they are equal), but when the
-    // compare_exchange operation is in a loop the weak version will yield
-    // better performance on some platforms.
-    while (!m_maximumReadIndex.compare_exchange_weak(
-                currentWriteIndex, (currentWriteIndex + 1)))
+
+    // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+    // inserting in the queue. It might fail if there are more than 1 producer threads because this
+    // operation has to be done in the same order as the previous CAS
+
+    while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
     {
         // this is a good place to yield the thread in case there are more
         // software threads than hardware processors and you have more
@@ -250,37 +244,35 @@
         sched_yield();
     }
 
-    // The value was successfully inserted into the queue
 #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-    m_count.fetch_add(1);
+    AtomicAdd(&m_count, 1);
 #endif
-
     return true;
 }
 
-template <typename ELEM_T>
-bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
+
+template <typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
 {
     uint32_t currentMaximumReadIndex;
     uint32_t currentReadIndex;
 
     do
     {
-        currentReadIndex = m_readIndex.load();
-        currentMaximumReadIndex = m_maximumReadIndex.load();
+        // to ensure thread-safety when there is more than 1 producer thread
+        // a second index is defined (m_maximumReadIndex)
+        currentReadIndex        = m_readIndex;
+        currentMaximumReadIndex = m_maximumReadIndex;
+    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
 
-     #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-
-        if (m_count.load() == 0) {
+        if (m_count == 0) {
             return false;
         }
     #else
-        // to ensure thread-safety when there is more than 1 producer 
-        // thread a second index is defined (m_maximumReadIndex)
         if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
         {
             // the queue is empty or
-            // a producer thread has allocate space in the queue but is 
+            // a producer thread has allocate space in the queue but is
             // waiting to commit the data into it
             return false;
         }
@@ -288,23 +280,22 @@
 
         // retrieve the data from the queue
         a_data = m_theQueue[countToIndex(currentReadIndex)];
-        //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) );
+
         // try to perfrom now the CAS operation on the read index. If we succeed
-        // a_data already contains what m_readIndex pointed to before we 
+        // a_data already contains what m_readIndex pointed to before we
         // increased it
-        if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1)))
+        if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
         {
-            // got here. The value was retrieved from the queue. Note that the
-            // data inside the m_queue array is not deleted nor reseted
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
-            m_count.fetch_sub(1);
-#endif
+        #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+            // m_count.fetch_sub(1);
+            AtomicSub(&m_count, 1);
+        #endif
             return true;
         }
-        
+
         // it failed retrieving the element off the queue. Someone else must
         // have read the element stored at countToIndex(currentReadIndex)
-        // before we could perform the CAS operation        
+        // before we could perform the CAS operation
 
     } while(1); // keep looping to try again!
 
@@ -312,18 +303,17 @@
     assert(0);
 
     // Add this return statement to avoid compiler warnings
-    return false;    
+    return false;
 }
 
-
-template <typename ELEM_T>
-ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i)
+template <typename ELEM_T, typename Allocator>
+ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
 {
-    int currentCount = m_count.load();
-    uint32_t currentReadIndex = m_readIndex.load();
-    if (i < 0 || i >= currentCount)
+    int currentCount = m_count;
+    uint32_t currentReadIndex = m_readIndex;
+    if (i >= currentCount)
     {
-        std::cerr << "Error in array limits: " << i << " is out of range\n";
+        std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
         std::exit(EXIT_FAILURE);
     }
     return m_theQueue[countToIndex(currentReadIndex+i)];
diff --git a/src/queue/lock_free_queue.h b/src/queue/lock_free_queue.h
index 924537f..b7dfd9f 100644
--- a/src/queue/lock_free_queue.h
+++ b/src/queue/lock_free_queue.h
@@ -10,7 +10,7 @@
 #include "sem_util.h"
 #include "logger_factory.h"
 #include "shm_allocator.h"
-#include "px_sem_util.h"
+#include "psem.h"
 #include "bus_error.h"
 
 // default Queue size
@@ -219,15 +219,15 @@
     template <typename T, typename AT> class Q_TYPE>
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
 {
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");   
-    if (sem_wait(&slots) == -1) {
+ LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n");   
+    if (psem_wait(&slots) == -1) {
         err_msg(errno, "LockFreeQueue push");
         return errno;
     }
     
     if ( m_qImpl.push(a_data) ) {
-        sem_post(&items);   
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");   
+        psem_post(&items);   
+LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n");   
         return 0;
     }
     return -1;
@@ -240,7 +240,7 @@
     template <typename T, typename AT> class Q_TYPE>
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
 {
-    if (sem_trywait(&slots) == -1) {
+    if (psem_trywait(&slots) == -1) {
         if (errno == EAGAIN)
             return EAGAIN;
         else {
@@ -251,7 +251,7 @@
     }
 
     if ( m_qImpl.push(a_data)) {
-        sem_post(&items);     
+        psem_post(&items);     
         return 0;
     }
     return -1;
@@ -264,20 +264,12 @@
     template <typename T, typename AT> class Q_TYPE>
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * ts)
 {
-     
+LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before\n");       
     int rv;
-    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
-  // LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld", 
-  //   timeout.tv_sec, timeout.tv_nsec);
-
-    while ( sem_timedwait(&slots, &timeout) == -1) {
-    //     LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout before tv_sec=%d, tv_nsec=%ld, ETIMEDOUT=%d, errno=%d\n", 
-    // timeout.tv_sec, timeout.tv_nsec, ETIMEDOUT, errno);
+    if ( psem_timedwait(&slots, ts) == -1) {
 
         if(errno == ETIMEDOUT)
             return EBUS_TIMEOUT;
-        else if(errno == EINTR)
-            continue;
         else {
            LoggerFactory::getLogger()->error(errno, "LockFreeQueue push_timeout");
            return errno;
@@ -285,8 +277,8 @@
     }
 
     if (m_qImpl.push(a_data)){
-        sem_post(&items);   
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");    
+        psem_post(&items);   
+LoggerFactory::getLogger()->debug("==================LockFreeQueue push_timeout after\n");    
         return 0;
     }
     return -1;
@@ -303,15 +295,15 @@
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
 {
 
- // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
-    if (sem_wait(&items) == -1) {
+  LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before\n");
+    if (psem_wait(&items) == -1) {
         LoggerFactory::getLogger()->error(errno, "LockFreeQueue pop");
         return errno;
     }
 
     if (m_qImpl.pop(a_data)) {
-        sem_post(&slots);
-// LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");      
+        psem_post(&slots);
+ LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n");      
         return 0;
     }
     return -1;
@@ -323,7 +315,7 @@
     template <typename T, typename AT> class Q_TYPE>
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
 {
-    if (sem_trywait(&items) == -1) {
+    if (psem_trywait(&items) == -1) {
         if (errno == EAGAIN)
             return errno;
         else {
@@ -333,7 +325,7 @@
     }
 
     if (m_qImpl.pop(a_data)) {
-        sem_post(&slots);     
+        psem_post(&slots);     
         return 0;
     }
     return -1;
@@ -346,18 +338,11 @@
     template <typename T, typename AT> class Q_TYPE>
 int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * ts)
 {
-
-    // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout before\n");   
-    struct timespec timeout = PXSemUtil::calc_sem_timeout(ts);
-
-    while (sem_timedwait(&items, &timeout) == -1) {
-        // LoggerFactory::getLogger()->error(errno, "1 LockFreeQueue pop_timeout %d %d", errno, ETIMEDOUT);
+    if (psem_timedwait(&items, ts) == -1) {
         if (errno == ETIMEDOUT) {
-             // LoggerFactory::getLogger()->error(errno, "2 LockFreeQueue pop_timeout %d %d", errno, EBUS_TIMEOUT);
             return EBUS_TIMEOUT;
         }
-        else if(errno == EINTR)
-            continue;
+       
         else {
           LoggerFactory::getLogger()->error(errno, "3  LockFreeQueue pop_timeout %d", errno);
           return errno;
@@ -365,7 +350,7 @@
     }
 
     if (m_qImpl.pop(a_data)) {
-        sem_post(&slots);  
+        psem_post(&slots);  
 // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop_timeout after\n");     
         return 0;
     }
diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp
index e421630..338d482 100755
--- a/src/shm/hashtable.cpp
+++ b/src/shm/hashtable.cpp
@@ -34,7 +34,13 @@
   hashtable->wlock = SemUtil::get(IPC_PRIVATE, 1);
   hashtable->cond = SemUtil::get(IPC_PRIVATE, 1);
   hashtable->readcnt = 0;
-printf("hashtable->mutex=%d\n", hashtable->mutex);
+
+  FILE * semfile = fopen("./sem.txt", "w+");
+  if(semfile == NULL) {
+    err_exit(errno, "fopen");
+  }
+  fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex);
+  fclose(semfile);
 }
 
 void hashtable_destroy(hashtable_t *hashtable) {
@@ -202,9 +208,13 @@
 }
 
 void *hashtable_get(hashtable_t *hashtable, int key) {
-
+  LoggerFactory::getLogger()->debug( "==========hashtable_get before 1");
+  
   int rv;
   rv = SemUtil::dec(hashtable->mutex);
+
+  LoggerFactory::getLogger()->debug( "==========hashtable_get before 2");
+
   if(rv != 0) {
     LoggerFactory::getLogger()->error(rv, "hashtable_get 1");
   }
@@ -248,13 +258,14 @@
   if(rv != 0) {
     LoggerFactory::getLogger()->error(rv, "hashtable_get 7");
   }
+  LoggerFactory::getLogger()->debug( "==========hashtable_get after");
   return res;
 }
 
 void hashtable_put(hashtable_t *hashtable, int key, void *value) {
 
   int rv;
-
+  LoggerFactory::getLogger()->debug( "==========hashtable_put before");
   rv = SemUtil::dec(hashtable->mutex);
   if(rv != 0) {
     LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
@@ -300,6 +311,8 @@
   if(rv != 0) {
     LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
   }
+
+  LoggerFactory::getLogger()->debug( "==========hashtable_put after");
 }
 
 
diff --git a/src/svsem_util.cpp b/src/svsem_util.cpp
new file mode 100644
index 0000000..6a8ba2b
--- /dev/null
+++ b/src/svsem_util.cpp
@@ -0,0 +1,253 @@
+#include "svsem_util.h"
+
+int SvsemUtil::get(key_t key, int nsems, unsigned short * arr_val) {
+// printf("==================SvsemUtil::get===============================\n");
+  int semid, perms;
+
+  perms = S_IRUSR | S_IWUSR;
+
+  semid = semget(key, nsems, IPC_CREAT | IPC_EXCL | perms);
+
+  if (semid != -1) { /* Successfully created the semaphore */
+    union semun arg;
+    struct sembuf sop;
+
+    //logger.info("%ld: created semaphore\n", (long)getpid());
+
+    arg.array = arr_val; /* So initialize it to arr_val */
+    if (semctl(semid, 0, SETALL, arg) == -1)
+      err_exit(errno, "semctl 1");
+    //logger.info("%ld: initialized semaphore\n", (long)getpid());
+
+    /* Perform a "no-op" semaphore operation - changes sem_otime
+       so other processes can see we've initialized the set. */
+
+    sop.sem_num = 0; /* Operate on semaphore 0 */
+    sop.sem_op = arr_val[0];
+    sop.sem_flg = 0;
+    if (semop(semid, &sop, 1) == -1)
+      err_exit(errno, "semop");
+    //logger.info("%ld: completed dummy semop()\n", (long)getpid());
+
+  } else { /* We didn't create the semaphore set */
+
+    if (errno != EEXIST) { /* Unexpected error from semget() */
+      err_exit(errno, "semget 1");
+
+    } else { /* Someone else already created it */
+      const int MAX_TRIES = 10;
+      int j;
+      union semun arg;
+      struct semid_ds ds;
+
+      semid = semget(key, nsems, perms); /* So just get ID */
+      if (semid == -1)
+        err_exit(errno, "semget 2");
+
+     // logger.info("%ld: got semaphore key\n", (long)getpid());
+      /* Wait until another process has called semop() */
+
+      arg.buf = &ds;
+      for (j = 0; j < MAX_TRIES; j++) {
+        //logger.info("Try %d\n", j);
+        if (semctl(semid, 0, IPC_STAT, arg) == -1)
+          err_exit(errno, "semctl 2");
+
+        if (ds.sem_otime != 0) /* Semop() performed? */
+          break;               /* Yes, quit loop */
+        sleep(1);              /* If not, wait and retry */
+      }
+
+      if (ds.sem_otime == 0) /* Loop ran to completion! */
+        err_exit(errno, "Existing semaphore not initialized");
+    }
+  }
+  return semid;
+}
+
+/* Reserve semaphore (blocking), return 0 on success, or -1 with 'errno'
+   set to EINTR if operation was interrupted by a signal handler */
+
+/* Reserve semaphore - decrement it by 1 */
+int SvsemUtil::dec(int semId) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = -1;
+  sops.sem_flg = 0;
+
+  while (semop(semId, &sops, 1) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "SvsemUtil::dec");
+      return errno;
+    }
+
+  return 0;
+}
+
+int SvsemUtil::dec_nowait(int semId) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = -1;
+  sops.sem_flg = IPC_NOWAIT | SEM_UNDO;
+
+  while (semop(semId, &sops, 1) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "SvsemUtil::dec_nowait");
+      return errno;
+    }
+
+  return 0;
+}
+
+int SvsemUtil::dec_timeout(const int semId, const struct timespec *timeout) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = -1;
+  sops.sem_flg = 0;
+
+  while (semtimedop(semId, &sops, 1, timeout) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "SvsemUtil::dec_timeout");
+      return errno;
+    }
+
+  return 0;
+}
+
+
+/**
+ * If sem_op equals 0, the value of the semaphore is checked to see whether it
+ * currently equals 0. If it does, the operation completes immediately; otherwise,
+ * semop() blocks until the semaphore value becomes 0.
+ */
+int SvsemUtil::zero(int semId) {
+// logger.debug("%d: SvsemUtil::dec\n", semId);
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = 0;
+  sops.sem_flg = 0;
+
+  while (semop(semId, &sops, 1) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "SvsemUtil::zero");
+      return errno;
+    }
+
+  return 0;
+}
+
+
+int SvsemUtil::zero_nowait(int semId) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = 0;
+  sops.sem_flg = IPC_NOWAIT;
+
+  while (semop(semId, &sops, 1) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "SvsemUtil::zero_nowait");
+      return errno;
+    }
+
+  return 0;
+}
+
+int SvsemUtil::zero_timeout(const int semId, const struct timespec *timeout) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = 0;
+  sops.sem_flg = 0;
+
+  while (semtimedop(semId, &sops, 1, timeout) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "SvsemUtil::zero_timeout");
+      return errno;
+    }
+
+  return 0;
+}
+
+
+/* Release semaphore - increment it by 1 */
+int SvsemUtil::inc(int semId) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = 1;
+  sops.sem_flg = 0;
+
+  int rv = semop(semId, &sops, 1);
+  if (rv == -1) {
+    // err_msg(errno, "SvsemUtil::inc");
+    return errno;
+  }
+  return 0;
+}
+
+
+int SvsemUtil::set(int semId, int val) {
+  union semun arg;
+  arg.val = val;
+  if (semctl(semId, 0, SETVAL, arg) == -1) {
+    err_msg(errno, "SvsemUtil::set");
+    return errno;
+  }
+  return 0;
+}
+
+
+
+
+int SvsemUtil::cond_wait(int semId ){
+
+  struct sembuf sops[2];
+
+  //閲婃斁mutex
+  sops[0].sem_num = 0;
+  sops[0].sem_op = 1;
+  sops[0].sem_flg = 0;
+
+  // 绛夊緟cond
+  sops[1].sem_num = 1;
+  sops[1].sem_op = -1;
+  sops[1].sem_flg = 0;
+
+  while (semop(semId, sops, 2) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "SvsemUtil::dec");
+      return errno;
+    }
+
+
+ 
+
+  return 0;
+}
+
+int SvsemUtil::cond_signal(int semId ){
+  struct sembuf sops;
+
+  // 閫氱煡绛夊緟cond鐨勮繘绋�
+  sops.sem_num = 1;
+  sops.sem_op = 1;
+  sops.sem_flg = 0;
+
+  int rv = semop(semId, &sops, 1);
+  if (rv == -1) {
+    // err_msg(errno, "SvsemUtil::inc");
+    return errno;
+  }
+  return 0;
+}
+
+void SvsemUtil::remove(int semid) {
+  union semun dummy;
+  if (semctl(semid, 0, IPC_RMID, dummy) == -1)
+    err_msg(errno, "SvsemUtil::remove");
+}
diff --git a/src/svsem_util.h b/src/svsem_util.h
new file mode 100644
index 0000000..f8b85fe
--- /dev/null
+++ b/src/svsem_util.h
@@ -0,0 +1,47 @@
+#ifndef _SVSEM_UTIL_H
+#define _SVSEM_UTIL_H  
+
+#include "usg_common.h"
+
+class SvsemUtil {
+public:
+  static int get(key_t key, int nsems, unsigned short * arr_val) ;
+  /* Reserve semaphore (blocking), return 0 on success, or -1 with 'errno'
+     set to EINTR if operation was interrupted by a signal handler */
+
+  /* Reserve semaphore - decrement it by 1 */
+  static int dec(int semId)  ;
+
+  static int dec_nowait(int semId)  ;
+
+  static int dec_timeout(const int semId, const struct timespec *timeout) ;
+
+
+  /**
+   * If sem_op equals 0, the value of the semaphore is checked to see whether it
+   * currently equals 0. If it does, the operation completes immediately; otherwise,
+   * semop() blocks until the semaphore value becomes 0.
+   */
+  static int zero(int semId) ; 
+
+
+  static int zero_nowait(int semId) ;
+
+  static int zero_timeout(const int semId, const struct timespec *timeout)  ;
+
+
+  /* Release semaphore - increment it by 1 */
+  static int inc(int semId) ;
+
+
+  static int set(int semId, int val) ;
+
+
+
+
+  static int cond_wait(int semid ) ;
+  static int cond_signal(int semid ) ;
+  static void remove(int semid) ;
+};
+
+#endif
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index a9fc516..18e1408 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -32,11 +32,3 @@
 
 
 
-add_executable(svsem_mon svsem_mon.cpp )
-target_link_libraries(svsem_mon PRIVATE  ${EXTRA_LIBS} )
-target_include_directories(svsem_mon PRIVATE
-                            "${PROJECT_BINARY_DIR}"
-                             ${EXTRA_INCLUDES}
-                            )
-
-
diff --git a/test/svsem_test.cpp b/test/svsem_test.cpp
new file mode 100644
index 0000000..5b047b4
--- /dev/null
+++ b/test/svsem_test.cpp
@@ -0,0 +1,242 @@
+#include <stdio.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/wait.h>
+#include <sys/mman.h>
+#include <sys/syscall.h>
+#include <linux/futex.h>
+#include <sys/time.h>
+#include "usg_common.h"
+#include <sys/mman.h>
+#include <sys/stat.h>        /* For mode constants */
+#include <fcntl.h>           /* For O_* constants */
+#include "sem_util.h"
+
+
+
+
+
+#include "sem_util.h"
+
+int _get(key_t key, unsigned int value) {
+// printf("==================_get===============================\n");
+  int semid, perms;
+
+  perms = S_IRUSR | S_IWUSR;
+
+  semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms);
+
+  if (semid != -1) { /* Successfully created the semaphore */
+    union semun arg;
+    struct sembuf sop;
+
+    //logger.info("%ld: created semaphore\n", (long)getpid());
+
+    arg.val = 0; /* So initialize it to 0 */
+    if (semctl(semid, 0, SETVAL, arg) == -1)
+      err_exit(errno, "semctl 1");
+    //logger.info("%ld: initialized semaphore\n", (long)getpid());
+
+    /* Perform a "no-op" semaphore operation - changes sem_otime
+       so other processes can see we've initialized the set. */
+
+    sop.sem_num = 0; /* Operate on semaphore 0 */
+    sop.sem_op = value;
+    sop.sem_flg = 0;
+    if (semop(semid, &sop, 1) == -1)
+      err_exit(errno, "semop");
+    //logger.info("%ld: completed dummy semop()\n", (long)getpid());
+
+  } else { /* We didn't create the semaphore set */
+
+    if (errno != EEXIST) { /* Unexpected error from semget() */
+      err_exit(errno, "semget 1");
+
+    } else { /* Someone else already created it */
+      const int MAX_TRIES = 10;
+      int j;
+      union semun arg;
+      struct semid_ds ds;
+
+      semid = semget(key, 1, perms); /* So just get ID */
+      if (semid == -1)
+        err_exit(errno, "semget 2");
+
+     // logger.info("%ld: got semaphore key\n", (long)getpid());
+      /* Wait until another process has called semop() */
+
+      arg.buf = &ds;
+      for (j = 0; j < MAX_TRIES; j++) {
+        //logger.info("Try %d\n", j);
+        if (semctl(semid, 0, IPC_STAT, arg) == -1)
+          err_exit(errno, "semctl 2");
+
+        if (ds.sem_otime != 0) /* Semop() performed? */
+          break;               /* Yes, quit loop */
+        sleep(1);              /* If not, wait and retry */
+      }
+
+      if (ds.sem_otime == 0) /* Loop ran to completion! */
+        err_exit(errno, "Existing semaphore not initialized");
+    }
+  }
+  return semid;
+}
+
+/* Reserve semaphore (blocking), return 0 on success, or -1 with 'errno'
+   set to EINTR if operation was interrupted by a signal handler */
+
+/* Reserve semaphore - decrement it by 1 */
+int _dec(int semId) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = -1;
+  sops.sem_flg = SEM_UNDO;
+
+  while (semop(semId, &sops, 1) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "_dec");
+      return errno;
+    }
+
+  return 0;
+}
+
+int _dec_nowait(int semId) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = -1;
+  sops.sem_flg = IPC_NOWAIT ;
+
+  while (semop(semId, &sops, 1) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "_dec_nowait");
+      return errno;
+    }
+
+  return 0;
+}
+
+int _dec_timeout(const int semId, const struct timespec *timeout) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = -1;
+  sops.sem_flg = 0;
+
+  while (semtimedop(semId, &sops, 1, timeout) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "_dec_timeout");
+      return errno;
+    }
+
+  return 0;
+}
+
+
+/**
+ * If sem_op equals 0, the value of the semaphore is checked to see whether it
+ * currently equals 0. If it does, the operation completes immediately; otherwise,
+ * semop() blocks until the semaphore value becomes 0.
+ */
+int _zero(int semId) {
+// logger.debug("%d: _dec\n", semId);
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = 0;
+  sops.sem_flg = 0;
+
+  while (semop(semId, &sops, 1) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "_zero");
+      return errno;
+    }
+
+  return 0;
+}
+
+
+int _zero_nowait(int semId) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = 0;
+  sops.sem_flg = IPC_NOWAIT;
+
+  while (semop(semId, &sops, 1) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "_zero_nowait");
+      return errno;
+    }
+
+  return 0;
+}
+
+int _zero_timeout(const int semId, const struct timespec *timeout) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = 0;
+  sops.sem_flg = 0;
+
+  while (semtimedop(semId, &sops, 1, timeout) == -1)
+    if (errno != EINTR) {
+      // err_msg(errno, "_zero_timeout");
+      return errno;
+    }
+
+  return 0;
+}
+
+
+/* Release semaphore - increment it by 1 */
+int _inc(int semId) {
+  struct sembuf sops;
+
+  sops.sem_num = 0;
+  sops.sem_op = 1;
+  sops.sem_flg = 0;
+
+  int rv = semop(semId, &sops, 1);
+  if (rv == -1) {
+    // err_msg(errno, "_inc");
+    return errno;
+  }
+  return 0;
+}
+
+void _remove(int semid) {
+  union semun dummy;
+  if (semctl(semid, 0, IPC_RMID, dummy) == -1)
+    err_msg(errno, "_remove");
+}
+
+int _set(int semId, int val) {
+  union semun arg;
+  arg.val = val;
+  if (semctl(semId, 0, SETVAL, arg) == -1) {
+    err_msg(errno, "_set");
+    return errno;
+  }
+  return 0;
+}
+
+
+#define KEY 0x383
+
+int main() {
+	int semid = _get(KEY, 1) ;
+
+	if(_dec(semid) != 0)
+		err_exit(errno, "_dec");
+
+		printf("(%ld) 杩涘叆浜掓枼鍖篭n", (long) getpid());
+		sleep(10);
+
+	if(_inc(semid) != 0)
+		err_exit(errno, "_inc");
+}
\ No newline at end of file
diff --git a/test_net_socket/CMakeLists.txt b/test_net_socket/CMakeLists.txt
index dddf7f8..a08cf09 100644
--- a/test_net_socket/CMakeLists.txt
+++ b/test_net_socket/CMakeLists.txt
@@ -15,11 +15,25 @@
 add_executable(test_net_mod_socket test_net_mod_socket.cpp  ${CMAKE_CURRENT_BINARY_DIR}/net_mod_socket.sh)
 target_link_libraries(test_net_mod_socket PRIVATE shm_queue  ${EXTRA_LIBS} )
 
+
+
 add_executable(heart_beat heart_beat.cpp ${CMAKE_CURRENT_BINARY_DIR}/heart_beat.sh)
 target_link_libraries(heart_beat PRIVATE shm_queue )
 # target_link_libraries(heart_beat PRIVATE shm_queue  ${EXTRA_LIBS} )
 
 
+
+
+
+add_executable(svsem_mon svsem_mon.cpp )
+target_link_libraries(svsem_mon PRIVATE  ${EXTRA_LIBS} )
+target_include_directories(svsem_mon PRIVATE
+                            "${PROJECT_BINARY_DIR}"
+                             ${EXTRA_INCLUDES}
+                            )
+
+
+
 target_include_directories(test_net_mod_socket PRIVATE
                             "${PROJECT_BINARY_DIR}"
                              ${EXTRA_INCLUDES}
diff --git a/test_net_socket/heart_beat.sh b/test_net_socket/heart_beat.sh
index 893a5d3..1fc28a6 100755
--- a/test_net_socket/heart_beat.sh
+++ b/test_net_socket/heart_beat.sh
@@ -1,6 +1,6 @@
 #! /bin/bash
 
-PROCESSES=10
+PROCESSES=4
 function clean() {
 	ipcrm -a
 	ps -ef | grep "heart_beat" | awk  '{print $2}' | xargs -i kill -9 {}
diff --git a/test_net_socket/svsem_mon.cpp b/test_net_socket/svsem_mon.cpp
new file mode 100644
index 0000000..4c51366
--- /dev/null
+++ b/test_net_socket/svsem_mon.cpp
@@ -0,0 +1,42 @@
+#include <sys/types.h>
+#include <sys/sem.h>
+#include <time.h>
+#include "usg_common.h"
+
+int
+main(int argc, char *argv[])
+{
+    struct semid_ds ds;
+    union semun arg, dummy;             /* Fourth argument for semctl() */
+    int semid, j;
+
+    if (argc != 2 || strcmp(argv[1], "--help") == 0)
+        err_exit(0, "%s semid\n", argv[0]);
+
+    semid = atoi(argv[1]);
+
+    arg.buf = &ds;
+    if (semctl(semid, 0, IPC_STAT, arg) == -1)
+        err_exit(errno, "semctl");
+
+    printf("Semaphore changed: %s", ctime(&ds.sem_ctime));
+    printf("Last semop():      %s", ctime(&ds.sem_otime));
+
+    /* Display per-semaphore information */
+
+    arg.array = (short unsigned int*)calloc(ds.sem_nsems, sizeof(arg.array[0]));
+    if (arg.array == NULL)
+        err_exit(errno, "calloc");
+    if (semctl(semid, 0, GETALL, arg) == -1)
+        err_exit(errno, "semctl-GETALL");
+
+    printf("Sem #  Value  SEMPID  SEMNCNT  SEMZCNT\n");
+
+    for (j = 0; j < ds.sem_nsems; j++)
+        printf("%3d   %5d   %5d  %5d    %5d\n", j, arg.array[j],
+                semctl(semid, j, GETPID, dummy),
+                semctl(semid, j, GETNCNT, dummy),
+                semctl(semid, j, GETZCNT, dummy));
+
+    exit(EXIT_SUCCESS);
+}

--
Gitblit v1.8.0