wangzhengquan
2021-01-22 09a82c2ece4caadad0baa0d1f3b84f1506363fdd
update
1个文件已添加
6个文件已修改
1006 ■■■■■ 已修改文件
src/queue/array_lock_free_sem_queue.h 104 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.cpp 320 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/hashtable2.cpp 438 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/mem_pool.h 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm/mm.cpp 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/svsem.cpp 105 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/array_lock_free_sem_queue.h
@@ -75,7 +75,7 @@
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  /// @brief number of elements in the queue
  int m_count;
  uint32_t m_count;
#endif
@@ -200,10 +200,6 @@
}
  template <typename ELEM_T, typename Allocator>
int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data,  const struct timespec *timeout, int flag)
{
@@ -215,28 +211,50 @@
  {
    currentWriteIndex = m_writeIndex;
    currentReadIndex  = m_readIndex;
  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    if (m_count == Q_SIZE) {
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
        }
            
      } else {
        s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
        s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
        }
      }
    }
  #else
    if (currentReadIndex == currentWriteIndex - Q_SIZE  + 1   )
    {
        // the queue is full
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
        }
      } else {
        s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
        }
      }
    }
  #endif
    //保留写入位
  } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
  // We know now that this index is reserved for us. Use it to save the data
@@ -255,10 +273,16 @@
    sched_yield();
  }
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  AtomicAdd(&m_count, 1);
  s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
  if (s  == -1)
  if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
      err_exit(errno, "futex-FUTEX_WAKE");
#else
  if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
      err_exit(errno, "futex-FUTEX_WAKE");
#endif
  return 0;
}
@@ -268,15 +292,16 @@
{
  uint32_t currentMaximumReadIndex;
  uint32_t currentReadIndex;
  int s;
  do
  {
    // to ensure thread-safety when there is more than 1 producer thread
    // a second index is defined (m_maximumReadIndex)
    currentReadIndex        = m_readIndex;
    currentMaximumReadIndex = m_maximumReadIndex;
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
  #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    if (m_count == 0) {
@@ -284,28 +309,45 @@
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
        s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
        }
            
      } else {
        s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
        s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
        }
      }
    }
#else
    if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
  #else
    if (currentReadIndex == currentMaximumReadIndex)
    {
      // the queue is empty or
      // a producer thread has allocate space in the queue but is
      // waiting to commit the data into it
      return -1;
      if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
        return -1;
      else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
        const struct timespec ts = TimeUtil::trim_time(timeout);
        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0);
        if (s == -1 && errno != EAGAIN &&  errno != EINTR) {
          // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
          return -1;
        }
      } else {
        s = futex((int *)&currentMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
        if (s == -1 && errno != EAGAIN && errno != EINTR) {
          return -1;
        }
      }
    }
#endif
  #endif
    // retrieve the data from the queue
    a_data = m_theQueue[countToIndex(currentReadIndex)];
@@ -315,14 +357,16 @@
    // increased it
    if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
    {
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
    #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
      // m_count.fetch_sub(1);
      AtomicSub(&m_count, 1);
#endif
      s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
      if (s  == -1)
      if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
        err_exit(errno, "futex-FUTEX_WAKE");
    #else
      if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
        err_exit(errno, "futex-FUTEX_WAKE");
    #endif
      return 0;
    }
