From 09a82c2ece4caadad0baa0d1f3b84f1506363fdd Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期五, 22 一月 2021 11:58:33 +0800
Subject: [PATCH] update
---
src/shm/hashtable2.cpp | 438 +++++++++++++++++++++++++
src/queue/array_lock_free_sem_queue.h | 104 ++++-
src/shm/mm.cpp | 10
src/shm/hashtable.h | 6
src/svsem.cpp | 105 +++--
src/shm/mem_pool.h | 23 -
src/shm/hashtable.cpp | 320 +++++-------------
7 files changed, 671 insertions(+), 335 deletions(-)
diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h
index bb213e8..69630d9 100644
--- a/src/queue/array_lock_free_sem_queue.h
+++ b/src/queue/array_lock_free_sem_queue.h
@@ -75,7 +75,7 @@
#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
/// @brief number of elements in the queue
- int m_count;
+ uint32_t m_count;
#endif
@@ -200,10 +200,6 @@
}
-
-
-
-
template <typename ELEM_T, typename Allocator>
int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag)
{
@@ -215,28 +211,50 @@
{
currentWriteIndex = m_writeIndex;
currentReadIndex = m_readIndex;
-
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
if (m_count == Q_SIZE) {
if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
return -1;
else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
const struct timespec ts = TimeUtil::trim_time(timeout);
- s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
+ s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
// err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
return -1;
}
} else {
- s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
+ s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
return -1;
}
}
}
+ #else
+ if (currentReadIndex == currentWriteIndex - Q_SIZE + 1 )
+ {
+ // the queue is full
+ if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+ return -1;
+ else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+ const struct timespec ts = TimeUtil::trim_time(timeout);
+ s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+ return -1;
+ }
+
+ } else {
+ s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, NULL, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ return -1;
+ }
+ }
+ }
+ #endif
-
+ //淇濈暀鍐欏叆浣�
} while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
// We know now that this index is reserved for us. Use it to save the data
@@ -255,10 +273,16 @@
sched_yield();
}
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
AtomicAdd(&m_count, 1);
- s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
- if (s == -1)
+
+ if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
err_exit(errno, "futex-FUTEX_WAKE");
+#else
+ if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1)
+ err_exit(errno, "futex-FUTEX_WAKE");
+#endif
+
return 0;
}
@@ -268,15 +292,16 @@
{
uint32_t currentMaximumReadIndex;
uint32_t currentReadIndex;
-
int s;
+
do
{
// to ensure thread-safety when there is more than 1 producer thread
// a second index is defined (m_maximumReadIndex)
currentReadIndex = m_readIndex;
currentMaximumReadIndex = m_maximumReadIndex;
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
if (m_count == 0) {
@@ -284,28 +309,45 @@
return -1;
else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
const struct timespec ts = TimeUtil::trim_time(timeout);
- s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
+ s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
// err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
return -1;
}
} else {
- s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
+ s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0);
if (s == -1 && errno != EAGAIN && errno != EINTR) {
return -1;
}
}
}
-#else
- if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+
+ #else
+
+ if (currentReadIndex == currentMaximumReadIndex)
{
// the queue is empty or
// a producer thread has allocate space in the queue but is
// waiting to commit the data into it
- return -1;
+ if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT)
+ return -1;
+ else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) {
+ const struct timespec ts = TimeUtil::trim_time(timeout);
+ s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT");
+ return -1;
+ }
+
+ } else {
+ s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0);
+ if (s == -1 && errno != EAGAIN && errno != EINTR) {
+ return -1;
+ }
+ }
}
-#endif
+ #endif
// retrieve the data from the queue
a_data = m_theQueue[countToIndex(currentReadIndex)];
@@ -315,14 +357,16 @@
// increased it
if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
{
-#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
// m_count.fetch_sub(1);
AtomicSub(&m_count, 1);
-#endif
-
- s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0);
- if (s == -1)
+ if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
err_exit(errno, "futex-FUTEX_WAKE");
+ #else
+ if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1)
+ err_exit(errno, "futex-FUTEX_WAKE");
+ #endif
+
return 0;
}
@@ -342,13 +386,13 @@
template <typename ELEM_T, typename Allocator>
ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i)
{
- int currentCount = m_count;
+ // int currentCount = m_count;
uint32_t currentReadIndex = m_readIndex;
- if (i >= currentCount)
- {
- std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
- std::exit(EXIT_FAILURE);
- }
+ // if (i >= currentCount)
+ // {
+ // std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
+ // std::exit(EXIT_FAILURE);
+ // }
return m_theQueue[countToIndex(currentReadIndex+i)];
}
diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp
index 701bb17..60633bc 100755
--- a/src/shm/hashtable.cpp
+++ b/src/shm/hashtable.cpp
@@ -24,29 +24,27 @@
static size_t hashcode(int key);
-static struct timespec TIMEOUT = {2, 0};
-
void hashtable_init(hashtable_t *hashtable )
{
memset(hashtable, 0, sizeof(hashtable_t));
hashtable->mutex = svsem_get(IPC_PRIVATE, 1);
- hashtable->wlock = svsem_get(IPC_PRIVATE, 1);
- hashtable->cond = svsem_get(IPC_PRIVATE, 1);
- hashtable->readcnt = 0;
+ // hashtable->wlock = svsem_get(IPC_PRIVATE, 1);
+ // hashtable->cond = svsem_get(IPC_PRIVATE, 1);
+ // hashtable->readcnt = 0;
- FILE * semfile = fopen("./sem.txt", "w+");
- if(semfile == NULL) {
- err_exit(errno, "fopen");
- }
- fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex);
- fclose(semfile);
+ // FILE * semfile = fopen("./sem.txt", "w+");
+ // if(semfile == NULL) {
+ // err_exit(errno, "fopen");
+ // }
+ // fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex);
+ // fclose(semfile);
}
void hashtable_destroy(hashtable_t *hashtable) {
svsem_remove( hashtable->mutex);
- svsem_remove( hashtable->wlock);
- svsem_remove( hashtable->cond);
+ // svsem_remove( hashtable->wlock);
+ // svsem_remove( hashtable->cond);
}
@@ -111,17 +109,17 @@
void *oldvalue;
int rv;
- if( (rv = svsem_wait(hashtable->wlock)) != 0) {
+ if( (rv = svsem_wait(hashtable->mutex)) != 0) {
LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
}
tailq_header_t *my_tailq_head = hashtable->array[code] ;
if ( my_tailq_head == NULL)
{
- svsem_post(hashtable->wlock);
+ if((rv = svsem_post(hashtable->mutex)) != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
+ }
return NULL;
- }
- else
- {
+ } else {
for (item = TAILQ_FIRST(my_tailq_head); item != NULL; item = TAILQ_NEXT(item, joint))
{
if (key == item->key)
@@ -132,197 +130,39 @@
/* mm_free the item as we don't need it anymore. */
mm_free(item);
- svsem_post(hashtable->wlock);
+ svsem_post(hashtable->mutex);
return oldvalue;
}
}
- }
- if((rv = svsem_post(hashtable->wlock)) != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
- }
- return NULL;
-
-}
-
-
-
-
-
-void hashtable_removeall(hashtable_t *hashtable)
-{
- tailq_entry_t *item;
- int rv;
- rv = svsem_wait(hashtable->wlock);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
- }
- for (int i = 0; i < MAPSIZE; i++)
- {
- tailq_header_t *my_tailq_head = hashtable->array[i] ;
-
- if (my_tailq_head == NULL )
- continue;
-
- while ((item = TAILQ_FIRST(my_tailq_head)) )
- {
- TAILQ_REMOVE(my_tailq_head, item, joint);
- mm_free(item);
+ if((rv = svsem_post(hashtable->mutex)) != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
}
- mm_free(my_tailq_head);
- hashtable->array[i] = NULL;
- }
- rv = svsem_post(hashtable->wlock);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
+
+ return NULL;
}
}
-/**
- * for debug
- */
-void hashtable_printall(hashtable_t *hashtable)
-{
- tailq_entry_t *item;
- for (int i = 0; i < MAPSIZE; i++)
- {
- tailq_header_t *my_tailq_head = hashtable->array[i] ;
- if (my_tailq_head == NULL )
- continue;
- printf("code=%d\n", i);
- TAILQ_FOREACH(item, my_tailq_head, joint)
- {
- printf("%d:%s\n", item->key, (char *)item->value);
- }
- printf("\n");
- }
-}
-
-static size_t hashcode(int key)
-{
-
- return key % MAPSIZE;
- /*printf("hashfun = %ld\n", code);*/
-}
void *hashtable_get(hashtable_t *hashtable, int key) {
-LoggerFactory::getLogger()->debug( "==========hashtable_get before 1");
-
- int rv;
- rv = svsem_wait(hashtable->mutex);
-
-LoggerFactory::getLogger()->debug( "==========hashtable_get before 2");
-
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_get 1");
- }
- hashtable->readcnt++;
- if (hashtable->readcnt == 1) {
- //鑾峰彇璇诲啓閿�
-LoggerFactory::getLogger()->debug( "==========hashtable_get before 3");
- rv = svsem_wait(hashtable->wlock);
-LoggerFactory::getLogger()->debug( "==========hashtable_get before 4");
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_get 2");
- }
- }
- rv = svsem_post(hashtable->mutex);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_get 3");
- }
- // ================
-
void * res = _hashtable_get(hashtable, key);
-
- // ==================
-
- rv = svsem_wait(hashtable->mutex);
-LoggerFactory::getLogger()->debug( "==========hashtable_get before 5");
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_get 4");
- }
- hashtable->readcnt--;
- if(hashtable->readcnt == 0) {
- //閲婃斁璇诲啓閿�
-
- rv = svsem_post(hashtable->wlock);
-LoggerFactory::getLogger()->debug( "==========hashtable_get before 6");
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_get 5");
- }
- //閫氱煡鍐�
- rv = svsem_set(hashtable->cond, 1);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_get 6");
- }
- }
-
- rv = svsem_post(hashtable->mutex);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_get 7");
- }
-LoggerFactory::getLogger()->debug( "==========hashtable_get after");
return res;
}
void hashtable_put(hashtable_t *hashtable, int key, void *value) {
int rv;
-LoggerFactory::getLogger()->debug( "==========hashtable_put before 1");
- rv = svsem_wait(hashtable->mutex);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
- }
-LoggerFactory::getLogger()->debug( "==========hashtable_put before 2");
- // 璁剧疆璇讳紭鍏堢骇楂�
- while (hashtable->readcnt > 0)
- {
- rv = svsem_set(hashtable->cond, 0);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
- }
- rv = svsem_post(hashtable->mutex);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
- }
- //绛夊緟鍐欓�氱煡
-LoggerFactory::getLogger()->debug( "==========hashtable_put before 3");
- rv = svsem_wait(hashtable->cond);
-LoggerFactory::getLogger()->debug( "==========hashtable_put before 4");
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
- }
-
-
- rv = svsem_wait(hashtable->mutex);
-LoggerFactory::getLogger()->debug( "==========hashtable_put before 5");
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
- }
- }
- rv = svsem_post(hashtable->mutex);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
- }
-
- //鑾峰彇璇诲啓閿�
- rv = svsem_wait(hashtable->wlock);
-LoggerFactory::getLogger()->debug( "==========hashtable_put before 6");
- if(rv != 0) {
+ if(( rv = svsem_wait(hashtable->mutex)) != 0) {
LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
}
_hashtable_put(hashtable, key, value);
- //閲婃斁璇诲啓閿�
- rv = svsem_post(hashtable->wlock);
- if(rv != 0) {
+ if(( rv = svsem_post(hashtable->mutex)) != 0) {
LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
}
-
- LoggerFactory::getLogger()->debug( "==========hashtable_put after");
}
@@ -344,56 +184,7 @@
void hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb) {
- int rv;
-
-
- LoggerFactory::getLogger()->debug("===hashtable_foreach before 1\n");
-
-
- rv = svsem_wait(hashtable->mutex);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
- }
- hashtable->readcnt++;
- if (hashtable->readcnt == 1) {
- //鑾峰彇璇诲啓閿�
- rv = svsem_wait(hashtable->wlock);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
- }
- }
- rv = svsem_post(hashtable->mutex);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
- }
-
- // ==================
-
- _hashtable_foreach(hashtable, cb);
-
- // ==================
-
- rv = svsem_wait(hashtable->mutex);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
- }
- hashtable->readcnt--;
- if(hashtable->readcnt == 0) {
- //閲婃斁璇诲啓閿�
- rv = svsem_post(hashtable->wlock);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
- }
- //閫氱煡鍐�
- rv = svsem_set(hashtable->cond, 1);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
- }
- }
- rv = svsem_post(hashtable->mutex);
- if(rv != 0) {
- LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
- }
+ return _hashtable_foreach(hashtable, cb);
}
@@ -419,7 +210,7 @@
int hashtable_alloc_key(hashtable_t *hashtable) {
int rv;
int key = START_KEY;
- rv = svsem_wait(hashtable->wlock);
+ rv = svsem_wait(hashtable->mutex);
if(rv != 0) {
LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
}
@@ -430,9 +221,68 @@
// 鍗犵敤key
_hashtable_put(hashtable, key, (void *)1);
- rv = svsem_post(hashtable->wlock);
+ rv = svsem_post(hashtable->mutex);
if(rv != 0) {
LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
}
return key;
}
+
+
+void hashtable_removeall(hashtable_t *hashtable)
+{
+ tailq_entry_t *item;
+ int rv;
+ if( (rv = svsem_wait(hashtable->mutex)) != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
+ }
+ for (int i = 0; i < MAPSIZE; i++)
+ {
+ tailq_header_t *my_tailq_head = hashtable->array[i] ;
+
+ if (my_tailq_head == NULL )
+ continue;
+
+ while ((item = TAILQ_FIRST(my_tailq_head)) )
+ {
+ TAILQ_REMOVE(my_tailq_head, item, joint);
+ mm_free(item);
+ }
+ mm_free(my_tailq_head);
+ hashtable->array[i] = NULL;
+ }
+
+ if((rv = svsem_post(hashtable->mutex)) != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
+ }
+}
+
+
+static size_t hashcode(int key)
+{
+
+ return key % MAPSIZE;
+ /*printf("hashfun = %ld\n", code);*/
+}
+
+/**
+ * for debug
+ */
+static void hashtable_printall(hashtable_t *hashtable)
+{
+ tailq_entry_t *item;
+ for (int i = 0; i < MAPSIZE; i++)
+ {
+ tailq_header_t *my_tailq_head = hashtable->array[i] ;
+
+ if (my_tailq_head == NULL )
+ continue;
+
+ printf("code=%d\n", i);
+ TAILQ_FOREACH(item, my_tailq_head, joint)
+ {
+ printf("%d:%s\n", item->key, (char *)item->value);
+ }
+ printf("\n");
+ }
+}
\ No newline at end of file
diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h
index ea4dc64..35b5892 100755
--- a/src/shm/hashtable.h
+++ b/src/shm/hashtable.h
@@ -11,9 +11,9 @@
{
struct tailq_header_t* array[MAPSIZE];
int mutex;
- int wlock;
- int cond;
- size_t readcnt;
+ // int wlock;
+ // int cond;
+ // size_t readcnt;
} hashtable_t;
typedef void (*hashtable_foreach_cb)(int key, void *value);
diff --git a/src/shm/hashtable2.cpp b/src/shm/hashtable2.cpp
new file mode 100755
index 0000000..701bb17
--- /dev/null
+++ b/src/shm/hashtable2.cpp
@@ -0,0 +1,438 @@
+#include "usg_common.h"
+#include "hashtable.h"
+#include "mm.h"
+#include "svsem.h"
+#include "logger_factory.h"
+#include <set>
+#include <functional>
+
+typedef struct tailq_entry_t
+{
+ void *value;
+ int key;
+ /*
+ * This holds the pointers to the next and previous joint in
+ * the tail queue.
+ */
+ TAILQ_ENTRY(tailq_entry_t) joint;
+} tailq_entry_t;
+
+#define START_KEY 1000
+
+typedef TAILQ_HEAD(tailq_header_t, tailq_entry_t) tailq_header_t;
+
+
+static size_t hashcode(int key);
+
+static struct timespec TIMEOUT = {2, 0};
+
+void hashtable_init(hashtable_t *hashtable )
+{
+
+ memset(hashtable, 0, sizeof(hashtable_t));
+ hashtable->mutex = svsem_get(IPC_PRIVATE, 1);
+ hashtable->wlock = svsem_get(IPC_PRIVATE, 1);
+ hashtable->cond = svsem_get(IPC_PRIVATE, 1);
+ hashtable->readcnt = 0;
+
+ FILE * semfile = fopen("./sem.txt", "w+");
+ if(semfile == NULL) {
+ err_exit(errno, "fopen");
+ }
+ fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex);
+ fclose(semfile);
+}
+
+void hashtable_destroy(hashtable_t *hashtable) {
+ svsem_remove( hashtable->mutex);
+ svsem_remove( hashtable->wlock);
+ svsem_remove( hashtable->cond);
+}
+
+
+static inline void *_hashtable_get(hashtable_t *hashtable, int key)
+{
+ size_t code = hashcode(key);
+ tailq_entry_t *item;
+ tailq_header_t *my_tailq_head = hashtable->array[code] ;
+ if ( my_tailq_head == NULL)
+ {
+ return NULL;
+ }
+ else
+ {
+
+ TAILQ_FOREACH(item, my_tailq_head, joint)
+ {
+ if (key == item->key)
+ return item->value;
+ }
+ }
+ return NULL;
+
+}
+
+static inline void * _hashtable_put(hashtable_t *hashtable, int key, void *value)
+{
+ size_t code = hashcode(key);
+ void *oldvalue;
+ tailq_entry_t *item;
+ tailq_header_t *my_tailq_head = hashtable->array[code] ;
+ if ( my_tailq_head == NULL)
+ {
+ my_tailq_head = (tailq_header_t*) mm_malloc(sizeof(tailq_header_t ));
+ TAILQ_INIT(my_tailq_head);
+ hashtable->array[code] = my_tailq_head;
+ goto putnew;
+ }
+
+ TAILQ_FOREACH(item, my_tailq_head, joint)
+ {
+ if (key ==item->key)
+ {
+ oldvalue = item -> value;
+ item->key= key;
+ item -> value = value;
+ return oldvalue;
+ }
+ }
+putnew:
+ item = (tailq_entry_t *) mm_malloc(sizeof(tailq_entry_t));
+ item->key = key;
+ item -> value = value;
+ TAILQ_INSERT_TAIL(my_tailq_head, item, joint);
+ return NULL;
+}
+
+void *hashtable_remove(hashtable_t *hashtable, int key)
+{
+ size_t code = hashcode(key);
+ tailq_entry_t *item;
+ void *oldvalue;
+ int rv;
+
+ if( (rv = svsem_wait(hashtable->wlock)) != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
+ }
+ tailq_header_t *my_tailq_head = hashtable->array[code] ;
+ if ( my_tailq_head == NULL)
+ {
+ svsem_post(hashtable->wlock);
+ return NULL;
+ }
+ else
+ {
+ for (item = TAILQ_FIRST(my_tailq_head); item != NULL; item = TAILQ_NEXT(item, joint))
+ {
+ if (key == item->key)
+ {
+ oldvalue = item->value;
+ /* Remove the item from the tail queue. */
+ TAILQ_REMOVE(my_tailq_head, item, joint);
+
+ /* mm_free the item as we don't need it anymore. */
+ mm_free(item);
+ svsem_post(hashtable->wlock);
+ return oldvalue;
+ }
+ }
+ }
+ if((rv = svsem_post(hashtable->wlock)) != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_remove\n");
+ }
+
+ return NULL;
+
+}
+
+
+
+
+
+void hashtable_removeall(hashtable_t *hashtable)
+{
+ tailq_entry_t *item;
+ int rv;
+ rv = svsem_wait(hashtable->wlock);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
+ }
+ for (int i = 0; i < MAPSIZE; i++)
+ {
+ tailq_header_t *my_tailq_head = hashtable->array[i] ;
+
+ if (my_tailq_head == NULL )
+ continue;
+
+ while ((item = TAILQ_FIRST(my_tailq_head)) )
+ {
+ TAILQ_REMOVE(my_tailq_head, item, joint);
+ mm_free(item);
+ }
+ mm_free(my_tailq_head);
+ hashtable->array[i] = NULL;
+ }
+ rv = svsem_post(hashtable->wlock);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n");
+ }
+}
+
+/**
+ * for debug
+ */
+void hashtable_printall(hashtable_t *hashtable)
+{
+ tailq_entry_t *item;
+ for (int i = 0; i < MAPSIZE; i++)
+ {
+ tailq_header_t *my_tailq_head = hashtable->array[i] ;
+
+ if (my_tailq_head == NULL )
+ continue;
+
+ printf("code=%d\n", i);
+ TAILQ_FOREACH(item, my_tailq_head, joint)
+ {
+ printf("%d:%s\n", item->key, (char *)item->value);
+ }
+ printf("\n");
+ }
+}
+
+static size_t hashcode(int key)
+{
+
+ return key % MAPSIZE;
+ /*printf("hashfun = %ld\n", code);*/
+}
+
+void *hashtable_get(hashtable_t *hashtable, int key) {
+LoggerFactory::getLogger()->debug( "==========hashtable_get before 1");
+
+ int rv;
+ rv = svsem_wait(hashtable->mutex);
+
+LoggerFactory::getLogger()->debug( "==========hashtable_get before 2");
+
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_get 1");
+ }
+ hashtable->readcnt++;
+ if (hashtable->readcnt == 1) {
+ //鑾峰彇璇诲啓閿�
+LoggerFactory::getLogger()->debug( "==========hashtable_get before 3");
+ rv = svsem_wait(hashtable->wlock);
+LoggerFactory::getLogger()->debug( "==========hashtable_get before 4");
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_get 2");
+ }
+ }
+ rv = svsem_post(hashtable->mutex);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_get 3");
+ }
+ // ================
+
+ void * res = _hashtable_get(hashtable, key);
+
+ // ==================
+
+ rv = svsem_wait(hashtable->mutex);
+LoggerFactory::getLogger()->debug( "==========hashtable_get before 5");
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_get 4");
+ }
+ hashtable->readcnt--;
+ if(hashtable->readcnt == 0) {
+ //閲婃斁璇诲啓閿�
+
+ rv = svsem_post(hashtable->wlock);
+LoggerFactory::getLogger()->debug( "==========hashtable_get before 6");
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_get 5");
+ }
+ //閫氱煡鍐�
+ rv = svsem_set(hashtable->cond, 1);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_get 6");
+ }
+ }
+
+ rv = svsem_post(hashtable->mutex);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_get 7");
+ }
+LoggerFactory::getLogger()->debug( "==========hashtable_get after");
+ return res;
+}
+
+void hashtable_put(hashtable_t *hashtable, int key, void *value) {
+
+ int rv;
+LoggerFactory::getLogger()->debug( "==========hashtable_put before 1");
+ rv = svsem_wait(hashtable->mutex);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
+ }
+LoggerFactory::getLogger()->debug( "==========hashtable_put before 2");
+ // 璁剧疆璇讳紭鍏堢骇楂�
+ while (hashtable->readcnt > 0)
+ {
+ rv = svsem_set(hashtable->cond, 0);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
+ }
+ rv = svsem_post(hashtable->mutex);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
+ }
+ //绛夊緟鍐欓�氱煡
+LoggerFactory::getLogger()->debug( "==========hashtable_put before 3");
+ rv = svsem_wait(hashtable->cond);
+LoggerFactory::getLogger()->debug( "==========hashtable_put before 4");
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
+ }
+
+
+ rv = svsem_wait(hashtable->mutex);
+LoggerFactory::getLogger()->debug( "==========hashtable_put before 5");
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
+ }
+ }
+ rv = svsem_post(hashtable->mutex);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
+ }
+
+ //鑾峰彇璇诲啓閿�
+ rv = svsem_wait(hashtable->wlock);
+LoggerFactory::getLogger()->debug( "==========hashtable_put before 6");
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
+ }
+
+ _hashtable_put(hashtable, key, value);
+
+ //閲婃斁璇诲啓閿�
+ rv = svsem_post(hashtable->wlock);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_put\n");
+ }
+
+ LoggerFactory::getLogger()->debug( "==========hashtable_put after");
+}
+
+
+
+static inline void _hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb) {
+ tailq_entry_t *item;
+ for (int i = 0; i < MAPSIZE; i++) {
+ tailq_header_t *my_tailq_head = hashtable->array[i] ;
+
+ if (my_tailq_head == NULL )
+ continue;
+
+ TAILQ_FOREACH(item, my_tailq_head, joint)
+ {
+ cb(item->key, item -> value);
+ }
+ }
+}
+
+
+void hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb) {
+ int rv;
+
+
+ LoggerFactory::getLogger()->debug("===hashtable_foreach before 1\n");
+
+
+ rv = svsem_wait(hashtable->mutex);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
+ }
+ hashtable->readcnt++;
+ if (hashtable->readcnt == 1) {
+ //鑾峰彇璇诲啓閿�
+ rv = svsem_wait(hashtable->wlock);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
+ }
+ }
+ rv = svsem_post(hashtable->mutex);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
+ }
+
+ // ==================
+
+ _hashtable_foreach(hashtable, cb);
+
+ // ==================
+
+ rv = svsem_wait(hashtable->mutex);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
+ }
+ hashtable->readcnt--;
+ if(hashtable->readcnt == 0) {
+ //閲婃斁璇诲啓閿�
+ rv = svsem_post(hashtable->wlock);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
+ }
+ //閫氱煡鍐�
+ rv = svsem_set(hashtable->cond, 1);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
+ }
+ }
+ rv = svsem_post(hashtable->mutex);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_foreach");
+ }
+
+}
+
+
+std::set<int> * hashtable_keyset(hashtable_t *hashtable) {
+ std::set<int> *keyset = new std::set<int>;
+ tailq_entry_t *item;
+ for (int i = 0; i < MAPSIZE; i++) {
+ tailq_header_t *my_tailq_head = hashtable->array[i] ;
+
+ if (my_tailq_head == NULL )
+ continue;
+
+ TAILQ_FOREACH(item, my_tailq_head, joint)
+ {
+ keyset->insert(item->key);
+ }
+ }
+ return keyset;
+}
+
+
+int hashtable_alloc_key(hashtable_t *hashtable) {
+ int rv;
+ int key = START_KEY;
+ rv = svsem_wait(hashtable->wlock);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
+ }
+
+ while(_hashtable_get(hashtable, key) != NULL) {
+ key++;
+ }
+ // 鍗犵敤key
+ _hashtable_put(hashtable, key, (void *)1);
+
+ rv = svsem_post(hashtable->wlock);
+ if(rv != 0) {
+ LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n");
+ }
+ return key;
+}
diff --git a/src/shm/mem_pool.h b/src/shm/mem_pool.h
index 2ea1f6b..5a698ec 100644
--- a/src/shm/mem_pool.h
+++ b/src/shm/mem_pool.h
@@ -4,42 +4,25 @@
#include "sem_util.h"
#define MEM_POOL_COND_KEY 0x8801
-static int mem_pool_cond = SemUtil::get(MEM_POOL_COND_KEY, 0);
// static int mem_pool_mutex = SemUtil::get(MEM_POOL_COND_KEY, 1);
static inline void mem_pool_init(size_t heap_size) {
- if(mm_init(heap_size)) {
-
- }
+ mm_init(heap_size);
}
static inline void mem_pool_destroy(void) {
- if(mm_destroy()) {
- SemUtil::remove(mem_pool_cond);
- }
+ mm_destroy();
}
static inline void *mem_pool_malloc (size_t size) {
- void *ptr;
- while( (ptr = mm_malloc(size)) == NULL ) {
- err_msg(0, "There is not enough memery to allocate, waiting someone else to free.");
- SemUtil::set(mem_pool_cond, 0);
- // wait for someone else to free space
- SemUtil::dec(mem_pool_cond);
-
- }
-
- return ptr;
+ return mm_malloc(size);
}
static inline void mem_pool_free (void *ptr) {
mm_free(ptr);
- // notify malloc
- SemUtil::set(mem_pool_cond, 1);
-
}
diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp
index 7f1fda3..2b52b51 100644
--- a/src/shm/mm.cpp
+++ b/src/shm/mm.cpp
@@ -229,6 +229,7 @@
/*
* mm_init - Initialize the memory manager, M unit
+ * @return 鏄惁绗竴娆″垱寤�
*/
bool mm_init(size_t heap_size)
{
@@ -301,16 +302,17 @@
return first;
}
-
+/**
+ * @return 鏄惁鐪熸閿�姣佹垚鍔�
+ */
bool mm_destroy(void) {
struct shmid_ds shmid_ds;
-
SemUtil::dec(mutex);
if(shmctl(shmid, IPC_STAT, &shmid_ds) == 0) {
//LoggerFactory::getLogger()->debug("shm_nattch=%d\n", shmid_ds.shm_nattch);
- // 鍙湁褰撳墠涓�涓繘绋媋ttach鍒板叡浜唴瀛樹笂
+ // 澶氫釜杩涚▼attach鍦ㄥ叡浜唴瀛樹笂
if (shmid_ds.shm_nattch > 1) {
//detache
if (shmdt(shmp) == -1) {
@@ -319,7 +321,7 @@
SemUtil::inc(mutex);
return false;
} else {
-
+ // 鍙湁褰撳墠涓�涓繘绋媋ttach鍒板叡浜唴瀛樹笂
hashtable_destroy(hashtable);
//detache
if (shmdt(shmp) == -1) {
diff --git a/src/svsem.cpp b/src/svsem.cpp
index 54db4a2..82390b6 100644
--- a/src/svsem.cpp
+++ b/src/svsem.cpp
@@ -6,7 +6,7 @@
perms = S_IRUSR | S_IWUSR;
- semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms);
+ semid = semget(key, 2, IPC_CREAT | IPC_EXCL | perms);
if (semid != -1) { /* Successfully created the semaphore */
union semun arg;
@@ -17,6 +17,10 @@
arg.val = 0; /* So initialize it to 0 */
if (semctl(semid, 0, SETVAL, arg) == -1)
err_exit(errno, "semctl 1");
+
+ arg.val = 1;
+ if (semctl(semid, 1, SETVAL, arg) == -1)
+ err_exit(errno, "semctl 2");
//logger.info("%ld: initialized semaphore\n", (long)getpid());
/* Perform a "no-op" semaphore operation - changes sem_otime
@@ -135,6 +139,63 @@
}
+
+
+int svsem_cond_wait(int semid ){
+
+ struct sembuf sops[2];
+ union semun arg;
+
+ arg.val = 1;
+ if (semctl(semid, 1, SETVAL, arg) == -1) {
+ err_msg(errno, "svsem_set");
+ return errno;
+ }
+
+ //閲婃斁mutex
+ sops[0].sem_num = 0;
+ sops[0].sem_op = 1;
+ sops[0].sem_flg = 0;
+
+ // 绛夊緟cond
+ sops[1].sem_num = 1;
+ sops[1].sem_op = 0;
+ sops[1].sem_flg = 0;
+
+ while (semop(semid, sops, 2) == -1)
+ if (errno != EINTR) {
+ // err_msg(errno, "Svsvsem_dec");
+ return errno;
+ }
+
+
+ //閲嶆柊鑾峰彇mutex
+ sops[0].sem_num = 0;
+ sops[0].sem_op = -1;
+ sops[0].sem_flg = 0;
+
+ while (semop(semid, sops, 1) == -1)
+ if (errno != EINTR) {
+ // err_msg(errno, "Svsvsem_dec");
+ return errno;
+ }
+
+ return 0;
+}
+
+
+int svsem_cond_signal(int semid ){
+ union semun arg;
+ arg.val = 0;
+
+ if (semctl(semid, 1, SETVAL, arg) == -1) {
+ err_msg(errno, "svsem_set");
+ return errno;
+ }
+ return 0;
+}
+
+
/**
* If sem_op equals 0, the value of the semaphore is checked to see whether it
* currently equals 0. If it does, the operation completes immediately; otherwise,
@@ -213,46 +274,4 @@
-
-int svsem_cond_wait(int semid ){
-
- struct sembuf sops[2];
-
- //閲婃斁mutex
- sops[0].sem_num = 0;
- sops[0].sem_op = 1;
- sops[0].sem_flg = 0;
-
- // 绛夊緟cond
- sops[1].sem_num = 1;
- sops[1].sem_op = -1;
- sops[1].sem_flg = 0;
-
- while (semop(semid, sops, 2) == -1)
- if (errno != EINTR) {
- // err_msg(errno, "Svsvsem_dec");
- return errno;
- }
-
-
-
-
- return 0;
-}
-
-int svsem_cond_signal(int semid ){
- struct sembuf sops;
-
- // 閫氱煡绛夊緟cond鐨勮繘绋�
- sops.sem_num = 1;
- sops.sem_op = 1;
- sops.sem_flg = 0;
-
- int rv = semop(semid, &sops, 1);
- if (rv == -1) {
- // err_msg(errno, "Svsvsem_inc");
- return errno;
- }
- return 0;
-}
\ No newline at end of file
--
Gitblit v1.8.0