From 09a82c2ece4caadad0baa0d1f3b84f1506363fdd Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期五, 22 一月 2021 11:58:33 +0800 Subject: [PATCH] update --- src/shm/hashtable2.cpp | 438 +++++++++++++++++++++++++ src/queue/array_lock_free_sem_queue.h | 104 ++++- src/shm/mm.cpp | 10 src/shm/hashtable.h | 6 src/svsem.cpp | 105 +++-- src/shm/mem_pool.h | 23 - src/shm/hashtable.cpp | 320 +++++------------- 7 files changed, 671 insertions(+), 335 deletions(-) diff --git a/src/queue/array_lock_free_sem_queue.h b/src/queue/array_lock_free_sem_queue.h index bb213e8..69630d9 100644 --- a/src/queue/array_lock_free_sem_queue.h +++ b/src/queue/array_lock_free_sem_queue.h @@ -75,7 +75,7 @@ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE /// @brief number of elements in the queue - int m_count; + uint32_t m_count; #endif @@ -200,10 +200,6 @@ } - - - - template <typename ELEM_T, typename Allocator> int ArrayLockFreeSemQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { @@ -215,28 +211,50 @@ { currentWriteIndex = m_writeIndex; currentReadIndex = m_readIndex; - + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == Q_SIZE) { if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) return -1; else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); - s = futex(&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); return -1; } } else { - s = futex(&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, Q_SIZE, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { return -1; } } } + #else + if (currentReadIndex == currentWriteIndex - Q_SIZE + 1 ) + { + // the queue is full + if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + return -1; + else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + const struct timespec ts = TimeUtil::trim_time(timeout); + s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, &ts, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + return -1; + } + + } else { + s = futex((int *)&m_readIndex, FUTEX_WAIT, currentWriteIndex - Q_SIZE + 1, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + return -1; + } + } + } + #endif - + //淇濈暀鍐欏叆浣� } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); // We know now that this index is reserved for us. Use it to save the data @@ -255,10 +273,16 @@ sched_yield(); } +#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE AtomicAdd(&m_count, 1); - s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); - if (s == -1) + + if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) err_exit(errno, "futex-FUTEX_WAKE"); +#else + if ( (s = futex((int *)&m_maximumReadIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0)) == -1) + err_exit(errno, "futex-FUTEX_WAKE"); +#endif + return 0; } @@ -268,15 +292,16 @@ { uint32_t currentMaximumReadIndex; uint32_t currentReadIndex; - int s; + do { // to ensure thread-safety when there is more than 1 producer thread // a second index is defined (m_maximumReadIndex) currentReadIndex = m_readIndex; currentMaximumReadIndex = m_maximumReadIndex; -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE if (m_count == 0) { @@ -284,28 +309,45 @@ return -1; else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { const struct timespec ts = TimeUtil::trim_time(timeout); - s = futex(&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, 0, &ts, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); return -1; } } else { - s = futex(&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); + s = futex((int *)&m_count, FUTEX_WAIT, 0, NULL, NULL, 0); if (s == -1 && errno != EAGAIN && errno != EINTR) { return -1; } } } -#else - if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) + + #else + + if (currentReadIndex == currentMaximumReadIndex) { // the queue is empty or // a producer thread has allocate space in the queue but is // waiting to commit the data into it - return -1; + if( (flag & LOCK_FREE_QUEUE_NOWAIT) == LOCK_FREE_QUEUE_NOWAIT) + return -1; + else if( (flag & LOCK_FREE_QUEUE_TIMEOUT) == LOCK_FREE_QUEUE_TIMEOUT && timeout != NULL) { + const struct timespec ts = TimeUtil::trim_time(timeout); + s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, &ts, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + // err_exit("ArrayLockFreeSemQueue<ELEM_T, Allocator>::push futex-FUTEX_WAIT"); + return -1; + } + + } else { + s = futex((int *)¤tMaximumReadIndex, FUTEX_WAIT, currentReadIndex, NULL, NULL, 0); + if (s == -1 && errno != EAGAIN && errno != EINTR) { + return -1; + } + } } -#endif + #endif // retrieve the data from the queue a_data = m_theQueue[countToIndex(currentReadIndex)]; @@ -315,14 +357,16 @@ // increased it if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) { -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE + #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE // m_count.fetch_sub(1); AtomicSub(&m_count, 1); -#endif - - s = futex(&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0); - if (s == -1) + if ( (s = futex((int *)&m_count, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) err_exit(errno, "futex-FUTEX_WAKE"); + #else + if ( (s = futex((int *)&m_readIndex, FUTEX_WAKE, INT_MAX, NULL, NULL, 0) ) == -1) + err_exit(errno, "futex-FUTEX_WAKE"); + #endif + return 0; } @@ -342,13 +386,13 @@ template <typename ELEM_T, typename Allocator> ELEM_T& ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[](unsigned int i) { - int currentCount = m_count; + // int currentCount = m_count; uint32_t currentReadIndex = m_readIndex; - if (i >= currentCount) - { - std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } + // if (i >= currentCount) + // { + // std::cerr << "ArrayLockFreeSemQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; + // std::exit(EXIT_FAILURE); + // } return m_theQueue[countToIndex(currentReadIndex+i)]; } diff --git a/src/shm/hashtable.cpp b/src/shm/hashtable.cpp index 701bb17..60633bc 100755 --- a/src/shm/hashtable.cpp +++ b/src/shm/hashtable.cpp @@ -24,29 +24,27 @@ static size_t hashcode(int key); -static struct timespec TIMEOUT = {2, 0}; - void hashtable_init(hashtable_t *hashtable ) { memset(hashtable, 0, sizeof(hashtable_t)); hashtable->mutex = svsem_get(IPC_PRIVATE, 1); - hashtable->wlock = svsem_get(IPC_PRIVATE, 1); - hashtable->cond = svsem_get(IPC_PRIVATE, 1); - hashtable->readcnt = 0; + // hashtable->wlock = svsem_get(IPC_PRIVATE, 1); + // hashtable->cond = svsem_get(IPC_PRIVATE, 1); + // hashtable->readcnt = 0; - FILE * semfile = fopen("./sem.txt", "w+"); - if(semfile == NULL) { - err_exit(errno, "fopen"); - } - fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex); - fclose(semfile); + // FILE * semfile = fopen("./sem.txt", "w+"); + // if(semfile == NULL) { + // err_exit(errno, "fopen"); + // } + // fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex); + // fclose(semfile); } void hashtable_destroy(hashtable_t *hashtable) { svsem_remove( hashtable->mutex); - svsem_remove( hashtable->wlock); - svsem_remove( hashtable->cond); + // svsem_remove( hashtable->wlock); + // svsem_remove( hashtable->cond); } @@ -111,17 +109,17 @@ void *oldvalue; int rv; - if( (rv = svsem_wait(hashtable->wlock)) != 0) { + if( (rv = svsem_wait(hashtable->mutex)) != 0) { LoggerFactory::getLogger()->error(rv, "hashtable_remove\n"); } tailq_header_t *my_tailq_head = hashtable->array[code] ; if ( my_tailq_head == NULL) { - svsem_post(hashtable->wlock); + if((rv = svsem_post(hashtable->mutex)) != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_remove\n"); + } return NULL; - } - else - { + } else { for (item = TAILQ_FIRST(my_tailq_head); item != NULL; item = TAILQ_NEXT(item, joint)) { if (key == item->key) @@ -132,197 +130,39 @@ /* mm_free the item as we don't need it anymore. */ mm_free(item); - svsem_post(hashtable->wlock); + svsem_post(hashtable->mutex); return oldvalue; } } - } - if((rv = svsem_post(hashtable->wlock)) != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_remove\n"); - } - return NULL; - -} - - - - - -void hashtable_removeall(hashtable_t *hashtable) -{ - tailq_entry_t *item; - int rv; - rv = svsem_wait(hashtable->wlock); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n"); - } - for (int i = 0; i < MAPSIZE; i++) - { - tailq_header_t *my_tailq_head = hashtable->array[i] ; - - if (my_tailq_head == NULL ) - continue; - - while ((item = TAILQ_FIRST(my_tailq_head)) ) - { - TAILQ_REMOVE(my_tailq_head, item, joint); - mm_free(item); + if((rv = svsem_post(hashtable->mutex)) != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_remove\n"); } - mm_free(my_tailq_head); - hashtable->array[i] = NULL; - } - rv = svsem_post(hashtable->wlock); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n"); + + return NULL; } } -/** - * for debug - */ -void hashtable_printall(hashtable_t *hashtable) -{ - tailq_entry_t *item; - for (int i = 0; i < MAPSIZE; i++) - { - tailq_header_t *my_tailq_head = hashtable->array[i] ; - if (my_tailq_head == NULL ) - continue; - printf("code=%d\n", i); - TAILQ_FOREACH(item, my_tailq_head, joint) - { - printf("%d:%s\n", item->key, (char *)item->value); - } - printf("\n"); - } -} - -static size_t hashcode(int key) -{ - - return key % MAPSIZE; - /*printf("hashfun = %ld\n", code);*/ -} void *hashtable_get(hashtable_t *hashtable, int key) { -LoggerFactory::getLogger()->debug( "==========hashtable_get before 1"); - - int rv; - rv = svsem_wait(hashtable->mutex); - -LoggerFactory::getLogger()->debug( "==========hashtable_get before 2"); - - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_get 1"); - } - hashtable->readcnt++; - if (hashtable->readcnt == 1) { - //鑾峰彇璇诲啓閿� -LoggerFactory::getLogger()->debug( "==========hashtable_get before 3"); - rv = svsem_wait(hashtable->wlock); -LoggerFactory::getLogger()->debug( "==========hashtable_get before 4"); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_get 2"); - } - } - rv = svsem_post(hashtable->mutex); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_get 3"); - } - // ================ - void * res = _hashtable_get(hashtable, key); - - // ================== - - rv = svsem_wait(hashtable->mutex); -LoggerFactory::getLogger()->debug( "==========hashtable_get before 5"); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_get 4"); - } - hashtable->readcnt--; - if(hashtable->readcnt == 0) { - //閲婃斁璇诲啓閿� - - rv = svsem_post(hashtable->wlock); -LoggerFactory::getLogger()->debug( "==========hashtable_get before 6"); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_get 5"); - } - //閫氱煡鍐� - rv = svsem_set(hashtable->cond, 1); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_get 6"); - } - } - - rv = svsem_post(hashtable->mutex); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_get 7"); - } -LoggerFactory::getLogger()->debug( "==========hashtable_get after"); return res; } void hashtable_put(hashtable_t *hashtable, int key, void *value) { int rv; -LoggerFactory::getLogger()->debug( "==========hashtable_put before 1"); - rv = svsem_wait(hashtable->mutex); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); - } -LoggerFactory::getLogger()->debug( "==========hashtable_put before 2"); - // 璁剧疆璇讳紭鍏堢骇楂� - while (hashtable->readcnt > 0) - { - rv = svsem_set(hashtable->cond, 0); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); - } - rv = svsem_post(hashtable->mutex); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); - } - //绛夊緟鍐欓�氱煡 -LoggerFactory::getLogger()->debug( "==========hashtable_put before 3"); - rv = svsem_wait(hashtable->cond); -LoggerFactory::getLogger()->debug( "==========hashtable_put before 4"); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); - } - - - rv = svsem_wait(hashtable->mutex); -LoggerFactory::getLogger()->debug( "==========hashtable_put before 5"); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); - } - } - rv = svsem_post(hashtable->mutex); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); - } - - //鑾峰彇璇诲啓閿� - rv = svsem_wait(hashtable->wlock); -LoggerFactory::getLogger()->debug( "==========hashtable_put before 6"); - if(rv != 0) { + if(( rv = svsem_wait(hashtable->mutex)) != 0) { LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); } _hashtable_put(hashtable, key, value); - //閲婃斁璇诲啓閿� - rv = svsem_post(hashtable->wlock); - if(rv != 0) { + if(( rv = svsem_post(hashtable->mutex)) != 0) { LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); } - - LoggerFactory::getLogger()->debug( "==========hashtable_put after"); } @@ -344,56 +184,7 @@ void hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb) { - int rv; - - - LoggerFactory::getLogger()->debug("===hashtable_foreach before 1\n"); - - - rv = svsem_wait(hashtable->mutex); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); - } - hashtable->readcnt++; - if (hashtable->readcnt == 1) { - //鑾峰彇璇诲啓閿� - rv = svsem_wait(hashtable->wlock); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); - } - } - rv = svsem_post(hashtable->mutex); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); - } - - // ================== - - _hashtable_foreach(hashtable, cb); - - // ================== - - rv = svsem_wait(hashtable->mutex); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); - } - hashtable->readcnt--; - if(hashtable->readcnt == 0) { - //閲婃斁璇诲啓閿� - rv = svsem_post(hashtable->wlock); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); - } - //閫氱煡鍐� - rv = svsem_set(hashtable->cond, 1); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); - } - } - rv = svsem_post(hashtable->mutex); - if(rv != 0) { - LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); - } + return _hashtable_foreach(hashtable, cb); } @@ -419,7 +210,7 @@ int hashtable_alloc_key(hashtable_t *hashtable) { int rv; int key = START_KEY; - rv = svsem_wait(hashtable->wlock); + rv = svsem_wait(hashtable->mutex); if(rv != 0) { LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n"); } @@ -430,9 +221,68 @@ // 鍗犵敤key _hashtable_put(hashtable, key, (void *)1); - rv = svsem_post(hashtable->wlock); + rv = svsem_post(hashtable->mutex); if(rv != 0) { LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n"); } return key; } + + +void hashtable_removeall(hashtable_t *hashtable) +{ + tailq_entry_t *item; + int rv; + if( (rv = svsem_wait(hashtable->mutex)) != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n"); + } + for (int i = 0; i < MAPSIZE; i++) + { + tailq_header_t *my_tailq_head = hashtable->array[i] ; + + if (my_tailq_head == NULL ) + continue; + + while ((item = TAILQ_FIRST(my_tailq_head)) ) + { + TAILQ_REMOVE(my_tailq_head, item, joint); + mm_free(item); + } + mm_free(my_tailq_head); + hashtable->array[i] = NULL; + } + + if((rv = svsem_post(hashtable->mutex)) != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n"); + } +} + + +static size_t hashcode(int key) +{ + + return key % MAPSIZE; + /*printf("hashfun = %ld\n", code);*/ +} + +/** + * for debug + */ +static void hashtable_printall(hashtable_t *hashtable) +{ + tailq_entry_t *item; + for (int i = 0; i < MAPSIZE; i++) + { + tailq_header_t *my_tailq_head = hashtable->array[i] ; + + if (my_tailq_head == NULL ) + continue; + + printf("code=%d\n", i); + TAILQ_FOREACH(item, my_tailq_head, joint) + { + printf("%d:%s\n", item->key, (char *)item->value); + } + printf("\n"); + } +} \ No newline at end of file diff --git a/src/shm/hashtable.h b/src/shm/hashtable.h index ea4dc64..35b5892 100755 --- a/src/shm/hashtable.h +++ b/src/shm/hashtable.h @@ -11,9 +11,9 @@ { struct tailq_header_t* array[MAPSIZE]; int mutex; - int wlock; - int cond; - size_t readcnt; + // int wlock; + // int cond; + // size_t readcnt; } hashtable_t; typedef void (*hashtable_foreach_cb)(int key, void *value); diff --git a/src/shm/hashtable2.cpp b/src/shm/hashtable2.cpp new file mode 100755 index 0000000..701bb17 --- /dev/null +++ b/src/shm/hashtable2.cpp @@ -0,0 +1,438 @@ +#include "usg_common.h" +#include "hashtable.h" +#include "mm.h" +#include "svsem.h" +#include "logger_factory.h" +#include <set> +#include <functional> + +typedef struct tailq_entry_t +{ + void *value; + int key; + /* + * This holds the pointers to the next and previous joint in + * the tail queue. + */ + TAILQ_ENTRY(tailq_entry_t) joint; +} tailq_entry_t; + +#define START_KEY 1000 + +typedef TAILQ_HEAD(tailq_header_t, tailq_entry_t) tailq_header_t; + + +static size_t hashcode(int key); + +static struct timespec TIMEOUT = {2, 0}; + +void hashtable_init(hashtable_t *hashtable ) +{ + + memset(hashtable, 0, sizeof(hashtable_t)); + hashtable->mutex = svsem_get(IPC_PRIVATE, 1); + hashtable->wlock = svsem_get(IPC_PRIVATE, 1); + hashtable->cond = svsem_get(IPC_PRIVATE, 1); + hashtable->readcnt = 0; + + FILE * semfile = fopen("./sem.txt", "w+"); + if(semfile == NULL) { + err_exit(errno, "fopen"); + } + fprintf(semfile, "hashtable->mutex=%d\n", hashtable->mutex); + fclose(semfile); +} + +void hashtable_destroy(hashtable_t *hashtable) { + svsem_remove( hashtable->mutex); + svsem_remove( hashtable->wlock); + svsem_remove( hashtable->cond); +} + + +static inline void *_hashtable_get(hashtable_t *hashtable, int key) +{ + size_t code = hashcode(key); + tailq_entry_t *item; + tailq_header_t *my_tailq_head = hashtable->array[code] ; + if ( my_tailq_head == NULL) + { + return NULL; + } + else + { + + TAILQ_FOREACH(item, my_tailq_head, joint) + { + if (key == item->key) + return item->value; + } + } + return NULL; + +} + +static inline void * _hashtable_put(hashtable_t *hashtable, int key, void *value) +{ + size_t code = hashcode(key); + void *oldvalue; + tailq_entry_t *item; + tailq_header_t *my_tailq_head = hashtable->array[code] ; + if ( my_tailq_head == NULL) + { + my_tailq_head = (tailq_header_t*) mm_malloc(sizeof(tailq_header_t )); + TAILQ_INIT(my_tailq_head); + hashtable->array[code] = my_tailq_head; + goto putnew; + } + + TAILQ_FOREACH(item, my_tailq_head, joint) + { + if (key ==item->key) + { + oldvalue = item -> value; + item->key= key; + item -> value = value; + return oldvalue; + } + } +putnew: + item = (tailq_entry_t *) mm_malloc(sizeof(tailq_entry_t)); + item->key = key; + item -> value = value; + TAILQ_INSERT_TAIL(my_tailq_head, item, joint); + return NULL; +} + +void *hashtable_remove(hashtable_t *hashtable, int key) +{ + size_t code = hashcode(key); + tailq_entry_t *item; + void *oldvalue; + int rv; + + if( (rv = svsem_wait(hashtable->wlock)) != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_remove\n"); + } + tailq_header_t *my_tailq_head = hashtable->array[code] ; + if ( my_tailq_head == NULL) + { + svsem_post(hashtable->wlock); + return NULL; + } + else + { + for (item = TAILQ_FIRST(my_tailq_head); item != NULL; item = TAILQ_NEXT(item, joint)) + { + if (key == item->key) + { + oldvalue = item->value; + /* Remove the item from the tail queue. */ + TAILQ_REMOVE(my_tailq_head, item, joint); + + /* mm_free the item as we don't need it anymore. */ + mm_free(item); + svsem_post(hashtable->wlock); + return oldvalue; + } + } + } + if((rv = svsem_post(hashtable->wlock)) != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_remove\n"); + } + + return NULL; + +} + + + + + +void hashtable_removeall(hashtable_t *hashtable) +{ + tailq_entry_t *item; + int rv; + rv = svsem_wait(hashtable->wlock); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n"); + } + for (int i = 0; i < MAPSIZE; i++) + { + tailq_header_t *my_tailq_head = hashtable->array[i] ; + + if (my_tailq_head == NULL ) + continue; + + while ((item = TAILQ_FIRST(my_tailq_head)) ) + { + TAILQ_REMOVE(my_tailq_head, item, joint); + mm_free(item); + } + mm_free(my_tailq_head); + hashtable->array[i] = NULL; + } + rv = svsem_post(hashtable->wlock); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_removeall\n"); + } +} + +/** + * for debug + */ +void hashtable_printall(hashtable_t *hashtable) +{ + tailq_entry_t *item; + for (int i = 0; i < MAPSIZE; i++) + { + tailq_header_t *my_tailq_head = hashtable->array[i] ; + + if (my_tailq_head == NULL ) + continue; + + printf("code=%d\n", i); + TAILQ_FOREACH(item, my_tailq_head, joint) + { + printf("%d:%s\n", item->key, (char *)item->value); + } + printf("\n"); + } +} + +static size_t hashcode(int key) +{ + + return key % MAPSIZE; + /*printf("hashfun = %ld\n", code);*/ +} + +void *hashtable_get(hashtable_t *hashtable, int key) { +LoggerFactory::getLogger()->debug( "==========hashtable_get before 1"); + + int rv; + rv = svsem_wait(hashtable->mutex); + +LoggerFactory::getLogger()->debug( "==========hashtable_get before 2"); + + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_get 1"); + } + hashtable->readcnt++; + if (hashtable->readcnt == 1) { + //鑾峰彇璇诲啓閿� +LoggerFactory::getLogger()->debug( "==========hashtable_get before 3"); + rv = svsem_wait(hashtable->wlock); +LoggerFactory::getLogger()->debug( "==========hashtable_get before 4"); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_get 2"); + } + } + rv = svsem_post(hashtable->mutex); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_get 3"); + } + // ================ + + void * res = _hashtable_get(hashtable, key); + + // ================== + + rv = svsem_wait(hashtable->mutex); +LoggerFactory::getLogger()->debug( "==========hashtable_get before 5"); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_get 4"); + } + hashtable->readcnt--; + if(hashtable->readcnt == 0) { + //閲婃斁璇诲啓閿� + + rv = svsem_post(hashtable->wlock); +LoggerFactory::getLogger()->debug( "==========hashtable_get before 6"); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_get 5"); + } + //閫氱煡鍐� + rv = svsem_set(hashtable->cond, 1); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_get 6"); + } + } + + rv = svsem_post(hashtable->mutex); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_get 7"); + } +LoggerFactory::getLogger()->debug( "==========hashtable_get after"); + return res; +} + +void hashtable_put(hashtable_t *hashtable, int key, void *value) { + + int rv; +LoggerFactory::getLogger()->debug( "==========hashtable_put before 1"); + rv = svsem_wait(hashtable->mutex); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); + } +LoggerFactory::getLogger()->debug( "==========hashtable_put before 2"); + // 璁剧疆璇讳紭鍏堢骇楂� + while (hashtable->readcnt > 0) + { + rv = svsem_set(hashtable->cond, 0); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); + } + rv = svsem_post(hashtable->mutex); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); + } + //绛夊緟鍐欓�氱煡 +LoggerFactory::getLogger()->debug( "==========hashtable_put before 3"); + rv = svsem_wait(hashtable->cond); +LoggerFactory::getLogger()->debug( "==========hashtable_put before 4"); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); + } + + + rv = svsem_wait(hashtable->mutex); +LoggerFactory::getLogger()->debug( "==========hashtable_put before 5"); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); + } + } + rv = svsem_post(hashtable->mutex); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); + } + + //鑾峰彇璇诲啓閿� + rv = svsem_wait(hashtable->wlock); +LoggerFactory::getLogger()->debug( "==========hashtable_put before 6"); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); + } + + _hashtable_put(hashtable, key, value); + + //閲婃斁璇诲啓閿� + rv = svsem_post(hashtable->wlock); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_put\n"); + } + + LoggerFactory::getLogger()->debug( "==========hashtable_put after"); +} + + + +static inline void _hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb) { + tailq_entry_t *item; + for (int i = 0; i < MAPSIZE; i++) { + tailq_header_t *my_tailq_head = hashtable->array[i] ; + + if (my_tailq_head == NULL ) + continue; + + TAILQ_FOREACH(item, my_tailq_head, joint) + { + cb(item->key, item -> value); + } + } +} + + +void hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb) { + int rv; + + + LoggerFactory::getLogger()->debug("===hashtable_foreach before 1\n"); + + + rv = svsem_wait(hashtable->mutex); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); + } + hashtable->readcnt++; + if (hashtable->readcnt == 1) { + //鑾峰彇璇诲啓閿� + rv = svsem_wait(hashtable->wlock); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); + } + } + rv = svsem_post(hashtable->mutex); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); + } + + // ================== + + _hashtable_foreach(hashtable, cb); + + // ================== + + rv = svsem_wait(hashtable->mutex); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); + } + hashtable->readcnt--; + if(hashtable->readcnt == 0) { + //閲婃斁璇诲啓閿� + rv = svsem_post(hashtable->wlock); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); + } + //閫氱煡鍐� + rv = svsem_set(hashtable->cond, 1); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); + } + } + rv = svsem_post(hashtable->mutex); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_foreach"); + } + +} + + +std::set<int> * hashtable_keyset(hashtable_t *hashtable) { + std::set<int> *keyset = new std::set<int>; + tailq_entry_t *item; + for (int i = 0; i < MAPSIZE; i++) { + tailq_header_t *my_tailq_head = hashtable->array[i] ; + + if (my_tailq_head == NULL ) + continue; + + TAILQ_FOREACH(item, my_tailq_head, joint) + { + keyset->insert(item->key); + } + } + return keyset; +} + + +int hashtable_alloc_key(hashtable_t *hashtable) { + int rv; + int key = START_KEY; + rv = svsem_wait(hashtable->wlock); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n"); + } + + while(_hashtable_get(hashtable, key) != NULL) { + key++; + } + // 鍗犵敤key + _hashtable_put(hashtable, key, (void *)1); + + rv = svsem_post(hashtable->wlock); + if(rv != 0) { + LoggerFactory::getLogger()->error(rv, "hashtable_alloc_key\n"); + } + return key; +} diff --git a/src/shm/mem_pool.h b/src/shm/mem_pool.h index 2ea1f6b..5a698ec 100644 --- a/src/shm/mem_pool.h +++ b/src/shm/mem_pool.h @@ -4,42 +4,25 @@ #include "sem_util.h" #define MEM_POOL_COND_KEY 0x8801 -static int mem_pool_cond = SemUtil::get(MEM_POOL_COND_KEY, 0); // static int mem_pool_mutex = SemUtil::get(MEM_POOL_COND_KEY, 1); static inline void mem_pool_init(size_t heap_size) { - if(mm_init(heap_size)) { - - } + mm_init(heap_size); } static inline void mem_pool_destroy(void) { - if(mm_destroy()) { - SemUtil::remove(mem_pool_cond); - } + mm_destroy(); } static inline void *mem_pool_malloc (size_t size) { - void *ptr; - while( (ptr = mm_malloc(size)) == NULL ) { - err_msg(0, "There is not enough memery to allocate, waiting someone else to free."); - SemUtil::set(mem_pool_cond, 0); - // wait for someone else to free space - SemUtil::dec(mem_pool_cond); - - } - - return ptr; + return mm_malloc(size); } static inline void mem_pool_free (void *ptr) { mm_free(ptr); - // notify malloc - SemUtil::set(mem_pool_cond, 1); - } diff --git a/src/shm/mm.cpp b/src/shm/mm.cpp index 7f1fda3..2b52b51 100644 --- a/src/shm/mm.cpp +++ b/src/shm/mm.cpp @@ -229,6 +229,7 @@ /* * mm_init - Initialize the memory manager, M unit + * @return 鏄惁绗竴娆″垱寤� */ bool mm_init(size_t heap_size) { @@ -301,16 +302,17 @@ return first; } - +/** + * @return 鏄惁鐪熸閿�姣佹垚鍔� + */ bool mm_destroy(void) { struct shmid_ds shmid_ds; - SemUtil::dec(mutex); if(shmctl(shmid, IPC_STAT, &shmid_ds) == 0) { //LoggerFactory::getLogger()->debug("shm_nattch=%d\n", shmid_ds.shm_nattch); - // 鍙湁褰撳墠涓�涓繘绋媋ttach鍒板叡浜唴瀛樹笂 + // 澶氫釜杩涚▼attach鍦ㄥ叡浜唴瀛樹笂 if (shmid_ds.shm_nattch > 1) { //detache if (shmdt(shmp) == -1) { @@ -319,7 +321,7 @@ SemUtil::inc(mutex); return false; } else { - + // 鍙湁褰撳墠涓�涓繘绋媋ttach鍒板叡浜唴瀛樹笂 hashtable_destroy(hashtable); //detache if (shmdt(shmp) == -1) { diff --git a/src/svsem.cpp b/src/svsem.cpp index 54db4a2..82390b6 100644 --- a/src/svsem.cpp +++ b/src/svsem.cpp @@ -6,7 +6,7 @@ perms = S_IRUSR | S_IWUSR; - semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms); + semid = semget(key, 2, IPC_CREAT | IPC_EXCL | perms); if (semid != -1) { /* Successfully created the semaphore */ union semun arg; @@ -17,6 +17,10 @@ arg.val = 0; /* So initialize it to 0 */ if (semctl(semid, 0, SETVAL, arg) == -1) err_exit(errno, "semctl 1"); + + arg.val = 1; + if (semctl(semid, 1, SETVAL, arg) == -1) + err_exit(errno, "semctl 2"); //logger.info("%ld: initialized semaphore\n", (long)getpid()); /* Perform a "no-op" semaphore operation - changes sem_otime @@ -135,6 +139,63 @@ } + + +int svsem_cond_wait(int semid ){ + + struct sembuf sops[2]; + union semun arg; + + arg.val = 1; + if (semctl(semid, 1, SETVAL, arg) == -1) { + err_msg(errno, "svsem_set"); + return errno; + } + + //閲婃斁mutex + sops[0].sem_num = 0; + sops[0].sem_op = 1; + sops[0].sem_flg = 0; + + // 绛夊緟cond + sops[1].sem_num = 1; + sops[1].sem_op = 0; + sops[1].sem_flg = 0; + + while (semop(semid, sops, 2) == -1) + if (errno != EINTR) { + // err_msg(errno, "Svsvsem_dec"); + return errno; + } + + + //閲嶆柊鑾峰彇mutex + sops[0].sem_num = 0; + sops[0].sem_op = -1; + sops[0].sem_flg = 0; + + while (semop(semid, sops, 1) == -1) + if (errno != EINTR) { + // err_msg(errno, "Svsvsem_dec"); + return errno; + } + + return 0; +} + + +int svsem_cond_signal(int semid ){ + union semun arg; + arg.val = 0; + + if (semctl(semid, 1, SETVAL, arg) == -1) { + err_msg(errno, "svsem_set"); + return errno; + } + return 0; +} + + /** * If sem_op equals 0, the value of the semaphore is checked to see whether it * currently equals 0. If it does, the operation completes immediately; otherwise, @@ -213,46 +274,4 @@ - -int svsem_cond_wait(int semid ){ - - struct sembuf sops[2]; - - //閲婃斁mutex - sops[0].sem_num = 0; - sops[0].sem_op = 1; - sops[0].sem_flg = 0; - - // 绛夊緟cond - sops[1].sem_num = 1; - sops[1].sem_op = -1; - sops[1].sem_flg = 0; - - while (semop(semid, sops, 2) == -1) - if (errno != EINTR) { - // err_msg(errno, "Svsvsem_dec"); - return errno; - } - - - - - return 0; -} - -int svsem_cond_signal(int semid ){ - struct sembuf sops; - - // 閫氱煡绛夊緟cond鐨勮繘绋� - sops.sem_num = 1; - sops.sem_op = 1; - sops.sem_flg = 0; - - int rv = semop(semid, &sops, 1); - if (rv == -1) { - // err_msg(errno, "Svsvsem_inc"); - return errno; - } - return 0; -} \ No newline at end of file -- Gitblit v1.8.0