@@ -342,13 +386,13 @@
  template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
  int currentCount = m_count;
  // int currentCount = m_count;
  uint32_t currentReadIndex = m_readIndex;
  if (i >= currentCount)
  {
    std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
    std::exit(EXIT_FAILURE);
  }
  // if (i >= currentCount)
  // {
  //   std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
  //   std::exit(EXIT_FAILURE);
  // }
  return m_theQueue[countToIndex(currentReadIndex+i)];
}
src/shm/hashtable.cpp
@@ -24,29 +24,27 @@
static size_t hashcode(int key);
static struct timespec TIMEOUT = {2, 0};
void hashtable_init(hashtable_t *hashtable )
{
  memset(hashtable, 0, sizeof(hashtable_t));
  hashtable->mutex = svsem_get(IPC_PRIVATE, 1);
  hashtable->wlock = svsem_get(IPC_PRIVATE, 1);
  hashtable->cond = svsem_get(IPC_PRIVATE, 1);
  hashtable->readcnt = 0;
  // hashtable->wlock = svsem_get(IPC_PRIVATE, 1);
  // hashtable->cond = svsem_get(IPC_PRIVATE, 1);
  // hashtable->readcnt = 0;
  FILE * semfile = fopen("./sem.txt", "w+");
  if(semfile == NULL) {
    err_exit(errno, "fopen");
  }
  fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex);
  fclose(semfile);
  // FILE * semfile = fopen("./sem.txt", "w+");
  // if(semfile == NULL) {
  //   err_exit(errno, "fopen");
  // }
  // fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex);
  // fclose(semfile);
}
void hashtable_destroy(hashtable_t *hashtable) {
  svsem_remove( hashtable->mutex);
  svsem_remove( hashtable->wlock);
  svsem_remove( hashtable->cond);
  // svsem_remove( hashtable->wlock);
  // svsem_remove( hashtable->cond);
}
@@ -111,17 +109,17 @@
  void *oldvalue;
  int rv;
  if( (rv = svsem_wait(hashtable->wlock)) != 0) {
  if( (rv = svsem_wait(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
  }
  tailq_header_t *my_tailq_head = hashtable->array[code] ;
  if ( my_tailq_head == NULL)
  {
    svsem_post(hashtable->wlock);
    if((rv = svsem_post(hashtable->mutex)) != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
    }
    return NULL;
  }
  else
  {
  } else {
    for (item = TAILQ_FIRST(my_tailq_head); item != NULL; item = TAILQ_NEXT(item, joint))
    {
      if (key == item->key)
@@ -132,197 +130,39 @@
        /* mm_free the item as we don't need it anymore. */
        mm_free(item);
        svsem_post(hashtable->wlock);
        svsem_post(hashtable->mutex);
        return oldvalue;
      }
    }
  }
  if((rv = svsem_post(hashtable->wlock)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
  }
  return NULL;
}
void hashtable_removeall(hashtable_t *hashtable)
{
  tailq_entry_t *item;
  int rv;
  rv = svsem_wait(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
  }
  for (int i = 0; i < MAPSIZE; i++)
  {
    tailq_header_t *my_tailq_head = hashtable->array[i] ;
    if (my_tailq_head == NULL )
      continue;
    while ((item = TAILQ_FIRST(my_tailq_head)) )
    {
      TAILQ_REMOVE(my_tailq_head, item, joint);
      mm_free(item);
    if((rv = svsem_post(hashtable->mutex)) != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
    }
    mm_free(my_tailq_head);
    hashtable->array[i] = NULL;
  }
  rv = svsem_post(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
    return NULL;
  }
}
/**
 * for debug
 */
void hashtable_printall(hashtable_t *hashtable)
{
  tailq_entry_t *item;
  for (int i = 0; i < MAPSIZE; i++)
  {
    tailq_header_t *my_tailq_head = hashtable->array[i] ;
    if (my_tailq_head == NULL )
      continue;
    printf("code=%d\n", i);
    TAILQ_FOREACH(item, my_tailq_head, joint)
    {
      printf("%d:%s\n", item->key, (char *)item->value);
    }
    printf("\n");
  }
}
static size_t hashcode(int key)
{
  return key % MAPSIZE;
  /*printf("hashfun = %ld\n", code);*/
}
void *hashtable_get(hashtable_t *hashtable, int key) {
LoggerFactory::getLogger()->debug( "==========hashtable_get before 1");
  int rv;
  rv = svsem_wait(hashtable->mutex);
LoggerFactory::getLogger()->debug( "==========hashtable_get before 2");
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get 1");
  }
  hashtable->readcnt++;
  if (hashtable->readcnt == 1) {
    //获取读写锁
LoggerFactory::getLogger()->debug( "==========hashtable_get before 3");
    rv = svsem_wait(hashtable->wlock);
LoggerFactory::getLogger()->debug( "==========hashtable_get before 4");
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_get 2");
    }
  }
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get 3");
  }
  // ================
  void * res = _hashtable_get(hashtable, key);
  // ==================
  rv = svsem_wait(hashtable->mutex);
LoggerFactory::getLogger()->debug( "==========hashtable_get before 5");
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get 4");
  }
  hashtable->readcnt--;
  if(hashtable->readcnt == 0) {
    //释放读写锁
    rv = svsem_post(hashtable->wlock);
LoggerFactory::getLogger()->debug( "==========hashtable_get before 6");
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_get 5");
    }
    //通知写
    rv = svsem_set(hashtable->cond, 1);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_get 6");
    }
  }
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get 7");
  }
