1 文件已复制
2个文件已删除
20个文件已添加
2 文件已重命名
15个文件已修改
| | |
| | | |
| | | |
| | | # Common temp files to delete from each directory. |
| | | TEMPFILES=core core.* *.o temp.* *.out *.a *.so |
| | | TEMPFILES=core core.* **/*.o temp.* *.out *.a *.so |
| | | |
| | | %: %.c |
| | | $(CC) $(CFLAGS) $^ -o $@ $(LDFLAGS) $(LDLIBS) |
| | |
| | | DIRS = src test test2 demo |
| | | DIRS = src test demo |
| | | |
| | | all: |
| | | for i in $(DIRS); do \ |
| | |
| | | for i in $(DIRS); do \ |
| | | (cd $$i && echo "cleaning $$i" && $(MAKE) clean) || exit 1; \ |
| | | done |
| | | rm -rf build |
| | | |
| | | ipcrm: |
| | | -ipcrm -a |
New file |
| | |
| | | #ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |
| | | #define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |
| | | #include "atomic_ops.h" |
| | | #include <assert.h> // assert() |
| | | #include <sched.h> // sched_yield() |
| | | #include "logger_factory.h" |
| | | #include "mem_pool.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | /// @brief implementation of an array based lock free queue with support for |
| | | /// multiple producers |
| | | /// This class is prevented from being instantiated directly (all members and |
| | | /// methods are private). To instantiate a multiple producers lock free queue |
| | | /// you must use the ArrayLockFreeQueue fachade: |
| | | /// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; |
| | | |
| | | |
| | | #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | template <typename ELEM_T, typename Allocator = SHM_Allocator> |
| | | class ArrayLockFreeQueue |
| | | { |
| | | // ArrayLockFreeQueue will be using this' private members |
| | | template < |
| | | typename ELEM_T_, |
| | | typename Allocator_, |
| | | template <typename T, typename AT> class Q_TYPE |
| | | > |
| | | friend class LockFreeQueue; |
| | | |
| | | private: |
| | | /// @brief constructor of the class |
| | | ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | virtual ~ArrayLockFreeQueue(); |
| | | |
| | | inline uint32_t size(); |
| | | |
| | | inline bool full(); |
| | | |
| | | inline bool empty(); |
| | | |
| | | bool push(const ELEM_T &a_data); |
| | | |
| | | bool pop(ELEM_T &a_data); |
| | | |
| | | /// @brief calculate the index in the circular array that corresponds |
| | | /// to a particular "count" value |
| | | inline uint32_t countToIndex(uint32_t a_count); |
| | | |
| | | ELEM_T& operator[](unsigned i); |
| | | |
| | | private: |
| | | size_t Q_SIZE; |
| | | /// @brief array to keep the elements |
| | | ELEM_T *m_theQueue; |
| | | |
| | | /// @brief where a new element will be inserted |
| | | uint32_t m_writeIndex; |
| | | |
| | | /// @brief where the next element where be extracted from |
| | | uint32_t m_readIndex; |
| | | |
| | | /// @brief maximum read index for multiple producer queues |
| | | /// If it's not the same as m_writeIndex it means |
| | | /// there are writes pending to be "committed" to the queue, that means, |
| | | /// the place for the data was reserved (the index in the array) but |
| | | /// data is still not in the queue, so the thread trying to read will have |
| | | /// to wait for those other threads to save the data into the queue |
| | | /// |
| | | /// note this is only used for multiple producers |
| | | uint32_t m_maximumReadIndex; |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | /// @brief number of elements in the queue |
| | | uint32_t m_count; |
| | | #endif |
| | | |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); |
| | | |
| | | }; |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize): |
| | | Q_SIZE(qsize), |
| | | m_writeIndex(0), // initialisation is not atomic |
| | | m_readIndex(0), // |
| | | m_maximumReadIndex(0) // |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | ,m_count(0) // |
| | | #endif |
| | | { |
| | | m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() |
| | | { |
| | | // std::cout << "destroy ArrayLockFreeQueue\n"; |
| | | Allocator::deallocate(m_theQueue); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) |
| | | { |
| | | // if Q_SIZE is a power of 2 this statement could be also written as |
| | | // return (a_count & (Q_SIZE - 1)); |
| | | return (a_count % Q_SIZE); |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return m_count; |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_maximumReadIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | // let's think of a scenario where this function returns bogus data |
| | | // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run |
| | | // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 |
| | | // 2. afterwards this thread is preemted. While this thread is inactive 2 |
| | | // elements are inserted and removed from the queue, so m_maximumReadIndex |
| | | // is 5 and m_readIndex 4. Real size is still 1 |
| | | // 3. Now the current thread comes back from preemption and reads m_readIndex. |
| | | // currentReadIndex is 4 |
| | | // 4. currentReadIndex is bigger than currentWriteIndex, so |
| | | // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, |
| | | // it returns that the queue is almost full, when it is almost empty |
| | | // |
| | | if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) |
| | | { |
| | | return (currentWriteIndex - currentReadIndex); |
| | | } |
| | | else |
| | | { |
| | | return (Q_SIZE + currentWriteIndex - currentReadIndex); |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == (Q_SIZE)); |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_writeIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count == 0); |
| | | #else |
| | | |
| | | if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) |
| | | { |
| | | uint32_t currentReadIndex; |
| | | uint32_t currentWriteIndex; |
| | | |
| | | do |
| | | { |
| | | |
| | | currentWriteIndex = m_writeIndex; |
| | | currentReadIndex = m_readIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count == Q_SIZE) { |
| | | return false; |
| | | } |
| | | #else |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); |
| | | |
| | | // We know now that this index is reserved for us. Use it to save the data |
| | | m_theQueue[countToIndex(currentWriteIndex)] = a_data; |
| | | |
| | | // update the maximum read index after saving the data. It wouldn't fail if there is only one thread |
| | | // inserting in the queue. It might fail if there are more than 1 producer threads because this |
| | | // operation has to be done in the same order as the previous CAS |
| | | |
| | | while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) |
| | | { |
| | | // this is a good place to yield the thread in case there are more |
| | | // software threads than hardware processors and you have more |
| | | // than 1 producer thread |
| | | // have a look at sched_yield (POSIX.1b) |
| | | sched_yield(); |
| | | } |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | AtomicAdd(&m_count, 1); |
| | | #endif |
| | | return true; |
| | | } |
| | | |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) |
| | | { |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | | |
| | | do |
| | | { |
| | | // to ensure thread-safety when there is more than 1 producer thread |
| | | // a second index is defined (m_maximumReadIndex) |
| | | currentReadIndex = m_readIndex; |
| | | currentMaximumReadIndex = m_maximumReadIndex; |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count == 0) { |
| | | return false; |
| | | } |
| | | #else |
| | | if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) |
| | | { |
| | | // the queue is empty or |
| | | // a producer thread has allocate space in the queue but is |
| | | // waiting to commit the data into it |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | // retrieve the data from the queue |
| | | a_data = m_theQueue[countToIndex(currentReadIndex)]; |
| | | |
| | | // try to perfrom now the CAS operation on the read index. If we succeed |
| | | // a_data already contains what m_readIndex pointed to before we |
| | | // increased it |
| | | if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | // m_count.fetch_sub(1); |
| | | AtomicSub(&m_count, 1); |
| | | #endif |
| | | return true; |
| | | } |
| | | |
| | | // it failed retrieving the element off the queue. Someone else must |
| | | // have read the element stored at countToIndex(currentReadIndex) |
| | | // before we could perform the CAS operation |
| | | |
| | | } while(1); // keep looping to try again! |
| | | |
| | | // Something went wrong. it shouldn't be possible to reach here |
| | | assert(0); |
| | | |
| | | // Add this return statement to avoid compiler warnings |
| | | return false; |
| | | } |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) |
| | | { |
| | | int currentCount = m_count; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | if (i < 0 || i >= currentCount) |
| | | { |
| | | std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; |
| | | std::exit(EXIT_FAILURE); |
| | | } |
| | | return m_theQueue[countToIndex(currentReadIndex+i)]; |
| | | } |
| | | |
| | | #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |
New file |
| | |
| | | #ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |
| | | #define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |
| | | |
| | | #include <assert.h> // assert() |
| | | #include <sched.h> // sched_yield() |
| | | #include "logger_factory.h" |
| | | |
| | | /// @brief implementation of an array based lock free queue with support for |
| | | /// multiple producers |
| | | /// This class is prevented from being instantiated directly (all members and |
| | | /// methods are private). To instantiate a multiple producers lock free queue |
| | | /// you must use the ArrayLockFreeQueue fachade: |
| | | /// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; |
| | | |
| | | |
| | | #define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | template <typename ELEM_T> |
| | | class ArrayLockFreeQueue |
| | | { |
| | | // ArrayLockFreeQueue will be using this' private members |
| | | template < |
| | | typename ELEM_T_, |
| | | template <typename T> class Q_TYPE > |
| | | friend class LockFreeQueue; |
| | | |
| | | private: |
| | | /// @brief constructor of the class |
| | | ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | virtual ~ArrayLockFreeQueue(); |
| | | |
| | | inline uint32_t size(); |
| | | |
| | | inline bool full(); |
| | | |
| | | inline bool empty(); |
| | | |
| | | bool push(const ELEM_T &a_data); |
| | | |
| | | bool pop(ELEM_T &a_data); |
| | | |
| | | /// @brief calculate the index in the circular array that corresponds |
| | | /// to a particular "count" value |
| | | inline uint32_t countToIndex(uint32_t a_count); |
| | | |
| | | ELEM_T& operator[](unsigned i); |
| | | |
| | | private: |
| | | size_t Q_SIZE; |
| | | /// @brief array to keep the elements |
| | | ELEM_T *m_theQueue; |
| | | |
| | | /// @brief where a new element will be inserted |
| | | std::atomic<uint32_t> m_writeIndex; |
| | | |
| | | /// @brief where the next element where be extracted from |
| | | std::atomic<uint32_t> m_readIndex; |
| | | |
| | | /// @brief maximum read index for multiple producer queues |
| | | /// If it's not the same as m_writeIndex it means |
| | | /// there are writes pending to be "committed" to the queue, that means, |
| | | /// the place for the data was reserved (the index in the array) but |
| | | /// data is still not in the queue, so the thread trying to read will have |
| | | /// to wait for those other threads to save the data into the queue |
| | | /// |
| | | /// note this is only used for multiple producers |
| | | std::atomic<uint32_t> m_maximumReadIndex; |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | /// @brief number of elements in the queue |
| | | std::atomic<uint32_t> m_count; |
| | | #endif |
| | | |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src); |
| | | |
| | | }; |
| | | |
| | | |
| | | template <typename ELEM_T> |
| | | ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize): |
| | | Q_SIZE(qsize), |
| | | m_writeIndex(0), // initialisation is not atomic |
| | | m_readIndex(0), // |
| | | m_maximumReadIndex(0) // |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | ,m_count(0) // |
| | | #endif |
| | | { |
| | | m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T)); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue() |
| | | { |
| | | // std::cout << "destroy ArrayLockFreeQueue\n"; |
| | | mm_free(m_theQueue); |
| | | |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count) |
| | | { |
| | | // if Q_SIZE is a power of 2 this statement could be also written as |
| | | // return (a_count & (Q_SIZE - 1)); |
| | | return (a_count % Q_SIZE); |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | inline |
| | | uint32_t ArrayLockFreeQueue<ELEM_T>::size() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return m_count.load(); |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_maximumReadIndex.load(); |
| | | uint32_t currentReadIndex = m_readIndex.load(); |
| | | |
| | | // let's think of a scenario where this function returns bogus data |
| | | // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run |
| | | // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 |
| | | // 2. afterwards this thread is preemted. While this thread is inactive 2 |
| | | // elements are inserted and removed from the queue, so m_maximumReadIndex |
| | | // is 5 and m_readIndex 4. Real size is still 1 |
| | | // 3. Now the current thread comes back from preemption and reads m_readIndex. |
| | | // currentReadIndex is 4 |
| | | // 4. currentReadIndex is bigger than currentWriteIndex, so |
| | | // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, |
| | | // it returns that the queue is almost full, when it is almost empty |
| | | // |
| | | if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) |
| | | { |
| | | return (currentWriteIndex - currentReadIndex); |
| | | } |
| | | else |
| | | { |
| | | return (Q_SIZE + currentWriteIndex - currentReadIndex); |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T>::full() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count.load() == (Q_SIZE)); |
| | | #else |
| | | |
| | | uint32_t currentWriteIndex = m_writeIndex; |
| | | uint32_t currentReadIndex = m_readIndex; |
| | | |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | inline |
| | | bool ArrayLockFreeQueue<ELEM_T>::empty() |
| | | { |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | return (m_count.load() == 0); |
| | | #else |
| | | |
| | | if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load())) |
| | | { |
| | | // the queue is full |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | // not full! |
| | | return false; |
| | | } |
| | | #endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | } |
| | | |
| | | |
| | | template <typename ELEM_T> |
| | | bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data) |
| | | { |
| | | uint32_t currentReadIndex; |
| | | uint32_t currentWriteIndex; |
| | | |
| | | do |
| | | { |
| | | currentWriteIndex = m_writeIndex.load(); |
| | | currentReadIndex = m_readIndex.load(); |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count.load() == Q_SIZE) { |
| | | return false; |
| | | } |
| | | #else |
| | | if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) |
| | | { |
| | | // the queue is full |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | // There is more than one producer. Keep looping till this thread is able |
| | | // to allocate space for current piece of data |
| | | // |
| | | // using compare_exchange_strong because it isn't allowed to fail spuriously |
| | | // When the compare_exchange operation is in a loop the weak version |
| | | // will yield better performance on some platforms, but here we'd have to |
| | | // load m_writeIndex all over again |
| | | } while (!m_writeIndex.compare_exchange_strong( |
| | | currentWriteIndex, (currentWriteIndex + 1))); |
| | | |
| | | // Just made sure this index is reserved for this thread. |
| | | m_theQueue[countToIndex(currentWriteIndex)] = a_data; |
| | | //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) ); |
| | | |
| | | // update the maximum read index after saving the piece of data. It can't |
| | | // fail if there is only one thread inserting in the queue. It might fail |
| | | // if there is more than 1 producer thread because this operation has to |
| | | // be done in the same order as the previous CAS |
| | | // |
| | | // using compare_exchange_weak because they are allowed to fail spuriously |
| | | // (act as if *this != expected, even if they are equal), but when the |
| | | // compare_exchange operation is in a loop the weak version will yield |
| | | // better performance on some platforms. |
| | | while (!m_maximumReadIndex.compare_exchange_weak( |
| | | currentWriteIndex, (currentWriteIndex + 1))) |
| | | { |
| | | // this is a good place to yield the thread in case there are more |
| | | // software threads than hardware processors and you have more |
| | | // than 1 producer thread |
| | | // have a look at sched_yield (POSIX.1b) |
| | | sched_yield(); |
| | | } |
| | | |
| | | // The value was successfully inserted into the queue |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | m_count.fetch_add(1); |
| | | #endif |
| | | |
| | | return true; |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data) |
| | | { |
| | | uint32_t currentMaximumReadIndex; |
| | | uint32_t currentReadIndex; |
| | | |
| | | do |
| | | { |
| | | currentReadIndex = m_readIndex.load(); |
| | | currentMaximumReadIndex = m_maximumReadIndex.load(); |
| | | |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | |
| | | if (m_count.load() == 0) { |
| | | return false; |
| | | } |
| | | #else |
| | | // to ensure thread-safety when there is more than 1 producer |
| | | // thread a second index is defined (m_maximumReadIndex) |
| | | if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) |
| | | { |
| | | // the queue is empty or |
| | | // a producer thread has allocate space in the queue but is |
| | | // waiting to commit the data into it |
| | | return false; |
| | | } |
| | | #endif |
| | | |
| | | // retrieve the data from the queue |
| | | a_data = m_theQueue[countToIndex(currentReadIndex)]; |
| | | //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) ); |
| | | // try to perfrom now the CAS operation on the read index. If we succeed |
| | | // a_data already contains what m_readIndex pointed to before we |
| | | // increased it |
| | | if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1))) |
| | | { |
| | | // got here. The value was retrieved from the queue. Note that the |
| | | // data inside the m_queue array is not deleted nor reseted |
| | | #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE |
| | | m_count.fetch_sub(1); |
| | | #endif |
| | | return true; |
| | | } |
| | | |
| | | // it failed retrieving the element off the queue. Someone else must |
| | | // have read the element stored at countToIndex(currentReadIndex) |
| | | // before we could perform the CAS operation |
| | | |
| | | } while(1); // keep looping to try again! |
| | | |
| | | // Something went wrong. it shouldn't be possible to reach here |
| | | assert(0); |
| | | |
| | | // Add this return statement to avoid compiler warnings |
| | | return false; |
| | | } |
| | | |
| | | |
| | | template <typename ELEM_T> |
| | | ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i) |
| | | { |
| | | int currentCount = m_count.load(); |
| | | uint32_t currentReadIndex = m_readIndex.load(); |
| | | if (i < 0 || i >= currentCount) |
| | | { |
| | | std::cerr << "Error in array limits: " << i << " is out of range\n"; |
| | | std::exit(EXIT_FAILURE); |
| | | } |
| | | return m_theQueue[countToIndex(currentReadIndex+i)]; |
| | | } |
| | | |
| | | #endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ |
New file |
| | |
| | | #ifndef __HASHTABLE_H__ |
| | | #define __HASHTABLE_H__ |
| | | |
| | | #include <sys/queue.h> |
| | | #include <set> |
| | | |
| | | #define MAPSIZE 100 |
| | | |
| | | typedef struct hashtable_t |
| | | { |
| | | struct tailq_header_t* array[MAPSIZE]; |
| | | int mutex; |
| | | int wlock; |
| | | int cond; |
| | | size_t readcnt; |
| | | |
| | | } hashtable_t; |
| | | typedef void (*hashtable_foreach_cb)(int key, void *value); |
| | | |
| | | void hashtable_init(hashtable_t *hashtable); |
| | | void *hashtable_get(hashtable_t *hashtable, int key); |
| | | void hashtable_put(hashtable_t *hashtable, int key, void *value); |
| | | void *hashtable_remove(hashtable_t *hashtable, int key); |
| | | void hashtable_removeall(hashtable_t *hashtable); |
| | | |
| | | |
| | | void hashtable_foreach(hashtable_t *hashtable, hashtable_foreach_cb cb); |
| | | |
| | | void hashtable_printall(hashtable_t *hashtable); |
| | | |
| | | int hashtable_alloc_key(hashtable_t *hashtable); |
| | | |
| | | std::set<int> * hashtable_keyset(hashtable_t *hashtable) ; |
| | | #endif |
New file |
| | |
| | | // queue.h -- interface for a queue |
| | | #ifndef __LINKED_LOCK_FREE_QUEUE_H_ |
| | | #define __LINKED_LOCK_FREE_QUEUE_H_ |
| | | #include "mm.h" |
| | | #include "sem_util.h" |
| | | |
| | | template <typename T> class Node; |
| | | |
| | | template <typename T> |
| | | class Pointer { |
| | | public: |
| | | |
| | | Node<T> *ptr; |
| | | unsigned long count; |
| | | Pointer( Node<T> *node = NULL, int c=0) noexcept : ptr(node), count(c) {} |
| | | |
| | | bool operator == (const Pointer<T> o) const { |
| | | return (o.ptr == ptr) && (o.count == count); |
| | | } |
| | | bool operator != (const Pointer<T> o) const { |
| | | return !((o.ptr == ptr) && (o.count == count)); |
| | | } |
| | | |
| | | |
| | | |
| | | }; |
| | | |
| | | template <typename T> |
| | | class Node { |
| | | public: |
| | | alignas(16) std::atomic<Pointer<T> > next; |
| | | T value; |
| | | |
| | | Node() { |
| | | } |
| | | |
| | | void *operator new(size_t size){ |
| | | return mm_malloc(size); |
| | | } |
| | | |
| | | void operator delete(void *p) { |
| | | return mm_free(p); |
| | | } |
| | | }; |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | template <typename ELEM_T> |
| | | class LinkedLockFreeQueue |
| | | { |
| | | |
| | | template < |
| | | typename ELEM_T_, |
| | | template <typename T> class Q_TYPE > |
| | | friend class LockFreeQueue; |
| | | private: |
| | | // class scope definitions |
| | | enum {Q_SIZE = 10}; |
| | | |
| | | // private class members |
| | | std::atomic<Pointer<ELEM_T> > Head; // pointer to front of Queue |
| | | std::atomic<Pointer<ELEM_T> > Tail; // pointer to rear of Queue |
| | | //std::atomic_uint count; // current number of size in Queue |
| | | std::atomic_uint count; |
| | | const size_t qsize; // maximum number of size in Queue |
| | | // preemptive definitions to prevent public copying |
| | | LinkedLockFreeQueue(const LinkedLockFreeQueue & q) : qsize(0) { } |
| | | LinkedLockFreeQueue & operator=(const LinkedLockFreeQueue & q) { return *this;} |
| | | protected: |
| | | LinkedLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit |
| | | ~LinkedLockFreeQueue(); |
| | | bool empty() const; |
| | | bool full() const; |
| | | unsigned int size() const; |
| | | bool push(const ELEM_T &item); // add item to end |
| | | bool pop(ELEM_T &item); |
| | | |
| | | |
| | | ELEM_T& operator[](unsigned i); |
| | | |
| | | }; |
| | | |
| | | |
| | | // Queue methods |
| | | template <typename T> |
| | | LinkedLockFreeQueue<T>::LinkedLockFreeQueue(size_t qs) : count(0), qsize(qs) |
| | | { |
| | | Node<T> *node = new Node<T>; |
| | | Pointer<T> pointer(node, 0); |
| | | |
| | | Head.store(pointer, std::memory_order_relaxed); |
| | | Tail.store(pointer, std::memory_order_relaxed); |
| | | |
| | | } |
| | | |
| | | template <typename T> |
| | | LinkedLockFreeQueue<T>::~LinkedLockFreeQueue() |
| | | { |
| | | LoggerFactory::getLogger().debug("LinkedLockFreeQueue destory"); |
| | | Node<T> * nodeptr; |
| | | Pointer<T> tmp = Head.load(std::memory_order_relaxed); |
| | | while((nodeptr = tmp.ptr) != NULL) { |
| | | tmp = (tmp.ptr->next).load(std::memory_order_relaxed); |
| | | //std::cerr << "delete " << nodeptr << std::endl; |
| | | delete nodeptr; |
| | | |
| | | } |
| | | } |
| | | |
| | | template <typename T> |
| | | bool LinkedLockFreeQueue<T>::empty() const |
| | | { |
| | | return count == 0; |
| | | } |
| | | |
| | | template <typename T> |
| | | bool LinkedLockFreeQueue<T>::full() const |
| | | { |
| | | return count == qsize; |
| | | } |
| | | |
| | | template <typename T> |
| | | unsigned int LinkedLockFreeQueue<T>::size() const |
| | | { |
| | | return count; |
| | | } |
| | | |
| | | // Add item to queue |
| | | template <typename T> |
| | | bool LinkedLockFreeQueue<T>::push(const T & item) |
| | | { |
| | | if (full()) |
| | | return false; |
| | | |
| | | Node<T> * node = new Node<T>; |
| | | node->value = item; |
| | | |
| | | |
| | | Pointer<T> tail ; |
| | | Pointer<T> next ; |
| | | |
| | | |
| | | while(true) { |
| | | tail = Tail.load(std::memory_order_relaxed); |
| | | next = (tail.ptr->next).load(std::memory_order_relaxed); |
| | | if (tail == Tail.load(std::memory_order_relaxed)) { |
| | | if (next.ptr == NULL) { |
| | | if ((tail.ptr->next).compare_exchange_weak(next, |
| | | Pointer<T>(node, next.count+1), |
| | | std::memory_order_release, |
| | | std::memory_order_relaxed) ) |
| | | break; |
| | | else |
| | | Tail.compare_exchange_weak(tail, |
| | | Pointer<T>(next.ptr, tail.count+1), |
| | | std::memory_order_release, |
| | | std::memory_order_relaxed); |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1), |
| | | std::memory_order_release, |
| | | std::memory_order_relaxed); |
| | | count++; |
| | | return true; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | // Place front item into item variable and remove from queue |
| | | template <typename T> |
| | | bool LinkedLockFreeQueue<T>::pop(T & item) |
| | | { |
| | | if (empty()) |
| | | return false; |
| | | |
| | | Pointer<T> head; |
| | | Pointer<T> tail; |
| | | Pointer<T> next; |
| | | |
| | | while(true) { |
| | | head = Head.load(std::memory_order_relaxed); |
| | | tail = Tail.load(std::memory_order_relaxed); |
| | | next = (head.ptr->next).load(); |
| | | if (head == Head.load(std::memory_order_relaxed)) { |
| | | if(head.ptr == tail.ptr) { |
| | | if (next.ptr == NULL) |
| | | return false; |
| | | // Tail is falling behind. Try to advance it |
| | | Tail.compare_exchange_weak(tail, |
| | | Pointer<T>(next.ptr, tail.count+1), |
| | | std::memory_order_release, |
| | | std::memory_order_relaxed); |
| | | } else { |
| | | item = next.ptr->value; |
| | | if (Head.compare_exchange_weak(head, |
| | | Pointer<T>(next.ptr, head.count+1), |
| | | std::memory_order_release, |
| | | std::memory_order_relaxed)) { |
| | | delete head.ptr; |
| | | break; |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | count--; |
| | | return true; |
| | | |
| | | } |
| | | |
| | | |
| | | template <class T> |
| | | T& LinkedLockFreeQueue<T>::operator[](unsigned int i) |
| | | { |
| | | if (i < 0 || i >= count) |
| | | { |
| | | std::cerr << "Error in array limits: " << i << " is out of range\n"; |
| | | std::exit(EXIT_FAILURE); |
| | | } |
| | | |
| | | |
| | | Pointer<T> tmp = Head.load(std::memory_order_relaxed); |
| | | //Pointer<T> tail = Tail.load(std::memory_order_relaxed); |
| | | |
| | | while(i > 0) { |
| | | //std::cout << i << ":" << std::endl; |
| | | tmp = (tmp.ptr->next).load(std::memory_order_relaxed); |
| | | i--; |
| | | } |
| | | |
| | | tmp = (tmp.ptr->next).load(std::memory_order_relaxed); |
| | | return tmp.ptr->value; |
| | | } |
| | | |
| | | |
| | | |
| | | #endif |
New file |
| | |
| | | #ifndef __LOCK_FREE_QUEUE_H__ |
| | | #define __LOCK_FREE_QUEUE_H__ |
| | | |
| | | #include <usg_common.h> |
| | | #include <assert.h> // assert() |
| | | #include "mem_pool.h" |
| | | #include "sem_util.h" |
| | | #include "logger_factory.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | // default Queue size |
| | | #define LOCK_FREE_Q_DEFAULT_SIZE 16 |
| | | |
| | | // define this macro if calls to "size" must return the real size of the |
| | | // queue. If it is undefined that function will try to take a snapshot of |
| | | // the queue, but returned value might be bogus |
| | | |
| | | |
| | | // forward declarations for default template values |
| | | // |
| | | |
| | | template <typename ELEM_T, typename Allocator> |
| | | class ArrayLockFreeQueue; |
| | | |
| | | // template <typename ELEM_T> |
| | | // class LinkedLockFreeQueue; |
| | | |
| | | |
| | | /// @brief Lock-free queue based on a circular array |
| | | /// No allocation of extra memory for the nodes handling is needed, but it has |
| | | /// to add extra overhead (extra CAS operation) when inserting to ensure the |
| | | /// thread-safety of the queue when the queue type is not |
| | | /// ArrayLockFreeQueueSingleProducer. |
| | | /// |
| | | /// examples of instantiation: |
| | | /// ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1) |
| | | /// // and defaulted to single producer |
| | | /// ArrayLockFreeQueue<int, 16> q; |
| | | /// // queue of ints of size (16 - 1) and |
| | | /// // defaulted to single producer |
| | | /// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; |
| | | /// // queue of ints of size (100 - 1) with support |
| | | /// // for multiple producers |
| | | /// |
| | | /// ELEM_T represents the type of elementes pushed and popped from the queue |
| | | /// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1) |
| | | /// This number should be a power of 2 to ensure |
| | | /// indexes in the circular queue keep stable when the uint32_t |
| | | /// variable that holds the current position rolls over from FFFFFFFF |
| | | /// to 0. For instance |
| | | /// 2 -> 0x02 |
| | | /// 4 -> 0x04 |
| | | /// 8 -> 0x08 |
| | | /// 16 -> 0x10 |
| | | /// (...) |
| | | /// 1024 -> 0x400 |
| | | /// 2048 -> 0x800 |
| | | /// |
| | | /// if queue size is not defined as requested, let's say, for |
| | | /// instance 100, when current position is FFFFFFFF (4,294,967,295) |
| | | /// index in the circular array is 4,294,967,295 % 100 = 95. |
| | | /// When that value is incremented it will be set to 0, that is the |
| | | /// last 4 elements of the queue are not used when the counter rolls |
| | | /// over to 0 |
| | | /// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and |
| | | /// ArrayLockFreeQueue are supported (single producer |
| | | /// by default) |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator = SHM_Allocator, |
| | | template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue |
| | | > |
| | | class LockFreeQueue |
| | | { |
| | | |
| | | private: |
| | | int slots; |
| | | int items; |
| | | |
| | | public: |
| | | // int mutex; |
| | | LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | /// @brief destructor of the class. |
| | | /// Note it is not virtual since it is not expected to inherit from this |
| | | /// template |
| | | ~LockFreeQueue(); |
| | | std::atomic_uint reference; |
| | | /// @brief constructor of the class |
| | | |
| | | |
| | | /// @brief returns the current number of items in the queue |
| | | /// It tries to take a snapshot of the size of the queue, but in busy environments |
| | | /// this function might return bogus values. |
| | | /// |
| | | /// If a reliable queue size must be kept you might want to have a look at |
| | | /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' |
| | | /// it enables a reliable size though it hits overall performance of the queue |
| | | /// (when the reliable size variable is on it's got an impact of about 20% in time) |
| | | inline uint32_t size(); |
| | | |
| | | /// @brief return true if the queue is full. False otherwise |
| | | /// It tries to take a snapshot of the size of the queue, but in busy |
| | | /// environments this function might return bogus values. See help in method |
| | | /// LockFreeQueue::size |
| | | inline bool full(); |
| | | |
| | | inline bool empty(); |
| | | |
| | | inline ELEM_T& operator[](unsigned i); |
| | | |
| | | /// @brief push an element at the tail of the queue |
| | | /// @param the element to insert in the queue |
| | | /// Note that the element is not a pointer or a reference, so if you are using large data |
| | | /// structures to be inserted in the queue you should think of instantiate the template |
| | | /// of the queue as a pointer to that large structure |
| | | /// @return true if the element was inserted in the queue. False if the queue was full |
| | | bool push(const ELEM_T &a_data); |
| | | bool push_nowait(const ELEM_T &a_data); |
| | | bool push_timeout(const ELEM_T &a_data, struct timespec * timeout); |
| | | |
| | | /// @brief pop the element at the head of the queue |
| | | /// @param a reference where the element in the head of the queue will be saved to |
| | | /// Note that the a_data parameter might contain rubbish if the function returns false |
| | | /// @return true if the element was successfully extracted from the queue. False if the queue was empty |
| | | bool pop(ELEM_T &a_data); |
| | | bool pop_nowait(ELEM_T &a_data); |
| | | bool pop_timeout(ELEM_T &a_data, struct timespec * timeout); |
| | | |
| | | |
| | | void *operator new(size_t size); |
| | | void operator delete(void *p); |
| | | |
| | | protected: |
| | | /// @brief the actual queue. methods are forwarded into the real |
| | | /// implementation |
| | | Q_TYPE<ELEM_T, Allocator> m_qImpl; |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); |
| | | }; |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) |
| | | { |
| | | // std::cout << "LockFreeQueue init reference=" << reference << std::endl; |
| | | slots = SemUtil::get(IPC_PRIVATE, qsize); |
| | | items = SemUtil::get(IPC_PRIVATE, 0); |
| | | // mutex = SemUtil::get(IPC_PRIVATE, 1); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() |
| | | { |
| | | LoggerFactory::getLogger().debug("LockFreeQueue desctroy"); |
| | | SemUtil::remove(slots); |
| | | SemUtil::remove(items); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() |
| | | { |
| | | return m_qImpl.size(); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() |
| | | { |
| | | return m_qImpl.full(); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() |
| | | { |
| | | return m_qImpl.empty(); |
| | | } |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) |
| | | { |
| | | if (SemUtil::dec(slots) == -1) { |
| | | err_msg(errno, "LockFreeQueue push"); |
| | | return false; |
| | | } |
| | | |
| | | if ( m_qImpl.push(a_data) ) { |
| | | SemUtil::inc(items); |
| | | return true; |
| | | } |
| | | return false; |
| | | |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) |
| | | { |
| | | if (SemUtil::dec_nowait(slots) == -1) { |
| | | if (errno == EAGAIN) |
| | | return false; |
| | | else { |
| | | err_msg(errno, "LockFreeQueue push_nowait"); |
| | | return false; |
| | | } |
| | | |
| | | } |
| | | |
| | | if ( m_qImpl.push(a_data)) { |
| | | SemUtil::inc(items); |
| | | return true; |
| | | } |
| | | return false; |
| | | |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, struct timespec * timeout) |
| | | { |
| | | |
| | | if (SemUtil::dec_timeout(slots, timeout) == -1) { |
| | | if (errno == EAGAIN) |
| | | return false; |
| | | else { |
| | | err_msg(errno, "LockFreeQueue push_timeout"); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | if (m_qImpl.push(a_data)){ |
| | | SemUtil::inc(items); |
| | | return true; |
| | | } |
| | | return false; |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) |
| | | { |
| | | if (SemUtil::dec(items) == -1) { |
| | | err_msg(errno, "LockFreeQueue pop"); |
| | | return false; |
| | | } |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | | SemUtil::inc(slots); |
| | | return true; |
| | | } |
| | | return false; |
| | | |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) |
| | | { |
| | | if (SemUtil::dec_nowait(items) == -1) { |
| | | if (errno == EAGAIN) |
| | | return false; |
| | | else { |
| | | err_msg(errno, "LockFreeQueue pop_nowait"); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | | SemUtil::inc(slots); |
| | | return true; |
| | | } |
| | | return false; |
| | | |
| | | } |
| | | |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) |
| | | { |
| | | if (SemUtil::dec_timeout(items, timeout) == -1) { |
| | | if (errno == EAGAIN) |
| | | return false; |
| | | else { |
| | | err_msg(errno, "LockFreeQueue pop_timeout"); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | if (m_qImpl.pop(a_data)) { |
| | | SemUtil::inc(slots); |
| | | return true; |
| | | } |
| | | return false; |
| | | |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { |
| | | return m_qImpl.operator[](i); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){ |
| | | return Allocator::allocate(size); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template <typename T, typename AT> class Q_TYPE> |
| | | void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) { |
| | | return Allocator::deallocate(p); |
| | | } |
| | | |
| | | // include implementation files |
| | | //#include "linked_lock_free_queue.h" |
| | | #include "array_lock_free_queue.h" |
| | | |
| | | #endif // _LOCK_FREE_QUEUE_H__ |
New file |
| | |
| | | #ifndef __LOGGER_FACTORY_H__ |
| | | #define __LOGGER_FACTORY_H__ |
| | | #include "logger.h" |
| | | |
| | | class LoggerFactory { |
| | | public: |
| | | |
| | | static Logger getLogger() { |
| | | //ERROR ALL DEBUG |
| | | static Logger logger(Logger::DEBUG); |
| | | return logger; |
| | | } |
| | | }; |
| | | |
| | | #endif |
| | | |
| | | |
New file |
| | |
| | | #ifndef _MEM_POOL_H_ |
| | | #define _MEM_POOL_H_ |
| | | #include "mm.h" |
| | | #include "sem_util.h" |
| | | #define MEM_POOL_COND_KEY 0x8801 |
| | | |
| | | static int mem_pool_cond = SemUtil::get(MEM_POOL_COND_KEY, 0); |
| | | |
| | | // static int mem_pool_mutex = SemUtil::get(MEM_POOL_COND_KEY, 1); |
| | | |
| | | static inline void mem_pool_init(size_t heap_size) { |
| | | if(mm_init(heap_size)) { |
| | | |
| | | } |
| | | } |
| | | |
| | | static inline void mem_pool_destroy(void) { |
| | | if(mm_destroy()) { |
| | | SemUtil::remove(mem_pool_cond); |
| | | } |
| | | |
| | | } |
| | | |
| | | static inline void *mem_pool_malloc (size_t size) { |
| | | void *ptr; |
| | | while( (ptr = mm_malloc(size)) == NULL ) { |
| | | err_msg(0, "There is not enough memery to allocate, waiting someone else to free."); |
| | | SemUtil::set(mem_pool_cond, 0); |
| | | // wait for someone else to free space |
| | | SemUtil::dec(mem_pool_cond); |
| | | |
| | | } |
| | | |
| | | return ptr; |
| | | } |
| | | |
| | | static inline void mem_pool_free (void *ptr) { |
| | | mm_free(ptr); |
| | | // notify malloc |
| | | SemUtil::set(mem_pool_cond, 1); |
| | | |
| | | } |
| | | |
| | | static inline void *mem_pool_realloc (void *ptr, size_t size) { |
| | | return mm_realloc(ptr, size); |
| | | } |
| | | |
| | | static inline hashtable_t * mem_pool_get_hashtable() { |
| | | return mm_get_hashtable(); |
| | | |
| | | } |
| | | // extern int mm_checkheap(int verbose); |
| | | |
| | | |
| | | #endif |
New file |
| | |
| | | #ifndef MM_HDR_H |
| | | #define MM_HDR_H /* Prevent accidental double inclusion */ |
| | | |
| | | #include <usg_common.h> |
| | | #include "usg_typedef.h" |
| | | #include "hashtable.h" |
| | | |
| | | extern bool mm_init(size_t heap_size); |
| | | extern bool mm_destroy(void); |
| | | |
| | | extern void *mm_malloc (size_t size); |
| | | extern void mm_free (void *ptr); |
| | | extern void *mm_realloc(void *ptr, size_t size); |
| | | extern hashtable_t * mm_get_hashtable(); |
| | | |
| | | // extern int mm_checkheap(int verbose); |
| | | |
| | | // extern void *get_mm_start_brk(); |
| | | // extern size_t get_mm_max_size(); |
| | | #endif |
New file |
| | |
| | | #ifndef __MOD_SOCKET_H__ |
| | | #define __MOD_SOCKET_H__ |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | enum socket_mod_t |
| | | { |
| | | PULL_PUSH = 1, |
| | | REQ_REP = 2, |
| | | PAIR = 3, |
| | | PUB_SUB = 4, |
| | | SURVEY = 5, |
| | | BUS = 6 |
| | | |
| | | }; |
| | | |
| | | /** |
| | | * 创建socket |
| | | * @return socket地址 |
| | | */ |
| | | void *mod_open_socket(int mod); |
| | | |
| | | /** |
| | | * 关闭socket |
| | | */ |
| | | int mod_close_socket(void * _socket); |
| | | |
| | | /** |
| | | * 绑定端口到socket, 如果不绑定则系统自动分配一个 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int mod_socket_bind(void * _socket, int port); |
| | | |
| | | |
| | | /** |
| | | * 服务端开启连接监听 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int mod_listen(void * _socket); |
| | | |
| | | /** |
| | | * 客户端发起连接请求 |
| | | */ |
| | | int mod_connect(void * _socket, int port); |
| | | |
| | | /** |
| | | * 发送信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int mod_send(void * _socket, const void *buf, const int size); |
| | | |
| | | /** |
| | | * 接收信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int mod_recv(void * _socket, void **buf, int *size) ; |
| | | |
| | | /** |
| | | * 释放接收信息的buf |
| | | */ |
| | | void mod_free(void *buf); |
| | | |
| | | |
| | | /** |
| | | * 获取soket端口号 |
| | | */ |
| | | int mod_get_socket_port(void * _socket); |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | | |
| | | #endif |
copy from src/util/sem_util.h
copy to build/include/sem_util.h
New file |
| | |
| | | #ifndef __SHM_ALLOCATOR_H__ |
| | | #define __SHM_ALLOCATOR_H__ |
| | | #include "usg_common.h" |
| | | #include "mem_pool.h" |
| | | #include <new> |
| | | #include <cstdlib> // for exit() |
| | | #include <climits> // for UNIX_MAX |
| | | #include <cstddef> |
| | | |
| | | |
| | | |
| | | template<class T> class SHM_STL_Allocator |
| | | { |
| | | public: |
| | | typedef T value_type; |
| | | typedef T* pointer; |
| | | typedef const T* const_pointer; |
| | | typedef T& reference; |
| | | typedef const T& const_reference; |
| | | typedef size_t size_type; |
| | | typedef ptrdiff_t difference_type; |
| | | |
| | | |
| | | SHM_STL_Allocator() {}; |
| | | ~SHM_STL_Allocator() {}; |
| | | template<class U> SHM_STL_Allocator(const SHM_STL_Allocator<U>& t) { }; |
| | | template<class U> struct rebind { typedef SHM_STL_Allocator<U> other; }; |
| | | |
| | | pointer allocate(size_type n, const void* hint=0) { |
| | | // fprintf(stderr, "allocate n=%u, hint= %p\n",n, hint); |
| | | return((T*) (mm_malloc(n * sizeof(T)))); |
| | | } |
| | | |
| | | void deallocate(pointer p, size_type n) { |
| | | // fprintf(stderr, "dealocate n=%u" ,n); |
| | | mm_free((void*)p); |
| | | } |
| | | |
| | | void construct(pointer p, const T& value) { |
| | | ::new(p) T(value); |
| | | } |
| | | |
| | | void construct(pointer p) |
| | | { |
| | | ::new(p) T(); |
| | | } |
| | | |
| | | void destroy(pointer p) { |
| | | p->~T(); |
| | | } |
| | | |
| | | pointer address(reference x) { |
| | | return (pointer)&x; |
| | | } |
| | | |
| | | const_pointer address(const_reference x) { |
| | | return (const_pointer)&x; |
| | | } |
| | | |
| | | size_type max_size() const { |
| | | return size_type(UINT_MAX/sizeof(T)); |
| | | } |
| | | }; |
| | | |
| | | |
| | | class SHM_Allocator { |
| | | public: |
| | | static void *allocate (size_t size) { |
| | | printf("shm_allocator malloc\n"); |
| | | return mem_pool_malloc(size); |
| | | } |
| | | |
| | | static void deallocate (void *ptr) { |
| | | printf("shm_allocator free\n"); |
| | | return mem_pool_free(ptr); |
| | | } |
| | | }; |
| | | |
| | | |
| | | class DM_Allocator { |
| | | public: |
| | | static void *allocate (size_t size) { |
| | | printf("dm_allocator malloc\n"); |
| | | return malloc(size); |
| | | } |
| | | |
| | | static void deallocate (void *ptr) { |
| | | printf("dm_allocator free\n"); |
| | | return free(ptr); |
| | | } |
| | | }; |
| | | |
| | | |
| | | // template<class charT, class traits = char _traits<charT>, |
| | | // class Allocator = allocator<charT> > |
| | | |
| | | |
| | | |
| | | |
| | | typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > shmstring; |
| | | |
| | | #endif |
New file |
| | |
| | | #ifndef __SHM_MM_H__ |
| | | #define __SHM_MM_H__ |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | /** |
| | | * 初始化共享内存 |
| | | * @size 共享内存大小, 单位M |
| | | * |
| | | */ |
| | | void shm_init(int size); |
| | | |
| | | /** |
| | | * 销毁共享内存 |
| | | * 整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。 |
| | | */ |
| | | void shm_destroy(); |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | | |
| | | #endif |
| | | |
New file |
| | |
| | | #ifndef __SHM_QUEUE_H__ |
| | | #define __SHM_QUEUE_H__ |
| | | |
| | | #include "usg_common.h" |
| | | #include "hashtable.h" |
| | | #include "lock_free_queue.h" |
| | | #include "logger_factory.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | // default Queue size |
| | | // #define LOCK_FREE_Q_DEFAULT_SIZE 16 |
| | | |
| | | template < typename ELEM_T> |
| | | class SHMQueue |
| | | { |
| | | |
| | | private: |
| | | const int KEY; |
| | | |
| | | public: |
| | | /// @brief constructor of the class |
| | | SHMQueue(int key=0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | |
| | | ~SHMQueue(); |
| | | |
| | | |
| | | inline uint32_t size(); |
| | | |
| | | inline bool full(); |
| | | inline bool empty(); |
| | | |
| | | inline bool push(const ELEM_T &a_data); |
| | | inline bool push_nowait(const ELEM_T &a_data); |
| | | inline bool push_timeout(const ELEM_T &a_data, struct timespec * timeout); |
| | | inline bool pop(ELEM_T &a_data); |
| | | inline bool pop_nowait(ELEM_T &a_data); |
| | | inline bool pop_timeout(ELEM_T &a_data, struct timespec * timeout); |
| | | |
| | | inline ELEM_T& operator[](unsigned i); |
| | | |
| | | static void remove_queues_exclude(int *keys, size_t length); |
| | | private: |
| | | |
| | | |
| | | protected: |
| | | /// @brief the actual queue-> methods are forwarded into the real |
| | | /// implementation |
| | | LockFreeQueue<ELEM_T, SHM_Allocator>* queue; |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src); |
| | | }; |
| | | |
| | | |
| | | template < typename ELEM_T > |
| | | void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length) |
| | | { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | std::set<int>* keyset = hashtable_keyset(hashtable); |
| | | std::set<int>::iterator keyItr; |
| | | LockFreeQueue<ELEM_T, SHM_Allocator>* mqueue; |
| | | bool found; |
| | | for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | found = false; |
| | | for(size_t i = 0; i < length; i++) { |
| | | if(*keyItr == keys[i]) { |
| | | found = true; |
| | | break; |
| | | } |
| | | } |
| | | if(!found) { |
| | | mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); |
| | | delete mqueue; |
| | | } |
| | | } |
| | | delete keyset; |
| | | |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize): KEY(key) |
| | | { |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); |
| | | //LockFreeQueue<int, 10000> q; |
| | | if (queue == NULL || (void *)queue == (void *)1) { |
| | | queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize); |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | } |
| | | queue->reference++; |
| | | LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load()); |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | SHMQueue<ELEM_T>::~SHMQueue() |
| | | { |
| | | queue->reference--; |
| | | LoggerFactory::getLogger().debug("SHMQueue destructor reference===%d", queue->reference.load()); |
| | | if(queue->reference.load() == 0) { |
| | | delete queue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | hashtable_remove(hashtable, KEY); |
| | | LoggerFactory::getLogger().debug("SHMQueue destructor delete queue"); |
| | | } |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline uint32_t SHMQueue<ELEM_T>::size() |
| | | { |
| | | return queue->size(); |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::full() |
| | | { |
| | | return queue->full(); |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::empty() |
| | | { |
| | | return queue->empty(); |
| | | } |
| | | |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data) |
| | | { |
| | | return queue->push(a_data); |
| | | |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) |
| | | { |
| | | return queue->push_nowait(a_data); |
| | | |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, struct timespec * timeout) |
| | | { |
| | | |
| | | return queue->push_timeout(a_data, timeout); |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data) |
| | | { |
| | | return queue->pop(a_data); |
| | | |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) |
| | | { |
| | | return queue->pop_nowait(a_data); |
| | | |
| | | } |
| | | |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) |
| | | { |
| | | return queue->pop_timeout(a_data, timeout); |
| | | |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline ELEM_T& SHMQueue<ELEM_T>::operator[](unsigned i) { |
| | | return queue->operator[](i); |
| | | } |
| | | |
| | | |
| | | |
| | | #endif |
New file |
| | |
| | | #ifndef __SHM_QUEUE_WRAPPER_H__ |
| | | #define __SHM_QUEUE_WRAPPER_H__ |
| | | |
| | | #include "usg_common.h" |
| | | #include "usg_typedef.h" |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | |
| | | |
| | | //移除不包含在keys中的队列 |
| | | void shm_remove_queues_exclude(void *keys, int length); |
| | | /** |
| | | * 创建队列 |
| | | * @ shmqueue |
| | | * @ key 标识共享队列的唯一标识, key是一个指针里面存储了key的值, 如果key的值为-1系统会自动分配一个key值并把该key的值赋给key指针。如果key的值不会空会检查是否有重复绑定的情况, 有重复就报错没有就创建队列并绑定key. |
| | | * @ queue_size 队列大小 |
| | | */ |
| | | void* shmqueue_create( int * key, int queue_size); |
| | | |
| | | /** |
| | | * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出 |
| | | */ |
| | | void* shmqueue_attach(int key) ; |
| | | |
| | | /** |
| | | * 销毁 |
| | | */ |
| | | void shmqueue_drop(void * _shmqueue); |
| | | |
| | | /** |
| | | * 队列元素的个数 |
| | | */ |
| | | int shmqueue_size(void * _shmqueue) ; |
| | | |
| | | /** |
| | | * 是否已满 |
| | | * @return 1是, 0否 |
| | | */ |
| | | int shmqueue_full(void * _shmqueue); |
| | | |
| | | /** |
| | | * 是否为空 |
| | | * @return 1是, 0否 |
| | | */ |
| | | int shmqueue_empty(void * _shmqueue) ; |
| | | |
| | | /** |
| | | * 入队, 队列满时等待. |
| | | * @return 1 入队成功, 0 入队失败 |
| | | */ |
| | | int shmqueue_push(void * _shmqueue, void *src, int size); |
| | | |
| | | /** |
| | | * 入队, 队列满时立即返回. |
| | | * @return 1 入队成功, 0 入队失败 |
| | | */ |
| | | int shmqueue_push_nowait(void * _shmqueue, void *src, int size) ; |
| | | |
| | | /** |
| | | * 入队, 指定时间内入队不成功就返回 |
| | | * @sec 秒 |
| | | * @nsec 纳秒 |
| | | * @return 1 入队成功, 0 入队失败 |
| | | */ |
| | | int shmqueue_push_timeout(void * _shmqueue, void *src, int size, int sec, int nsec) ; |
| | | |
| | | /** |
| | | * 出队, 队列空时等待 |
| | | * @return 1 出队成功, 0出队失败 |
| | | */ |
| | | int shmqueue_pop(void * _shmqueue, void **dest, int *size); |
| | | |
| | | /** |
| | | * 出队, 队列空时立即返回 |
| | | * @return 1 出队成功, 0出队失败 |
| | | */ |
| | | int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) ; |
| | | |
| | | /** |
| | | * 出队, 指定时间内出队不成功就返回 |
| | | * @sec秒 |
| | | * @nsec纳秒 |
| | | * @return 1 出队成功, 0出队失败 |
| | | */ |
| | | int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec); |
| | | |
| | | /** |
| | | * 释放出队分配的内存 |
| | | */ |
| | | void shmqueue_free(void *ptr); |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | | |
| | | #endif |
New file |
| | |
| | | #ifndef __SHM_SOCKET_H__ |
| | | #define __SHM_SOCKET_H__ |
| | | |
| | | #include "usg_common.h" |
| | | #include "usg_typedef.h" |
| | | #include "shm_queue.h" |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | | #endif |
| | | |
| | | enum shm_msg_type_t |
| | | { |
| | | SHM_SOCKET_OPEN = 1, |
| | | SHM_SOCKET_OPEN_REPLY = 2, |
| | | SHM_SOCKET_CLOSE = 3, |
| | | SHM_COMMON_MSG = 4 |
| | | |
| | | }; |
| | | |
| | | enum shm_socket_type_t |
| | | { |
| | | SHM_SOCKET_STREAM = 1, |
| | | SHM_SOCKET_DGRAM = 2 |
| | | |
| | | }; |
| | | |
| | | enum shm_connection_status_t { |
| | | SHM_CONN_CLOSED=1, |
| | | SHM_CONN_LISTEN=2, |
| | | SHM_CONN_ESTABLISHED=3 |
| | | }; |
| | | |
| | | typedef struct shm_msg_t { |
| | | int port; |
| | | shm_msg_type_t type; |
| | | size_t size; |
| | | void * buf; |
| | | |
| | | } shm_msg_t; |
| | | |
| | | |
| | | typedef struct shm_socket_t { |
| | | shm_socket_type_t socket_type; |
| | | // 本地port |
| | | int port; |
| | | shm_connection_status_t status; |
| | | SHMQueue<shm_msg_t> *queue; |
| | | SHMQueue<shm_msg_t> *remoteQueue; |
| | | LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue; |
| | | LockFreeQueue<shm_msg_t, DM_Allocator> *acceptQueue; |
| | | std::map<int, shm_socket_t* > *clientSocketMap; |
| | | pthread_t dispatch_thread; |
| | | |
| | | } shm_socket_t; |
| | | |
| | | |
| | | |
| | | |
| | | shm_socket_t *shm_open_socket(shm_socket_type_t socket_type); |
| | | |
| | | |
| | | int shm_close_socket(shm_socket_t * socket) ; |
| | | |
| | | |
| | | int shm_socket_bind(shm_socket_t * socket, int port) ; |
| | | |
| | | int shm_listen(shm_socket_t * socket) ; |
| | | |
| | | shm_socket_t* shm_accept(shm_socket_t* socket); |
| | | |
| | | int shm_connect(shm_socket_t * socket, int port); |
| | | |
| | | int shm_send(shm_socket_t * socket, const void *buf, const int size) ; |
| | | |
| | | int shm_recv(shm_socket_t * socket, void **buf, int *size) ; |
| | | |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port); |
| | | |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port); |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | | |
| | | #endif |
| | |
| | | # Makefile for common library. |
| | | # |
| | | ROOT=.. |
| | | LDLIBS+=-Wl,-rpath=$(ROOT)/queue:$(ROOT)/lib |
| | | LDLIBS+=-Wl,-rpath=$(ROOT)/lib:$(ROOT)/build/lib |
| | | # 开源工具包路径 |
| | | LDDIR += -L$(ROOT)/queue |
| | | LDDIR += -L$(ROOT)/build/lib |
| | | # 开源工具包 |
| | | LDLIBS += -lshm_queue -lusgcommon -lpthread |
| | | |
| | | INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include |
| | | INCLUDE += -I$(ROOT)/build/include |
| | | |
| | | PLATFORM=$(shell $(ROOT)/systype.sh) |
| | | include $(ROOT)/Make.defines.$(PLATFORM) |
| | | |
| | | |
| | | PROGS = req_rep pub_sub |
| | | PROGS = req_rep pub_sub queue |
| | | |
| | | |
| | | build: $(PROGS) |
| | |
| | | |
| | | clean: |
| | | rm -f $(TEMPFILES) $(PROGS) |
| | | |
| | | |
| | | |
| | | $(LIBQUEUE): |
| | | (cd $(ROOT)/queue && $(MAKE)) |
File was renamed from test2/test_queue_wrapper.c |
| | |
| | | #include "shm_queue_wrapper.h" |
| | | #include "mm.h" |
| | | #include "shm_mm.h" |
| | | |
| | | // typedef struct message_t |
| | | // { |
| | |
| | | for(i = 0; i < qsize; i++) { |
| | | sprintf(msg, "%d hello", i); |
| | | //入队 |
| | | if(shmqueue_push(queue, (void *)msg, sizeof(msg))) { |
| | | if(shmqueue_push(queue, (void *)msg, strlen(msg) + 1)) { |
| | | printf("push: %s\n", msg ); |
| | | } |
| | | } |
| | |
| | | |
| | | MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE) |
| | | |
| | | PREFIX = $(ROOT)/build |
| | | |
| | | ifeq ($(DEBUG),y) |
| | | MYLIBS = $(LIBSQUEUE) |
| | | else |
| | | MYLIBS = $(LIBSQUEUE) $(DLIBSQUEUE) |
| | | endif |
| | | |
| | | all: build |
| | | |
| | | all: install |
| | | |
| | | |
| | | build: $(MYLIBS) |
| | | |
| | |
| | | install -d $(PREFIX)/lib/ |
| | | install -m 644 $^ $(PREFIX)/lib/ |
| | | install -d $(PREFIX)/include/ |
| | | install -m 644 $(MINCLUDE)/* $(PREFIX)/include/ |
| | | install -m 644 ./*.h ./queue/include/* ./socket/include/* ./util/include/* $(PREFIX)/include/ |
| | | |
| | | clean: |
| | | rm -f $(TEMPFILES) |
| | |
| | | public: |
| | | |
| | | static Logger getLogger() { |
| | | //ERROR ALL |
| | | static Logger logger(Logger::ERROR); |
| | | //ERROR ALL DEBUG |
| | | static Logger logger(Logger::DEBUG); |
| | | return logger; |
| | | } |
| | | }; |
| | |
| | | #ifndef __SHM_ALLOCATOR_H__ |
| | | #define __SHM_ALLOCATOR_H__ |
| | | #include "usg_common.h" |
| | | #include "mm.h" |
| | | #include "mem_pool.h" |
| | | #include <new> |
| | | #include <cstdlib> // for exit() |
| | | #include <climits> // for UNIX_MAX |
| | |
| | | public: |
| | | static void *allocate (size_t size) { |
| | | printf("shm_allocator malloc\n"); |
| | | return mm_malloc(size); |
| | | return mem_pool_malloc(size); |
| | | } |
| | | |
| | | static void deallocate (void *ptr) { |
| | | printf("shm_allocator free\n"); |
| | | return mm_free(ptr); |
| | | return mem_pool_free(ptr); |
| | | } |
| | | }; |
| | | |
| | |
| | | #define __SHM_QUEUE_H__ |
| | | |
| | | #include "usg_common.h" |
| | | #include "mm.h" |
| | | #include "hashtable.h" |
| | | #include "lock_free_queue.h" |
| | | #include "logger_factory.h" |
| | |
| | | |
| | | #include "usg_common.h" |
| | | #include "usg_typedef.h" |
| | | #include "shm_queue.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | |
| | | |
| | | #include "mem_pool.h" |
| | | #include "hashtable.h" |
| | | #include "shm_queue.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | typedef struct ele_t { |
| | | size_t size; |
| | |
| | | #include "usg_common.h" |
| | | #include "usg_typedef.h" |
| | | #include "shm_queue.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | #include "mem_pool.h" |
| | | #include "hashtable.h" |
| | | #include "sem_util.h" |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | |
| | | SHM_SOCKET_OPEN_REPLY = 2, |
| | | SHM_SOCKET_CLOSE = 3, |
| | | SHM_COMMON_MSG = 4 |
| | | |
| | | }; |
| | | |
| | | enum shm_socket_type_t |
| | | { |
| | | SHM_SOCKET_STREAM = 1, |
| | | SHM_SOCKET_DGRAM = 2 |
| | | |
| | | }; |
| | | |
| | |
| | | |
| | | |
| | | typedef struct shm_socket_t { |
| | | shm_socket_type_t socket_type; |
| | | // 本地port |
| | | int port; |
| | | shm_connection_status_t status; |
| | |
| | | |
| | | |
| | | |
| | | shm_socket_t *shm_open_socket(); |
| | | shm_socket_t *shm_open_socket(shm_socket_type_t socket_type); |
| | | |
| | | |
| | | int shm_close_socket(shm_socket_t * socket) ; |
| | |
| | | |
| | | int shm_recv(shm_socket_t * socket, void **buf, int *size) ; |
| | | |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port); |
| | | |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port); |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | |
| | | #include "usg_common.h" |
| | | #include "mod_socket.h" |
| | | #include "shm_socket.h" |
| | | #include "usg_common.h" |
| | | #include "shm_allocator.h" |
| | | #include "mem_pool.h" |
| | | #include "hashtable.h" |
| | | #include "sem_util.h" |
| | | #include "logger_factory.h" |
| | | |
| | | static Logger logger = LoggerFactory::getLogger(); |
| | | |
| | | typedef struct mod_entry_t |
| | |
| | | */ |
| | | void *mod_open_socket(int mod) { |
| | | mod_socket_t *socket = (mod_socket_t *)malloc(sizeof(mod_socket_t)); |
| | | socket->shm_socket=shm_open_socket(); |
| | | socket->shm_socket=shm_open_socket(SHM_SOCKET_STREAM); |
| | | socket->is_server = 0; |
| | | socket->mod = (socket_mod_t)mod; |
| | | socket->recvQueue = new LockFreeQueue<mod_entry_t, DM_Allocator>(16); |
| | |
| | | #include "shm_socket.h" |
| | | #include "hashtable.h" |
| | | #include "logger_factory.h" |
| | | #include <map> |
| | | |
| | |
| | | |
| | | void * _client_run_msg_rev(void* _socket); |
| | | |
| | | int _shm_close_dgram_socket(shm_socket_t *socket); |
| | | |
| | | |
| | | int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote); |
| | | |
| | | SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ; |
| | | |
| | | shm_socket_t *shm_open_socket() { |
| | | shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) { |
| | | shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t)); |
| | | |
| | | socket->socket_type = socket_type; |
| | | socket->port = -1; |
| | | socket->dispatch_thread = 0; |
| | | socket->status=SHM_CONN_CLOSED; |
| | |
| | | return socket; |
| | | } |
| | | |
| | | int shm_close_socket(shm_socket_t *socket) { |
| | | switch(socket->socket_type) { |
| | | case SHM_SOCKET_STREAM: |
| | | return _shm_close_stream_socket(socket, true); |
| | | case SHM_SOCKET_DGRAM: |
| | | return _shm_close_dgram_socket(socket); |
| | | default: |
| | | return -1; |
| | | } |
| | | return -1; |
| | | |
| | | } |
| | | |
| | | int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) { |
| | | int shm_socket_bind(shm_socket_t * socket, int port) { |
| | | socket -> port = port; |
| | | return 0; |
| | | } |
| | | |
| | | int shm_listen(shm_socket_t* socket) { |
| | | |
| | | if(socket->socket_type != SHM_SOCKET_STREAM) { |
| | | err_exit(0, "can not invoke shm_listen method with a socket which is not a SHM_SOCKET_STREAM socket"); |
| | | } |
| | | |
| | | int port; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if(socket -> port == -1) { |
| | | port = hashtable_alloc_key(hashtable); |
| | | socket -> port = port; |
| | | } else { |
| | | |
| | | if(hashtable_get(hashtable, socket->port)!= NULL) { |
| | | err_exit(0, "key %d has already been in used!", socket->port); |
| | | } |
| | | } |
| | | |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); |
| | | socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); |
| | | socket->clientSocketMap = new std::map<int, shm_socket_t* >; |
| | | socket->status = SHM_CONN_LISTEN; |
| | | pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket); |
| | | |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 接受客户端建立新连接的请求 |
| | | * |
| | | */ |
| | | shm_socket_t* shm_accept(shm_socket_t* socket) { |
| | | if(socket->socket_type != SHM_SOCKET_STREAM) { |
| | | err_exit(0, "can not invoke shm_accept method with a socket which is not a SHM_SOCKET_STREAM socket"); |
| | | } |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | int client_port; |
| | | shm_socket_t *client_socket; |
| | | shm_msg_t src; |
| | | |
| | | if (socket->acceptQueue->pop(src) ) { |
| | | |
| | | // print_msg("===accept:", src); |
| | | client_port = src.port; |
| | | client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); |
| | | client_socket->port = socket->port; |
| | | // client_socket->queue= socket->queue; |
| | | //初始化消息queue |
| | | client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); |
| | | //连接到对方queue |
| | | client_socket->remoteQueue = _attach_remote_queue(client_port); |
| | | |
| | | socket->clientSocketMap->insert({client_port, client_socket}); |
| | | |
| | | /* |
| | | * shm_accept 用户执行的方法 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题 |
| | | */ |
| | | //发送open_reply,回应客户端的connect请求 |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t msg; |
| | | msg.port = socket->port; |
| | | msg.size = 0; |
| | | msg.type = SHM_SOCKET_OPEN_REPLY; |
| | | |
| | | if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) |
| | | { |
| | | client_socket->status = SHM_CONN_ESTABLISHED; |
| | | return client_socket; |
| | | } else { |
| | | err_msg(0, "shm_accept: 发送open_reply失败"); |
| | | return NULL; |
| | | } |
| | | |
| | | |
| | | } else { |
| | | err_exit(errno, "shm_accept"); |
| | | } |
| | | return NULL; |
| | | |
| | | } |
| | | |
| | | |
| | | int shm_connect(shm_socket_t* socket, int port) { |
| | | if(socket->socket_type != SHM_SOCKET_STREAM) { |
| | | err_exit(0, "can not invoke shm_connect method with a socket which is not a SHM_SOCKET_STREAM socket"); |
| | | } |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if(hashtable_get(hashtable, port)== NULL) { |
| | | err_exit(0, "shm_connect:connect at port %d failed!", port); |
| | | } |
| | | |
| | | if(socket->port == -1) { |
| | | socket->port = hashtable_alloc_key(hashtable); |
| | | } else { |
| | | |
| | | if(hashtable_get(hashtable, socket->port)!= NULL) { |
| | | err_exit(0, "key %d has already been in used!", socket->port); |
| | | } |
| | | } |
| | | |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); |
| | | socket->remoteQueue = _attach_remote_queue(port); |
| | | socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); |
| | | |
| | | |
| | | //发送open请求 |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t msg; |
| | | msg.port = socket->port; |
| | | msg.size = 0; |
| | | msg.type=SHM_SOCKET_OPEN; |
| | | socket->remoteQueue->push_timeout(msg, &timeout); |
| | | |
| | | //接受open reply |
| | | if(socket->queue->pop(msg)) { |
| | | // 在这里server端已经准备好接受客户端发送请求了,完成与服务端的连接 |
| | | if(msg.type == SHM_SOCKET_OPEN_REPLY) { |
| | | socket->status = SHM_CONN_ESTABLISHED; |
| | | pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); |
| | | } else { |
| | | err_exit(0, "shm_connect: 不匹配的应答信息!"); |
| | | } |
| | | |
| | | } else { |
| | | err_exit(0, "connect failted!"); |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | int shm_send(shm_socket_t *socket, const void *buf, const int size) { |
| | | if(socket->socket_type != SHM_SOCKET_STREAM) { |
| | | err_exit(0, "can not invoke shm_send method with a socket which is not a SHM_SOCKET_STREAM socket"); |
| | | } |
| | | // hashtable_t *hashtable = mm_get_hashtable(); |
| | | // if(socket->remoteQueue == NULL) { |
| | | // err_msg(errno, "当前客户端无连接!"); |
| | | // return -1; |
| | | // } |
| | | shm_msg_t dest; |
| | | dest.type=SHM_COMMON_MSG; |
| | | dest.port = socket->port; |
| | | dest.size = size; |
| | | dest.buf = mm_malloc(size); |
| | | memcpy(dest.buf, buf, size); |
| | | |
| | | |
| | | if(socket->remoteQueue->push(dest)) { |
| | | return 0; |
| | | } else { |
| | | err_msg(errno, "connection has been closed!"); |
| | | return -1; |
| | | } |
| | | } |
| | | |
| | | |
| | | int shm_recv(shm_socket_t* socket, void **buf, int *size) { |
| | | if(socket->socket_type != SHM_SOCKET_STREAM) { |
| | | err_exit(0, "can not invoke shm_recv method with a socket which is not a SHM_SOCKET_STREAM socket"); |
| | | } |
| | | shm_msg_t src; |
| | | |
| | | if (socket->messageQueue->pop(src)) { |
| | | void * _buf = malloc(src.size); |
| | | memcpy(_buf, src.buf, src.size); |
| | | *buf = _buf; |
| | | *size = src.size; |
| | | mm_free(src.buf); |
| | | return 0; |
| | | } else { |
| | | return -1; |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | // 短连接方式发送 |
| | | int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if(socket->queue == NULL) { |
| | | if(socket->port == -1) { |
| | | socket->port = hashtable_alloc_key(hashtable); |
| | | } else { |
| | | |
| | | if(hashtable_get(hashtable, socket->port)!= NULL) { |
| | | err_exit(0, "key %d has already been in used!", socket->port); |
| | | } |
| | | } |
| | | |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); |
| | | } |
| | | if (port == socket->port) { |
| | | err_msg(0, "can not send to your self!"); |
| | | return -1; |
| | | } |
| | | |
| | | shm_msg_t dest; |
| | | dest.type=SHM_COMMON_MSG; |
| | | dest.port = socket->port; |
| | | dest.size = size; |
| | | dest.buf = mm_malloc(size); |
| | | memcpy(dest.buf, buf, size); |
| | | |
| | | SHMQueue<shm_msg_t> *remoteQueue = _attach_remote_queue(port); |
| | | if(remoteQueue->push(dest)) { |
| | | delete remoteQueue; |
| | | return 0; |
| | | } else { |
| | | delete remoteQueue; |
| | | err_msg(errno, "sendto port %d failed!", port); |
| | | return -1; |
| | | } |
| | | } |
| | | |
| | | |
| | | // 短连接方式接受 |
| | | int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port){ |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if(socket->queue == NULL) { |
| | | if(socket->port == -1) { |
| | | socket->port = hashtable_alloc_key(hashtable); |
| | | } else { |
| | | |
| | | if(hashtable_get(hashtable, socket->port)!= NULL) { |
| | | err_exit(0, "key %d has already been in used!", socket->port); |
| | | } |
| | | } |
| | | |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); |
| | | } |
| | | |
| | | shm_msg_t src; |
| | | //logger.debug("shm_recvfrom pop before"); |
| | | if (socket->queue->pop(src)) { |
| | | void * _buf = malloc(src.size); |
| | | memcpy(_buf, src.buf, src.size); |
| | | *buf = _buf; |
| | | *size = src.size; |
| | | *port = src.port; |
| | | mm_free(src.buf); |
| | | //logger.debug("shm_recvfrom pop after"); |
| | | return 0; |
| | | } else { |
| | | return -1; |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出 |
| | | */ |
| | | SHMQueue<shm_msg_t> * _attach_remote_queue(int port) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if(hashtable_get(hashtable, port)== NULL) { |
| | | err_exit(0, "_remote_queue_attach:connet at port %d failed!", port); |
| | | return NULL; |
| | | } |
| | | |
| | | SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0); |
| | | return queue; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | void _server_close_conn_to_client(shm_socket_t* socket, int port) { |
| | | shm_socket_t *client_socket; |
| | | auto iter = socket->clientSocketMap->find(port); |
| | | if( iter != socket->clientSocketMap->end() ) { |
| | | socket->clientSocketMap->erase(iter); |
| | | } |
| | | //free((void *)client_socket); |
| | | |
| | | } |
| | | |
| | | /** |
| | | * server端各种类型消息()在这里进程分拣 |
| | | */ |
| | | void * _server_run_msg_rev(void* _socket) { |
| | | pthread_detach(pthread_self()); |
| | | shm_socket_t* socket = (shm_socket_t*) _socket; |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t src; |
| | | shm_socket_t *client_socket; |
| | | std::map<int, shm_socket_t* >::iterator iter; |
| | | |
| | | while(socket->queue->pop(src)) { |
| | | |
| | | switch (src.type) { |
| | | case SHM_SOCKET_OPEN : |
| | | socket->acceptQueue->push_timeout(src, &timeout); |
| | | break; |
| | | case SHM_SOCKET_CLOSE : |
| | | _server_close_conn_to_client(socket, src.port); |
| | | break; |
| | | case SHM_COMMON_MSG : |
| | | |
| | | iter = socket->clientSocketMap->find(src.port); |
| | | if( iter != socket->clientSocketMap->end()) { |
| | | client_socket= iter->second; |
| | | // print_msg("_server_run_msg_rev push before", src); |
| | | client_socket->messageQueue->push_timeout(src, &timeout); |
| | | // print_msg("_server_run_msg_rev push after", src); |
| | | } |
| | | |
| | | break; |
| | | |
| | | default: |
| | | err_msg(0, "socket.__shm_rev__: undefined message type."); |
| | | } |
| | | } |
| | | |
| | | return NULL; |
| | | } |
| | | |
| | | |
| | | |
| | | void _client_close_conn_to_server(shm_socket_t* socket) { |
| | | |
| | | _shm_close_stream_socket(socket, false); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * client端的各种类型消息()在这里进程分拣 |
| | | */ |
| | | void * _client_run_msg_rev(void* _socket) { |
| | | pthread_detach(pthread_self()); |
| | | shm_socket_t* socket = (shm_socket_t*) _socket; |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t src; |
| | | |
| | | while(socket->queue->pop(src)) { |
| | | switch (src.type) { |
| | | |
| | | case SHM_SOCKET_CLOSE : |
| | | _client_close_conn_to_server(socket); |
| | | break; |
| | | case SHM_COMMON_MSG : |
| | | socket->messageQueue->push_timeout(src, &timeout); |
| | | break; |
| | | default: |
| | | err_msg(0, "socket.__shm_rev__: undefined message type."); |
| | | } |
| | | } |
| | | |
| | | return NULL; |
| | | } |
| | | |
| | | |
| | | int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote) { |
| | | socket->status = SHM_CONN_CLOSED; |
| | | //给对方发送一个关闭连接的消息 |
| | | struct timespec timeout = {1, 0}; |
| | |
| | | |
| | | } |
| | | |
| | | int shm_close_socket(shm_socket_t *socket) { |
| | | return _shm_close_socket(socket, true); |
| | | } |
| | | |
| | | int shm_socket_bind(shm_socket_t * socket, int port) { |
| | | shm_socket_t * _socket = (shm_socket_t *) socket; |
| | | _socket -> port = port; |
| | | int _shm_close_dgram_socket(shm_socket_t *socket){ |
| | | if(socket->queue != NULL) { |
| | | delete socket->queue; |
| | | socket->queue = NULL; |
| | | } |
| | | free(socket); |
| | | return 0; |
| | | } |
| | | |
| | | int shm_listen(shm_socket_t* socket) { |
| | | int port; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if(socket -> port == -1) { |
| | | port = hashtable_alloc_key(hashtable); |
| | | socket -> port = port; |
| | | } else { |
| | | |
| | | if(hashtable_get(hashtable, socket->port)!= NULL) { |
| | | err_exit(0, "key %d has already been in used!", socket->port); |
| | | } |
| | | } |
| | | |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); |
| | | socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); |
| | | socket->clientSocketMap = new std::map<int, shm_socket_t* >; |
| | | socket->status = SHM_CONN_LISTEN; |
| | | pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket); |
| | | |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | void _server_close_conn_to_client(shm_socket_t* socket, int port) { |
| | | shm_socket_t *client_socket; |
| | | auto iter = socket->clientSocketMap->find(port); |
| | | if( iter != socket->clientSocketMap->end() ) { |
| | | socket->clientSocketMap->erase(iter); |
| | | } |
| | | //free((void *)client_socket); |
| | | |
| | | } |
| | | |
| | | /** |
| | | * server端各种类型消息()在这里进程分拣 |
| | | */ |
| | | void * _server_run_msg_rev(void* _socket) { |
| | | pthread_detach(pthread_self()); |
| | | shm_socket_t* socket = (shm_socket_t*) _socket; |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t src; |
| | | shm_socket_t *client_socket; |
| | | std::map<int, shm_socket_t* >::iterator iter; |
| | | |
| | | while(socket->queue->pop(src)) { |
| | | |
| | | switch (src.type) { |
| | | case SHM_SOCKET_OPEN : |
| | | socket->acceptQueue->push_timeout(src, &timeout); |
| | | break; |
| | | case SHM_SOCKET_CLOSE : |
| | | _server_close_conn_to_client(socket, src.port); |
| | | break; |
| | | case SHM_COMMON_MSG : |
| | | |
| | | iter = socket->clientSocketMap->find(src.port); |
| | | if( iter != socket->clientSocketMap->end()) { |
| | | client_socket= iter->second; |
| | | // print_msg("_server_run_msg_rev push before", src); |
| | | client_socket->messageQueue->push_timeout(src, &timeout); |
| | | // print_msg("_server_run_msg_rev push after", src); |
| | | } |
| | | |
| | | break; |
| | | |
| | | default: |
| | | err_msg(0, "socket.__shm_rev__: undefined message type."); |
| | | } |
| | | } |
| | | |
| | | return NULL; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * 接受客户端建立新连接的请求 |
| | | * |
| | | */ |
| | | |
| | | shm_socket_t* shm_accept(shm_socket_t* socket) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | int client_port; |
| | | shm_socket_t *client_socket; |
| | | shm_msg_t src; |
| | | |
| | | if (socket->acceptQueue->pop(src) ) { |
| | | |
| | | // print_msg("===accept:", src); |
| | | client_port = src.port; |
| | | client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t)); |
| | | client_socket->port = socket->port; |
| | | // client_socket->queue= socket->queue; |
| | | //初始化消息queue |
| | | client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); |
| | | //连接到对方queue |
| | | client_socket->remoteQueue = _attach_remote_queue(client_port); |
| | | |
| | | socket->clientSocketMap->insert({client_port, client_socket}); |
| | | |
| | | /* |
| | | * shm_accept 用户执行的方法 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题 |
| | | */ |
| | | //发送open_reply,回应客户端的connect请求 |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t msg; |
| | | msg.port = socket->port; |
| | | msg.size = 0; |
| | | msg.type = SHM_SOCKET_OPEN_REPLY; |
| | | |
| | | if (client_socket->remoteQueue->push_timeout(msg, &timeout) ) |
| | | { |
| | | client_socket->status = SHM_CONN_ESTABLISHED; |
| | | return client_socket; |
| | | } else { |
| | | err_msg(0, "shm_accept: 发送open_reply失败"); |
| | | return NULL; |
| | | } |
| | | |
| | | |
| | | } else { |
| | | err_exit(errno, "shm_accept"); |
| | | } |
| | | return NULL; |
| | | |
| | | } |
| | | |
| | | |
| | | int shm_connect(shm_socket_t* socket, int port) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if(hashtable_get(hashtable, port)== NULL) { |
| | | err_exit(0, "shm_connect:connect at port %d failed!", port); |
| | | } |
| | | if(socket->port == -1) { |
| | | socket->port = hashtable_alloc_key(hashtable); |
| | | } else { |
| | | |
| | | if(hashtable_get(hashtable, socket->port)!= NULL) { |
| | | err_exit(0, "key %d has already been in used!", socket->port); |
| | | } |
| | | } |
| | | |
| | | socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16); |
| | | socket->remoteQueue = _attach_remote_queue(port); |
| | | socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16); |
| | | |
| | | |
| | | //发送open请求 |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t msg; |
| | | msg.port = socket->port; |
| | | msg.size = 0; |
| | | msg.type=SHM_SOCKET_OPEN; |
| | | socket->remoteQueue->push_timeout(msg, &timeout); |
| | | |
| | | //接受open reply |
| | | if(socket->queue->pop(msg)) { |
| | | // 在这里server端已经准备好接受客户端发送请求了,完成与服务端的连接 |
| | | if(msg.type == SHM_SOCKET_OPEN_REPLY) { |
| | | socket->status = SHM_CONN_ESTABLISHED; |
| | | pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket); |
| | | } else { |
| | | err_exit(0, "shm_connect: 不匹配的应答信息!"); |
| | | } |
| | | |
| | | } else { |
| | | err_exit(0, "connect failted!"); |
| | | } |
| | | |
| | | return 0; |
| | | } |
| | | |
| | | void _client_close_conn_to_server(shm_socket_t* socket) { |
| | | |
| | | _shm_close_socket(socket, false); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * client端的各种类型消息()在这里进程分拣 |
| | | */ |
| | | void * _client_run_msg_rev(void* _socket) { |
| | | pthread_detach(pthread_self()); |
| | | shm_socket_t* socket = (shm_socket_t*) _socket; |
| | | struct timespec timeout = {1, 0}; |
| | | shm_msg_t src; |
| | | |
| | | while(socket->queue->pop(src)) { |
| | | switch (src.type) { |
| | | |
| | | case SHM_SOCKET_CLOSE : |
| | | _client_close_conn_to_server(socket); |
| | | break; |
| | | case SHM_COMMON_MSG : |
| | | socket->messageQueue->push_timeout(src, &timeout); |
| | | break; |
| | | default: |
| | | err_msg(0, "socket.__shm_rev__: undefined message type."); |
| | | } |
| | | } |
| | | |
| | | return NULL; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | int shm_send(shm_socket_t *socket, const void *buf, const int size) { |
| | | // hashtable_t *hashtable = mm_get_hashtable(); |
| | | // if(socket->remoteQueue == NULL) { |
| | | // err_msg(errno, "当前客户端无连接!"); |
| | | // return -1; |
| | | // } |
| | | shm_msg_t dest; |
| | | dest.type=SHM_COMMON_MSG; |
| | | dest.port = socket->port; |
| | | dest.size = size; |
| | | dest.buf = mm_malloc(size); |
| | | memcpy(dest.buf, buf, size); |
| | | |
| | | // struct timeval time; |
| | | // gettimeofday(&time, NULL); |
| | | //err_msg(0, "%d %d=======>push befor %d", time.tv_sec, time.tv_usec, socket->port); |
| | | if(socket->remoteQueue->push(dest)) { |
| | | |
| | | //gettimeofday(&time, NULL); |
| | | //err_msg(0, "%d %d=======>push after %d", time.tv_sec, time.tv_usec, socket->port); |
| | | return 0; |
| | | } else { |
| | | err_msg(errno, "connection has been closed!"); |
| | | return -1; |
| | | } |
| | | |
| | | |
| | | } |
| | | |
| | | int shm_recv(shm_socket_t* socket, void **buf, int *size) { |
| | | shm_msg_t src; |
| | | |
| | | // struct timeval time; |
| | | // gettimeofday(&time, NULL); |
| | | // err_msg(0, "%d %d=======>pop befor %d", time.tv_sec, time.tv_usec, socket->port); |
| | | if (socket->messageQueue->pop(src)) { |
| | | // gettimeofday(&time, NULL); |
| | | // err_msg(0, "%d %d=======>pop after %d", time.tv_sec, time.tv_usec, socket->port); |
| | | void * _buf = malloc(src.size); |
| | | memcpy(_buf, src.buf, src.size); |
| | | *buf = _buf; |
| | | *size = src.size; |
| | | mm_free(src.buf); |
| | | return 0; |
| | | } else { |
| | | return -1; |
| | | } |
| | | |
| | | |
| | | |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出 |
| | | */ |
| | | SHMQueue<shm_msg_t> * _attach_remote_queue(int port) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | if(hashtable_get(hashtable, port)== NULL) { |
| | | err_exit(0, "_remote_queue_attach:connet at port %d failed!", port); |
| | | return NULL; |
| | | } |
| | | |
| | | SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0); |
| | | return queue; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | |
| | | #include "sem_util.h" |
| | | #include "logger_factory.h" |
| | | |
| | | static Logger logger = LoggerFactory::getLogger(); |
| | | |
| | | int SemUtil::get(key_t key, unsigned int value) { |
| | | int semid, perms; |
| | | |
| | | perms = S_IRUSR | S_IWUSR; |
| | | int semid, perms; |
| | | |
| | | semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms); |
| | | perms = S_IRUSR | S_IWUSR; |
| | | |
| | | if (semid != -1) { /* Successfully created the semaphore */ |
| | | union semun arg; |
| | | struct sembuf sop; |
| | | semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms); |
| | | |
| | | fprintf(stderr, "%ld: created semaphore\n", (long) getpid()); |
| | | if (semid != -1) { /* Successfully created the semaphore */ |
| | | union semun arg; |
| | | struct sembuf sop; |
| | | |
| | | arg.val = 0; /* So initialize it to 0 */ |
| | | if (semctl(semid, 0, SETVAL, arg) == -1) |
| | | err_exit(errno, "semctl 1"); |
| | | fprintf(stderr, "%ld: initialized semaphore\n", (long) getpid()); |
| | | logger.info("%ld: created semaphore\n", (long)getpid()); |
| | | |
| | | /* Perform a "no-op" semaphore operation - changes sem_otime |
| | | so other processes can see we've initialized the set. */ |
| | | arg.val = 0; /* So initialize it to 0 */ |
| | | if (semctl(semid, 0, SETVAL, arg) == -1) |
| | | err_exit(errno, "semctl 1"); |
| | | logger.info("%ld: initialized semaphore\n", (long)getpid()); |
| | | |
| | | sop.sem_num = 0; /* Operate on semaphore 0 */ |
| | | sop.sem_op = value; |
| | | sop.sem_flg = 0; |
| | | if (semop(semid, &sop, 1) == -1) |
| | | err_exit(errno, "semop"); |
| | | fprintf(stderr, "%ld: completed dummy semop()\n", (long) getpid()); |
| | | /* Perform a "no-op" semaphore operation - changes sem_otime |
| | | so other processes can see we've initialized the set. */ |
| | | |
| | | } else { /* We didn't create the semaphore set */ |
| | | sop.sem_num = 0; /* Operate on semaphore 0 */ |
| | | sop.sem_op = value; |
| | | sop.sem_flg = 0; |
| | | if (semop(semid, &sop, 1) == -1) |
| | | err_exit(errno, "semop"); |
| | | logger.info("%ld: completed dummy semop()\n", (long)getpid()); |
| | | |
| | | if (errno != EEXIST) { /* Unexpected error from semget() */ |
| | | err_exit(errno, "semget 1"); |
| | | } else { /* We didn't create the semaphore set */ |
| | | |
| | | } else { /* Someone else already created it */ |
| | | const int MAX_TRIES = 10; |
| | | int j; |
| | | union semun arg; |
| | | struct semid_ds ds; |
| | | if (errno != EEXIST) { /* Unexpected error from semget() */ |
| | | err_exit(errno, "semget 1"); |
| | | |
| | | semid = semget(key, 1, perms); /* So just get ID */ |
| | | if (semid == -1) |
| | | err_exit(errno, "semget 2"); |
| | | } else { /* Someone else already created it */ |
| | | const int MAX_TRIES = 10; |
| | | int j; |
| | | union semun arg; |
| | | struct semid_ds ds; |
| | | |
| | | fprintf(stderr, "%ld: got semaphore key\n", (long) getpid()); |
| | | /* Wait until another process has called semop() */ |
| | | semid = semget(key, 1, perms); /* So just get ID */ |
| | | if (semid == -1) |
| | | err_exit(errno, "semget 2"); |
| | | |
| | | arg.buf = &ds; |
| | | for (j = 0; j < MAX_TRIES; j++) { |
| | | fprintf(stderr, "Try %d\n", j); |
| | | if (semctl(semid, 0, IPC_STAT, arg) == -1) |
| | | err_exit(errno, "semctl 2"); |
| | | logger.info("%ld: got semaphore key\n", (long)getpid()); |
| | | /* Wait until another process has called semop() */ |
| | | |
| | | if (ds.sem_otime != 0) /* Semop() performed? */ |
| | | break; /* Yes, quit loop */ |
| | | sleep(1); /* If not, wait and retry */ |
| | | } |
| | | arg.buf = &ds; |
| | | for (j = 0; j < MAX_TRIES; j++) { |
| | | logger.info("Try %d\n", j); |
| | | if (semctl(semid, 0, IPC_STAT, arg) == -1) |
| | | err_exit(errno, "semctl 2"); |
| | | |
| | | if (ds.sem_otime == 0) /* Loop ran to completion! */ |
| | | err_exit(errno, "Existing semaphore not initialized"); |
| | | } |
| | | if (ds.sem_otime != 0) /* Semop() performed? */ |
| | | break; /* Yes, quit loop */ |
| | | sleep(1); /* If not, wait and retry */ |
| | | } |
| | | |
| | | if (ds.sem_otime == 0) /* Loop ran to completion! */ |
| | | err_exit(errno, "Existing semaphore not initialized"); |
| | | } |
| | | return semid; |
| | | } |
| | | return semid; |
| | | } |
| | | |
| | | |
| | | /* Reserve semaphore (blocking), return 0 on success, or -1 with 'errno' |
| | | set to EINTR if operation was interrupted by a signal handler */ |
| | | |
| | | /* Reserve semaphore - decrement it by 1 */ |
| | | int SemUtil::dec(int semId) |
| | | { |
| | | struct sembuf sops; |
| | | int SemUtil::dec(int semId) { |
| | | struct sembuf sops; |
| | | |
| | | sops.sem_num = 0; |
| | | sops.sem_op = -1; |
| | | sops.sem_flg = 0; |
| | | sops.sem_num = 0; |
| | | sops.sem_op = -1; |
| | | sops.sem_flg = 0; |
| | | |
| | | while (semop(semId, &sops, 1) == -1) |
| | | if (errno != EINTR ) { |
| | | err_msg(errno, "SemUtil::dec"); |
| | | return -1; |
| | | } |
| | | while (semop(semId, &sops, 1) == -1) |
| | | if (errno != EINTR) { |
| | | err_msg(errno, "SemUtil::dec"); |
| | | return -1; |
| | | } |
| | | |
| | | return 0; |
| | | return 0; |
| | | } |
| | | |
| | | int SemUtil::dec_nowait(int semId) |
| | | { |
| | | struct sembuf sops; |
| | | int SemUtil::dec_nowait(int semId) { |
| | | struct sembuf sops; |
| | | |
| | | sops.sem_num = 0; |
| | | sops.sem_op = -1; |
| | | sops.sem_flg = IPC_NOWAIT; |
| | | sops.sem_num = 0; |
| | | sops.sem_op = -1; |
| | | sops.sem_flg = IPC_NOWAIT; |
| | | |
| | | while (semop(semId, &sops, 1) == -1) |
| | | if (errno != EINTR ) { |
| | | err_msg(errno, "SemUtil::dec_nowait"); |
| | | return -1; |
| | | } |
| | | while (semop(semId, &sops, 1) == -1) |
| | | if (errno != EINTR) { |
| | | err_msg(errno, "SemUtil::dec_nowait"); |
| | | return -1; |
| | | } |
| | | |
| | | return 0; |
| | | return 0; |
| | | } |
| | | |
| | | int SemUtil::dec_timeout(int semId, struct timespec * timeout) |
| | | { |
| | | struct sembuf sops; |
| | | int SemUtil::dec_timeout(int semId, struct timespec *timeout) { |
| | | struct sembuf sops; |
| | | |
| | | sops.sem_num = 0; |
| | | sops.sem_op = -1; |
| | | sops.sem_flg = 0; |
| | | sops.sem_num = 0; |
| | | sops.sem_op = -1; |
| | | sops.sem_flg = 0; |
| | | |
| | | while ( semtimedop(semId, &sops, 1, timeout) == -1) |
| | | if (errno != EINTR ) { |
| | | err_msg(errno, "SemUtil::dec_timeout"); |
| | | return -1; |
| | | } |
| | | while (semtimedop(semId, &sops, 1, timeout) == -1) |
| | | if (errno != EINTR) { |
| | | err_msg(errno, "SemUtil::dec_timeout"); |
| | | return -1; |
| | | } |
| | | |
| | | return 0; |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | |
| | | /* Release semaphore - increment it by 1 */ |
| | | int SemUtil::inc(int semId) |
| | | { |
| | | struct sembuf sops; |
| | | int SemUtil::inc(int semId) { |
| | | struct sembuf sops; |
| | | |
| | | sops.sem_num = 0; |
| | | sops.sem_op = 1; |
| | | sops.sem_flg = 0; |
| | | sops.sem_num = 0; |
| | | sops.sem_op = 1; |
| | | sops.sem_flg = 0; |
| | | |
| | | int rv = semop(semId, &sops, 1); |
| | | if(rv == -1) { |
| | | err_msg(errno, "SemUtil::inc"); |
| | | } |
| | | return rv; |
| | | int rv = semop(semId, &sops, 1); |
| | | if (rv == -1) { |
| | | err_msg(errno, "SemUtil::inc"); |
| | | } |
| | | return rv; |
| | | } |
| | | |
| | | void SemUtil::remove(int semid) { |
| | | union semun dummy; |
| | | if (semctl(semid, 0, IPC_RMID, dummy) == -1) |
| | | err_msg(errno, "SemUtil::remove"); |
| | | |
| | | union semun dummy; |
| | | if (semctl(semid, 0, IPC_RMID, dummy) == -1) |
| | | err_msg(errno, "SemUtil::remove"); |
| | | } |
| | | |
| | | |
| | | void SemUtil::set(int semId, int val) |
| | | { |
| | | union semun arg; |
| | | arg.val = val; |
| | | if (semctl(semId, 0, SETVAL, arg) == -1) |
| | | err_msg(errno, "SemUtil::set"); |
| | | void SemUtil::set(int semId, int val) { |
| | | union semun arg; |
| | | arg.val = val; |
| | | if (semctl(semId, 0, SETVAL, arg) == -1) |
| | | err_msg(errno, "SemUtil::set"); |
| | | } |
| | | |
| | | |
| | |
| | | # Makefile for common library. |
| | | # |
| | | ROOT=.. |
| | | LDLIBS+=-Wl,-rpath=$(ROOT)/queue:$(ROOT)/lib |
| | | LDLIBS+=-Wl,-rpath=$(ROOT)/lib:$(ROOT)/build/lib |
| | | # 开源工具包路径 |
| | | LDDIR += -L$(ROOT)/queue |
| | | LDDIR += -L$(ROOT)/lib -L$(ROOT)/build/lib |
| | | # 开源工具包 |
| | | LDLIBS += -lshm_queue -lusgcommon -lpthread |
| | | |
| | | INCLUDE += -I$(ROOT)/queue/ -I$(ROOT)/queue/include |
| | | INCLUDE += -I$(ROOT)/build/include |
| | | |
| | | PLATFORM=$(shell $(ROOT)/systype.sh) |
| | | include $(ROOT)/Make.defines.$(PLATFORM) |
| | | |
| | | |
| | | PROGS = communication |
| | | |
| | | PROGS = dgram_socket_test |
| | | |
| | | build: $(PROGS) |
| | | |
| | | # test1: $(LIBCOMMON) |
| | | |
| | | # 如果包A 引用包B, B 要放在 A 后面 |
| | | |
| | | |
| | | test_queue: test.h $(ROOT)/queue/include/lock_free_queue.h |
| | | |
| | | single_productor: test.h $(ROOT)/queue/include/lock_free_queue.h |
| | | |
| | | single_consumer: test.h $(ROOT)/queue/include/lock_free_queue.h |
| | | |
| | | clean: |
| | | rm -f $(TEMPFILES) $(PROGS) |
| | | |
| | | |
| | | |
| | | $(LIBQUEUE): |
| | | (cd $(ROOT)/queue && $(MAKE)) |
New file |
| | |
| | | #include "shm_socket.h" |
| | | #include "usg_common.h" |
| | | #include "shm_mm.h" |
| | | |
| | | #include "shm_socket.h" |
| | | #include "usg_common.h" |
| | | #include "shm_mm.h" |
| | | typedef struct Targ { |
| | | int port; |
| | | int id; |
| | | }Targ; |
| | | |
| | | |
| | | void server(int port) { |
| | | pthread_t tid; |
| | | shm_socket_t *socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | | shm_socket_bind(socket, port); |
| | | |
| | | char *buf; |
| | | int size; |
| | | int remotePort; |
| | | char sendbuf[512]; |
| | | while( shm_recvfrom(socket, (void **)&buf, &size, &remotePort) == 0) { |
| | | sprintf(sendbuf, "RECEIVED:%s", buf); |
| | | printf("received from %d:%s\n", remotePort, buf); |
| | | shm_sendto(socket, (void *)sendbuf, strlen(sendbuf) + 1, remotePort); |
| | | free(buf); |
| | | } |
| | | |
| | | shm_close_socket(socket); |
| | | |
| | | } |
| | | |
| | | void client(int port) { |
| | | shm_socket_t *socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | | int size; |
| | | char *recvbuf; |
| | | char sendbuf[512]; |
| | | int remote_port; |
| | | while(true) { |
| | | printf("request: "); |
| | | scanf("%s", sendbuf); |
| | | shm_sendto(socket, sendbuf, strlen(sendbuf)+1, port) ; |
| | | shm_recvfrom(socket, (void **)&recvbuf, &size, &remote_port); |
| | | printf("reply from (%d): %s\n", remote_port, recvbuf); |
| | | free(recvbuf); |
| | | |
| | | } |
| | | shm_close_socket(socket); |
| | | } |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | shm_init(512); |
| | | int port; |
| | | if (argc < 3) { |
| | | fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); |
| | | return 1; |
| | | } |
| | | |
| | | port = atoi(argv[2]); |
| | | |
| | | if (strcmp("server", argv[1]) == 0 ) { |
| | | server(port); |
| | | } |
| | | |
| | | if (strcmp("client", argv[1]) == 0) |
| | | client(port); |
| | | |
| | | |
| | | |
| | | shm_destroy(); |
| | | // fprintf(stderr, "Usage: reqrep %s|%s <URL> ...\n", "server", "client"); |
| | | return 0; |
| | | } |