From 91ec036cace39fd5b5f04644f6bced1f477005e0 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 21 七月 2020 19:33:28 +0800
Subject: [PATCH] update
---
src/util/include/sem_util.h | 0
build/include/shm_socket.h | 87 +
Makefile | 3
build/include/array_lock_free_queue.h | 322 ++++++
build/include/lock_free_queue.h | 360 +++++++
demo/pub_sub | 0
src/queue/shm_queue_wrapper.c | 2
build/include/shm_queue_wrapper.h | 100 +
Make.defines.linux | 2
build/include/mm.h | 20
src/socket/mod_socket.c | 9
build/include/shm_mm.h | 26
build/include/mem_pool.h | 55 +
build/include/array_lock_free_queue2.h | 332 ++++++
test/dgram_socket_test | 0
src/socket/include/shm_socket.h | 18
src/libshm_queue.a | 0
src/socket/shm_socket.c | 678 +++++++-----
demo/queue | 0
src/queue/include/shm_queue.h | 1
build/include/hashtable.h | 34
build/include/linked_lock_free_queue.h | 245 ++++
src/util/sem_util.c | 208 +--
test/dgram_socket_test.c | 74 +
build/include/logger_factory.h | 17
demo/Makefile | 13
demo/queue.c | 4
build/include/sem_util.h | 0
src/queue/include/shm_allocator.h | 6
/dev/null | 0
test/Makefile | 21
build/include/mod_socket.h | 76 +
build/include/shm_allocator.h | 102 ++
src/queue/include/shm_queue_wrapper.h | 3
demo/req_rep | 0
src/Makefile | 8
build/lib/libshm_queue.a | 0
build/include/shm_queue.h | 184 +++
src/logger_factory.h | 4
39 files changed, 2,566 insertions(+), 448 deletions(-)
diff --git a/Make.defines.linux b/Make.defines.linux
index ef20eaa..9e30018 100755
--- a/Make.defines.linux
+++ b/Make.defines.linux
@@ -25,7 +25,7 @@
# Common temp files to delete from each directory.
-TEMPFILES=core core.* *.o temp.* *.out *.a *.so
+TEMPFILES=core core.* **/*.o temp.* *.out *.a *.so
%: %.c
$(CC) $(CFLAGS) $^ -o $@ $(LDFLAGS) $(LDLIBS)
diff --git a/Makefile b/Makefile
index adac55f..44bd26e 100755
--- a/Makefile
+++ b/Makefile
@@ -1,4 +1,4 @@
-DIRS = src test test2 demo
+DIRS = src test demo
all:
for i in $(DIRS); do \
@@ -9,6 +9,7 @@
for i in $(DIRS); do \
(cd $$i && echo "cleaning $$i" && $(MAKE) clean) || exit 1; \
done
+ rm -rf build
ipcrm:
-ipcrm -a
diff --git a/build/include/array_lock_free_queue.h b/build/include/array_lock_free_queue.h
new file mode 100644
index 0000000..24a4ec6
--- /dev/null
+++ b/build/include/array_lock_free_queue.h
@@ -0,0 +1,322 @@
+#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
+#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
+#include "atomic_ops.h"
+#include <assert.h> // assert()
+#include <sched.h> // sched_yield()
+#include "logger_factory.h"
+#include "mem_pool.h"
+#include "shm_allocator.h"
+
+/// @brief implementation of an array based lock free queue with support for
+/// multiple producers
+/// This class is prevented from being instantiated directly (all members and
+/// methods are private). To instantiate a multiple producers lock free queue
+/// you must use the ArrayLockFreeQueue fachade:
+/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
+
+
+#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+template <typename ELEM_T, typename Allocator = SHM_Allocator>
+class ArrayLockFreeQueue
+{
+ // ArrayLockFreeQueue will be using this' private members
+ template <
+ typename ELEM_T_,
+ typename Allocator_,
+ template <typename T, typename AT> class Q_TYPE
+ >
+ friend class LockFreeQueue;
+
+private:
+ /// @brief constructor of the class
+ ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+
+ virtual ~ArrayLockFreeQueue();
+
+ inline uint32_t size();
+
+ inline bool full();
+
+ inline bool empty();
+
+ bool push(const ELEM_T &a_data);
+
+ bool pop(ELEM_T &a_data);
+
+ /// @brief calculate the index in the circular array that corresponds
+ /// to a particular "count" value
+ inline uint32_t countToIndex(uint32_t a_count);
+
+ ELEM_T& operator[](unsigned i);
+
+private:
+ size_t Q_SIZE;
+ /// @brief array to keep the elements
+ ELEM_T *m_theQueue;
+
+ /// @brief where a new element will be inserted
+ uint32_t m_writeIndex;
+
+ /// @brief where the next element where be extracted from
+ uint32_t m_readIndex;
+
+ /// @brief maximum read index for multiple producer queues
+ /// If it's not the same as m_writeIndex it means
+ /// there are writes pending to be "committed" to the queue, that means,
+ /// the place for the data was reserved (the index in the array) but
+ /// data is still not in the queue, so the thread trying to read will have
+ /// to wait for those other threads to save the data into the queue
+ ///
+ /// note this is only used for multiple producers
+ uint32_t m_maximumReadIndex;
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ /// @brief number of elements in the queue
+ uint32_t m_count;
+#endif
+
+
+private:
+ /// @brief disable copy constructor declaring it private
+ ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+
+};
+
+
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize):
+ Q_SIZE(qsize),
+ m_writeIndex(0), // initialisation is not atomic
+ m_readIndex(0), //
+ m_maximumReadIndex(0) //
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ ,m_count(0) //
+#endif
+{
+ m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T));
+
+}
+
+template <typename ELEM_T, typename Allocator>
+ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue()
+{
+ // std::cout << "destroy ArrayLockFreeQueue\n";
+ Allocator::deallocate(m_theQueue);
+
+}
+
+template <typename ELEM_T, typename Allocator>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count)
+{
+ // if Q_SIZE is a power of 2 this statement could be also written as
+ // return (a_count & (Q_SIZE - 1));
+ return (a_count % Q_SIZE);
+}
+
+template <typename ELEM_T, typename Allocator>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ return m_count;
+#else
+
+ uint32_t currentWriteIndex = m_maximumReadIndex;
+ uint32_t currentReadIndex = m_readIndex;
+
+ // let's think of a scenario where this function returns bogus data
+ // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
+ // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
+ // 2. afterwards this thread is preemted. While this thread is inactive 2
+ // elements are inserted and removed from the queue, so m_maximumReadIndex
+ // is 5 and m_readIndex 4. Real size is still 1
+ // 3. Now the current thread comes back from preemption and reads m_readIndex.
+ // currentReadIndex is 4
+ // 4. currentReadIndex is bigger than currentWriteIndex, so
+ // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+ // it returns that the queue is almost full, when it is almost empty
+ //
+ if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
+ {
+ return (currentWriteIndex - currentReadIndex);
+ }
+ else
+ {
+ return (Q_SIZE + currentWriteIndex - currentReadIndex);
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, typename Allocator>
+inline
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::full()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ return (m_count == (Q_SIZE));
+#else
+
+ uint32_t currentWriteIndex = m_writeIndex;
+ uint32_t currentReadIndex = m_readIndex;
+
+ if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+ {
+ // the queue is full
+ return true;
+ }
+ else
+ {
+ // not full!
+ return false;
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T, typename Allocator>
+inline
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ return (m_count == 0);
+#else
+
+ if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex))
+ {
+ // the queue is full
+ return true;
+ }
+ else
+ {
+ // not full!
+ return false;
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+
+
+
+
+
+template <typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data)
+{
+ uint32_t currentReadIndex;
+ uint32_t currentWriteIndex;
+
+ do
+ {
+
+ currentWriteIndex = m_writeIndex;
+ currentReadIndex = m_readIndex;
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ if (m_count == Q_SIZE) {
+ return false;
+ }
+ #else
+ if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+ {
+ // the queue is full
+ return false;
+ }
+ #endif
+
+ } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1)));
+
+ // We know now that this index is reserved for us. Use it to save the data
+ m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+
+ // update the maximum read index after saving the data. It wouldn't fail if there is only one thread
+ // inserting in the queue. It might fail if there are more than 1 producer threads because this
+ // operation has to be done in the same order as the previous CAS
+
+ while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1)))
+ {
+ // this is a good place to yield the thread in case there are more
+ // software threads than hardware processors and you have more
+ // than 1 producer thread
+ // have a look at sched_yield (POSIX.1b)
+ sched_yield();
+ }
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ AtomicAdd(&m_count, 1);
+#endif
+ return true;
+}
+
+
+template <typename ELEM_T, typename Allocator>
+bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data)
+{
+ uint32_t currentMaximumReadIndex;
+ uint32_t currentReadIndex;
+
+ do
+ {
+ // to ensure thread-safety when there is more than 1 producer thread
+ // a second index is defined (m_maximumReadIndex)
+ currentReadIndex = m_readIndex;
+ currentMaximumReadIndex = m_maximumReadIndex;
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ if (m_count == 0) {
+ return false;
+ }
+ #else
+ if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+ {
+ // the queue is empty or
+ // a producer thread has allocate space in the queue but is
+ // waiting to commit the data into it
+ return false;
+ }
+ #endif
+
+ // retrieve the data from the queue
+ a_data = m_theQueue[countToIndex(currentReadIndex)];
+
+ // try to perfrom now the CAS operation on the read index. If we succeed
+ // a_data already contains what m_readIndex pointed to before we
+ // increased it
+ if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1)))
+ {
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ // m_count.fetch_sub(1);
+ AtomicSub(&m_count, 1);
+ #endif
+ return true;
+ }
+
+ // it failed retrieving the element off the queue. Someone else must
+ // have read the element stored at countToIndex(currentReadIndex)
+ // before we could perform the CAS operation
+
+ } while(1); // keep looping to try again!
+
+ // Something went wrong. it shouldn't be possible to reach here
+ assert(0);
+
+ // Add this return statement to avoid compiler warnings
+ return false;
+}
+
+template <typename ELEM_T, typename Allocator>
+ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i)
+{
+ int currentCount = m_count;
+ uint32_t currentReadIndex = m_readIndex;
+ if (i < 0 || i >= currentCount)
+ {
+ std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n";
+ std::exit(EXIT_FAILURE);
+ }
+ return m_theQueue[countToIndex(currentReadIndex+i)];
+}
+
+#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
diff --git a/build/include/array_lock_free_queue2.h b/build/include/array_lock_free_queue2.h
new file mode 100644
index 0000000..3b79b7f
--- /dev/null
+++ b/build/include/array_lock_free_queue2.h
@@ -0,0 +1,332 @@
+#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
+#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
+
+#include <assert.h> // assert()
+#include <sched.h> // sched_yield()
+#include "logger_factory.h"
+
+/// @brief implementation of an array based lock free queue with support for
+/// multiple producers
+/// This class is prevented from being instantiated directly (all members and
+/// methods are private). To instantiate a multiple producers lock free queue
+/// you must use the ArrayLockFreeQueue fachade:
+/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
+
+
+#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+template <typename ELEM_T>
+class ArrayLockFreeQueue
+{
+ // ArrayLockFreeQueue will be using this' private members
+ template <
+ typename ELEM_T_,
+ template <typename T> class Q_TYPE >
+ friend class LockFreeQueue;
+
+private:
+ /// @brief constructor of the class
+ ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+
+ virtual ~ArrayLockFreeQueue();
+
+ inline uint32_t size();
+
+ inline bool full();
+
+ inline bool empty();
+
+ bool push(const ELEM_T &a_data);
+
+ bool pop(ELEM_T &a_data);
+
+ /// @brief calculate the index in the circular array that corresponds
+ /// to a particular "count" value
+ inline uint32_t countToIndex(uint32_t a_count);
+
+ ELEM_T& operator[](unsigned i);
+
+private:
+ size_t Q_SIZE;
+ /// @brief array to keep the elements
+ ELEM_T *m_theQueue;
+
+ /// @brief where a new element will be inserted
+ std::atomic<uint32_t> m_writeIndex;
+
+ /// @brief where the next element where be extracted from
+ std::atomic<uint32_t> m_readIndex;
+
+ /// @brief maximum read index for multiple producer queues
+ /// If it's not the same as m_writeIndex it means
+ /// there are writes pending to be "committed" to the queue, that means,
+ /// the place for the data was reserved (the index in the array) but
+ /// data is still not in the queue, so the thread trying to read will have
+ /// to wait for those other threads to save the data into the queue
+ ///
+ /// note this is only used for multiple producers
+ std::atomic<uint32_t> m_maximumReadIndex;
+
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ /// @brief number of elements in the queue
+ std::atomic<uint32_t> m_count;
+#endif
+
+
+private:
+ /// @brief disable copy constructor declaring it private
+ ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src);
+
+};
+
+
+template <typename ELEM_T>
+ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize):
+ Q_SIZE(qsize),
+ m_writeIndex(0), // initialisation is not atomic
+ m_readIndex(0), //
+ m_maximumReadIndex(0) //
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ ,m_count(0) //
+#endif
+{
+ m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T));
+
+}
+
+template <typename ELEM_T>
+ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue()
+{
+ // std::cout << "destroy ArrayLockFreeQueue\n";
+ mm_free(m_theQueue);
+
+}
+
+template <typename ELEM_T>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count)
+{
+ // if Q_SIZE is a power of 2 this statement could be also written as
+ // return (a_count & (Q_SIZE - 1));
+ return (a_count % Q_SIZE);
+}
+
+template <typename ELEM_T>
+inline
+uint32_t ArrayLockFreeQueue<ELEM_T>::size()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ return m_count.load();
+#else
+
+ uint32_t currentWriteIndex = m_maximumReadIndex.load();
+ uint32_t currentReadIndex = m_readIndex.load();
+
+ // let's think of a scenario where this function returns bogus data
+ // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run
+ // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1
+ // 2. afterwards this thread is preemted. While this thread is inactive 2
+ // elements are inserted and removed from the queue, so m_maximumReadIndex
+ // is 5 and m_readIndex 4. Real size is still 1
+ // 3. Now the current thread comes back from preemption and reads m_readIndex.
+ // currentReadIndex is 4
+ // 4. currentReadIndex is bigger than currentWriteIndex, so
+ // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is,
+ // it returns that the queue is almost full, when it is almost empty
+ //
+ if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex))
+ {
+ return (currentWriteIndex - currentReadIndex);
+ }
+ else
+ {
+ return (Q_SIZE + currentWriteIndex - currentReadIndex);
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T>
+inline
+bool ArrayLockFreeQueue<ELEM_T>::full()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ return (m_count.load() == (Q_SIZE));
+#else
+
+ uint32_t currentWriteIndex = m_writeIndex;
+ uint32_t currentReadIndex = m_readIndex;
+
+ if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+ {
+ // the queue is full
+ return true;
+ }
+ else
+ {
+ // not full!
+ return false;
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+template <typename ELEM_T>
+inline
+bool ArrayLockFreeQueue<ELEM_T>::empty()
+{
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ return (m_count.load() == 0);
+#else
+
+ if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load()))
+ {
+ // the queue is full
+ return true;
+ }
+ else
+ {
+ // not full!
+ return false;
+ }
+#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+}
+
+
+template <typename ELEM_T>
+bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data)
+{
+ uint32_t currentReadIndex;
+ uint32_t currentWriteIndex;
+
+ do
+ {
+ currentWriteIndex = m_writeIndex.load();
+ currentReadIndex = m_readIndex.load();
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ if (m_count.load() == Q_SIZE) {
+ return false;
+ }
+#else
+ if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex))
+ {
+ // the queue is full
+ return false;
+ }
+#endif
+
+ // There is more than one producer. Keep looping till this thread is able
+ // to allocate space for current piece of data
+ //
+ // using compare_exchange_strong because it isn't allowed to fail spuriously
+ // When the compare_exchange operation is in a loop the weak version
+ // will yield better performance on some platforms, but here we'd have to
+ // load m_writeIndex all over again
+ } while (!m_writeIndex.compare_exchange_strong(
+ currentWriteIndex, (currentWriteIndex + 1)));
+
+ // Just made sure this index is reserved for this thread.
+ m_theQueue[countToIndex(currentWriteIndex)] = a_data;
+ //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) );
+
+ // update the maximum read index after saving the piece of data. It can't
+ // fail if there is only one thread inserting in the queue. It might fail
+ // if there is more than 1 producer thread because this operation has to
+ // be done in the same order as the previous CAS
+ //
+ // using compare_exchange_weak because they are allowed to fail spuriously
+ // (act as if *this != expected, even if they are equal), but when the
+ // compare_exchange operation is in a loop the weak version will yield
+ // better performance on some platforms.
+ while (!m_maximumReadIndex.compare_exchange_weak(
+ currentWriteIndex, (currentWriteIndex + 1)))
+ {
+ // this is a good place to yield the thread in case there are more
+ // software threads than hardware processors and you have more
+ // than 1 producer thread
+ // have a look at sched_yield (POSIX.1b)
+ sched_yield();
+ }
+
+ // The value was successfully inserted into the queue
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ m_count.fetch_add(1);
+#endif
+
+ return true;
+}
+
+template <typename ELEM_T>
+bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data)
+{
+ uint32_t currentMaximumReadIndex;
+ uint32_t currentReadIndex;
+
+ do
+ {
+ currentReadIndex = m_readIndex.load();
+ currentMaximumReadIndex = m_maximumReadIndex.load();
+
+ #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+
+ if (m_count.load() == 0) {
+ return false;
+ }
+ #else
+ // to ensure thread-safety when there is more than 1 producer
+ // thread a second index is defined (m_maximumReadIndex)
+ if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex))
+ {
+ // the queue is empty or
+ // a producer thread has allocate space in the queue but is
+ // waiting to commit the data into it
+ return false;
+ }
+ #endif
+
+ // retrieve the data from the queue
+ a_data = m_theQueue[countToIndex(currentReadIndex)];
+ //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) );
+ // try to perfrom now the CAS operation on the read index. If we succeed
+ // a_data already contains what m_readIndex pointed to before we
+ // increased it
+ if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1)))
+ {
+ // got here. The value was retrieved from the queue. Note that the
+ // data inside the m_queue array is not deleted nor reseted
+#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE
+ m_count.fetch_sub(1);
+#endif
+ return true;
+ }
+
+ // it failed retrieving the element off the queue. Someone else must
+ // have read the element stored at countToIndex(currentReadIndex)
+ // before we could perform the CAS operation
+
+ } while(1); // keep looping to try again!
+
+ // Something went wrong. it shouldn't be possible to reach here
+ assert(0);
+
+ // Add this return statement to avoid compiler warnings
+ return false;
+}
+
+
+template <typename ELEM_T>
+ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i)
+{
+ int currentCount = m_count.load();
+ uint32_t currentReadIndex = m_readIndex.load();
+ if (i < 0 || i >= currentCount)
+ {
+ std::cerr << "Error in array limits: " << i << " is out of range\n";
+ std::exit(EXIT_FAILURE);
+ }
+ return m_theQueue[countToIndex(currentReadIndex+i)];
+}
+
+#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__
diff --git a/build/include/hashtable.h b/build/include/hashtable.h
new file mode 100644
index 0000000..726a5bc
--- /dev/null
+++ b/build/include/hashtable.h
@@ -0,0 +1,34 @@
+#ifndef __HASHTABLE_H__
+#define __HASHTABLE_H__
+
+#include <sys/queue.h>
+#include <set>
+
+#define MAPSIZE 100
+
+typedef struct hashtable_t
+{
+ struct tailq_header_t* array[MAPSIZE];
+ int mutex;
+ int wlock;
+ int cond;
+ size_t readcnt;
+
+} hashtable_t;
+typedef void (*hashtable_foreach_cb)(int key, void *value);
+
+void hashtable_init(hashtable_t *hashtable);
+void *hashtable_get(hashtable_t *hashtable, int key);
+void hashtable_put(hashtable_t *hashtable, int key, void *value);
+void *hashtable_remove(hashtable_t *hashtable, int key);
+void hashtable_removeall(hashtable_t *hashtable);
+
+
+void hashtable_foreach(hashtable_t *hashtable, hashtable_foreach_cb cb);
+
+void hashtable_printall(hashtable_t *hashtable);
+
+int hashtable_alloc_key(hashtable_t *hashtable);
+
+std::set<int> * hashtable_keyset(hashtable_t *hashtable) ;
+#endif
diff --git a/build/include/linked_lock_free_queue.h b/build/include/linked_lock_free_queue.h
new file mode 100644
index 0000000..3906a42
--- /dev/null
+++ b/build/include/linked_lock_free_queue.h
@@ -0,0 +1,245 @@
+// queue.h -- interface for a queue
+#ifndef __LINKED_LOCK_FREE_QUEUE_H_
+#define __LINKED_LOCK_FREE_QUEUE_H_
+#include "mm.h"
+#include "sem_util.h"
+
+template <typename T> class Node;
+
+template <typename T>
+class Pointer {
+public:
+
+ Node<T> *ptr;
+ unsigned long count;
+ Pointer( Node<T> *node = NULL, int c=0) noexcept : ptr(node), count(c) {}
+
+ bool operator == (const Pointer<T> o) const {
+ return (o.ptr == ptr) && (o.count == count);
+ }
+ bool operator != (const Pointer<T> o) const {
+ return !((o.ptr == ptr) && (o.count == count));
+ }
+
+
+
+};
+
+template <typename T>
+class Node {
+public:
+ alignas(16) std::atomic<Pointer<T> > next;
+ T value;
+
+ Node() {
+ }
+
+ void *operator new(size_t size){
+ return mm_malloc(size);
+ }
+
+ void operator delete(void *p) {
+ return mm_free(p);
+ }
+};
+
+
+
+
+
+template <typename ELEM_T>
+class LinkedLockFreeQueue
+{
+
+ template <
+ typename ELEM_T_,
+ template <typename T> class Q_TYPE >
+ friend class LockFreeQueue;
+private:
+// class scope definitions
+ enum {Q_SIZE = 10};
+
+// private class members
+ std::atomic<Pointer<ELEM_T> > Head; // pointer to front of Queue
+ std::atomic<Pointer<ELEM_T> > Tail; // pointer to rear of Queue
+ //std::atomic_uint count; // current number of size in Queue
+ std::atomic_uint count;
+ const size_t qsize; // maximum number of size in Queue
+ // preemptive definitions to prevent public copying
+ LinkedLockFreeQueue(const LinkedLockFreeQueue & q) : qsize(0) { }
+ LinkedLockFreeQueue & operator=(const LinkedLockFreeQueue & q) { return *this;}
+protected:
+ LinkedLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit
+ ~LinkedLockFreeQueue();
+ bool empty() const;
+ bool full() const;
+ unsigned int size() const;
+ bool push(const ELEM_T &item); // add item to end
+ bool pop(ELEM_T &item);
+
+
+ ELEM_T& operator[](unsigned i);
+
+};
+
+
+// Queue methods
+template <typename T>
+LinkedLockFreeQueue<T>::LinkedLockFreeQueue(size_t qs) : count(0), qsize(qs)
+{
+ Node<T> *node = new Node<T>;
+ Pointer<T> pointer(node, 0);
+
+ Head.store(pointer, std::memory_order_relaxed);
+ Tail.store(pointer, std::memory_order_relaxed);
+
+}
+
+template <typename T>
+LinkedLockFreeQueue<T>::~LinkedLockFreeQueue()
+{
+ LoggerFactory::getLogger().debug("LinkedLockFreeQueue destory");
+ Node<T> * nodeptr;
+ Pointer<T> tmp = Head.load(std::memory_order_relaxed);
+ while((nodeptr = tmp.ptr) != NULL) {
+ tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
+ //std::cerr << "delete " << nodeptr << std::endl;
+ delete nodeptr;
+
+ }
+}
+
+template <typename T>
+bool LinkedLockFreeQueue<T>::empty() const
+{
+ return count == 0;
+}
+
+template <typename T>
+bool LinkedLockFreeQueue<T>::full() const
+{
+ return count == qsize;
+}
+
+template <typename T>
+unsigned int LinkedLockFreeQueue<T>::size() const
+{
+ return count;
+}
+
+// Add item to queue
+template <typename T>
+bool LinkedLockFreeQueue<T>::push(const T & item)
+{
+ if (full())
+ return false;
+
+ Node<T> * node = new Node<T>;
+ node->value = item;
+
+
+ Pointer<T> tail ;
+ Pointer<T> next ;
+
+
+ while(true) {
+ tail = Tail.load(std::memory_order_relaxed);
+ next = (tail.ptr->next).load(std::memory_order_relaxed);
+ if (tail == Tail.load(std::memory_order_relaxed)) {
+ if (next.ptr == NULL) {
+ if ((tail.ptr->next).compare_exchange_weak(next,
+ Pointer<T>(node, next.count+1),
+ std::memory_order_release,
+ std::memory_order_relaxed) )
+ break;
+ else
+ Tail.compare_exchange_weak(tail,
+ Pointer<T>(next.ptr, tail.count+1),
+ std::memory_order_release,
+ std::memory_order_relaxed);
+ }
+
+ }
+ }
+
+ Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1),
+ std::memory_order_release,
+ std::memory_order_relaxed);
+ count++;
+ return true;
+}
+
+
+
+
+// Place front item into item variable and remove from queue
+template <typename T>
+bool LinkedLockFreeQueue<T>::pop(T & item)
+{
+ if (empty())
+ return false;
+
+ Pointer<T> head;
+ Pointer<T> tail;
+ Pointer<T> next;
+
+ while(true) {
+ head = Head.load(std::memory_order_relaxed);
+ tail = Tail.load(std::memory_order_relaxed);
+ next = (head.ptr->next).load();
+ if (head == Head.load(std::memory_order_relaxed)) {
+ if(head.ptr == tail.ptr) {
+ if (next.ptr == NULL)
+ return false;
+ // Tail is falling behind. Try to advance it
+ Tail.compare_exchange_weak(tail,
+ Pointer<T>(next.ptr, tail.count+1),
+ std::memory_order_release,
+ std::memory_order_relaxed);
+ } else {
+ item = next.ptr->value;
+ if (Head.compare_exchange_weak(head,
+ Pointer<T>(next.ptr, head.count+1),
+ std::memory_order_release,
+ std::memory_order_relaxed)) {
+ delete head.ptr;
+ break;
+ }
+
+ }
+ }
+
+ }
+
+ count--;
+ return true;
+
+}
+
+
+template <class T>
+T& LinkedLockFreeQueue<T>::operator[](unsigned int i)
+{
+ if (i < 0 || i >= count)
+ {
+ std::cerr << "Error in array limits: " << i << " is out of range\n";
+ std::exit(EXIT_FAILURE);
+ }
+
+
+ Pointer<T> tmp = Head.load(std::memory_order_relaxed);
+ //Pointer<T> tail = Tail.load(std::memory_order_relaxed);
+
+ while(i > 0) {
+ //std::cout << i << ":" << std::endl;
+ tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
+ i--;
+ }
+
+ tmp = (tmp.ptr->next).load(std::memory_order_relaxed);
+ return tmp.ptr->value;
+}
+
+
+
+#endif
diff --git a/build/include/lock_free_queue.h b/build/include/lock_free_queue.h
new file mode 100644
index 0000000..f34079f
--- /dev/null
+++ b/build/include/lock_free_queue.h
@@ -0,0 +1,360 @@
+#ifndef __LOCK_FREE_QUEUE_H__
+#define __LOCK_FREE_QUEUE_H__
+
+#include <usg_common.h>
+#include <assert.h> // assert()
+#include "mem_pool.h"
+#include "sem_util.h"
+#include "logger_factory.h"
+#include "shm_allocator.h"
+
+// default Queue size
+#define LOCK_FREE_Q_DEFAULT_SIZE 16
+
+// define this macro if calls to "size" must return the real size of the
+// queue. If it is undefined that function will try to take a snapshot of
+// the queue, but returned value might be bogus
+
+
+// forward declarations for default template values
+//
+
+template <typename ELEM_T, typename Allocator>
+class ArrayLockFreeQueue;
+
+// template <typename ELEM_T>
+// class LinkedLockFreeQueue;
+
+
+/// @brief Lock-free queue based on a circular array
+/// No allocation of extra memory for the nodes handling is needed, but it has
+/// to add extra overhead (extra CAS operation) when inserting to ensure the
+/// thread-safety of the queue when the queue type is not
+/// ArrayLockFreeQueueSingleProducer.
+///
+/// examples of instantiation:
+/// ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1)
+/// // and defaulted to single producer
+/// ArrayLockFreeQueue<int, 16> q;
+/// // queue of ints of size (16 - 1) and
+/// // defaulted to single producer
+/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q;
+/// // queue of ints of size (100 - 1) with support
+/// // for multiple producers
+///
+/// ELEM_T represents the type of elementes pushed and popped from the queue
+/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1)
+/// This number should be a power of 2 to ensure
+/// indexes in the circular queue keep stable when the uint32_t
+/// variable that holds the current position rolls over from FFFFFFFF
+/// to 0. For instance
+/// 2 -> 0x02
+/// 4 -> 0x04
+/// 8 -> 0x08
+/// 16 -> 0x10
+/// (...)
+/// 1024 -> 0x400
+/// 2048 -> 0x800
+///
+/// if queue size is not defined as requested, let's say, for
+/// instance 100, when current position is FFFFFFFF (4,294,967,295)
+/// index in the circular array is 4,294,967,295 % 100 = 95.
+/// When that value is incremented it will be set to 0, that is the
+/// last 4 elements of the queue are not used when the counter rolls
+/// over to 0
+/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and
+/// ArrayLockFreeQueue are supported (single producer
+/// by default)
+template <
+ typename ELEM_T,
+ typename Allocator = SHM_Allocator,
+ template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue
+ >
+class LockFreeQueue
+{
+
+private:
+ int slots;
+ int items;
+
+public:
+ // int mutex;
+ LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+
+ /// @brief destructor of the class.
+ /// Note it is not virtual since it is not expected to inherit from this
+ /// template
+ ~LockFreeQueue();
+ std::atomic_uint reference;
+ /// @brief constructor of the class
+
+
+ /// @brief returns the current number of items in the queue
+ /// It tries to take a snapshot of the size of the queue, but in busy environments
+ /// this function might return bogus values.
+ ///
+ /// If a reliable queue size must be kept you might want to have a look at
+ /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE'
+ /// it enables a reliable size though it hits overall performance of the queue
+ /// (when the reliable size variable is on it's got an impact of about 20% in time)
+ inline uint32_t size();
+
+ /// @brief return true if the queue is full. False otherwise
+ /// It tries to take a snapshot of the size of the queue, but in busy
+ /// environments this function might return bogus values. See help in method
+ /// LockFreeQueue::size
+ inline bool full();
+
+ inline bool empty();
+
+ inline ELEM_T& operator[](unsigned i);
+
+ /// @brief push an element at the tail of the queue
+ /// @param the element to insert in the queue
+ /// Note that the element is not a pointer or a reference, so if you are using large data
+ /// structures to be inserted in the queue you should think of instantiate the template
+ /// of the queue as a pointer to that large structure
+ /// @return true if the element was inserted in the queue. False if the queue was full
+ bool push(const ELEM_T &a_data);
+ bool push_nowait(const ELEM_T &a_data);
+ bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
+
+ /// @brief pop the element at the head of the queue
+ /// @param a reference where the element in the head of the queue will be saved to
+ /// Note that the a_data parameter might contain rubbish if the function returns false
+ /// @return true if the element was successfully extracted from the queue. False if the queue was empty
+ bool pop(ELEM_T &a_data);
+ bool pop_nowait(ELEM_T &a_data);
+ bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
+
+
+ void *operator new(size_t size);
+ void operator delete(void *p);
+
+protected:
+ /// @brief the actual queue. methods are forwarded into the real
+ /// implementation
+ Q_TYPE<ELEM_T, Allocator> m_qImpl;
+
+private:
+ /// @brief disable copy constructor declaring it private
+ LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src);
+};
+
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize)
+{
+// std::cout << "LockFreeQueue init reference=" << reference << std::endl;
+ slots = SemUtil::get(IPC_PRIVATE, qsize);
+ items = SemUtil::get(IPC_PRIVATE, 0);
+ // mutex = SemUtil::get(IPC_PRIVATE, 1);
+}
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
+{
+ LoggerFactory::getLogger().debug("LockFreeQueue desctroy");
+ SemUtil::remove(slots);
+ SemUtil::remove(items);
+}
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size()
+{
+ return m_qImpl.size();
+}
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full()
+{
+ return m_qImpl.full();
+}
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty()
+{
+ return m_qImpl.empty();
+}
+
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data)
+{
+ if (SemUtil::dec(slots) == -1) {
+ err_msg(errno, "LockFreeQueue push");
+ return false;
+ }
+
+ if ( m_qImpl.push(a_data) ) {
+ SemUtil::inc(items);
+ return true;
+ }
+ return false;
+
+}
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data)
+{
+ if (SemUtil::dec_nowait(slots) == -1) {
+ if (errno == EAGAIN)
+ return false;
+ else {
+ err_msg(errno, "LockFreeQueue push_nowait");
+ return false;
+ }
+
+ }
+
+ if ( m_qImpl.push(a_data)) {
+ SemUtil::inc(items);
+ return true;
+ }
+ return false;
+
+}
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
+{
+
+ if (SemUtil::dec_timeout(slots, timeout) == -1) {
+ if (errno == EAGAIN)
+ return false;
+ else {
+ err_msg(errno, "LockFreeQueue push_timeout");
+ return false;
+ }
+ }
+
+ if (m_qImpl.push(a_data)){
+ SemUtil::inc(items);
+ return true;
+ }
+ return false;
+
+}
+
+
+
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data)
+{
+ if (SemUtil::dec(items) == -1) {
+ err_msg(errno, "LockFreeQueue pop");
+ return false;
+ }
+
+ if (m_qImpl.pop(a_data)) {
+ SemUtil::inc(slots);
+ return true;
+ }
+ return false;
+
+}
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data)
+{
+ if (SemUtil::dec_nowait(items) == -1) {
+ if (errno == EAGAIN)
+ return false;
+ else {
+ err_msg(errno, "LockFreeQueue pop_nowait");
+ return false;
+ }
+ }
+
+ if (m_qImpl.pop(a_data)) {
+ SemUtil::inc(slots);
+ return true;
+ }
+ return false;
+
+}
+
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
+{
+ if (SemUtil::dec_timeout(items, timeout) == -1) {
+ if (errno == EAGAIN)
+ return false;
+ else {
+ err_msg(errno, "LockFreeQueue pop_timeout");
+ return false;
+ }
+ }
+
+ if (m_qImpl.pop(a_data)) {
+ SemUtil::inc(slots);
+ return true;
+ }
+ return false;
+
+}
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) {
+ return m_qImpl.operator[](i);
+}
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){
+ return Allocator::allocate(size);
+}
+
+template <
+ typename ELEM_T,
+ typename Allocator,
+ template <typename T, typename AT> class Q_TYPE>
+void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) {
+ return Allocator::deallocate(p);
+}
+
+// include implementation files
+//#include "linked_lock_free_queue.h"
+#include "array_lock_free_queue.h"
+
+#endif // _LOCK_FREE_QUEUE_H__
diff --git a/build/include/logger_factory.h b/build/include/logger_factory.h
new file mode 100644
index 0000000..384e3e0
--- /dev/null
+++ b/build/include/logger_factory.h
@@ -0,0 +1,17 @@
+#ifndef __LOGGER_FACTORY_H__
+#define __LOGGER_FACTORY_H__
+#include "logger.h"
+
+class LoggerFactory {
+public:
+
+ static Logger getLogger() {
+//ERROR ALL DEBUG
+ static Logger logger(Logger::DEBUG);
+ return logger;
+ }
+};
+
+#endif
+
+
diff --git a/build/include/mem_pool.h b/build/include/mem_pool.h
new file mode 100644
index 0000000..17a7c5c
--- /dev/null
+++ b/build/include/mem_pool.h
@@ -0,0 +1,55 @@
+#ifndef _MEM_POOL_H_
+#define _MEM_POOL_H_
+#include "mm.h"
+#include "sem_util.h"
+#define MEM_POOL_COND_KEY 0x8801
+
+static int mem_pool_cond = SemUtil::get(MEM_POOL_COND_KEY, 0);
+
+// static int mem_pool_mutex = SemUtil::get(MEM_POOL_COND_KEY, 1);
+
+static inline void mem_pool_init(size_t heap_size) {
+ if(mm_init(heap_size)) {
+
+ }
+}
+
+static inline void mem_pool_destroy(void) {
+ if(mm_destroy()) {
+ SemUtil::remove(mem_pool_cond);
+ }
+
+}
+
+static inline void *mem_pool_malloc (size_t size) {
+ void *ptr;
+ while( (ptr = mm_malloc(size)) == NULL ) {
+ err_msg(0, "There is not enough memery to allocate, waiting someone else to free.");
+ SemUtil::set(mem_pool_cond, 0);
+ // wait for someone else to free space
+ SemUtil::dec(mem_pool_cond);
+
+ }
+
+ return ptr;
+}
+
+static inline void mem_pool_free (void *ptr) {
+ mm_free(ptr);
+ // notify malloc
+ SemUtil::set(mem_pool_cond, 1);
+
+}
+
+static inline void *mem_pool_realloc (void *ptr, size_t size) {
+ return mm_realloc(ptr, size);
+}
+
+static inline hashtable_t * mem_pool_get_hashtable() {
+ return mm_get_hashtable();
+
+}
+// extern int mm_checkheap(int verbose);
+
+
+#endif
\ No newline at end of file
diff --git a/build/include/mm.h b/build/include/mm.h
new file mode 100644
index 0000000..f0ab764
--- /dev/null
+++ b/build/include/mm.h
@@ -0,0 +1,20 @@
+#ifndef MM_HDR_H
+#define MM_HDR_H /* Prevent accidental double inclusion */
+
+#include <usg_common.h>
+#include "usg_typedef.h"
+#include "hashtable.h"
+
+extern bool mm_init(size_t heap_size);
+extern bool mm_destroy(void);
+
+extern void *mm_malloc (size_t size);
+extern void mm_free (void *ptr);
+extern void *mm_realloc(void *ptr, size_t size);
+extern hashtable_t * mm_get_hashtable();
+
+// extern int mm_checkheap(int verbose);
+
+// extern void *get_mm_start_brk();
+// extern size_t get_mm_max_size();
+#endif
diff --git a/build/include/mod_socket.h b/build/include/mod_socket.h
new file mode 100644
index 0000000..21498ee
--- /dev/null
+++ b/build/include/mod_socket.h
@@ -0,0 +1,76 @@
+#ifndef __MOD_SOCKET_H__
+#define __MOD_SOCKET_H__
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+enum socket_mod_t
+{
+ PULL_PUSH = 1,
+ REQ_REP = 2,
+ PAIR = 3,
+ PUB_SUB = 4,
+ SURVEY = 5,
+ BUS = 6
+
+};
+
+/**
+ * 鍒涘缓socket
+ * @return socket鍦板潃
+*/
+void *mod_open_socket(int mod);
+
+/**
+ * 鍏抽棴socket
+*/
+int mod_close_socket(void * _socket);
+
+/**
+ * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓�
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
+*/
+int mod_socket_bind(void * _socket, int port);
+
+
+/**
+ * 鏈嶅姟绔紑鍚繛鎺ョ洃鍚�
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
+ */
+int mod_listen(void * _socket);
+
+/**
+ * 瀹㈡埛绔彂璧疯繛鎺ヨ姹�
+ */
+int mod_connect(void * _socket, int port);
+
+/**
+ * 鍙戦�佷俊鎭�
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
+ */
+int mod_send(void * _socket, const void *buf, const int size);
+
+/**
+ * 鎺ユ敹淇℃伅
+ * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜
+*/
+int mod_recv(void * _socket, void **buf, int *size) ;
+
+/**
+ * 閲婃斁鎺ユ敹淇℃伅鐨刡uf
+ */
+void mod_free(void *buf);
+
+
+/**
+ * 鑾峰彇soket绔彛鍙�
+ */
+int mod_get_socket_port(void * _socket);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
\ No newline at end of file
diff --git a/src/util/sem_util.h b/build/include/sem_util.h
similarity index 100%
copy from src/util/sem_util.h
copy to build/include/sem_util.h
diff --git a/build/include/shm_allocator.h b/build/include/shm_allocator.h
new file mode 100644
index 0000000..023bc9d
--- /dev/null
+++ b/build/include/shm_allocator.h
@@ -0,0 +1,102 @@
+#ifndef __SHM_ALLOCATOR_H__
+#define __SHM_ALLOCATOR_H__
+#include "usg_common.h"
+#include "mem_pool.h"
+#include <new>
+#include <cstdlib> // for exit()
+#include <climits> // for UNIX_MAX
+#include <cstddef>
+
+
+
+template<class T> class SHM_STL_Allocator
+{
+public:
+ typedef T value_type;
+ typedef T* pointer;
+ typedef const T* const_pointer;
+ typedef T& reference;
+ typedef const T& const_reference;
+ typedef size_t size_type;
+ typedef ptrdiff_t difference_type;
+
+
+ SHM_STL_Allocator() {};
+ ~SHM_STL_Allocator() {};
+ template<class U> SHM_STL_Allocator(const SHM_STL_Allocator<U>& t) { };
+ template<class U> struct rebind { typedef SHM_STL_Allocator<U> other; };
+
+ pointer allocate(size_type n, const void* hint=0) {
+// fprintf(stderr, "allocate n=%u, hint= %p\n",n, hint);
+ return((T*) (mm_malloc(n * sizeof(T))));
+ }
+
+ void deallocate(pointer p, size_type n) {
+// fprintf(stderr, "dealocate n=%u" ,n);
+ mm_free((void*)p);
+ }
+
+ void construct(pointer p, const T& value) {
+ ::new(p) T(value);
+ }
+
+ void construct(pointer p)
+ {
+ ::new(p) T();
+ }
+
+ void destroy(pointer p) {
+ p->~T();
+ }
+
+ pointer address(reference x) {
+ return (pointer)&x;
+ }
+
+ const_pointer address(const_reference x) {
+ return (const_pointer)&x;
+ }
+
+ size_type max_size() const {
+ return size_type(UINT_MAX/sizeof(T));
+ }
+};
+
+
+class SHM_Allocator {
+ public:
+ static void *allocate (size_t size) {
+ printf("shm_allocator malloc\n");
+ return mem_pool_malloc(size);
+ }
+
+ static void deallocate (void *ptr) {
+ printf("shm_allocator free\n");
+ return mem_pool_free(ptr);
+ }
+};
+
+
+class DM_Allocator {
+ public:
+ static void *allocate (size_t size) {
+ printf("dm_allocator malloc\n");
+ return malloc(size);
+ }
+
+ static void deallocate (void *ptr) {
+ printf("dm_allocator free\n");
+ return free(ptr);
+ }
+};
+
+
+// template<class charT, class traits = char _traits<charT>,
+// class Allocator = allocator<charT> >
+
+
+
+
+typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > shmstring;
+
+#endif
\ No newline at end of file
diff --git a/build/include/shm_mm.h b/build/include/shm_mm.h
new file mode 100644
index 0000000..b32568e
--- /dev/null
+++ b/build/include/shm_mm.h
@@ -0,0 +1,26 @@
+#ifndef __SHM_MM_H__
+#define __SHM_MM_H__
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * 鍒濆鍖栧叡浜唴瀛�
+ * @size 鍏变韩鍐呭瓨澶у皬, 鍗曚綅M
+ *
+ */
+void shm_init(int size);
+
+/**
+ * 閿�姣佸叡浜唴瀛�
+ * 鏁翠釜杩涚▼閫�鍑烘椂闇�瑕佹墽琛岃繖涓柟娉曪紝璇ユ柟娉曢鍏堜細妫�鏌ユ槸鍚﹁繕鏈夊叾浠栬繘绋嬪湪浣跨敤璇ュ叡浜唴瀛橈紝濡傛灉杩樻湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ氨鍙槸detach,濡傛灉娌℃湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ垯閿�姣佹暣鍧楀唴瀛樸��
+ */
+void shm_destroy();
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
diff --git a/build/include/shm_queue.h b/build/include/shm_queue.h
new file mode 100644
index 0000000..d853774
--- /dev/null
+++ b/build/include/shm_queue.h
@@ -0,0 +1,184 @@
+#ifndef __SHM_QUEUE_H__
+#define __SHM_QUEUE_H__
+
+#include "usg_common.h"
+#include "hashtable.h"
+#include "lock_free_queue.h"
+#include "logger_factory.h"
+#include "shm_allocator.h"
+
+// default Queue size
+// #define LOCK_FREE_Q_DEFAULT_SIZE 16
+
+template < typename ELEM_T>
+class SHMQueue
+{
+
+private:
+ const int KEY;
+
+public:
+ /// @brief constructor of the class
+ SHMQueue(int key=0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE);
+
+
+ ~SHMQueue();
+
+
+ inline uint32_t size();
+
+ inline bool full();
+ inline bool empty();
+
+ inline bool push(const ELEM_T &a_data);
+ inline bool push_nowait(const ELEM_T &a_data);
+ inline bool push_timeout(const ELEM_T &a_data, struct timespec * timeout);
+ inline bool pop(ELEM_T &a_data);
+ inline bool pop_nowait(ELEM_T &a_data);
+ inline bool pop_timeout(ELEM_T &a_data, struct timespec * timeout);
+
+ inline ELEM_T& operator[](unsigned i);
+
+ static void remove_queues_exclude(int *keys, size_t length);
+private:
+
+
+protected:
+ /// @brief the actual queue-> methods are forwarded into the real
+ /// implementation
+ LockFreeQueue<ELEM_T, SHM_Allocator>* queue;
+
+private:
+ /// @brief disable copy constructor declaring it private
+ SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src);
+};
+
+
+template < typename ELEM_T >
+void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length)
+{
+ hashtable_t *hashtable = mm_get_hashtable();
+ std::set<int>* keyset = hashtable_keyset(hashtable);
+ std::set<int>::iterator keyItr;
+ LockFreeQueue<ELEM_T, SHM_Allocator>* mqueue;
+ bool found;
+ for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) {
+ found = false;
+ for(size_t i = 0; i < length; i++) {
+ if(*keyItr == keys[i]) {
+ found = true;
+ break;
+ }
+ }
+ if(!found) {
+ mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr);
+ delete mqueue;
+ }
+ }
+ delete keyset;
+
+}
+
+template < typename ELEM_T >
+SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize): KEY(key)
+{
+
+ hashtable_t *hashtable = mm_get_hashtable();
+ queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key);
+ //LockFreeQueue<int, 10000> q;
+ if (queue == NULL || (void *)queue == (void *)1) {
+ queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize);
+ hashtable_put(hashtable, key, (void *)queue);
+ }
+ queue->reference++;
+ LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load());
+}
+
+template < typename ELEM_T >
+SHMQueue<ELEM_T>::~SHMQueue()
+{
+ queue->reference--;
+ LoggerFactory::getLogger().debug("SHMQueue destructor reference===%d", queue->reference.load());
+ if(queue->reference.load() == 0) {
+ delete queue;
+ hashtable_t *hashtable = mm_get_hashtable();
+ hashtable_remove(hashtable, KEY);
+ LoggerFactory::getLogger().debug("SHMQueue destructor delete queue");
+ }
+}
+
+template < typename ELEM_T >
+inline uint32_t SHMQueue<ELEM_T>::size()
+{
+ return queue->size();
+}
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::full()
+{
+ return queue->full();
+}
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::empty()
+{
+ return queue->empty();
+}
+
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data)
+{
+ return queue->push(a_data);
+
+}
+
+template <
+ typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data)
+{
+ return queue->push_nowait(a_data);
+
+}
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, struct timespec * timeout)
+{
+
+ return queue->push_timeout(a_data, timeout);
+
+}
+
+
+
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data)
+{
+ return queue->pop(a_data);
+
+}
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data)
+{
+ return queue->pop_nowait(a_data);
+
+}
+
+
+template < typename ELEM_T >
+inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec * timeout)
+{
+ return queue->pop_timeout(a_data, timeout);
+
+}
+
+template < typename ELEM_T >
+inline ELEM_T& SHMQueue<ELEM_T>::operator[](unsigned i) {
+ return queue->operator[](i);
+}
+
+
+
+#endif
diff --git a/build/include/shm_queue_wrapper.h b/build/include/shm_queue_wrapper.h
new file mode 100644
index 0000000..984bd5a
--- /dev/null
+++ b/build/include/shm_queue_wrapper.h
@@ -0,0 +1,100 @@
+#ifndef __SHM_QUEUE_WRAPPER_H__
+#define __SHM_QUEUE_WRAPPER_H__
+
+#include "usg_common.h"
+#include "usg_typedef.h"
+
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+
+
+//绉婚櫎涓嶅寘鍚湪keys涓殑闃熷垪
+void shm_remove_queues_exclude(void *keys, int length);
+/**
+ * 鍒涘缓闃熷垪
+ * @ shmqueue
+ * @ key 鏍囪瘑鍏变韩闃熷垪鐨勫敮涓�鏍囪瘑, key鏄竴涓寚閽堥噷闈㈠瓨鍌ㄤ簡key鐨勫�硷紝 濡傛灉key鐨勫�间负-1绯荤粺浼氳嚜鍔ㄥ垎閰嶄竴涓猭ey鍊煎苟鎶婅key鐨勫�艰祴缁檏ey鎸囬拡銆傚鏋渒ey鐨勫�间笉浼氱┖浼氭鏌ユ槸鍚︽湁閲嶅缁戝畾鐨勬儏鍐�, 鏈夐噸澶嶅氨鎶ラ敊娌℃湁灏卞垱寤洪槦鍒楀苟缁戝畾key.
+ * @ queue_size 闃熷垪澶у皬
+ */
+void* shmqueue_create( int * key, int queue_size);
+
+/**
+ * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑�
+ */
+void* shmqueue_attach(int key) ;
+
+/**
+ * 閿�姣�
+*/
+void shmqueue_drop(void * _shmqueue);
+
+/**
+ * 闃熷垪鍏冪礌鐨勪釜鏁�
+ */
+int shmqueue_size(void * _shmqueue) ;
+
+/**
+ * 鏄惁宸叉弧
+ * @return 1鏄紝 0鍚�
+ */
+int shmqueue_full(void * _shmqueue);
+
+/**
+ * 鏄惁涓虹┖
+ * @return 1鏄紝 0鍚�
+ */
+int shmqueue_empty(void * _shmqueue) ;
+
+/**
+ * 鍏ラ槦, 闃熷垪婊℃椂绛夊緟.
+ * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触
+ */
+int shmqueue_push(void * _shmqueue, void *src, int size);
+
+/**
+ * 鍏ラ槦, 闃熷垪婊℃椂绔嬪嵆杩斿洖.
+ * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触
+ */
+int shmqueue_push_nowait(void * _shmqueue, void *src, int size) ;
+
+/**
+ * 鍏ラ槦, 鎸囧畾鏃堕棿鍐呭叆闃熶笉鎴愬姛灏辫繑鍥�
+ * @sec 绉�
+ * @nsec 绾崇
+ * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触
+ */
+int shmqueue_push_timeout(void * _shmqueue, void *src, int size, int sec, int nsec) ;
+
+/**
+ * 鍑洪槦, 闃熷垪绌烘椂绛夊緟
+ * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触
+ */
+int shmqueue_pop(void * _shmqueue, void **dest, int *size);
+
+/**
+ * 鍑洪槦, 闃熷垪绌烘椂绔嬪嵆杩斿洖
+ * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触
+ */
+int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) ;
+
+/**
+ * 鍑洪槦, 鎸囧畾鏃堕棿鍐呭嚭闃熶笉鎴愬姛灏辫繑鍥�
+ * @sec绉�
+ * @nsec绾崇
+ * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触
+ */
+int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec);
+
+/**
+ * 閲婃斁鍑洪槦鍒嗛厤鐨勫唴瀛�
+ */
+void shmqueue_free(void *ptr);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
\ No newline at end of file
diff --git a/build/include/shm_socket.h b/build/include/shm_socket.h
new file mode 100644
index 0000000..30b85da
--- /dev/null
+++ b/build/include/shm_socket.h
@@ -0,0 +1,87 @@
+#ifndef __SHM_SOCKET_H__
+#define __SHM_SOCKET_H__
+
+#include "usg_common.h"
+#include "usg_typedef.h"
+#include "shm_queue.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+enum shm_msg_type_t
+{
+ SHM_SOCKET_OPEN = 1,
+ SHM_SOCKET_OPEN_REPLY = 2,
+ SHM_SOCKET_CLOSE = 3,
+ SHM_COMMON_MSG = 4
+
+};
+
+enum shm_socket_type_t
+{
+ SHM_SOCKET_STREAM = 1,
+ SHM_SOCKET_DGRAM = 2
+
+};
+
+enum shm_connection_status_t {
+ SHM_CONN_CLOSED=1,
+ SHM_CONN_LISTEN=2,
+ SHM_CONN_ESTABLISHED=3
+};
+
+typedef struct shm_msg_t {
+ int port;
+ shm_msg_type_t type;
+ size_t size;
+ void * buf;
+
+} shm_msg_t;
+
+
+typedef struct shm_socket_t {
+ shm_socket_type_t socket_type;
+ // 鏈湴port
+ int port;
+ shm_connection_status_t status;
+ SHMQueue<shm_msg_t> *queue;
+ SHMQueue<shm_msg_t> *remoteQueue;
+ LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue;
+ LockFreeQueue<shm_msg_t, DM_Allocator> *acceptQueue;
+ std::map<int, shm_socket_t* > *clientSocketMap;
+ pthread_t dispatch_thread;
+
+} shm_socket_t;
+
+
+
+
+shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
+
+
+int shm_close_socket(shm_socket_t * socket) ;
+
+
+int shm_socket_bind(shm_socket_t * socket, int port) ;
+
+int shm_listen(shm_socket_t * socket) ;
+
+shm_socket_t* shm_accept(shm_socket_t* socket);
+
+int shm_connect(shm_socket_t * socket, int port);
+
+int shm_send(shm_socket_t * socket, const void *buf, const int size) ;
+
+int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
+
+int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port);
+
+int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
\ No newline at end of file
diff --git a/build/lib/libshm_queue.a b/build/lib/libshm_queue.a
new file mode 100644
index 0000000..4e9e1cb
--- /dev/null
+++ b/build/lib/libshm_queue.a
Binary files differ
diff --git a/demo/Makefile b/demo/Makefile
index a80c730..f7c6491 100644
--- a/demo/Makefile
+++ b/demo/Makefile
@@ -2,19 +2,19 @@
# Makefile for common library.
#
ROOT=..
-LDLIBS+=-Wl,-rpath=$(ROOT)/queue:$(ROOT)/lib
+LDLIBS+=-Wl,-rpath=$(ROOT)/lib:$(ROOT)/build/lib
# 寮�婧愬伐鍏峰寘璺緞
-LDDIR += -L$(ROOT)/queue
+LDDIR += -L$(ROOT)/build/lib
# 寮�婧愬伐鍏峰寘
LDLIBS += -lshm_queue -lusgcommon -lpthread
-INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include
+INCLUDE += -I$(ROOT)/build/include
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
-PROGS = req_rep pub_sub
+PROGS = req_rep pub_sub queue
build: $(PROGS)
@@ -25,8 +25,3 @@
clean:
rm -f $(TEMPFILES) $(PROGS)
-
-
-
-$(LIBQUEUE):
- (cd $(ROOT)/queue && $(MAKE))
\ No newline at end of file
diff --git a/demo/pub_sub b/demo/pub_sub
new file mode 100755
index 0000000..265621f
--- /dev/null
+++ b/demo/pub_sub
Binary files differ
diff --git a/demo/queue b/demo/queue
new file mode 100755
index 0000000..b1e1056
--- /dev/null
+++ b/demo/queue
Binary files differ
diff --git a/test2/test_queue_wrapper.c b/demo/queue.c
similarity index 92%
rename from test2/test_queue_wrapper.c
rename to demo/queue.c
index 6483fb3..265b574 100644
--- a/test2/test_queue_wrapper.c
+++ b/demo/queue.c
@@ -1,5 +1,5 @@
#include "shm_queue_wrapper.h"
-#include "mm.h"
+#include "shm_mm.h"
// typedef struct message_t
// {
@@ -22,7 +22,7 @@
for(i = 0; i < qsize; i++) {
sprintf(msg, "%d hello", i);
//鍏ラ槦
- if(shmqueue_push(queue, (void *)msg, sizeof(msg))) {
+ if(shmqueue_push(queue, (void *)msg, strlen(msg) + 1)) {
printf("push: %s\n", msg );
}
}
diff --git a/demo/req_rep b/demo/req_rep
new file mode 100755
index 0000000..3d35107
--- /dev/null
+++ b/demo/req_rep
Binary files differ
diff --git a/src/Makefile b/src/Makefile
index 2ab0bb5..9f399e1 100644
--- a/src/Makefile
+++ b/src/Makefile
@@ -23,14 +23,16 @@
MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
+PREFIX = $(ROOT)/build
+
ifeq ($(DEBUG),y)
MYLIBS = $(LIBSQUEUE)
else
MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE)
endif
-all: build
-
+all: install
+
build: $(MYLIBS)
@@ -55,7 +57,7 @@
install -d $(PREFIX)/lib/
install -m 644 $^ $(PREFIX)/lib/
install -d $(PREFIX)/include/
- install -m 644 $(MINCLUDE)/* $(PREFIX)/include/
+ install -m 644 ./*.h ./queue/include/* ./socket/include/* ./util/include/* $(PREFIX)/include/
clean:
rm -f $(TEMPFILES)
diff --git a/src/libshm_queue.a b/src/libshm_queue.a
index 1406c9b..4e9e1cb 100644
--- a/src/libshm_queue.a
+++ b/src/libshm_queue.a
Binary files differ
diff --git a/src/logger_factory.h b/src/logger_factory.h
index e738ad9..384e3e0 100644
--- a/src/logger_factory.h
+++ b/src/logger_factory.h
@@ -6,8 +6,8 @@
public:
static Logger getLogger() {
-//ERROR ALL
- static Logger logger(Logger::ERROR);
+//ERROR ALL DEBUG
+ static Logger logger(Logger::DEBUG);
return logger;
}
};
diff --git a/src/queue/include/shm_allocator.h b/src/queue/include/shm_allocator.h
index 6d9dcc6..023bc9d 100644
--- a/src/queue/include/shm_allocator.h
+++ b/src/queue/include/shm_allocator.h
@@ -1,7 +1,7 @@
#ifndef __SHM_ALLOCATOR_H__
#define __SHM_ALLOCATOR_H__
#include "usg_common.h"
-#include "mm.h"
+#include "mem_pool.h"
#include <new>
#include <cstdlib> // for exit()
#include <climits> // for UNIX_MAX
@@ -67,12 +67,12 @@
public:
static void *allocate (size_t size) {
printf("shm_allocator malloc\n");
- return mm_malloc(size);
+ return mem_pool_malloc(size);
}
static void deallocate (void *ptr) {
printf("shm_allocator free\n");
- return mm_free(ptr);
+ return mem_pool_free(ptr);
}
};
diff --git a/src/queue/include/shm_queue.h b/src/queue/include/shm_queue.h
index 394545b..d853774 100644
--- a/src/queue/include/shm_queue.h
+++ b/src/queue/include/shm_queue.h
@@ -2,7 +2,6 @@
#define __SHM_QUEUE_H__
#include "usg_common.h"
-#include "mm.h"
#include "hashtable.h"
#include "lock_free_queue.h"
#include "logger_factory.h"
diff --git a/src/queue/include/shm_queue_wrapper.h b/src/queue/include/shm_queue_wrapper.h
index 51b73f5..984bd5a 100644
--- a/src/queue/include/shm_queue_wrapper.h
+++ b/src/queue/include/shm_queue_wrapper.h
@@ -3,8 +3,7 @@
#include "usg_common.h"
#include "usg_typedef.h"
-#include "shm_queue.h"
-#include "shm_allocator.h"
+
#ifdef __cplusplus
extern "C" {
diff --git a/src/queue/libshm_queue.a b/src/queue/libshm_queue.a
deleted file mode 100644
index 1b5c93b..0000000
--- a/src/queue/libshm_queue.a
+++ /dev/null
Binary files differ
diff --git a/src/queue/libshm_queue.so b/src/queue/libshm_queue.so
deleted file mode 100755
index 598c829..0000000
--- a/src/queue/libshm_queue.so
+++ /dev/null
Binary files differ
diff --git a/src/queue/shm_queue_wrapper.c b/src/queue/shm_queue_wrapper.c
index 98ec273..29a8f64 100644
--- a/src/queue/shm_queue_wrapper.c
+++ b/src/queue/shm_queue_wrapper.c
@@ -2,6 +2,8 @@
#include "mem_pool.h"
#include "hashtable.h"
+#include "shm_queue.h"
+#include "shm_allocator.h"
typedef struct ele_t {
size_t size;
diff --git a/src/socket/include/shm_socket.h b/src/socket/include/shm_socket.h
index 8b14f06..30b85da 100644
--- a/src/socket/include/shm_socket.h
+++ b/src/socket/include/shm_socket.h
@@ -4,11 +4,6 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "shm_queue.h"
-#include "shm_allocator.h"
-
-#include "mem_pool.h"
-#include "hashtable.h"
-#include "sem_util.h"
#ifdef __cplusplus
extern "C" {
@@ -20,6 +15,13 @@
SHM_SOCKET_OPEN_REPLY = 2,
SHM_SOCKET_CLOSE = 3,
SHM_COMMON_MSG = 4
+
+};
+
+enum shm_socket_type_t
+{
+ SHM_SOCKET_STREAM = 1,
+ SHM_SOCKET_DGRAM = 2
};
@@ -39,6 +41,7 @@
typedef struct shm_socket_t {
+ shm_socket_type_t socket_type;
// 鏈湴port
int port;
shm_connection_status_t status;
@@ -54,7 +57,7 @@
-shm_socket_t *shm_open_socket();
+shm_socket_t *shm_open_socket(shm_socket_type_t socket_type);
int shm_close_socket(shm_socket_t * socket) ;
@@ -72,6 +75,9 @@
int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
+int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port);
+
+int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port);
#ifdef __cplusplus
diff --git a/src/socket/mod_socket.c b/src/socket/mod_socket.c
index cc358f6..fcb5e58 100644
--- a/src/socket/mod_socket.c
+++ b/src/socket/mod_socket.c
@@ -1,7 +1,12 @@
+#include "usg_common.h"
#include "mod_socket.h"
#include "shm_socket.h"
-#include "usg_common.h"
+#include "shm_allocator.h"
+#include "mem_pool.h"
+#include "hashtable.h"
+#include "sem_util.h"
#include "logger_factory.h"
+
static Logger logger = LoggerFactory::getLogger();
typedef struct mod_entry_t
@@ -28,7 +33,7 @@
*/
void *mod_open_socket(int mod) {
mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t));
- socket->shm_socket=shm_open_socket();
+ socket->shm_socket=shm_open_socket(SHM_SOCKET_STREAM);
socket->is_server = 0;
socket->mod = (socket_mod_t)mod;
socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16);
diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c
index 4fb90cf..260cdc2 100644
--- a/src/socket/shm_socket.c
+++ b/src/socket/shm_socket.c
@@ -1,4 +1,5 @@
#include "shm_socket.h"
+#include "hashtable.h"
#include "logger_factory.h"
#include <map>
@@ -14,11 +15,16 @@
void * _client_run_msg_rev(void* _socket);
+int _shm_close_dgram_socket(shm_socket_t *socket);
+
+
+int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
+
SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ;
-shm_socket_t *shm_open_socket() {
+shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
-
+ socket->socket_type = socket_type;
socket->port = -1;
socket->dispatch_thread = 0;
socket->status=SHM_CONN_CLOSED;
@@ -26,8 +32,381 @@
return socket;
}
+int shm_close_socket(shm_socket_t *socket) {
+ switch(socket->socket_type) {
+ case SHM_SOCKET_STREAM:
+ return _shm_close_stream_socket(socket, true);
+ case SHM_SOCKET_DGRAM:
+ return _shm_close_dgram_socket(socket);
+ default:
+ return -1;
+ }
+ return -1;
+
+}
-int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) {
+int shm_socket_bind(shm_socket_t * socket, int port) {
+ socket -> port = port;
+ return 0;
+}
+
+int shm_listen(shm_socket_t* socket) {
+
+ if(socket->socket_type != SHM_SOCKET_STREAM) {
+ err_exit(0, "can not invoke shm_listen method with a socket which is not a SHM_SOCKET_STREAM socket");
+ }
+
+ int port;
+ hashtable_t *hashtable = mm_get_hashtable();
+ if(socket -> port == -1) {
+ port = hashtable_alloc_key(hashtable);
+ socket -> port = port;
+ } else {
+
+ if(hashtable_get(hashtable, socket->port)!= NULL) {
+ err_exit(0, "key %d has already been in used!", socket->port);
+ }
+ }
+
+ socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+ socket->clientSocketMap = new std::map<int, shm_socket_t* >;
+ socket->status = SHM_CONN_LISTEN;
+ pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket);
+
+
+ return 0;
+}
+
+
+/**
+ * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹�
+ *
+*/
+shm_socket_t* shm_accept(shm_socket_t* socket) {
+ if(socket->socket_type != SHM_SOCKET_STREAM) {
+ err_exit(0, "can not invoke shm_accept method with a socket which is not a SHM_SOCKET_STREAM socket");
+ }
+ hashtable_t *hashtable = mm_get_hashtable();
+ int client_port;
+ shm_socket_t *client_socket;
+ shm_msg_t src;
+
+ if (socket->acceptQueue->pop(src) ) {
+
+// print_msg("===accept:", src);
+ client_port = src.port;
+ client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
+ client_socket->port = socket->port;
+ // client_socket->queue= socket->queue;
+ //鍒濆鍖栨秷鎭痲ueue
+ client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+ //杩炴帴鍒板鏂筿ueue
+ client_socket->remoteQueue = _attach_remote_queue(client_port);
+
+ socket->clientSocketMap->insert({client_port, client_socket});
+
+ /*
+ * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰�
+ */
+ //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰
+ struct timespec timeout = {1, 0};
+ shm_msg_t msg;
+ msg.port = socket->port;
+ msg.size = 0;
+ msg.type = SHM_SOCKET_OPEN_REPLY;
+
+ if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
+ {
+ client_socket->status = SHM_CONN_ESTABLISHED;
+ return client_socket;
+ } else {
+ err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触");
+ return NULL;
+ }
+
+
+ } else {
+ err_exit(errno, "shm_accept");
+ }
+ return NULL;
+
+}
+
+
+int shm_connect(shm_socket_t* socket, int port) {
+ if(socket->socket_type != SHM_SOCKET_STREAM) {
+ err_exit(0, "can not invoke shm_connect method with a socket which is not a SHM_SOCKET_STREAM socket");
+ }
+ hashtable_t *hashtable = mm_get_hashtable();
+ if(hashtable_get(hashtable, port)== NULL) {
+ err_exit(0, "shm_connect锛歝onnect at port %d failed!", port);
+ }
+
+ if(socket->port == -1) {
+ socket->port = hashtable_alloc_key(hashtable);
+ } else {
+
+ if(hashtable_get(hashtable, socket->port)!= NULL) {
+ err_exit(0, "key %d has already been in used!", socket->port);
+ }
+ }
+
+ socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ socket->remoteQueue = _attach_remote_queue(port);
+ socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
+
+
+ //鍙戦�乷pen璇锋眰
+ struct timespec timeout = {1, 0};
+ shm_msg_t msg;
+ msg.port = socket->port;
+ msg.size = 0;
+ msg.type=SHM_SOCKET_OPEN;
+ socket->remoteQueue->push_timeout(msg, &timeout);
+
+ //鎺ュ彈open reply
+ if(socket->queue->pop(msg)) {
+ // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺�
+ if(msg.type == SHM_SOCKET_OPEN_REPLY) {
+ socket->status = SHM_CONN_ESTABLISHED;
+ pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
+ } else {
+ err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!");
+ }
+
+ } else {
+ err_exit(0, "connect failted!");
+ }
+
+ return 0;
+}
+
+
+int shm_send(shm_socket_t *socket, const void *buf, const int size) {
+ if(socket->socket_type != SHM_SOCKET_STREAM) {
+ err_exit(0, "can not invoke shm_send method with a socket which is not a SHM_SOCKET_STREAM socket");
+ }
+ // hashtable_t *hashtable = mm_get_hashtable();
+ // if(socket->remoteQueue == NULL) {
+ // err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!");
+ // return -1;
+ // }
+ shm_msg_t dest;
+ dest.type=SHM_COMMON_MSG;
+ dest.port = socket->port;
+ dest.size = size;
+ dest.buf = mm_malloc(size);
+ memcpy(dest.buf, buf, size);
+
+
+ if(socket->remoteQueue->push(dest)) {
+ return 0;
+ } else {
+ err_msg(errno, "connection has been closed!");
+ return -1;
+ }
+}
+
+
+int shm_recv(shm_socket_t* socket, void **buf, int *size) {
+ if(socket->socket_type != SHM_SOCKET_STREAM) {
+ err_exit(0, "can not invoke shm_recv method with a socket which is not a SHM_SOCKET_STREAM socket");
+ }
+ shm_msg_t src;
+
+ if (socket->messageQueue->pop(src)) {
+ void * _buf = malloc(src.size);
+ memcpy(_buf, src.buf, src.size);
+ *buf = _buf;
+ *size = src.size;
+ mm_free(src.buf);
+ return 0;
+ } else {
+ return -1;
+ }
+
+}
+
+
+
+// 鐭繛鎺ユ柟寮忓彂閫�
+int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port) {
+ hashtable_t *hashtable = mm_get_hashtable();
+
+ if(socket->queue == NULL) {
+ if(socket->port == -1) {
+ socket->port = hashtable_alloc_key(hashtable);
+ } else {
+
+ if(hashtable_get(hashtable, socket->port)!= NULL) {
+ err_exit(0, "key %d has already been in used!", socket->port);
+ }
+ }
+
+ socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ }
+ if (port == socket->port) {
+ err_msg(0, "can not send to your self!");
+ return -1;
+ }
+
+ shm_msg_t dest;
+ dest.type=SHM_COMMON_MSG;
+ dest.port = socket->port;
+ dest.size = size;
+ dest.buf = mm_malloc(size);
+ memcpy(dest.buf, buf, size);
+
+ SHMQueue<shm_msg_t> *remoteQueue = _attach_remote_queue(port);
+ if(remoteQueue->push(dest)) {
+ delete remoteQueue;
+ return 0;
+ } else {
+ delete remoteQueue;
+ err_msg(errno, "sendto port %d failed!", port);
+ return -1;
+ }
+}
+
+
+// 鐭繛鎺ユ柟寮忔帴鍙�
+int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port){
+ hashtable_t *hashtable = mm_get_hashtable();
+ if(socket->queue == NULL) {
+ if(socket->port == -1) {
+ socket->port = hashtable_alloc_key(hashtable);
+ } else {
+
+ if(hashtable_get(hashtable, socket->port)!= NULL) {
+ err_exit(0, "key %d has already been in used!", socket->port);
+ }
+ }
+
+ socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
+ }
+
+ shm_msg_t src;
+//logger.debug("shm_recvfrom pop before");
+ if (socket->queue->pop(src)) {
+ void * _buf = malloc(src.size);
+ memcpy(_buf, src.buf, src.size);
+ *buf = _buf;
+ *size = src.size;
+ *port = src.port;
+ mm_free(src.buf);
+//logger.debug("shm_recvfrom pop after");
+ return 0;
+ } else {
+ return -1;
+ }
+}
+
+
+/**
+ * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑�
+ */
+SHMQueue<shm_msg_t> * _attach_remote_queue(int port) {
+ hashtable_t *hashtable = mm_get_hashtable();
+ if(hashtable_get(hashtable, port)== NULL) {
+ err_exit(0, "_remote_queue_attach锛歝onnet at port %d failed!", port);
+ return NULL;
+ }
+
+ SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0);
+ return queue;
+}
+
+
+
+
+
+void _server_close_conn_to_client(shm_socket_t* socket, int port) {
+ shm_socket_t *client_socket;
+ auto iter = socket->clientSocketMap->find(port);
+ if( iter != socket->clientSocketMap->end() ) {
+ socket->clientSocketMap->erase(iter);
+ }
+ //free((void *)client_socket);
+
+}
+
+/**
+ * server绔悇绉嶇被鍨嬫秷鎭紙锛夊湪杩欓噷杩涚▼鍒嗘嫞
+ */
+void * _server_run_msg_rev(void* _socket) {
+ pthread_detach(pthread_self());
+ shm_socket_t* socket = (shm_socket_t*) _socket;
+ struct timespec timeout = {1, 0};
+ shm_msg_t src;
+ shm_socket_t *client_socket;
+ std::map<int, shm_socket_t* >::iterator iter;
+
+ while(socket->queue->pop(src)) {
+
+ switch (src.type) {
+ case SHM_SOCKET_OPEN :
+ socket->acceptQueue->push_timeout(src, &timeout);
+ break;
+ case SHM_SOCKET_CLOSE :
+ _server_close_conn_to_client(socket, src.port);
+ break;
+ case SHM_COMMON_MSG :
+
+ iter = socket->clientSocketMap->find(src.port);
+ if( iter != socket->clientSocketMap->end()) {
+ client_socket= iter->second;
+ // print_msg("_server_run_msg_rev push before", src);
+ client_socket->messageQueue->push_timeout(src, &timeout);
+ // print_msg("_server_run_msg_rev push after", src);
+ }
+
+ break;
+
+ default:
+ err_msg(0, "socket.__shm_rev__: undefined message type.");
+ }
+ }
+
+ return NULL;
+}
+
+
+
+void _client_close_conn_to_server(shm_socket_t* socket) {
+
+ _shm_close_stream_socket(socket, false);
+}
+
+
+/**
+ * client绔殑鍚勭绫诲瀷娑堟伅锛堬級鍦ㄨ繖閲岃繘绋嬪垎鎷�
+ */
+void * _client_run_msg_rev(void* _socket) {
+ pthread_detach(pthread_self());
+ shm_socket_t* socket = (shm_socket_t*) _socket;
+ struct timespec timeout = {1, 0};
+ shm_msg_t src;
+
+ while(socket->queue->pop(src)) {
+ switch (src.type) {
+
+ case SHM_SOCKET_CLOSE :
+ _client_close_conn_to_server(socket);
+ break;
+ case SHM_COMMON_MSG :
+ socket->messageQueue->push_timeout(src, &timeout);
+ break;
+ default:
+ err_msg(0, "socket.__shm_rev__: undefined message type.");
+ }
+ }
+
+ return NULL;
+}
+
+
+int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote) {
socket->status = SHM_CONN_CLOSED;
//缁欏鏂瑰彂閫佷竴涓叧闂繛鎺ョ殑娑堟伅
struct timespec timeout = {1, 0};
@@ -86,295 +465,14 @@
}
-int shm_close_socket(shm_socket_t *socket) {
- return _shm_close_socket(socket, true);
-}
-
-int shm_socket_bind(shm_socket_t * socket, int port) {
- shm_socket_t * _socket = (shm_socket_t *) socket;
- _socket -> port = port;
+int _shm_close_dgram_socket(shm_socket_t *socket){
+ if(socket->queue != NULL) {
+ delete socket->queue;
+ socket->queue = NULL;
+ }
+ free(socket);
return 0;
}
-
-int shm_listen(shm_socket_t* socket) {
- int port;
- hashtable_t *hashtable = mm_get_hashtable();
- if(socket -> port == -1) {
- port = hashtable_alloc_key(hashtable);
- socket -> port = port;
- } else {
-
- if(hashtable_get(hashtable, socket->port)!= NULL) {
- err_exit(0, "key %d has already been in used!", socket->port);
- }
- }
-
- socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
- socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
- socket->clientSocketMap = new std::map<int, shm_socket_t* >;
- socket->status = SHM_CONN_LISTEN;
- pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket);
-
-
- return 0;
-}
-
-void _server_close_conn_to_client(shm_socket_t* socket, int port) {
- shm_socket_t *client_socket;
- auto iter = socket->clientSocketMap->find(port);
- if( iter != socket->clientSocketMap->end() ) {
- socket->clientSocketMap->erase(iter);
- }
- //free((void *)client_socket);
-
-}
-
-/**
- * server绔悇绉嶇被鍨嬫秷鎭紙锛夊湪杩欓噷杩涚▼鍒嗘嫞
- */
-void * _server_run_msg_rev(void* _socket) {
- pthread_detach(pthread_self());
- shm_socket_t* socket = (shm_socket_t*) _socket;
- struct timespec timeout = {1, 0};
- shm_msg_t src;
- shm_socket_t *client_socket;
- std::map<int, shm_socket_t* >::iterator iter;
-
- while(socket->queue->pop(src)) {
-
- switch (src.type) {
- case SHM_SOCKET_OPEN :
- socket->acceptQueue->push_timeout(src, &timeout);
- break;
- case SHM_SOCKET_CLOSE :
- _server_close_conn_to_client(socket, src.port);
- break;
- case SHM_COMMON_MSG :
-
- iter = socket->clientSocketMap->find(src.port);
- if( iter != socket->clientSocketMap->end()) {
- client_socket= iter->second;
- // print_msg("_server_run_msg_rev push before", src);
- client_socket->messageQueue->push_timeout(src, &timeout);
- // print_msg("_server_run_msg_rev push after", src);
- }
-
- break;
-
- default:
- err_msg(0, "socket.__shm_rev__: undefined message type.");
- }
- }
-
- return NULL;
-}
-
-
-
-
-/**
- * 鎺ュ彈瀹㈡埛绔缓绔嬫柊杩炴帴鐨勮姹�
- *
-*/
-
-shm_socket_t* shm_accept(shm_socket_t* socket) {
- hashtable_t *hashtable = mm_get_hashtable();
- int client_port;
- shm_socket_t *client_socket;
- shm_msg_t src;
-
- if (socket->acceptQueue->pop(src) ) {
-
-// print_msg("===accept:", src);
- client_port = src.port;
- client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
- client_socket->port = socket->port;
- // client_socket->queue= socket->queue;
- //鍒濆鍖栨秷鎭痲ueue
- client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
- //杩炴帴鍒板鏂筿ueue
- client_socket->remoteQueue = _attach_remote_queue(client_port);
-
- socket->clientSocketMap->insert({client_port, client_socket});
-
- /*
- * shm_accept 鐢ㄦ埛鎵ц鐨勬柟娉� 涓巁server_run_msg_rev鍦ㄤ袱涓笉鍚岀殑闄愬埗宸ヤ綔,accept瑕佷繚璇佸湪瀹㈡埛鐨勫彂閫佹秷鎭箣鍓嶅畬鎴愯祫婧愮殑鍑嗗宸ヤ綔锛屼互閬垮厤鍑虹幇绔炴�侀棶棰�
- */
- //鍙戦�乷pen_reply,鍥炲簲瀹㈡埛绔殑connect璇锋眰
- struct timespec timeout = {1, 0};
- shm_msg_t msg;
- msg.port = socket->port;
- msg.size = 0;
- msg.type = SHM_SOCKET_OPEN_REPLY;
-
- if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
- {
- client_socket->status = SHM_CONN_ESTABLISHED;
- return client_socket;
- } else {
- err_msg(0, "shm_accept: 鍙戦�乷pen_reply澶辫触");
- return NULL;
- }
-
-
- } else {
- err_exit(errno, "shm_accept");
- }
- return NULL;
-
-}
-
-
-int shm_connect(shm_socket_t* socket, int port) {
- hashtable_t *hashtable = mm_get_hashtable();
- if(hashtable_get(hashtable, port)== NULL) {
- err_exit(0, "shm_connect锛歝onnect at port %d failed!", port);
- }
- if(socket->port == -1) {
- socket->port = hashtable_alloc_key(hashtable);
- } else {
-
- if(hashtable_get(hashtable, socket->port)!= NULL) {
- err_exit(0, "key %d has already been in used!", socket->port);
- }
- }
-
- socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
- socket->remoteQueue = _attach_remote_queue(port);
- socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
-
-
- //鍙戦�乷pen璇锋眰
- struct timespec timeout = {1, 0};
- shm_msg_t msg;
- msg.port = socket->port;
- msg.size = 0;
- msg.type=SHM_SOCKET_OPEN;
- socket->remoteQueue->push_timeout(msg, &timeout);
-
- //鎺ュ彈open reply
- if(socket->queue->pop(msg)) {
- // 鍦ㄨ繖閲宻erver绔凡缁忓噯澶囧ソ鎺ュ彈瀹㈡埛绔彂閫佽姹備簡,瀹屾垚涓庢湇鍔$鐨勮繛鎺�
- if(msg.type == SHM_SOCKET_OPEN_REPLY) {
- socket->status = SHM_CONN_ESTABLISHED;
- pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
- } else {
- err_exit(0, "shm_connect: 涓嶅尮閰嶇殑搴旂瓟淇℃伅!");
- }
-
- } else {
- err_exit(0, "connect failted!");
- }
-
- return 0;
-}
-
-void _client_close_conn_to_server(shm_socket_t* socket) {
-
- _shm_close_socket(socket, false);
-}
-
-
-/**
- * client绔殑鍚勭绫诲瀷娑堟伅锛堬級鍦ㄨ繖閲岃繘绋嬪垎鎷�
- */
-void * _client_run_msg_rev(void* _socket) {
- pthread_detach(pthread_self());
- shm_socket_t* socket = (shm_socket_t*) _socket;
- struct timespec timeout = {1, 0};
- shm_msg_t src;
-
- while(socket->queue->pop(src)) {
- switch (src.type) {
-
- case SHM_SOCKET_CLOSE :
- _client_close_conn_to_server(socket);
- break;
- case SHM_COMMON_MSG :
- socket->messageQueue->push_timeout(src, &timeout);
- break;
- default:
- err_msg(0, "socket.__shm_rev__: undefined message type.");
- }
- }
-
- return NULL;
-}
-
-
-
-
-
-int shm_send(shm_socket_t *socket, const void *buf, const int size) {
- // hashtable_t *hashtable = mm_get_hashtable();
- // if(socket->remoteQueue == NULL) {
- // err_msg(errno, "褰撳墠瀹㈡埛绔棤杩炴帴!");
- // return -1;
- // }
- shm_msg_t dest;
- dest.type=SHM_COMMON_MSG;
- dest.port = socket->port;
- dest.size = size;
- dest.buf = mm_malloc(size);
- memcpy(dest.buf, buf, size);
-
- // struct timeval time;
- // gettimeofday(&time, NULL);
-//err_msg(0, "%d %d=======>push befor %d", time.tv_sec, time.tv_usec, socket->port);
- if(socket->remoteQueue->push(dest)) {
-
- //gettimeofday(&time, NULL);
-//err_msg(0, "%d %d=======>push after %d", time.tv_sec, time.tv_usec, socket->port);
- return 0;
- } else {
- err_msg(errno, "connection has been closed!");
- return -1;
- }
-
-
-}
-
-int shm_recv(shm_socket_t* socket, void **buf, int *size) {
- shm_msg_t src;
-
-// struct timeval time;
-// gettimeofday(&time, NULL);
-// err_msg(0, "%d %d=======>pop befor %d", time.tv_sec, time.tv_usec, socket->port);
- if (socket->messageQueue->pop(src)) {
-// gettimeofday(&time, NULL);
-// err_msg(0, "%d %d=======>pop after %d", time.tv_sec, time.tv_usec, socket->port);
- void * _buf = malloc(src.size);
- memcpy(_buf, src.buf, src.size);
- *buf = _buf;
- *size = src.size;
- mm_free(src.buf);
- return 0;
- } else {
- return -1;
- }
-
-
-
-}
-
-
-/**
- * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑�
- */
-SHMQueue<shm_msg_t> * _attach_remote_queue(int port) {
- hashtable_t *hashtable = mm_get_hashtable();
- if(hashtable_get(hashtable, port)== NULL) {
- err_exit(0, "_remote_queue_attach锛歝onnet at port %d failed!", port);
- return NULL;
- }
-
- SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0);
- return queue;
-}
-
-
-
-
diff --git a/src/util/sem_util.h b/src/util/include/sem_util.h
similarity index 100%
rename from src/util/sem_util.h
rename to src/util/include/sem_util.h
diff --git a/src/util/sem_util.c b/src/util/sem_util.c
index ceedac3..e2b2c20 100644
--- a/src/util/sem_util.c
+++ b/src/util/sem_util.c
@@ -1,158 +1,148 @@
#include "sem_util.h"
+#include "logger_factory.h"
+static Logger logger = LoggerFactory::getLogger();
int SemUtil::get(key_t key, unsigned int value) {
- int semid, perms;
-
- perms = S_IRUSR | S_IWUSR;
+ int semid, perms;
- semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms);
+ perms = S_IRUSR | S_IWUSR;
- if (semid != -1) { /* Successfully created the semaphore */
- union semun arg;
- struct sembuf sop;
+ semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms);
- fprintf(stderr, "%ld: created semaphore\n", (long) getpid());
+ if (semid != -1) { /* Successfully created the semaphore */
+ union semun arg;
+ struct sembuf sop;
- arg.val = 0; /* So initialize it to 0 */
- if (semctl(semid, 0, SETVAL, arg) == -1)
- err_exit(errno, "semctl 1");
- fprintf(stderr, "%ld: initialized semaphore\n", (long) getpid());
+ logger.info("%ld: created semaphore\n", (long)getpid());
- /* Perform a "no-op" semaphore operation - changes sem_otime
- so other processes can see we've initialized the set. */
+ arg.val = 0; /* So initialize it to 0 */
+ if (semctl(semid, 0, SETVAL, arg) == -1)
+ err_exit(errno, "semctl 1");
+ logger.info("%ld: initialized semaphore\n", (long)getpid());
- sop.sem_num = 0; /* Operate on semaphore 0 */
- sop.sem_op = value;
- sop.sem_flg = 0;
- if (semop(semid, &sop, 1) == -1)
- err_exit(errno, "semop");
- fprintf(stderr, "%ld: completed dummy semop()\n", (long) getpid());
+ /* Perform a "no-op" semaphore operation - changes sem_otime
+ so other processes can see we've initialized the set. */
- } else { /* We didn't create the semaphore set */
+ sop.sem_num = 0; /* Operate on semaphore 0 */
+ sop.sem_op = value;
+ sop.sem_flg = 0;
+ if (semop(semid, &sop, 1) == -1)
+ err_exit(errno, "semop");
+ logger.info("%ld: completed dummy semop()\n", (long)getpid());
- if (errno != EEXIST) { /* Unexpected error from semget() */
- err_exit(errno, "semget 1");
+ } else { /* We didn't create the semaphore set */
- } else { /* Someone else already created it */
- const int MAX_TRIES = 10;
- int j;
- union semun arg;
- struct semid_ds ds;
+ if (errno != EEXIST) { /* Unexpected error from semget() */
+ err_exit(errno, "semget 1");
- semid = semget(key, 1, perms); /* So just get ID */
- if (semid == -1)
- err_exit(errno, "semget 2");
+ } else { /* Someone else already created it */
+ const int MAX_TRIES = 10;
+ int j;
+ union semun arg;
+ struct semid_ds ds;
- fprintf(stderr, "%ld: got semaphore key\n", (long) getpid());
- /* Wait until another process has called semop() */
+ semid = semget(key, 1, perms); /* So just get ID */
+ if (semid == -1)
+ err_exit(errno, "semget 2");
- arg.buf = &ds;
- for (j = 0; j < MAX_TRIES; j++) {
- fprintf(stderr, "Try %d\n", j);
- if (semctl(semid, 0, IPC_STAT, arg) == -1)
- err_exit(errno, "semctl 2");
+ logger.info("%ld: got semaphore key\n", (long)getpid());
+ /* Wait until another process has called semop() */
- if (ds.sem_otime != 0) /* Semop() performed? */
- break; /* Yes, quit loop */
- sleep(1); /* If not, wait and retry */
- }
+ arg.buf = &ds;
+ for (j = 0; j < MAX_TRIES; j++) {
+ logger.info("Try %d\n", j);
+ if (semctl(semid, 0, IPC_STAT, arg) == -1)
+ err_exit(errno, "semctl 2");
- if (ds.sem_otime == 0) /* Loop ran to completion! */
- err_exit(errno, "Existing semaphore not initialized");
- }
+ if (ds.sem_otime != 0) /* Semop() performed? */
+ break; /* Yes, quit loop */
+ sleep(1); /* If not, wait and retry */
+ }
+
+ if (ds.sem_otime == 0) /* Loop ran to completion! */
+ err_exit(errno, "Existing semaphore not initialized");
}
- return semid;
+ }
+ return semid;
}
-
/* Reserve semaphore (blocking), return 0 on success, or -1 with 'errno'
set to EINTR if operation was interrupted by a signal handler */
/* Reserve semaphore - decrement it by 1 */
-int SemUtil::dec(int semId)
-{
- struct sembuf sops;
+int SemUtil::dec(int semId) {
+ struct sembuf sops;
- sops.sem_num = 0;
- sops.sem_op = -1;
- sops.sem_flg = 0;
+ sops.sem_num = 0;
+ sops.sem_op = -1;
+ sops.sem_flg = 0;
- while (semop(semId, &sops, 1) == -1)
- if (errno != EINTR ) {
- err_msg(errno, "SemUtil::dec");
- return -1;
- }
+ while (semop(semId, &sops, 1) == -1)
+ if (errno != EINTR) {
+ err_msg(errno, "SemUtil::dec");
+ return -1;
+ }
- return 0;
+ return 0;
}
-int SemUtil::dec_nowait(int semId)
-{
- struct sembuf sops;
+int SemUtil::dec_nowait(int semId) {
+ struct sembuf sops;
- sops.sem_num = 0;
- sops.sem_op = -1;
- sops.sem_flg = IPC_NOWAIT;
+ sops.sem_num = 0;
+ sops.sem_op = -1;
+ sops.sem_flg = IPC_NOWAIT;
- while (semop(semId, &sops, 1) == -1)
- if (errno != EINTR ) {
- err_msg(errno, "SemUtil::dec_nowait");
- return -1;
- }
+ while (semop(semId, &sops, 1) == -1)
+ if (errno != EINTR) {
+ err_msg(errno, "SemUtil::dec_nowait");
+ return -1;
+ }
- return 0;
+ return 0;
}
-int SemUtil::dec_timeout(int semId, struct timespec * timeout)
-{
- struct sembuf sops;
+int SemUtil::dec_timeout(int semId, struct timespec *timeout) {
+ struct sembuf sops;
- sops.sem_num = 0;
- sops.sem_op = -1;
- sops.sem_flg = 0;
+ sops.sem_num = 0;
+ sops.sem_op = -1;
+ sops.sem_flg = 0;
- while ( semtimedop(semId, &sops, 1, timeout) == -1)
- if (errno != EINTR ) {
- err_msg(errno, "SemUtil::dec_timeout");
- return -1;
- }
+ while (semtimedop(semId, &sops, 1, timeout) == -1)
+ if (errno != EINTR) {
+ err_msg(errno, "SemUtil::dec_timeout");
+ return -1;
+ }
- return 0;
+ return 0;
}
-
-
/* Release semaphore - increment it by 1 */
-int SemUtil::inc(int semId)
-{
- struct sembuf sops;
+int SemUtil::inc(int semId) {
+ struct sembuf sops;
- sops.sem_num = 0;
- sops.sem_op = 1;
- sops.sem_flg = 0;
+ sops.sem_num = 0;
+ sops.sem_op = 1;
+ sops.sem_flg = 0;
- int rv = semop(semId, &sops, 1);
- if(rv == -1) {
- err_msg(errno, "SemUtil::inc");
- }
- return rv;
+ int rv = semop(semId, &sops, 1);
+ if (rv == -1) {
+ err_msg(errno, "SemUtil::inc");
+ }
+ return rv;
}
void SemUtil::remove(int semid) {
- union semun dummy;
- if (semctl(semid, 0, IPC_RMID, dummy) == -1)
- err_msg(errno, "SemUtil::remove");
-
+ union semun dummy;
+ if (semctl(semid, 0, IPC_RMID, dummy) == -1)
+ err_msg(errno, "SemUtil::remove");
}
-
-void SemUtil::set(int semId, int val)
-{
- union semun arg;
- arg.val = val;
- if (semctl(semId, 0, SETVAL, arg) == -1)
- err_msg(errno, "SemUtil::set");
+void SemUtil::set(int semId, int val) {
+ union semun arg;
+ arg.val = val;
+ if (semctl(semId, 0, SETVAL, arg) == -1)
+ err_msg(errno, "SemUtil::set");
}
-
-
diff --git a/test/Makefile b/test/Makefile
index eee9f77..643b72d 100755
--- a/test/Makefile
+++ b/test/Makefile
@@ -2,38 +2,25 @@
# Makefile for common library.
#
ROOT=..
-LDLIBS+=-Wl,-rpath=$(ROOT)/queue:$(ROOT)/lib
+LDLIBS+=-Wl,-rpath=$(ROOT)/lib:$(ROOT)/build/lib
# 寮�婧愬伐鍏峰寘璺緞
-LDDIR += -L$(ROOT)/queue
+LDDIR += -L$(ROOT)/lib -L$(ROOT)/build/lib
# 寮�婧愬伐鍏峰寘
LDLIBS += -lshm_queue -lusgcommon -lpthread
-INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include
+INCLUDE += -I$(ROOT)/build/include
PLATFORM=$(shell $(ROOT)/systype.sh)
include $(ROOT)/Make.defines.$(PLATFORM)
-PROGS = communication
-
+PROGS = dgram_socket_test
build: $(PROGS)
# test1: $(LIBCOMMON)
# 濡傛灉鍖匒 寮曠敤鍖匓锛� B 瑕佹斁鍦� A 鍚庨潰
-
-
-test_queue: test.h $(ROOT)/queue/include/lock_free_queue.h
-
-single_productor: test.h $(ROOT)/queue/include/lock_free_queue.h
-
-single_consumer: test.h $(ROOT)/queue/include/lock_free_queue.h
clean:
rm -f $(TEMPFILES) $(PROGS)
-
-
-
-$(LIBQUEUE):
- (cd $(ROOT)/queue && $(MAKE))
diff --git a/test/dgram_socket_test b/test/dgram_socket_test
new file mode 100755
index 0000000..6770bc1
--- /dev/null
+++ b/test/dgram_socket_test
Binary files differ
diff --git a/test/dgram_socket_test.c b/test/dgram_socket_test.c
new file mode 100644
index 0000000..97c602a
--- /dev/null
+++ b/test/dgram_socket_test.c
@@ -0,0 +1,74 @@
+#include "shm_socket.h"
+#include "usg_common.h"
+#include "shm_mm.h"
+
+#include "shm_socket.h"
+#include "usg_common.h"
+#include "shm_mm.h"
+typedef struct Targ {
+ int port;
+ int id;
+}Targ;
+
+
+void server(int port) {
+ pthread_t tid;
+ shm_socket_t *socket = shm_open_socket(SHM_SOCKET_DGRAM);
+ shm_socket_bind(socket, port);
+
+ char *buf;
+ int size;
+ int remotePort;
+ char sendbuf[512];
+ while( shm_recvfrom(socket, (void **)&buf, &size, &remotePort) == 0) {
+ sprintf(sendbuf, "RECEIVED:%s", buf);
+ printf("received from %d:%s\n", remotePort, buf);
+ shm_sendto(socket, (void *)sendbuf, strlen(sendbuf) + 1, remotePort);
+ free(buf);
+ }
+
+ shm_close_socket(socket);
+
+}
+
+void client(int port) {
+ shm_socket_t *socket = shm_open_socket(SHM_SOCKET_DGRAM);
+ int size;
+ char *recvbuf;
+ char sendbuf[512];
+ int remote_port;
+ while(true) {
+ printf("request: ");
+ scanf("%s", sendbuf);
+ shm_sendto(socket, sendbuf, strlen(sendbuf)+1, port) ;
+ shm_recvfrom(socket, (void **)&recvbuf, &size, &remote_port);
+ printf("reply from (%d): %s\n", remote_port, recvbuf);
+ free(recvbuf);
+
+ }
+ shm_close_socket(socket);
+}
+
+int main(int argc, char *argv[]) {
+ shm_init(512);
+ int port;
+ if (argc < 3) {
+ fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
+ return 1;
+ }
+
+ port = atoi(argv[2]);
+
+ if (strcmp("server", argv[1]) == 0 ) {
+ server(port);
+ }
+
+ if (strcmp("client", argv[1]) == 0)
+ client(port);
+
+
+
+ shm_destroy();
+ // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client");
+ return 0;
+}
\ No newline at end of file
--
Gitblit v1.8.0