LoggerFactory::getLogger()->debug( "==========hashtable_get after");
  return res;
}
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
  int rv;
LoggerFactory::getLogger()->debug( "==========hashtable_put before 1");
  rv = svsem_wait(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
LoggerFactory::getLogger()->debug( "==========hashtable_put before 2");
  // 设置读优先级高
  while (hashtable->readcnt > 0)
  {
    rv = svsem_set(hashtable->cond, 0);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
    rv = svsem_post(hashtable->mutex);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
    //等待写通知
LoggerFactory::getLogger()->debug( "==========hashtable_put before 3");
    rv = svsem_wait(hashtable->cond);
LoggerFactory::getLogger()->debug( "==========hashtable_put before 4");
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
    rv = svsem_wait(hashtable->mutex);
LoggerFactory::getLogger()->debug( "==========hashtable_put before 5");
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
  }
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
  //获取读写锁
  rv = svsem_wait(hashtable->wlock);
LoggerFactory::getLogger()->debug( "==========hashtable_put before 6");
  if(rv != 0) {
  if(( rv = svsem_wait(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
  _hashtable_put(hashtable, key, value);
  //释放读写锁
  rv = svsem_post(hashtable->wlock);
  if(rv != 0) {
  if(( rv = svsem_post(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
  LoggerFactory::getLogger()->debug( "==========hashtable_put after");
}
@@ -344,56 +184,7 @@
void hashtable_foreach(hashtable_t *hashtable,  std::function<void(int, void *)>  cb) {
  int rv;
  LoggerFactory::getLogger()->debug("===hashtable_foreach before 1\n");
  rv = svsem_wait(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
  }
  hashtable->readcnt++;
  if (hashtable->readcnt == 1) {
    //获取读写锁
    rv = svsem_wait(hashtable->wlock);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
    }
  }
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
  }
  // ==================
  _hashtable_foreach(hashtable, cb);
  // ==================
  rv = svsem_wait(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
  }
  hashtable->readcnt--;
  if(hashtable->readcnt == 0) {
    //释放读写锁
    rv = svsem_post(hashtable->wlock);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
    }
    //通知写
    rv = svsem_set(hashtable->cond, 1);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
    }
  }
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
  }
 return _hashtable_foreach(hashtable, cb);
}
@@ -419,7 +210,7 @@
int hashtable_alloc_key(hashtable_t *hashtable) {
  int rv;
  int key = START_KEY;
  rv = svsem_wait(hashtable->wlock);
  rv = svsem_wait(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
  }
@@ -430,9 +221,68 @@
  // 占用key
  _hashtable_put(hashtable, key, (void *)1);
  rv = svsem_post(hashtable->wlock);
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
  }
  return key;
}
void hashtable_removeall(hashtable_t *hashtable)
{
  tailq_entry_t *item;
  int rv;
  if( (rv = svsem_wait(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
  }
  for (int i = 0; i < MAPSIZE; i++)
  {
    tailq_header_t *my_tailq_head = hashtable->array[i] ;
    if (my_tailq_head == NULL )
      continue;
    while ((item = TAILQ_FIRST(my_tailq_head)) )
    {
      TAILQ_REMOVE(my_tailq_head, item, joint);
      mm_free(item);
    }
    mm_free(my_tailq_head);
    hashtable->array[i] = NULL;
  }
  if((rv = svsem_post(hashtable->mutex)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
  }
}
static size_t hashcode(int key)
{
  return key % MAPSIZE;
  /*printf("hashfun = %ld\n", code);*/
}
/**
 * for debug
 */
static void hashtable_printall(hashtable_t *hashtable)
{
  tailq_entry_t *item;
  for (int i = 0; i < MAPSIZE; i++)
  {
    tailq_header_t *my_tailq_head = hashtable->array[i] ;
    if (my_tailq_head == NULL )
      continue;
    printf("code=%d\n", i);
    TAILQ_FOREACH(item, my_tailq_head, joint)
    {
      printf("%d:%s\n", item->key, (char *)item->value);
    }
    printf("\n");
  }
}
src/shm/hashtable.h
@@ -11,9 +11,9 @@
{
 struct tailq_header_t* array[MAPSIZE];
 int mutex;
 int wlock;
 int cond;
 size_t readcnt;
 // int wlock;
 // int cond;
 // size_t readcnt;
} hashtable_t;
typedef void (*hashtable_foreach_cb)(int key, void *value);
src/shm/hashtable2.cpp
New file
@@ -0,0 +1,438 @@
#include "usg_common.h"
#include "hashtable.h"
#include "mm.h"
#include "svsem.h"
#include "logger_factory.h"
#include <set>
#include <functional>
typedef struct tailq_entry_t
{
  void *value;
  int key;
  /*
   * This holds the pointers to the next and previous joint in
   * the tail queue.
   */
  TAILQ_ENTRY(tailq_entry_t) joint;
} tailq_entry_t;
#define START_KEY 1000
typedef  TAILQ_HEAD(tailq_header_t, tailq_entry_t) tailq_header_t;
static size_t hashcode(int key);
static struct timespec TIMEOUT = {2, 0};
void hashtable_init(hashtable_t *hashtable )
{
  memset(hashtable, 0, sizeof(hashtable_t));
  hashtable->mutex = svsem_get(IPC_PRIVATE, 1);
  hashtable->wlock = svsem_get(IPC_PRIVATE, 1);
  hashtable->cond = svsem_get(IPC_PRIVATE, 1);
  hashtable->readcnt = 0;
  FILE * semfile = fopen("./sem.txt", "w+");
  if(semfile == NULL) {
    err_exit(errno, "fopen");
  }
  fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex);
  fclose(semfile);
}
void hashtable_destroy(hashtable_t *hashtable) {
  svsem_remove( hashtable->mutex);
  svsem_remove( hashtable->wlock);
  svsem_remove( hashtable->cond);
}
static inline void *_hashtable_get(hashtable_t *hashtable, int key)
{
  size_t code = hashcode(key);
  tailq_entry_t *item;
  tailq_header_t *my_tailq_head = hashtable->array[code] ;
  if ( my_tailq_head == NULL)
  {
    return NULL;
  }
  else
  {
    TAILQ_FOREACH(item, my_tailq_head, joint)
    {
      if (key == item->key)
        return item->value;
    }
  }
  return NULL;
}
static inline void * _hashtable_put(hashtable_t *hashtable, int key, void *value)
{
  size_t code = hashcode(key);
  void *oldvalue;
  tailq_entry_t *item;
  tailq_header_t *my_tailq_head = hashtable->array[code] ;
  if ( my_tailq_head == NULL)
  {
    my_tailq_head  = (tailq_header_t*) mm_malloc(sizeof(tailq_header_t ));
    TAILQ_INIT(my_tailq_head);
    hashtable->array[code] = my_tailq_head;
    goto putnew;
  }
  TAILQ_FOREACH(item, my_tailq_head, joint)
  {
    if (key ==item->key)
    {
      oldvalue = item -> value;
      item->key= key;
      item -> value = value;
      return oldvalue;
    }
  }
putnew:
  item = (tailq_entry_t *) mm_malloc(sizeof(tailq_entry_t));
  item->key = key;
  item -> value = value;
  TAILQ_INSERT_TAIL(my_tailq_head, item, joint);
  return NULL;
}
void *hashtable_remove(hashtable_t *hashtable, int key)
{
  size_t code = hashcode(key);
  tailq_entry_t *item;
  void *oldvalue;
  int rv;
  if( (rv = svsem_wait(hashtable->wlock)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
  }
  tailq_header_t *my_tailq_head = hashtable->array[code] ;
  if ( my_tailq_head == NULL)
  {
    svsem_post(hashtable->wlock);
    return NULL;
  }
  else
  {
    for (item = TAILQ_FIRST(my_tailq_head); item != NULL; item = TAILQ_NEXT(item, joint))
    {
      if (key == item->key)
      {
        oldvalue = item->value;
        /* Remove the item from the tail queue. */
        TAILQ_REMOVE(my_tailq_head, item, joint);
        /* mm_free the item as we don't need it anymore. */
        mm_free(item);
        svsem_post(hashtable->wlock);
        return oldvalue;
      }
    }
  }
  if((rv = svsem_post(hashtable->wlock)) != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
  }
  return NULL;
}
void hashtable_removeall(hashtable_t *hashtable)
{
  tailq_entry_t *item;
  int rv;
  rv = svsem_wait(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
  }
  for (int i = 0; i < MAPSIZE; i++)
  {
    tailq_header_t *my_tailq_head = hashtable->array[i] ;
    if (my_tailq_head == NULL )
      continue;
    while ((item = TAILQ_FIRST(my_tailq_head)) )
    {
      TAILQ_REMOVE(my_tailq_head, item, joint);
      mm_free(item);
    }
    mm_free(my_tailq_head);
    hashtable->array[i] = NULL;
  }
  rv = svsem_post(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
  }
}
/**
 * for debug
 */
void hashtable_printall(hashtable_t *hashtable)
{
  tailq_entry_t *item;
  for (int i = 0; i < MAPSIZE; i++)
  {
    tailq_header_t *my_tailq_head = hashtable->array[i] ;
    if (my_tailq_head == NULL )
      continue;
    printf("code=%d\n", i);
    TAILQ_FOREACH(item, my_tailq_head, joint)
    {
      printf("%d:%s\n", item->key, (char *)item->value);
    }
    printf("\n");
  }
}
static size_t hashcode(int key)
{
  return key % MAPSIZE;
  /*printf("hashfun = %ld\n", code);*/
}
void *hashtable_get(hashtable_t *hashtable, int key) {
LoggerFactory::getLogger()->debug( "==========hashtable_get before 1");
  int rv;
  rv = svsem_wait(hashtable->mutex);
LoggerFactory::getLogger()->debug( "==========hashtable_get before 2");
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get 1");
  }
  hashtable->readcnt++;
  if (hashtable->readcnt == 1) {
    //获取读写锁
LoggerFactory::getLogger()->debug( "==========hashtable_get before 3");
    rv = svsem_wait(hashtable->wlock);
LoggerFactory::getLogger()->debug( "==========hashtable_get before 4");
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_get 2");
    }
  }
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get 3");
  }
  // ================
  void * res = _hashtable_get(hashtable, key);
  // ==================
  rv = svsem_wait(hashtable->mutex);
LoggerFactory::getLogger()->debug( "==========hashtable_get before 5");
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get 4");
  }
  hashtable->readcnt--;
  if(hashtable->readcnt == 0) {
    //释放读写锁
    rv = svsem_post(hashtable->wlock);
LoggerFactory::getLogger()->debug( "==========hashtable_get before 6");
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_get 5");
    }
    //通知写
    rv = svsem_set(hashtable->cond, 1);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_get 6");
    }
  }
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_get 7");
  }
