From b6043642f60ef23a7a100418cd4fec1251a98ad9 Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期四, 23 七月 2020 14:47:50 +0800 Subject: [PATCH] update --- src/libshm_queue.a | 0 src/socket/shm_socket.c | 6 .gitignore | 4 demo/queue | 0 test_socket/dgram_socket_test | 0 Makefile | 13 - src/socket/dgram_mod_socket.c | 146 +++++++++++--------- demo/pub_sub | 0 test_socket/dgram_mod_req_rep.c | 119 +++++++++++++++- demo/dgram_mod_req_rep.c | 58 ++++++++ src/queue/mm.c | 4 /dev/null | 0 test_socket/dgram_mod_survey.c | 56 ++++++++ test_socket/dgram_mod_req_rep | 0 demo/req_rep | 0 15 files changed, 313 insertions(+), 93 deletions(-) diff --git a/.gitignore b/.gitignore index 31c642f..deff700 100644 --- a/.gitignore +++ b/.gitignore @@ -38,3 +38,7 @@ *.tar *.tar.gz .vscode/ + +build/ +*.tmp +*.txt diff --git a/Makefile b/Makefile index e509479..18a1a54 100755 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ DIRS = src test_socket demo - +TAR_NAME = shm_queue.tar.gz all: for i in $(DIRS); do \ (cd $$i && echo "making $$i" && $(MAKE) ) || exit 1; \ @@ -12,11 +12,6 @@ rm -rf build ipcrm -a -ipcrm: - -ipcrm -a - -ipcs - -# -ipcrm -M 0x1234 -# -ipcrm -S 145 -# -ipcrm -S 146 -# -ipcrm -S 8899 +tar: + rm -f $(TAR_NAME) + git archive --format tar.gz --output $(TAR_NAME) master \ No newline at end of file diff --git a/build/include/array_lock_free_queue.h b/build/include/array_lock_free_queue.h deleted file mode 100644 index 24a4ec6..0000000 --- a/build/include/array_lock_free_queue.h +++ /dev/null @@ -1,322 +0,0 @@ -#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ -#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ -#include "atomic_ops.h" -#include <assert.h> // assert() -#include <sched.h> // sched_yield() -#include "logger_factory.h" -#include "mem_pool.h" -#include "shm_allocator.h" - -/// @brief implementation of an array based lock free queue with support for -/// multiple producers -/// This class is prevented from being instantiated directly (all members and -/// methods are private). To instantiate a multiple producers lock free queue -/// you must use the ArrayLockFreeQueue fachade: -/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; - - -#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - -template <typename ELEM_T, typename Allocator = SHM_Allocator> -class ArrayLockFreeQueue -{ - // ArrayLockFreeQueue will be using this' private members - template < - typename ELEM_T_, - typename Allocator_, - template <typename T, typename AT> class Q_TYPE - > - friend class LockFreeQueue; - -private: - /// @brief constructor of the class - ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - virtual ~ArrayLockFreeQueue(); - - inline uint32_t size(); - - inline bool full(); - - inline bool empty(); - - bool push(const ELEM_T &a_data); - - bool pop(ELEM_T &a_data); - - /// @brief calculate the index in the circular array that corresponds - /// to a particular "count" value - inline uint32_t countToIndex(uint32_t a_count); - - ELEM_T& operator[](unsigned i); - -private: - size_t Q_SIZE; - /// @brief array to keep the elements - ELEM_T *m_theQueue; - - /// @brief where a new element will be inserted - uint32_t m_writeIndex; - - /// @brief where the next element where be extracted from - uint32_t m_readIndex; - - /// @brief maximum read index for multiple producer queues - /// If it's not the same as m_writeIndex it means - /// there are writes pending to be "committed" to the queue, that means, - /// the place for the data was reserved (the index in the array) but - /// data is still not in the queue, so the thread trying to read will have - /// to wait for those other threads to save the data into the queue - /// - /// note this is only used for multiple producers - uint32_t m_maximumReadIndex; - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - /// @brief number of elements in the queue - uint32_t m_count; -#endif - - -private: - /// @brief disable copy constructor declaring it private - ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); - -}; - - -template <typename ELEM_T, typename Allocator> -ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize): - Q_SIZE(qsize), - m_writeIndex(0), // initialisation is not atomic - m_readIndex(0), // - m_maximumReadIndex(0) // -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - ,m_count(0) // -#endif -{ - m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); - -} - -template <typename ELEM_T, typename Allocator> -ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() -{ - // std::cout << "destroy ArrayLockFreeQueue\n"; - Allocator::deallocate(m_theQueue); - -} - -template <typename ELEM_T, typename Allocator> -inline -uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) -{ - // if Q_SIZE is a power of 2 this statement could be also written as - // return (a_count & (Q_SIZE - 1)); - return (a_count % Q_SIZE); -} - -template <typename ELEM_T, typename Allocator> -inline -uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return m_count; -#else - - uint32_t currentWriteIndex = m_maximumReadIndex; - uint32_t currentReadIndex = m_readIndex; - - // let's think of a scenario where this function returns bogus data - // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run - // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 - // 2. afterwards this thread is preemted. While this thread is inactive 2 - // elements are inserted and removed from the queue, so m_maximumReadIndex - // is 5 and m_readIndex 4. Real size is still 1 - // 3. Now the current thread comes back from preemption and reads m_readIndex. - // currentReadIndex is 4 - // 4. currentReadIndex is bigger than currentWriteIndex, so - // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, - // it returns that the queue is almost full, when it is almost empty - // - if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) - { - return (currentWriteIndex - currentReadIndex); - } - else - { - return (Q_SIZE + currentWriteIndex - currentReadIndex); - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T, typename Allocator> -inline -bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count == (Q_SIZE)); -#else - - uint32_t currentWriteIndex = m_writeIndex; - uint32_t currentReadIndex = m_readIndex; - - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T, typename Allocator> -inline -bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count == 0); -#else - - if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - - - - - - -template <typename ELEM_T, typename Allocator> -bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) -{ - uint32_t currentReadIndex; - uint32_t currentWriteIndex; - - do - { - - currentWriteIndex = m_writeIndex; - currentReadIndex = m_readIndex; - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count == Q_SIZE) { - return false; - } - #else - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return false; - } - #endif - - } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); - - // We know now that this index is reserved for us. Use it to save the data - m_theQueue[countToIndex(currentWriteIndex)] = a_data; - - // update the maximum read index after saving the data. It wouldn't fail if there is only one thread - // inserting in the queue. It might fail if there are more than 1 producer threads because this - // operation has to be done in the same order as the previous CAS - - while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) - { - // this is a good place to yield the thread in case there are more - // software threads than hardware processors and you have more - // than 1 producer thread - // have a look at sched_yield (POSIX.1b) - sched_yield(); - } - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - AtomicAdd(&m_count, 1); -#endif - return true; -} - - -template <typename ELEM_T, typename Allocator> -bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) -{ - uint32_t currentMaximumReadIndex; - uint32_t currentReadIndex; - - do - { - // to ensure thread-safety when there is more than 1 producer thread - // a second index is defined (m_maximumReadIndex) - currentReadIndex = m_readIndex; - currentMaximumReadIndex = m_maximumReadIndex; - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count == 0) { - return false; - } - #else - if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) - { - // the queue is empty or - // a producer thread has allocate space in the queue but is - // waiting to commit the data into it - return false; - } - #endif - - // retrieve the data from the queue - a_data = m_theQueue[countToIndex(currentReadIndex)]; - - // try to perfrom now the CAS operation on the read index. If we succeed - // a_data already contains what m_readIndex pointed to before we - // increased it - if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) - { - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - // m_count.fetch_sub(1); - AtomicSub(&m_count, 1); - #endif - return true; - } - - // it failed retrieving the element off the queue. Someone else must - // have read the element stored at countToIndex(currentReadIndex) - // before we could perform the CAS operation - - } while(1); // keep looping to try again! - - // Something went wrong. it shouldn't be possible to reach here - assert(0); - - // Add this return statement to avoid compiler warnings - return false; -} - -template <typename ELEM_T, typename Allocator> -ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) -{ - int currentCount = m_count; - uint32_t currentReadIndex = m_readIndex; - if (i < 0 || i >= currentCount) - { - std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } - return m_theQueue[countToIndex(currentReadIndex+i)]; -} - -#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ diff --git a/build/include/array_lock_free_queue2.h b/build/include/array_lock_free_queue2.h deleted file mode 100644 index 3b79b7f..0000000 --- a/build/include/array_lock_free_queue2.h +++ /dev/null @@ -1,332 +0,0 @@ -#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ -#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ - -#include <assert.h> // assert() -#include <sched.h> // sched_yield() -#include "logger_factory.h" - -/// @brief implementation of an array based lock free queue with support for -/// multiple producers -/// This class is prevented from being instantiated directly (all members and -/// methods are private). To instantiate a multiple producers lock free queue -/// you must use the ArrayLockFreeQueue fachade: -/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; - - -#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - -template <typename ELEM_T> -class ArrayLockFreeQueue -{ - // ArrayLockFreeQueue will be using this' private members - template < - typename ELEM_T_, - template <typename T> class Q_TYPE > - friend class LockFreeQueue; - -private: - /// @brief constructor of the class - ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - virtual ~ArrayLockFreeQueue(); - - inline uint32_t size(); - - inline bool full(); - - inline bool empty(); - - bool push(const ELEM_T &a_data); - - bool pop(ELEM_T &a_data); - - /// @brief calculate the index in the circular array that corresponds - /// to a particular "count" value - inline uint32_t countToIndex(uint32_t a_count); - - ELEM_T& operator[](unsigned i); - -private: - size_t Q_SIZE; - /// @brief array to keep the elements - ELEM_T *m_theQueue; - - /// @brief where a new element will be inserted - std::atomic<uint32_t> m_writeIndex; - - /// @brief where the next element where be extracted from - std::atomic<uint32_t> m_readIndex; - - /// @brief maximum read index for multiple producer queues - /// If it's not the same as m_writeIndex it means - /// there are writes pending to be "committed" to the queue, that means, - /// the place for the data was reserved (the index in the array) but - /// data is still not in the queue, so the thread trying to read will have - /// to wait for those other threads to save the data into the queue - /// - /// note this is only used for multiple producers - std::atomic<uint32_t> m_maximumReadIndex; - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - /// @brief number of elements in the queue - std::atomic<uint32_t> m_count; -#endif - - -private: - /// @brief disable copy constructor declaring it private - ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src); - -}; - - -template <typename ELEM_T> -ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize): - Q_SIZE(qsize), - m_writeIndex(0), // initialisation is not atomic - m_readIndex(0), // - m_maximumReadIndex(0) // -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - ,m_count(0) // -#endif -{ - m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T)); - -} - -template <typename ELEM_T> -ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue() -{ - // std::cout << "destroy ArrayLockFreeQueue\n"; - mm_free(m_theQueue); - -} - -template <typename ELEM_T> -inline -uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count) -{ - // if Q_SIZE is a power of 2 this statement could be also written as - // return (a_count & (Q_SIZE - 1)); - return (a_count % Q_SIZE); -} - -template <typename ELEM_T> -inline -uint32_t ArrayLockFreeQueue<ELEM_T>::size() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return m_count.load(); -#else - - uint32_t currentWriteIndex = m_maximumReadIndex.load(); - uint32_t currentReadIndex = m_readIndex.load(); - - // let's think of a scenario where this function returns bogus data - // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run - // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 - // 2. afterwards this thread is preemted. While this thread is inactive 2 - // elements are inserted and removed from the queue, so m_maximumReadIndex - // is 5 and m_readIndex 4. Real size is still 1 - // 3. Now the current thread comes back from preemption and reads m_readIndex. - // currentReadIndex is 4 - // 4. currentReadIndex is bigger than currentWriteIndex, so - // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, - // it returns that the queue is almost full, when it is almost empty - // - if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) - { - return (currentWriteIndex - currentReadIndex); - } - else - { - return (Q_SIZE + currentWriteIndex - currentReadIndex); - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T> -inline -bool ArrayLockFreeQueue<ELEM_T>::full() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count.load() == (Q_SIZE)); -#else - - uint32_t currentWriteIndex = m_writeIndex; - uint32_t currentReadIndex = m_readIndex; - - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T> -inline -bool ArrayLockFreeQueue<ELEM_T>::empty() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count.load() == 0); -#else - - if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load())) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - - -template <typename ELEM_T> -bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data) -{ - uint32_t currentReadIndex; - uint32_t currentWriteIndex; - - do - { - currentWriteIndex = m_writeIndex.load(); - currentReadIndex = m_readIndex.load(); -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count.load() == Q_SIZE) { - return false; - } -#else - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return false; - } -#endif - - // There is more than one producer. Keep looping till this thread is able - // to allocate space for current piece of data - // - // using compare_exchange_strong because it isn't allowed to fail spuriously - // When the compare_exchange operation is in a loop the weak version - // will yield better performance on some platforms, but here we'd have to - // load m_writeIndex all over again - } while (!m_writeIndex.compare_exchange_strong( - currentWriteIndex, (currentWriteIndex + 1))); - - // Just made sure this index is reserved for this thread. - m_theQueue[countToIndex(currentWriteIndex)] = a_data; - //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) ); - - // update the maximum read index after saving the piece of data. It can't - // fail if there is only one thread inserting in the queue. It might fail - // if there is more than 1 producer thread because this operation has to - // be done in the same order as the previous CAS - // - // using compare_exchange_weak because they are allowed to fail spuriously - // (act as if *this != expected, even if they are equal), but when the - // compare_exchange operation is in a loop the weak version will yield - // better performance on some platforms. - while (!m_maximumReadIndex.compare_exchange_weak( - currentWriteIndex, (currentWriteIndex + 1))) - { - // this is a good place to yield the thread in case there are more - // software threads than hardware processors and you have more - // than 1 producer thread - // have a look at sched_yield (POSIX.1b) - sched_yield(); - } - - // The value was successfully inserted into the queue -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - m_count.fetch_add(1); -#endif - - return true; -} - -template <typename ELEM_T> -bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data) -{ - uint32_t currentMaximumReadIndex; - uint32_t currentReadIndex; - - do - { - currentReadIndex = m_readIndex.load(); - currentMaximumReadIndex = m_maximumReadIndex.load(); - - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count.load() == 0) { - return false; - } - #else - // to ensure thread-safety when there is more than 1 producer - // thread a second index is defined (m_maximumReadIndex) - if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) - { - // the queue is empty or - // a producer thread has allocate space in the queue but is - // waiting to commit the data into it - return false; - } - #endif - - // retrieve the data from the queue - a_data = m_theQueue[countToIndex(currentReadIndex)]; - //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) ); - // try to perfrom now the CAS operation on the read index. If we succeed - // a_data already contains what m_readIndex pointed to before we - // increased it - if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1))) - { - // got here. The value was retrieved from the queue. Note that the - // data inside the m_queue array is not deleted nor reseted -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - m_count.fetch_sub(1); -#endif - return true; - } - - // it failed retrieving the element off the queue. Someone else must - // have read the element stored at countToIndex(currentReadIndex) - // before we could perform the CAS operation - - } while(1); // keep looping to try again! - - // Something went wrong. it shouldn't be possible to reach here - assert(0); - - // Add this return statement to avoid compiler warnings - return false; -} - - -template <typename ELEM_T> -ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i) -{ - int currentCount = m_count.load(); - uint32_t currentReadIndex = m_readIndex.load(); - if (i < 0 || i >= currentCount) - { - std::cerr << "Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } - return m_theQueue[countToIndex(currentReadIndex+i)]; -} - -#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ diff --git a/build/include/dgram_mod_socket.h b/build/include/dgram_mod_socket.h deleted file mode 100644 index ab635a3..0000000 --- a/build/include/dgram_mod_socket.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef __DGRAM_MOD_SOCKET_H__ -#define __DGRAM_MOD_SOCKET_H__ - - -#ifdef __cplusplus -extern "C" { -#endif - - -enum socket_mod_t -{ - PULL_PUSH = 1, - REQ_REP = 2, - PAIR = 3, - PUB_SUB = 4, - SURVEY = 5, - BUS = 6 - -}; - - - - -void *dgram_mod_open_socket(int mod); - -int dgram_mod_close_socket(void * _socket); - -int dgram_mod_bind(void * _socket, int port); - -int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port); - -int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port); - - -int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; - -#ifdef __cplusplus -} -#endif - -#endif \ No newline at end of file diff --git a/build/include/hashtable.h b/build/include/hashtable.h deleted file mode 100644 index 726a5bc..0000000 --- a/build/include/hashtable.h +++ /dev/null @@ -1,34 +0,0 @@ -#ifndef __HASHTABLE_H__ -#define __HASHTABLE_H__ - -#include <sys/queue.h> -#include <set> - -#define MAPSIZE 100 - -typedef struct hashtable_t -{ - struct tailq_header_t* array[MAPSIZE]; - int mutex; - int wlock; - int cond; - size_t readcnt; - -} hashtable_t; -typedef void (*hashtable_foreach_cb)(int key, void *value); - -void hashtable_init(hashtable_t *hashtable); -void *hashtable_get(hashtable_t *hashtable, int key); -void hashtable_put(hashtable_t *hashtable, int key, void *value); -void *hashtable_remove(hashtable_t *hashtable, int key); -void hashtable_removeall(hashtable_t *hashtable); - - -void hashtable_foreach(hashtable_t *hashtable, hashtable_foreach_cb cb); - -void hashtable_printall(hashtable_t *hashtable); - -int hashtable_alloc_key(hashtable_t *hashtable); - -std::set<int> * hashtable_keyset(hashtable_t *hashtable) ; -#endif diff --git a/build/include/linked_lock_free_queue.h b/build/include/linked_lock_free_queue.h deleted file mode 100644 index 3906a42..0000000 --- a/build/include/linked_lock_free_queue.h +++ /dev/null @@ -1,245 +0,0 @@ -// queue.h -- interface for a queue -#ifndef __LINKED_LOCK_FREE_QUEUE_H_ -#define __LINKED_LOCK_FREE_QUEUE_H_ -#include "mm.h" -#include "sem_util.h" - -template <typename T> class Node; - -template <typename T> -class Pointer { -public: - - Node<T> *ptr; - unsigned long count; - Pointer( Node<T> *node = NULL, int c=0) noexcept : ptr(node), count(c) {} - - bool operator == (const Pointer<T> o) const { - return (o.ptr == ptr) && (o.count == count); - } - bool operator != (const Pointer<T> o) const { - return !((o.ptr == ptr) && (o.count == count)); - } - - - -}; - -template <typename T> -class Node { -public: - alignas(16) std::atomic<Pointer<T> > next; - T value; - - Node() { - } - - void *operator new(size_t size){ - return mm_malloc(size); - } - - void operator delete(void *p) { - return mm_free(p); - } -}; - - - - - -template <typename ELEM_T> -class LinkedLockFreeQueue -{ - - template < - typename ELEM_T_, - template <typename T> class Q_TYPE > - friend class LockFreeQueue; -private: -// class scope definitions - enum {Q_SIZE = 10}; - -// private class members - std::atomic<Pointer<ELEM_T> > Head; // pointer to front of Queue - std::atomic<Pointer<ELEM_T> > Tail; // pointer to rear of Queue - //std::atomic_uint count; // current number of size in Queue - std::atomic_uint count; - const size_t qsize; // maximum number of size in Queue - // preemptive definitions to prevent public copying - LinkedLockFreeQueue(const LinkedLockFreeQueue & q) : qsize(0) { } - LinkedLockFreeQueue & operator=(const LinkedLockFreeQueue & q) { return *this;} -protected: - LinkedLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit - ~LinkedLockFreeQueue(); - bool empty() const; - bool full() const; - unsigned int size() const; - bool push(const ELEM_T &item); // add item to end - bool pop(ELEM_T &item); - - - ELEM_T& operator[](unsigned i); - -}; - - -// Queue methods -template <typename T> -LinkedLockFreeQueue<T>::LinkedLockFreeQueue(size_t qs) : count(0), qsize(qs) -{ - Node<T> *node = new Node<T>; - Pointer<T> pointer(node, 0); - - Head.store(pointer, std::memory_order_relaxed); - Tail.store(pointer, std::memory_order_relaxed); - -} - -template <typename T> -LinkedLockFreeQueue<T>::~LinkedLockFreeQueue() -{ - LoggerFactory::getLogger().debug("LinkedLockFreeQueue destory"); - Node<T> * nodeptr; - Pointer<T> tmp = Head.load(std::memory_order_relaxed); - while((nodeptr = tmp.ptr) != NULL) { - tmp = (tmp.ptr->next).load(std::memory_order_relaxed); - //std::cerr << "delete " << nodeptr << std::endl; - delete nodeptr; - - } -} - -template <typename T> -bool LinkedLockFreeQueue<T>::empty() const -{ - return count == 0; -} - -template <typename T> -bool LinkedLockFreeQueue<T>::full() const -{ - return count == qsize; -} - -template <typename T> -unsigned int LinkedLockFreeQueue<T>::size() const -{ - return count; -} - -// Add item to queue -template <typename T> -bool LinkedLockFreeQueue<T>::push(const T & item) -{ - if (full()) - return false; - - Node<T> * node = new Node<T>; - node->value = item; - - - Pointer<T> tail ; - Pointer<T> next ; - - - while(true) { - tail = Tail.load(std::memory_order_relaxed); - next = (tail.ptr->next).load(std::memory_order_relaxed); - if (tail == Tail.load(std::memory_order_relaxed)) { - if (next.ptr == NULL) { - if ((tail.ptr->next).compare_exchange_weak(next, - Pointer<T>(node, next.count+1), - std::memory_order_release, - std::memory_order_relaxed) ) - break; - else - Tail.compare_exchange_weak(tail, - Pointer<T>(next.ptr, tail.count+1), - std::memory_order_release, - std::memory_order_relaxed); - } - - } - } - - Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1), - std::memory_order_release, - std::memory_order_relaxed); - count++; - return true; -} - - - - -// Place front item into item variable and remove from queue -template <typename T> -bool LinkedLockFreeQueue<T>::pop(T & item) -{ - if (empty()) - return false; - - Pointer<T> head; - Pointer<T> tail; - Pointer<T> next; - - while(true) { - head = Head.load(std::memory_order_relaxed); - tail = Tail.load(std::memory_order_relaxed); - next = (head.ptr->next).load(); - if (head == Head.load(std::memory_order_relaxed)) { - if(head.ptr == tail.ptr) { - if (next.ptr == NULL) - return false; - // Tail is falling behind. Try to advance it - Tail.compare_exchange_weak(tail, - Pointer<T>(next.ptr, tail.count+1), - std::memory_order_release, - std::memory_order_relaxed); - } else { - item = next.ptr->value; - if (Head.compare_exchange_weak(head, - Pointer<T>(next.ptr, head.count+1), - std::memory_order_release, - std::memory_order_relaxed)) { - delete head.ptr; - break; - } - - } - } - - } - - count--; - return true; - -} - - -template <class T> -T& LinkedLockFreeQueue<T>::operator[](unsigned int i) -{ - if (i < 0 || i >= count) - { - std::cerr << "Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } - - - Pointer<T> tmp = Head.load(std::memory_order_relaxed); - //Pointer<T> tail = Tail.load(std::memory_order_relaxed); - - while(i > 0) { - //std::cout << i << ":" << std::endl; - tmp = (tmp.ptr->next).load(std::memory_order_relaxed); - i--; - } - - tmp = (tmp.ptr->next).load(std::memory_order_relaxed); - return tmp.ptr->value; -} - - - -#endif diff --git a/build/include/lock_free_queue.h b/build/include/lock_free_queue.h deleted file mode 100644 index f34079f..0000000 --- a/build/include/lock_free_queue.h +++ /dev/null @@ -1,360 +0,0 @@ -#ifndef __LOCK_FREE_QUEUE_H__ -#define __LOCK_FREE_QUEUE_H__ - -#include <usg_common.h> -#include <assert.h> // assert() -#include "mem_pool.h" -#include "sem_util.h" -#include "logger_factory.h" -#include "shm_allocator.h" - -// default Queue size -#define LOCK_FREE_Q_DEFAULT_SIZE 16 - -// define this macro if calls to "size" must return the real size of the -// queue. If it is undefined that function will try to take a snapshot of -// the queue, but returned value might be bogus - - -// forward declarations for default template values -// - -template <typename ELEM_T, typename Allocator> -class ArrayLockFreeQueue; - -// template <typename ELEM_T> -// class LinkedLockFreeQueue; - - -/// @brief Lock-free queue based on a circular array -/// No allocation of extra memory for the nodes handling is needed, but it has -/// to add extra overhead (extra CAS operation) when inserting to ensure the -/// thread-safety of the queue when the queue type is not -/// ArrayLockFreeQueueSingleProducer. -/// -/// examples of instantiation: -/// ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1) -/// // and defaulted to single producer -/// ArrayLockFreeQueue<int, 16> q; -/// // queue of ints of size (16 - 1) and -/// // defaulted to single producer -/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; -/// // queue of ints of size (100 - 1) with support -/// // for multiple producers -/// -/// ELEM_T represents the type of elementes pushed and popped from the queue -/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1) -/// This number should be a power of 2 to ensure -/// indexes in the circular queue keep stable when the uint32_t -/// variable that holds the current position rolls over from FFFFFFFF -/// to 0. For instance -/// 2 -> 0x02 -/// 4 -> 0x04 -/// 8 -> 0x08 -/// 16 -> 0x10 -/// (...) -/// 1024 -> 0x400 -/// 2048 -> 0x800 -/// -/// if queue size is not defined as requested, let's say, for -/// instance 100, when current position is FFFFFFFF (4,294,967,295) -/// index in the circular array is 4,294,967,295 % 100 = 95. -/// When that value is incremented it will be set to 0, that is the -/// last 4 elements of the queue are not used when the counter rolls -/// over to 0 -/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and -/// ArrayLockFreeQueue are supported (single producer -/// by default) -template < - typename ELEM_T, - typename Allocator = SHM_Allocator, - template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue - > -class LockFreeQueue -{ - -private: - int slots; - int items; - -public: - // int mutex; - LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - /// @brief destructor of the class. - /// Note it is not virtual since it is not expected to inherit from this - /// template - ~LockFreeQueue(); - std::atomic_uint reference; - /// @brief constructor of the class - - - /// @brief returns the current number of items in the queue - /// It tries to take a snapshot of the size of the queue, but in busy environments - /// this function might return bogus values. - /// - /// If a reliable queue size must be kept you might want to have a look at - /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' - /// it enables a reliable size though it hits overall performance of the queue - /// (when the reliable size variable is on it's got an impact of about 20% in time) - inline uint32_t size(); - - /// @brief return true if the queue is full. False otherwise - /// It tries to take a snapshot of the size of the queue, but in busy - /// environments this function might return bogus values. See help in method - /// LockFreeQueue::size - inline bool full(); - - inline bool empty(); - - inline ELEM_T& operator[](unsigned i); - - /// @brief push an element at the tail of the queue - /// @param the element to insert in the queue - /// Note that the element is not a pointer or a reference, so if you are using large data - /// structures to be inserted in the queue you should think of instantiate the template - /// of the queue as a pointer to that large structure - /// @return true if the element was inserted in the queue. False if the queue was full - bool push(const ELEM_T &a_data); - bool push_nowait(const ELEM_T &a_data); - bool push_timeout(const ELEM_T &a_data, struct timespec * timeout); - - /// @brief pop the element at the head of the queue - /// @param a reference where the element in the head of the queue will be saved to - /// Note that the a_data parameter might contain rubbish if the function returns false - /// @return true if the element was successfully extracted from the queue. False if the queue was empty - bool pop(ELEM_T &a_data); - bool pop_nowait(ELEM_T &a_data); - bool pop_timeout(ELEM_T &a_data, struct timespec * timeout); - - - void *operator new(size_t size); - void operator delete(void *p); - -protected: - /// @brief the actual queue. methods are forwarded into the real - /// implementation - Q_TYPE<ELEM_T, Allocator> m_qImpl; - -private: - /// @brief disable copy constructor declaring it private - LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); -}; - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) -{ -// std::cout << "LockFreeQueue init reference=" << reference << std::endl; - slots = SemUtil::get(IPC_PRIVATE, qsize); - items = SemUtil::get(IPC_PRIVATE, 0); - // mutex = SemUtil::get(IPC_PRIVATE, 1); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() -{ - LoggerFactory::getLogger().debug("LockFreeQueue desctroy"); - SemUtil::remove(slots); - SemUtil::remove(items); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() -{ - return m_qImpl.size(); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() -{ - return m_qImpl.full(); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() -{ - return m_qImpl.empty(); -} - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) -{ - if (SemUtil::dec(slots) == -1) { - err_msg(errno, "LockFreeQueue push"); - return false; - } - - if ( m_qImpl.push(a_data) ) { - SemUtil::inc(items); - return true; - } - return false; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) -{ - if (SemUtil::dec_nowait(slots) == -1) { - if (errno == EAGAIN) - return false; - else { - err_msg(errno, "LockFreeQueue push_nowait"); - return false; - } - - } - - if ( m_qImpl.push(a_data)) { - SemUtil::inc(items); - return true; - } - return false; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout) -{ - - if (SemUtil::dec_timeout(slots, timeout) == -1) { - if (errno == EAGAIN) - return false; - else { - err_msg(errno, "LockFreeQueue push_timeout"); - return false; - } - } - - if (m_qImpl.push(a_data)){ - SemUtil::inc(items); - return true; - } - return false; - -} - - - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) -{ - if (SemUtil::dec(items) == -1) { - err_msg(errno, "LockFreeQueue pop"); - return false; - } - - if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); - return true; - } - return false; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) -{ - if (SemUtil::dec_nowait(items) == -1) { - if (errno == EAGAIN) - return false; - else { - err_msg(errno, "LockFreeQueue pop_nowait"); - return false; - } - } - - if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); - return true; - } - return false; - -} - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) -{ - if (SemUtil::dec_timeout(items, timeout) == -1) { - if (errno == EAGAIN) - return false; - else { - err_msg(errno, "LockFreeQueue pop_timeout"); - return false; - } - } - - if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); - return true; - } - return false; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { - return m_qImpl.operator[](i); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){ - return Allocator::allocate(size); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) { - return Allocator::deallocate(p); -} - -// include implementation files -//#include "linked_lock_free_queue.h" -#include "array_lock_free_queue.h" - -#endif // _LOCK_FREE_QUEUE_H__ diff --git a/build/include/logger_factory.h b/build/include/logger_factory.h deleted file mode 100644 index a766d14..0000000 --- a/build/include/logger_factory.h +++ /dev/null @@ -1,17 +0,0 @@ -#ifndef __LOGGER_FACTORY_H__ -#define __LOGGER_FACTORY_H__ -#include "logger.h" - -class LoggerFactory { -public: - - static Logger getLogger() { -//ERROR ALL DEBUG INFO - static Logger logger(Logger::ERROR); - return logger; - } -}; - -#endif - - diff --git a/build/include/mem_pool.h b/build/include/mem_pool.h deleted file mode 100644 index 17a7c5c..0000000 --- a/build/include/mem_pool.h +++ /dev/null @@ -1,55 +0,0 @@ -#ifndef _MEM_POOL_H_ -#define _MEM_POOL_H_ -#include "mm.h" -#include "sem_util.h" -#define MEM_POOL_COND_KEY 0x8801 - -static int mem_pool_cond = SemUtil::get(MEM_POOL_COND_KEY, 0); - -// static int mem_pool_mutex = SemUtil::get(MEM_POOL_COND_KEY, 1); - -static inline void mem_pool_init(size_t heap_size) { - if(mm_init(heap_size)) { - - } -} - -static inline void mem_pool_destroy(void) { - if(mm_destroy()) { - SemUtil::remove(mem_pool_cond); - } - -} - -static inline void *mem_pool_malloc (size_t size) { - void *ptr; - while( (ptr = mm_malloc(size)) == NULL ) { - err_msg(0, "There is not enough memery to allocate, waiting someone else to free."); - SemUtil::set(mem_pool_cond, 0); - // wait for someone else to free space - SemUtil::dec(mem_pool_cond); - - } - - return ptr; -} - -static inline void mem_pool_free (void *ptr) { - mm_free(ptr); - // notify malloc - SemUtil::set(mem_pool_cond, 1); - -} - -static inline void *mem_pool_realloc (void *ptr, size_t size) { - return mm_realloc(ptr, size); -} - -static inline hashtable_t * mem_pool_get_hashtable() { - return mm_get_hashtable(); - -} -// extern int mm_checkheap(int verbose); - - -#endif \ No newline at end of file diff --git a/build/include/mm.h b/build/include/mm.h deleted file mode 100644 index f0ab764..0000000 --- a/build/include/mm.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef MM_HDR_H -#define MM_HDR_H /* Prevent accidental double inclusion */ - -#include <usg_common.h> -#include "usg_typedef.h" -#include "hashtable.h" - -extern bool mm_init(size_t heap_size); -extern bool mm_destroy(void); - -extern void *mm_malloc (size_t size); -extern void mm_free (void *ptr); -extern void *mm_realloc(void *ptr, size_t size); -extern hashtable_t * mm_get_hashtable(); - -// extern int mm_checkheap(int verbose); - -// extern void *get_mm_start_brk(); -// extern size_t get_mm_max_size(); -#endif diff --git a/build/include/mod_socket.h b/build/include/mod_socket.h deleted file mode 100644 index 21498ee..0000000 --- a/build/include/mod_socket.h +++ /dev/null @@ -1,76 +0,0 @@ -#ifndef __MOD_SOCKET_H__ -#define __MOD_SOCKET_H__ - - -#ifdef __cplusplus -extern "C" { -#endif - -enum socket_mod_t -{ - PULL_PUSH = 1, - REQ_REP = 2, - PAIR = 3, - PUB_SUB = 4, - SURVEY = 5, - BUS = 6 - -}; - -/** - * 鍒涘缓socket - * @return socket鍦板潃 -*/ -void *mod_open_socket(int mod); - -/** - * 鍏抽棴socket -*/ -int mod_close_socket(void * _socket); - -/** - * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int mod_socket_bind(void * _socket, int port); - - -/** - * 鏈嶅姟绔紑鍚繛鎺ョ洃鍚� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ -int mod_listen(void * _socket); - -/** - * 瀹㈡埛绔彂璧疯繛鎺ヨ姹� - */ -int mod_connect(void * _socket, int port); - -/** - * 鍙戦�佷俊鎭� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ -int mod_send(void * _socket, const void *buf, const int size); - -/** - * 鎺ユ敹淇℃伅 - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int mod_recv(void * _socket, void **buf, int *size) ; - -/** - * 閲婃斁鎺ユ敹淇℃伅鐨刡uf - */ -void mod_free(void *buf); - - -/** - * 鑾峰彇soket绔彛鍙� - */ -int mod_get_socket_port(void * _socket); - -#ifdef __cplusplus -} -#endif - -#endif \ No newline at end of file diff --git a/build/include/sem_util.h b/build/include/sem_util.h deleted file mode 100644 index 0d673de..0000000 --- a/build/include/sem_util.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef PCSEM_H -#define PCSEM_H - -#include "usg_common.h" -#include "usg_typedef.h" - -namespace SemUtil { - - int get(key_t key, unsigned int value); - int dec(int semId); - int dec_nowait(int semId); - int dec_timeout(int semId, struct timespec * timeout); - int inc(int semId); - void remove(int semid); - - void set(int semId, int val); - -} - -#endif diff --git a/build/include/shm_allocator.h b/build/include/shm_allocator.h deleted file mode 100644 index 023bc9d..0000000 --- a/build/include/shm_allocator.h +++ /dev/null @@ -1,102 +0,0 @@ -#ifndef __SHM_ALLOCATOR_H__ -#define __SHM_ALLOCATOR_H__ -#include "usg_common.h" -#include "mem_pool.h" -#include <new> -#include <cstdlib> // for exit() -#include <climits> // for UNIX_MAX -#include <cstddef> - - - -template<class T> class SHM_STL_Allocator -{ -public: - typedef T value_type; - typedef T* pointer; - typedef const T* const_pointer; - typedef T& reference; - typedef const T& const_reference; - typedef size_t size_type; - typedef ptrdiff_t difference_type; - - - SHM_STL_Allocator() {}; - ~SHM_STL_Allocator() {}; - template<class U> SHM_STL_Allocator(const SHM_STL_Allocator<U>& t) { }; - template<class U> struct rebind { typedef SHM_STL_Allocator<U> other; }; - - pointer allocate(size_type n, const void* hint=0) { -// fprintf(stderr, "allocate n=%u, hint= %p\n",n, hint); - return((T*) (mm_malloc(n * sizeof(T)))); - } - - void deallocate(pointer p, size_type n) { -// fprintf(stderr, "dealocate n=%u" ,n); - mm_free((void*)p); - } - - void construct(pointer p, const T& value) { - ::new(p) T(value); - } - - void construct(pointer p) - { - ::new(p) T(); - } - - void destroy(pointer p) { - p->~T(); - } - - pointer address(reference x) { - return (pointer)&x; - } - - const_pointer address(const_reference x) { - return (const_pointer)&x; - } - - size_type max_size() const { - return size_type(UINT_MAX/sizeof(T)); - } -}; - - -class SHM_Allocator { - public: - static void *allocate (size_t size) { - printf("shm_allocator malloc\n"); - return mem_pool_malloc(size); - } - - static void deallocate (void *ptr) { - printf("shm_allocator free\n"); - return mem_pool_free(ptr); - } -}; - - -class DM_Allocator { - public: - static void *allocate (size_t size) { - printf("dm_allocator malloc\n"); - return malloc(size); - } - - static void deallocate (void *ptr) { - printf("dm_allocator free\n"); - return free(ptr); - } -}; - - -// template<class charT, class traits = char _traits<charT>, -// class Allocator = allocator<charT> > - - - - -typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > shmstring; - -#endif \ No newline at end of file diff --git a/build/include/shm_mm.h b/build/include/shm_mm.h deleted file mode 100644 index b32568e..0000000 --- a/build/include/shm_mm.h +++ /dev/null @@ -1,26 +0,0 @@ -#ifndef __SHM_MM_H__ -#define __SHM_MM_H__ - -#ifdef __cplusplus -extern "C" { -#endif - -/** - * 鍒濆鍖栧叡浜唴瀛� - * @size 鍏变韩鍐呭瓨澶у皬, 鍗曚綅M - * - */ -void shm_init(int size); - -/** - * 閿�姣佸叡浜唴瀛� - * 鏁翠釜杩涚▼閫�鍑烘椂闇�瑕佹墽琛岃繖涓柟娉曪紝璇ユ柟娉曢鍏堜細妫�鏌ユ槸鍚﹁繕鏈夊叾浠栬繘绋嬪湪浣跨敤璇ュ叡浜唴瀛橈紝濡傛灉杩樻湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ氨鍙槸detach,濡傛灉娌℃湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ垯閿�姣佹暣鍧楀唴瀛樸�� - */ -void shm_destroy(); - -#ifdef __cplusplus -} -#endif - -#endif - diff --git a/build/include/shm_queue.h b/build/include/shm_queue.h deleted file mode 100644 index d853774..0000000 --- a/build/include/shm_queue.h +++ /dev/null @@ -1,184 +0,0 @@ -#ifndef __SHM_QUEUE_H__ -#define __SHM_QUEUE_H__ - -#include "usg_common.h" -#include "hashtable.h" -#include "lock_free_queue.h" -#include "logger_factory.h" -#include "shm_allocator.h" - -// default Queue size -// #define LOCK_FREE_Q_DEFAULT_SIZE 16 - -template < typename ELEM_T> -class SHMQueue -{ - -private: - const int KEY; - -public: - /// @brief constructor of the class - SHMQueue(int key=0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - - ~SHMQueue(); - - - inline uint32_t size(); - - inline bool full(); - inline bool empty(); - - inline bool push(const ELEM_T &a_data); - inline bool push_nowait(const ELEM_T &a_data); - inline bool push_timeout(const ELEM_T &a_data, struct timespec * timeout); - inline bool pop(ELEM_T &a_data); - inline bool pop_nowait(ELEM_T &a_data); - inline bool pop_timeout(ELEM_T &a_data, struct timespec * timeout); - - inline ELEM_T& operator[](unsigned i); - - static void remove_queues_exclude(int *keys, size_t length); -private: - - -protected: - /// @brief the actual queue-> methods are forwarded into the real - /// implementation - LockFreeQueue<ELEM_T, SHM_Allocator>* queue; - -private: - /// @brief disable copy constructor declaring it private - SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src); -}; - - -template < typename ELEM_T > -void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length) -{ - hashtable_t *hashtable = mm_get_hashtable(); - std::set<int>* keyset = hashtable_keyset(hashtable); - std::set<int>::iterator keyItr; - LockFreeQueue<ELEM_T, SHM_Allocator>* mqueue; - bool found; - for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { - found = false; - for(size_t i = 0; i < length; i++) { - if(*keyItr == keys[i]) { - found = true; - break; - } - } - if(!found) { - mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); - delete mqueue; - } - } - delete keyset; - -} - -template < typename ELEM_T > -SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize): KEY(key) -{ - - hashtable_t *hashtable = mm_get_hashtable(); - queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); - //LockFreeQueue<int, 10000> q; - if (queue == NULL || (void *)queue == (void *)1) { - queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize); - hashtable_put(hashtable, key, (void *)queue); - } - queue->reference++; - LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load()); -} - -template < typename ELEM_T > -SHMQueue<ELEM_T>::~SHMQueue() -{ - queue->reference--; - LoggerFactory::getLogger().debug("SHMQueue destructor reference===%d", queue->reference.load()); - if(queue->reference.load() == 0) { - delete queue; - hashtable_t *hashtable = mm_get_hashtable(); - hashtable_remove(hashtable, KEY); - LoggerFactory::getLogger().debug("SHMQueue destructor delete queue"); - } -} - -template < typename ELEM_T > -inline uint32_t SHMQueue<ELEM_T>::size() -{ - return queue->size(); -} - -template < typename ELEM_T > -inline bool SHMQueue<ELEM_T>::full() -{ - return queue->full(); -} - -template < typename ELEM_T > -inline bool SHMQueue<ELEM_T>::empty() -{ - return queue->empty(); -} - - -template < typename ELEM_T > -inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data) -{ - return queue->push(a_data); - -} - -template < - typename ELEM_T > -inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) -{ - return queue->push_nowait(a_data); - -} - -template < typename ELEM_T > -inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, struct timespec * timeout) -{ - - return queue->push_timeout(a_data, timeout); - -} - - - - -template < typename ELEM_T > -inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data) -{ - return queue->pop(a_data); - -} - -template < typename ELEM_T > -inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) -{ - return queue->pop_nowait(a_data); - -} - - -template < typename ELEM_T > -inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) -{ - return queue->pop_timeout(a_data, timeout); - -} - -template < typename ELEM_T > -inline ELEM_T& SHMQueue<ELEM_T>::operator[](unsigned i) { - return queue->operator[](i); -} - - - -#endif diff --git a/build/include/shm_queue_wrapper.h b/build/include/shm_queue_wrapper.h deleted file mode 100644 index 984bd5a..0000000 --- a/build/include/shm_queue_wrapper.h +++ /dev/null @@ -1,100 +0,0 @@ -#ifndef __SHM_QUEUE_WRAPPER_H__ -#define __SHM_QUEUE_WRAPPER_H__ - -#include "usg_common.h" -#include "usg_typedef.h" - - -#ifdef __cplusplus -extern "C" { -#endif - - - -//绉婚櫎涓嶅寘鍚湪keys涓殑闃熷垪 -void shm_remove_queues_exclude(void *keys, int length); -/** - * 鍒涘缓闃熷垪 - * @ shmqueue - * @ key 鏍囪瘑鍏变韩闃熷垪鐨勫敮涓�鏍囪瘑, key鏄竴涓寚閽堥噷闈㈠瓨鍌ㄤ簡key鐨勫�硷紝 濡傛灉key鐨勫�间负-1绯荤粺浼氳嚜鍔ㄥ垎閰嶄竴涓猭ey鍊煎苟鎶婅key鐨勫�艰祴缁檏ey鎸囬拡銆傚鏋渒ey鐨勫�间笉浼氱┖浼氭鏌ユ槸鍚︽湁閲嶅缁戝畾鐨勬儏鍐�, 鏈夐噸澶嶅氨鎶ラ敊娌℃湁灏卞垱寤洪槦鍒楀苟缁戝畾key. - * @ queue_size 闃熷垪澶у皬 - */ -void* shmqueue_create( int * key, int queue_size); - -/** - * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑� - */ -void* shmqueue_attach(int key) ; - -/** - * 閿�姣� -*/ -void shmqueue_drop(void * _shmqueue); - -/** - * 闃熷垪鍏冪礌鐨勪釜鏁� - */ -int shmqueue_size(void * _shmqueue) ; - -/** - * 鏄惁宸叉弧 - * @return 1鏄紝 0鍚� - */ -int shmqueue_full(void * _shmqueue); - -/** - * 鏄惁涓虹┖ - * @return 1鏄紝 0鍚� - */ -int shmqueue_empty(void * _shmqueue) ; - -/** - * 鍏ラ槦, 闃熷垪婊℃椂绛夊緟. - * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触 - */ -int shmqueue_push(void * _shmqueue, void *src, int size); - -/** - * 鍏ラ槦, 闃熷垪婊℃椂绔嬪嵆杩斿洖. - * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触 - */ -int shmqueue_push_nowait(void * _shmqueue, void *src, int size) ; - -/** - * 鍏ラ槦, 鎸囧畾鏃堕棿鍐呭叆闃熶笉鎴愬姛灏辫繑鍥� - * @sec 绉� - * @nsec 绾崇 - * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触 - */ -int shmqueue_push_timeout(void * _shmqueue, void *src, int size, int sec, int nsec) ; - -/** - * 鍑洪槦, 闃熷垪绌烘椂绛夊緟 - * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触 - */ -int shmqueue_pop(void * _shmqueue, void **dest, int *size); - -/** - * 鍑洪槦, 闃熷垪绌烘椂绔嬪嵆杩斿洖 - * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触 - */ -int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) ; - -/** - * 鍑洪槦, 鎸囧畾鏃堕棿鍐呭嚭闃熶笉鎴愬姛灏辫繑鍥� - * @sec绉� - * @nsec绾崇 - * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触 - */ -int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec); - -/** - * 閲婃斁鍑洪槦鍒嗛厤鐨勫唴瀛� - */ -void shmqueue_free(void *ptr); - -#ifdef __cplusplus -} -#endif - -#endif \ No newline at end of file diff --git a/build/include/shm_socket.h b/build/include/shm_socket.h deleted file mode 100644 index 30b85da..0000000 --- a/build/include/shm_socket.h +++ /dev/null @@ -1,87 +0,0 @@ -#ifndef __SHM_SOCKET_H__ -#define __SHM_SOCKET_H__ - -#include "usg_common.h" -#include "usg_typedef.h" -#include "shm_queue.h" - -#ifdef __cplusplus -extern "C" { -#endif - -enum shm_msg_type_t -{ - SHM_SOCKET_OPEN = 1, - SHM_SOCKET_OPEN_REPLY = 2, - SHM_SOCKET_CLOSE = 3, - SHM_COMMON_MSG = 4 - -}; - -enum shm_socket_type_t -{ - SHM_SOCKET_STREAM = 1, - SHM_SOCKET_DGRAM = 2 - -}; - -enum shm_connection_status_t { - SHM_CONN_CLOSED=1, - SHM_CONN_LISTEN=2, - SHM_CONN_ESTABLISHED=3 -}; - -typedef struct shm_msg_t { - int port; - shm_msg_type_t type; - size_t size; - void * buf; - -} shm_msg_t; - - -typedef struct shm_socket_t { - shm_socket_type_t socket_type; - // 鏈湴port - int port; - shm_connection_status_t status; - SHMQueue<shm_msg_t> *queue; - SHMQueue<shm_msg_t> *remoteQueue; - LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue; - LockFreeQueue<shm_msg_t, DM_Allocator> *acceptQueue; - std::map<int, shm_socket_t* > *clientSocketMap; - pthread_t dispatch_thread; - -} shm_socket_t; - - - - -shm_socket_t *shm_open_socket(shm_socket_type_t socket_type); - - -int shm_close_socket(shm_socket_t * socket) ; - - -int shm_socket_bind(shm_socket_t * socket, int port) ; - -int shm_listen(shm_socket_t * socket) ; - -shm_socket_t* shm_accept(shm_socket_t* socket); - -int shm_connect(shm_socket_t * socket, int port); - -int shm_send(shm_socket_t * socket, const void *buf, const int size) ; - -int shm_recv(shm_socket_t * socket, void **buf, int *size) ; - -int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port); - -int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port); - - -#ifdef __cplusplus -} -#endif - -#endif \ No newline at end of file diff --git a/build/lib/libshm_queue.a b/build/lib/libshm_queue.a deleted file mode 100644 index 11e442c..0000000 --- a/build/lib/libshm_queue.a +++ /dev/null Binary files differ diff --git a/demo/dgram_mod_req_rep.c b/demo/dgram_mod_req_rep.c new file mode 100644 index 0000000..a857ce6 --- /dev/null +++ b/demo/dgram_mod_req_rep.c @@ -0,0 +1,58 @@ +#include "dgram_mod_socket.h" +#include "shm_mm.h" +#include "usg_common.h" + +void server(int port) { + void *socket = dgram_mod_open_socket(REQ_REP); + dgram_mod_bind(socket, port); + int size; + void *recvbuf; + char sendbuf[512]; + int rv; + int remote_port; + while ( (rv = dgram_mod_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) { + sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); + puts(sendbuf); + dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port); + free(recvbuf); + } + dgram_mod_close_socket(socket); +} + +void client(int port) { + void *socket = dgram_mod_open_socket(REQ_REP); + int size; + void *recvbuf; + char sendbuf[512]; + while (true) { + printf("request: "); + scanf("%s", sendbuf); + dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size); + printf("reply: %s\n", (char *)recvbuf); + free(recvbuf); + } + dgram_mod_close_socket(socket); +} + + + +int main(int argc, char *argv[]) { + shm_init(512); + int port; + if (argc < 3) { + fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); + return 1; + } + + port = atoi(argv[2]); + + if (strcmp("server", argv[1]) == 0) { + server(port); + } + + if (strcmp("client", argv[1]) == 0) + client(port); + + + return 0; +} \ No newline at end of file diff --git a/demo/pub_sub b/demo/pub_sub index 6ae4f04..1cc620d 100755 --- a/demo/pub_sub +++ b/demo/pub_sub Binary files differ diff --git a/demo/queue b/demo/queue index 096c64a..43913ed 100755 --- a/demo/queue +++ b/demo/queue Binary files differ diff --git a/demo/req_rep b/demo/req_rep index c950193..a32bb1f 100755 --- a/demo/req_rep +++ b/demo/req_rep Binary files differ diff --git a/src/libshm_queue.a b/src/libshm_queue.a index 11e442c..92905db 100644 --- a/src/libshm_queue.a +++ b/src/libshm_queue.a Binary files differ diff --git a/src/queue/mm.c b/src/queue/mm.c index 592fda0..bfd27d9 100644 --- a/src/queue/mm.c +++ b/src/queue/mm.c @@ -122,6 +122,7 @@ SemUtil::inc(mutex); return aptr; } else { + SemUtil::inc(mutex); err_msg(0, "mm_malloc : out of memery\n"); return NULL; } @@ -143,9 +144,8 @@ *} */ - - size_t size = GET_SIZE(HDRP(ptr)); SemUtil::dec(mutex); + size_t size = GET_SIZE(HDRP(ptr)); PUT(HDRP(ptr), PACK(size, 0)); PUT(FTRP(ptr), PACK(size, 0)); coalesce(ptr); diff --git a/src/socket/dgram_mod_socket.c b/src/socket/dgram_mod_socket.c index f9857bc..5b24fc7 100644 --- a/src/socket/dgram_mod_socket.c +++ b/src/socket/dgram_mod_socket.c @@ -10,19 +10,17 @@ typedef struct dgram_mod_socket_t { socket_mod_t mod; shm_socket_t *shm_socket; - pthread_t recv_thread; - std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * > *recv_queue_map; + // pthread_t recv_thread; + // std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * > *recv_queue_map; } dgram_mod_socket_t; void *dgram_mod_open_socket(int mod) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *)calloc(1, sizeof(dgram_mod_socket_t)); socket->mod = (socket_mod_t)mod; - socket->recv_thread = 0; - socket->recv_queue_map = NULL; + // socket->recv_thread = 0; + // socket->recv_queue_map = NULL; socket->shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); - - printf("socket->shm_socket = %p \n" , socket->shm_socket); return (void *)socket; } @@ -30,18 +28,18 @@ int dgram_mod_close_socket(void * _socket) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; shm_close_socket(socket->shm_socket); - if(socket->recv_queue_map != NULL) { - for(auto iter = socket->recv_queue_map->begin(); iter != socket->recv_queue_map->end(); iter++) { - delete iter->second; - socket->recv_queue_map->erase(iter); + // if(socket->recv_queue_map != NULL) { + // for(auto iter = socket->recv_queue_map->begin(); iter != socket->recv_queue_map->end(); iter++) { + // delete iter->second; + // socket->recv_queue_map->erase(iter); - } - delete socket->recv_queue_map; - } + // } + // delete socket->recv_queue_map; + // } - if(socket->recv_thread != 0) - pthread_cancel(socket->recv_thread); + // if(socket->recv_thread != 0) + // pthread_cancel(socket->recv_thread); free(_socket); } @@ -53,7 +51,6 @@ int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - return shm_sendto(socket->shm_socket, buf, size, port); } @@ -61,79 +58,90 @@ int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - if(socket->mod == REQ_REP && socket->recv_thread != 0) { - err_exit(0, "you have used sendandrecv method os you can not use recvfrom method any more. these two method can not be used at the same time."); - return -1; - } + // if(socket->mod == REQ_REP && socket->recv_thread != 0) { + // err_exit(0, "you have used sendandrecv method os you can not use recvfrom method any more. these two method can not be used at the same time."); + // return -1; + // } return shm_recvfrom(socket->shm_socket, buf, size, port); } -void *_dgram_mod_run_recv(void * _socket) { - pthread_detach(pthread_self()); - dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; - void *buf; - int size; - int port; - shm_msg_t msg; - LockFreeQueue<shm_msg_t, DM_Allocator> *queue; - std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter; -// printf("==============_dgram_mod_run_recv recv before\n"); - while(shm_recvfrom(socket->shm_socket, &buf, &size, &port) == 0) { - if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) { - queue = iter->second; - } else { - queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); - socket->recv_queue_map->insert({port, queue}); - } +// void *_dgram_mod_run_recv(void * _socket) { +// pthread_detach(pthread_self()); +// dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; +// void *buf; +// int size; +// int port; +// shm_msg_t msg; +// LockFreeQueue<shm_msg_t, DM_Allocator> *queue; +// std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter; +// // printf("==============_dgram_mod_run_recv recv before\n"); +// while(shm_recvfrom(socket->shm_socket, &buf, &size, &port) == 0) { +// if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) { +// queue = iter->second; +// } else { +// queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); +// socket->recv_queue_map->insert({port, queue}); +// } - msg.buf = buf; - msg.size = size; - msg.port = port; -// printf("==============_dgram_mod_run_recv push before\n"); - queue->push(msg); -// printf("==============_dgram_mod_run_recv push after\n"); +// msg.buf = buf; +// msg.size = size; +// msg.port = port; +// // printf("==============_dgram_mod_run_recv push before\n"); +// queue->push(msg); +// // printf("==============_dgram_mod_run_recv push after\n"); - } - return NULL; +// } +// return NULL; -} +// } -int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) { + + +int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) { dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; if(socket->mod != REQ_REP) { err_exit(0, "you can't use this method other than REQ_REP mod!"); } - if(socket->recv_queue_map == NULL) { - socket->recv_queue_map = new std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >; - } + // if(socket->recv_queue_map == NULL) { + // socket->recv_queue_map = new std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >; + // } - std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter; - LockFreeQueue<shm_msg_t, DM_Allocator> *queue; - if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) { - queue = iter->second; - } else { - queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); - socket->recv_queue_map->insert({port, queue}); - } + // std::map<int, LockFreeQueue<shm_msg_t, DM_Allocator> * >::iterator iter; + // LockFreeQueue<shm_msg_t, DM_Allocator> *queue; + // if( (iter = socket->recv_queue_map->find(port) ) != socket->recv_queue_map->end()) { + // queue = iter->second; + // } else { + // queue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); + // socket->recv_queue_map->insert({port, queue}); + // } - if (socket->recv_thread == 0) { + // if (socket->recv_thread == 0) { - pthread_create(&(socket->recv_thread ), NULL, _dgram_mod_run_recv , _socket); + // pthread_create(&(socket->recv_thread ), NULL, _dgram_mod_run_recv , _socket); - } + // } - shm_sendto(socket->shm_socket, send_buf, send_size, port); - shm_msg_t msg; -// printf("==============dgram_mod_sendandrecv pop before\n"); - if(queue->pop(msg)) { - *recv_buf = msg.buf; - *recv_size = msg.size; -// printf("==============dgram_mod_sendandrecv pop after\n"); - return 0; + // shm_msg_t msg; + // if(queue->pop(msg)) { + // *recv_buf = msg.buf; + // *recv_size = msg.size; + // return 0; + // } + + int recv_port; + int rv; + + shm_socket_t *shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); + if (shm_sendto(shm_socket, send_buf, send_size, send_port) == 0) { + rv = shm_recvfrom(shm_socket, recv_buf, recv_size, &recv_port); + shm_close_socket(shm_socket); + return rv; } + + return -1; } \ No newline at end of file diff --git a/src/socket/shm_socket.c b/src/socket/shm_socket.c index 3d7ba4e..5b8b9c0 100644 --- a/src/socket/shm_socket.c +++ b/src/socket/shm_socket.c @@ -287,7 +287,7 @@ } shm_msg_t src; -printf("shm_recvfrom pop before"); +// printf("shm_recvfrom pop before"); if (socket->queue->pop(src)) { void * _buf = malloc(src.size); memcpy(_buf, src.buf, src.size); @@ -295,7 +295,7 @@ *size = src.size; *port = src.port; mm_free(src.buf); -printf("shm_recvfrom pop after"); +// printf("shm_recvfrom pop after"); return 0; } else { return -1; @@ -327,7 +327,7 @@ if( iter != socket->clientSocketMap->end() ) { socket->clientSocketMap->erase(iter); } - //free((void *)client_socket); + free((void *)client_socket); } diff --git a/test_socket/dgram_mod_req_rep b/test_socket/dgram_mod_req_rep index 4034ad8..f31e2b9 100755 --- a/test_socket/dgram_mod_req_rep +++ b/test_socket/dgram_mod_req_rep Binary files differ diff --git a/test_socket/dgram_mod_req_rep.c b/test_socket/dgram_mod_req_rep.c index a857ce6..f4d2918 100644 --- a/test_socket/dgram_mod_req_rep.c +++ b/test_socket/dgram_mod_req_rep.c @@ -1,20 +1,58 @@ #include "dgram_mod_socket.h" #include "shm_mm.h" #include "usg_common.h" +#include "lock_free_queue.h" + +#define WORKERS 4 + +typedef struct task_t { + void *buf; + int size; + int port; + +} task_t; + + +typedef struct Targ { + int port; + int id; + +}Targ; + +LockFreeQueue<task_t, DM_Allocator> task_queue(100); + + +void *worker(void *socket) { + pthread_detach(pthread_self()); + char sendbuf[512]; + task_t task; + while(true) { + task_queue.pop(task); + sprintf(sendbuf, "SERVER RECEIVED: %s", task.buf); + // puts(sendbuf); + dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, task.port); + free(task.buf); + } + return NULL; +} + +void initThreadPool(void *socket) { + + pthread_t tid; + for (int i = 0; i < WORKERS; i++) + pthread_create(&tid, NULL, worker, socket); +} void server(int port) { void *socket = dgram_mod_open_socket(REQ_REP); dgram_mod_bind(socket, port); - int size; - void *recvbuf; - char sendbuf[512]; + initThreadPool(socket); + int rv; - int remote_port; - while ( (rv = dgram_mod_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) { - sprintf(sendbuf, "SERVER RECEIVED: %s", recvbuf); - puts(sendbuf); - dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port); - free(recvbuf); + task_t task; + while ( (rv = dgram_mod_recvfrom(socket, &task.buf, &task.size, &task.port) ) == 0) { + task_queue.push(task); + } dgram_mod_close_socket(socket); } @@ -34,6 +72,67 @@ dgram_mod_close_socket(socket); } + + +void *runclient(void *arg) { + Targ *targ = (Targ *)arg; + int port = targ->port; + char sendbuf[512]; + int scale = 100000; + int i; + void *socket = dgram_mod_open_socket(REQ_REP); + + char filename[512]; + sprintf(filename, "test%d.txt", targ->id); + FILE *fp = NULL; + fp = fopen(filename, "w+"); + + int recvsize; + void *recvbuf; + for (i = 0; i < scale; i++) { + sprintf(sendbuf, "thread(%d) %d", targ->id, i); + fprintf(fp, "requst:%s\n", sendbuf); + dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &recvsize); + fprintf(fp, "reply: %s\n", (char *)recvbuf); + free(recvbuf); + } + fclose(fp); + dgram_mod_close_socket(socket); + return (void *)i; +} + +void startClients(int port) { + + int status, i = 0, processors = 4; + void *res[processors]; + Targ *targs = (Targ *)calloc(processors, sizeof(Targ)); + pthread_t tids[processors]; + char sendbuf[512]; + + struct timeval start; + gettimeofday(&start, NULL); + for (i = 0; i < processors; i++) { + targs[i].port = port; + targs[i].id = i; + pthread_create(&tids[i], NULL, runclient, (void *)&targs[i]); + } + + for (i = 0; i < processors; i++) { + if (pthread_join(tids[i], &res[i]) != 0) { + perror("multyThreadClient pthread_join"); + } else { + fprintf(stderr, "client(%d) 鍐欏叆 %ld 鏉℃暟鎹甛n", i, (long)res[i]); + } + } + + struct timeval end; + gettimeofday(&end, NULL); + + double difftime = end.tv_sec * 1000000 + end.tv_usec - (start.tv_sec * 1000000 + start.tv_usec); + long diffsec = (long) (difftime/1000000); + long diffmsec = difftime - diffsec; + printf("cost: %ld sec: %ld msc\n", diffsec, diffmsec); +} int main(int argc, char *argv[]) { @@ -51,7 +150,7 @@ } if (strcmp("client", argv[1]) == 0) - client(port); + startClients(port); return 0; diff --git a/test_socket/dgram_mod_survey.c b/test_socket/dgram_mod_survey.c new file mode 100644 index 0000000..988877b --- /dev/null +++ b/test_socket/dgram_mod_survey.c @@ -0,0 +1,56 @@ +#include "dgram_mod_socket.h" +#include "shm_mm.h" +#include "usg_common.h" + +void server(int port) { + void *socket = dgram_mod_open_socket(SURVEY); + dgram_mod_bind(socket, port); + int size; + void *recvbuf; + char sendbuf[512]; + int rv; + int remote_port; + while ( (rv = dgram_mod_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) { + printf( "鏀跺埌鏉ヨ嚜%d娉ㄥ唽淇℃伅: %s", remote_port, recvbuf); + free(recvbuf); + } + dgram_mod_close_socket(socket); +} + +void client(int port) { + void *socket = dgram_mod_open_socket(SURVEY); + int size; + void *recvbuf; + char sendbuf[512]; + while (true) { + printf("request: "); + scanf("%s", sendbuf); + dgram_mod_send(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size); + printf("reply: %s\n", (char *)recvbuf); + free(recvbuf); + } + dgram_mod_close_socket(socket); +} + + + +int main(int argc, char *argv[]) { + shm_init(512); + int port; + if (argc < 3) { + fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); + return 1; + } + + port = atoi(argv[2]); + + if (strcmp("server", argv[1]) == 0) { + server(port); + } + + if (strcmp("client", argv[1]) == 0) + client(port); + + + return 0; +} \ No newline at end of file diff --git a/test_socket/dgram_socket_test b/test_socket/dgram_socket_test index 6a18d7c..9752490 100755 --- a/test_socket/dgram_socket_test +++ b/test_socket/dgram_socket_test Binary files differ -- Gitblit v1.8.0