LoggerFactory::getLogger()->debug( "==========hashtable_get after");
  return res;
}
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
  int rv;
LoggerFactory::getLogger()->debug( "==========hashtable_put before 1");
  rv = svsem_wait(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
LoggerFactory::getLogger()->debug( "==========hashtable_put before 2");
  // 设置读优先级高
  while (hashtable->readcnt > 0)
  {
    rv = svsem_set(hashtable->cond, 0);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
    rv = svsem_post(hashtable->mutex);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
    //等待写通知
LoggerFactory::getLogger()->debug( "==========hashtable_put before 3");
    rv = svsem_wait(hashtable->cond);
LoggerFactory::getLogger()->debug( "==========hashtable_put before 4");
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
    rv = svsem_wait(hashtable->mutex);
LoggerFactory::getLogger()->debug( "==========hashtable_put before 5");
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
    }
  }
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
  //获取读写锁
  rv = svsem_wait(hashtable->wlock);
LoggerFactory::getLogger()->debug( "==========hashtable_put before 6");
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
  _hashtable_put(hashtable, key, value);
  //释放读写锁
  rv = svsem_post(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
  }
  LoggerFactory::getLogger()->debug( "==========hashtable_put after");
}
static inline void _hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)>  cb) {
  tailq_entry_t *item;
  for (int i = 0; i < MAPSIZE; i++) {
    tailq_header_t *my_tailq_head = hashtable->array[i] ;
    if (my_tailq_head == NULL )
      continue;
    TAILQ_FOREACH(item, my_tailq_head, joint)
    {
      cb(item->key,  item -> value);
    }
  }
}
void hashtable_foreach(hashtable_t *hashtable,  std::function<void(int, void *)>  cb) {
  int rv;
  LoggerFactory::getLogger()->debug("===hashtable_foreach before 1\n");
  rv = svsem_wait(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
  }
  hashtable->readcnt++;
  if (hashtable->readcnt == 1) {
    //获取读写锁
    rv = svsem_wait(hashtable->wlock);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
    }
  }
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
  }
  // ==================
  _hashtable_foreach(hashtable, cb);
  // ==================
  rv = svsem_wait(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
  }
  hashtable->readcnt--;
  if(hashtable->readcnt == 0) {
    //释放读写锁
    rv = svsem_post(hashtable->wlock);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
    }
    //通知写
    rv = svsem_set(hashtable->cond, 1);
    if(rv != 0) {
      LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
    }
  }
  rv = svsem_post(hashtable->mutex);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
  }
}
std::set<int> * hashtable_keyset(hashtable_t *hashtable) {
  std::set<int> *keyset = new std::set<int>;
  tailq_entry_t *item;
  for (int i = 0; i < MAPSIZE; i++) {
    tailq_header_t *my_tailq_head = hashtable->array[i] ;
    if (my_tailq_head == NULL )
      continue;
    TAILQ_FOREACH(item, my_tailq_head, joint)
    {
      keyset->insert(item->key);
    }
  }
  return keyset;
}
int hashtable_alloc_key(hashtable_t *hashtable) {
  int rv;
  int key = START_KEY;
  rv = svsem_wait(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
  }
  while(_hashtable_get(hashtable, key) != NULL) {
    key++;
  }
  // 占用key
  _hashtable_put(hashtable, key, (void *)1);
  rv = svsem_post(hashtable->wlock);
  if(rv != 0) {
    LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
  }
  return key;
}
src/shm/mem_pool.h
@@ -4,42 +4,25 @@
#include "sem_util.h"
#define MEM_POOL_COND_KEY 0x8801
static int mem_pool_cond  = SemUtil::get(MEM_POOL_COND_KEY, 0);
// static int mem_pool_mutex  = SemUtil::get(MEM_POOL_COND_KEY, 1);
static inline void mem_pool_init(size_t heap_size) {
    if(mm_init(heap_size)) {
    }
    mm_init(heap_size);
}
static inline void mem_pool_destroy(void) {
    if(mm_destroy()) {
        SemUtil::remove(mem_pool_cond);
    }
    mm_destroy();
    
}
static inline void *mem_pool_malloc (size_t size) {
    void *ptr;
    while( (ptr = mm_malloc(size)) == NULL ) {
        err_msg(0, "There is not enough memery to allocate, waiting someone else to free.");
        SemUtil::set(mem_pool_cond, 0);
        // wait for someone else to free space
        SemUtil::dec(mem_pool_cond);
    }
    return ptr;
    return  mm_malloc(size);
}
static inline void mem_pool_free (void *ptr) {
    mm_free(ptr);
    // notify malloc
    SemUtil::set(mem_pool_cond, 1);
}
src/shm/mm.cpp
@@ -229,6 +229,7 @@
/*
 * mm_init - Initialize the memory manager, M unit
 * @return 是否第一次创建
 */
bool mm_init(size_t heap_size)
{
@@ -301,16 +302,17 @@
  return first;
}
/**
 * @return 是否真正销毁成功
 */
bool mm_destroy(void) {
  struct shmid_ds shmid_ds;
  
  SemUtil::dec(mutex);
  
  if(shmctl(shmid, IPC_STAT, &shmid_ds) == 0) {
    //LoggerFactory::getLogger()->debug("shm_nattch=%d\n", shmid_ds.shm_nattch);
    // 只有当前一个进程attach到共享内存上
    // 多个进程attach在共享内存上
    if (shmid_ds.shm_nattch > 1) {
      //detache
      if (shmdt(shmp) == -1) {
@@ -319,7 +321,7 @@
      SemUtil::inc(mutex);
      return false;
    } else  {
      // 只有当前一个进程attach到共享内存上
      hashtable_destroy(hashtable);
      //detache
      if (shmdt(shmp) == -1) {
src/svsem.cpp
@@ -6,7 +6,7 @@
  perms = S_IRUSR | S_IWUSR;
  semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms);
  semid = semget(key, 2, IPC_CREAT | IPC_EXCL | perms);
  if (semid != -1) { /* Successfully created the semaphore */
    union semun arg;
@@ -17,6 +17,10 @@
    arg.val = 0; /* So initialize it to 0 */
    if (semctl(semid, 0, SETVAL, arg) == -1)
      err_exit(errno, "semctl 1");
    arg.val = 1;
    if (semctl(semid, 1, SETVAL, arg) == -1)
      err_exit(errno, "semctl 2");
    //logger.info("%ld: initialized semaphore\n", (long)getpid());
    /* Perform a "no-op" semaphore operation - changes sem_otime
@@ -135,6 +139,63 @@
}
int svsem_cond_wait(int semid ){
  struct sembuf sops[2];
  union semun arg;
  arg.val = 1;
  if (semctl(semid, 1, SETVAL, arg) == -1) {
    err_msg(errno, "svsem_set");
    return errno;
  }
  //释放mutex
  sops[0].sem_num = 0;
  sops[0].sem_op = 1;
  sops[0].sem_flg = 0;
  // 等待cond
  sops[1].sem_num = 1;
  sops[1].sem_op = 0;
  sops[1].sem_flg = 0;
  while (semop(semid, sops, 2) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "Svsvsem_dec");
      return errno;
    }
   //重新获取mutex
  sops[0].sem_num = 0;
  sops[0].sem_op = -1;
  sops[0].sem_flg = 0;
  while (semop(semid, sops, 1) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "Svsvsem_dec");
      return errno;
    }
  return 0;
}
int svsem_cond_signal(int semid ){
  union semun arg;
  arg.val = 0;
  if (semctl(semid, 1, SETVAL, arg) == -1) {
    err_msg(errno, "svsem_set");
    return errno;
  }
  return 0;
}
/**
 * If sem_op equals 0, the value of the semaphore is checked to see whether it
 * currently equals 0. If it does, the operation completes immediately; otherwise,
@@ -213,46 +274,4 @@
int svsem_cond_wait(int semid ){
  struct sembuf sops[2];
  //释放mutex
  sops[0].sem_num = 0;
  sops[0].sem_op = 1;
  sops[0].sem_flg = 0;
  // 等待cond
  sops[1].sem_num = 1;
  sops[1].sem_op = -1;
  sops[1].sem_flg = 0;
  while (semop(semid, sops, 2) == -1)
    if (errno != EINTR) {
      // err_msg(errno, "Svsvsem_dec");
      return errno;
    }
  return 0;
}
int svsem_cond_signal(int semid ){
  struct sembuf sops;
  // 通知等待cond的进程
  sops.sem_num = 1;
  sops.sem_op = 1;
  sops.sem_flg = 0;
  int rv = semop(semid, &sops, 1);
  if (rv == -1) {
    // err_msg(errno, "Svsvsem_inc");
    return errno;
  }
  return 0;
}