From 9ebe80228c7b7cb35ccaeaaa46ccf726bf71d6bd Mon Sep 17 00:00:00 2001 From: wangzhengquan <wangzhengquan85@126.com> Date: 星期六, 10 十月 2020 18:42:15 +0800 Subject: [PATCH] update --- src/socket/net_mod_socket.c | 4 + src/libshm_queue.a | 0 lib/libusgcommon.a | 0 test_net_socket/test_net_mod_socket.c | 17 ++++- src/socket/net_mod_socket_io.h | 10 +++ demo/Makefile | 48 ++++++++++------ src/socket/net_mod_socket_io.c | 51 +++++++++++++++++ demo/dgram_mod_req_rep.c | 2 /dev/null | 20 ------ src/socket/net_mod_server_socket.h | 2 include/usgcommon/socket_io.h | 4 src/Makefile | 4 src/socket/net_mod_server_socket.c | 10 ++- 13 files changed, 119 insertions(+), 53 deletions(-) diff --git a/demo/Makefile b/demo/Makefile index 8fbd78f..9e1429f 100644 --- a/demo/Makefile +++ b/demo/Makefile @@ -1,28 +1,40 @@ -# -# Makefile for common library. -# -ROOT=.. -LDLIBS+=-Wl,-rpath=$(ROOT)/lib:$(ROOT)/build/lib -# 寮�婧愬伐鍏峰寘璺緞 -LDDIR += -L$(ROOT)/build/lib -# 寮�婧愬伐鍏峰寘 -LDLIBS += -lshm_queue -lusgcommon -lpthread + -INCLUDE += -I$(ROOT)/build/include - +ROOT := .. PLATFORM=$(shell $(ROOT)/systype.sh) include $(ROOT)/Make.defines.$(PLATFORM) +#RPATH += -Wl,-rpath=${ROOT}/lib +# 寮�婧愬伐鍏峰寘璺緞 +LDDIR += -L${DEST}/lib -PROGS = dgram_mod_req_rep dgram_mod_survey dgram_mod_bus - +# 寮�婧愬伐鍏峰寘 +LDLIBS += -lshm_queue -lusgcommon -lpthread + +INCLUDES += -I${DEST}/include/shmqueue -I$(ROOT)/include/usgcommon -build: $(PROGS) +PROGS = ${DEST}/dgram_mod_req_rep ${DEST}/dgram_mod_survey ${DEST}/dgram_mod_bus -# test1: $(LIBCOMMON) +DEPENDENCES = $(patsubst %, %.d, $(PROGS)) -# 濡傛灉鍖匒 寮曠敤鍖匓锛� B 瑕佹斁鍦� A 鍚庨潰 - +#LIBCOMMON=${ROOT}/lib/libusgcommon.a + +build: $(PROGS) + + +# class +#$(DEST)/kucker : kucker.c + + clean: - rm -f $(TEMPFILES) $(PROGS) + rm -f $(PROGS) $(DEPENDENCES) $(TEMPFILES) + +# $(LIBCOMMON): +# @(cd $(ROOT)/common && $(MAKE)) + +-include $(DEPENDENCES) +include $(ROOT)/Make.common.inc + + + diff --git a/demo/dgram_mod_req_rep.c b/demo/dgram_mod_req_rep.c index 4a70a4e..b26d61d 100644 --- a/demo/dgram_mod_req_rep.c +++ b/demo/dgram_mod_req_rep.c @@ -26,7 +26,7 @@ void *socket = dgram_mod_open_socket(); int size; void *recvbuf; - printf("client :send request%s\n", sendbuf); + printf("client :send request %s\n", sendbuf); if(dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size) == 0) { printf("client :received reply => %s\n", (char *)recvbuf); free(recvbuf); diff --git a/include/usgcommon/socket_io.h b/include/usgcommon/socket_io.h index 88d6045..171bf51 100644 --- a/include/usgcommon/socket_io.h +++ b/include/usgcommon/socket_io.h @@ -20,14 +20,14 @@ } rio_t; /* Rio (Robust I/O) package */ ssize_t rio_readn(int fd, void *usrbuf, size_t n); -ssize_t rio_writen(int fd, void *usrbuf, size_t n); +ssize_t rio_writen(int fd, const void *usrbuf, size_t n); void rio_readinitb(rio_t *rp, int fd); ssize_t rio_readnb(rio_t *rp, void *usrbuf, size_t n); ssize_t rio_readlineb(rio_t *rp, char *usrbuf, size_t maxlen); /* Wrappers for Rio package */ ssize_t Rio_readn(int fd, void *usrbuf, size_t n); -void Rio_writen(int fd, void *usrbuf, size_t n); +void Rio_writen(int fd, const void *usrbuf, size_t n); void Rio_readinitb(rio_t *rp, int fd); ssize_t Rio_readnb(rio_t *rp, void *usrbuf, size_t n); ssize_t Rio_readlineb(rio_t *rp, char *usrbuf, size_t maxlen); diff --git a/lib/libusgcommon.a b/lib/libusgcommon.a index c1df371..20f0e45 100644 --- a/lib/libusgcommon.a +++ b/lib/libusgcommon.a Binary files differ diff --git a/lib/libusgcommon.so b/lib/libusgcommon.so deleted file mode 100644 index 8e10ddc..0000000 --- a/lib/libusgcommon.so +++ /dev/null Binary files differ diff --git a/src/Makefile b/src/Makefile index 9039e9c..33baad0 100644 --- a/src/Makefile +++ b/src/Makefile @@ -14,7 +14,7 @@ # 寮�婧愬伐鍏峰寘 LDLIBS += -lusgcommon -INCLUDES += -I./queue -I./socket -I./util -I$(ROOT)/include/usgcommon +INCLUDES += -I./queue -I./socket -I$(ROOT)/include/usgcommon SOURCES := $(wildcard *.c ./**/*.c) OBJS = $(patsubst %.c, $(DEST)/%.o, $(SOURCES)) @@ -63,7 +63,7 @@ install -d $(PREFIX)/lib/ install -m 644 $^ $(PREFIX)/lib/ install -d $(PREFIX)/include/shmqueue - install -m 644 ./*.h ./queue/*.h ./socket/*.h ./util/*.h $(PREFIX)/include/shmqueue + install -m 644 ./*.h ./queue/*.h ./socket/*.h $(PREFIX)/include/shmqueue .PHONY: uninstall diff --git a/src/libshm_queue.a b/src/libshm_queue.a index fcc7969..3ae19a9 100644 --- a/src/libshm_queue.a +++ b/src/libshm_queue.a Binary files differ diff --git a/src/queue/include/array_lock_free_queue.h b/src/queue/include/array_lock_free_queue.h deleted file mode 100644 index 233bc6a..0000000 --- a/src/queue/include/array_lock_free_queue.h +++ /dev/null @@ -1,322 +0,0 @@ -#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ -#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ -#include "atomic_ops.h" -#include <assert.h> // assert() -#include <sched.h> // sched_yield() -#include "logger_factory.h" -#include "mem_pool.h" -#include "shm_allocator.h" - -/// @brief implementation of an array based lock free queue with support for -/// multiple producers -/// This class is prevented from being instantiated directly (all members and -/// methods are private). To instantiate a multiple producers lock free queue -/// you must use the ArrayLockFreeQueue fachade: -/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; - - -#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - -template <typename ELEM_T, typename Allocator = SHM_Allocator> -class ArrayLockFreeQueue -{ - // ArrayLockFreeQueue will be using this' private members - template < - typename ELEM_T_, - typename Allocator_, - template <typename T, typename AT> class Q_TYPE - > - friend class LockFreeQueue; - -private: - /// @brief constructor of the class - ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - virtual ~ArrayLockFreeQueue(); - - inline uint32_t size(); - - inline bool full(); - - inline bool empty(); - - bool push(const ELEM_T &a_data); - - bool pop(ELEM_T &a_data); - - /// @brief calculate the index in the circular array that corresponds - /// to a particular "count" value - inline uint32_t countToIndex(uint32_t a_count); - - ELEM_T& operator[](unsigned i); - -private: - size_t Q_SIZE; - /// @brief array to keep the elements - ELEM_T *m_theQueue; - - /// @brief where a new element will be inserted - uint32_t m_writeIndex; - - /// @brief where the next element where be extracted from - uint32_t m_readIndex; - - /// @brief maximum read index for multiple producer queues - /// If it's not the same as m_writeIndex it means - /// there are writes pending to be "committed" to the queue, that means, - /// the place for the data was reserved (the index in the array) but - /// data is still not in the queue, so the thread trying to read will have - /// to wait for those other threads to save the data into the queue - /// - /// note this is only used for multiple producers - uint32_t m_maximumReadIndex; - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - /// @brief number of elements in the queue - uint32_t m_count; -#endif - - -private: - /// @brief disable copy constructor declaring it private - ArrayLockFreeQueue<ELEM_T, Allocator>(const ArrayLockFreeQueue<ELEM_T> &a_src); - -}; - - -template <typename ELEM_T, typename Allocator> -ArrayLockFreeQueue<ELEM_T, Allocator>::ArrayLockFreeQueue(size_t qsize): - Q_SIZE(qsize), - m_writeIndex(0), // initialisation is not atomic - m_readIndex(0), // - m_maximumReadIndex(0) // -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - ,m_count(0) // -#endif -{ - m_theQueue = (ELEM_T*)Allocator::allocate(Q_SIZE * sizeof(ELEM_T)); - -} - -template <typename ELEM_T, typename Allocator> -ArrayLockFreeQueue<ELEM_T, Allocator>::~ArrayLockFreeQueue() -{ - // std::cout << "destroy ArrayLockFreeQueue\n"; - Allocator::deallocate(m_theQueue); - -} - -template <typename ELEM_T, typename Allocator> -inline -uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::countToIndex(uint32_t a_count) -{ - // if Q_SIZE is a power of 2 this statement could be also written as - // return (a_count & (Q_SIZE - 1)); - return (a_count % Q_SIZE); -} - -template <typename ELEM_T, typename Allocator> -inline -uint32_t ArrayLockFreeQueue<ELEM_T, Allocator>::size() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return m_count; -#else - - uint32_t currentWriteIndex = m_maximumReadIndex; - uint32_t currentReadIndex = m_readIndex; - - // let's think of a scenario where this function returns bogus data - // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run - // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 - // 2. afterwards this thread is preemted. While this thread is inactive 2 - // elements are inserted and removed from the queue, so m_maximumReadIndex - // is 5 and m_readIndex 4. Real size is still 1 - // 3. Now the current thread comes back from preemption and reads m_readIndex. - // currentReadIndex is 4 - // 4. currentReadIndex is bigger than currentWriteIndex, so - // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, - // it returns that the queue is almost full, when it is almost empty - // - if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) - { - return (currentWriteIndex - currentReadIndex); - } - else - { - return (Q_SIZE + currentWriteIndex - currentReadIndex); - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T, typename Allocator> -inline -bool ArrayLockFreeQueue<ELEM_T, Allocator>::full() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count == (Q_SIZE)); -#else - - uint32_t currentWriteIndex = m_writeIndex; - uint32_t currentReadIndex = m_readIndex; - - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T, typename Allocator> -inline -bool ArrayLockFreeQueue<ELEM_T, Allocator>::empty() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count == 0); -#else - - if (countToIndex( m_readIndex) == countToIndex(m_maximumReadIndex)) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - - - - - - -template <typename ELEM_T, typename Allocator> -bool ArrayLockFreeQueue<ELEM_T, Allocator>::push(const ELEM_T &a_data) -{ - uint32_t currentReadIndex; - uint32_t currentWriteIndex; - - do - { - - currentWriteIndex = m_writeIndex; - currentReadIndex = m_readIndex; - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count == Q_SIZE) { - return false; - } - #else - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return false; - } - #endif - - } while (!CAS(&m_writeIndex, currentWriteIndex, (currentWriteIndex + 1))); - - // We know now that this index is reserved for us. Use it to save the data - m_theQueue[countToIndex(currentWriteIndex)] = a_data; - - // update the maximum read index after saving the data. It wouldn't fail if there is only one thread - // inserting in the queue. It might fail if there are more than 1 producer threads because this - // operation has to be done in the same order as the previous CAS - - while (!CAS(&m_maximumReadIndex, currentWriteIndex, (currentWriteIndex + 1))) - { - // this is a good place to yield the thread in case there are more - // software threads than hardware processors and you have more - // than 1 producer thread - // have a look at sched_yield (POSIX.1b) - sched_yield(); - } - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - AtomicAdd(&m_count, 1); -#endif - return true; -} - - -template <typename ELEM_T, typename Allocator> -bool ArrayLockFreeQueue<ELEM_T, Allocator>::pop(ELEM_T &a_data) -{ - uint32_t currentMaximumReadIndex; - uint32_t currentReadIndex; - - do - { - // to ensure thread-safety when there is more than 1 producer thread - // a second index is defined (m_maximumReadIndex) - currentReadIndex = m_readIndex; - currentMaximumReadIndex = m_maximumReadIndex; - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count == 0) { - return false; - } - #else - if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) - { - // the queue is empty or - // a producer thread has allocate space in the queue but is - // waiting to commit the data into it - return false; - } - #endif - - // retrieve the data from the queue - a_data = m_theQueue[countToIndex(currentReadIndex)]; - - // try to perfrom now the CAS operation on the read index. If we succeed - // a_data already contains what m_readIndex pointed to before we - // increased it - if (CAS(&m_readIndex, currentReadIndex, (currentReadIndex + 1))) - { - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - // m_count.fetch_sub(1); - AtomicSub(&m_count, 1); - #endif - return true; - } - - // it failed retrieving the element off the queue. Someone else must - // have read the element stored at countToIndex(currentReadIndex) - // before we could perform the CAS operation - - } while(1); // keep looping to try again! - - // Something went wrong. it shouldn't be possible to reach here - assert(0); - - // Add this return statement to avoid compiler warnings - return false; -} - -template <typename ELEM_T, typename Allocator> -ELEM_T& ArrayLockFreeQueue<ELEM_T, Allocator>::operator[](unsigned int i) -{ - int currentCount = m_count; - uint32_t currentReadIndex = m_readIndex; - if (i >= currentCount) - { - std::cerr << "ArrayLockFreeQueue<ELEM_T, Allocator>::operator[] , Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } - return m_theQueue[countToIndex(currentReadIndex+i)]; -} - -#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ diff --git a/src/queue/include/array_lock_free_queue2.h b/src/queue/include/array_lock_free_queue2.h deleted file mode 100644 index 3b79b7f..0000000 --- a/src/queue/include/array_lock_free_queue2.h +++ /dev/null @@ -1,332 +0,0 @@ -#ifndef __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ -#define __ARRAY_LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ - -#include <assert.h> // assert() -#include <sched.h> // sched_yield() -#include "logger_factory.h" - -/// @brief implementation of an array based lock free queue with support for -/// multiple producers -/// This class is prevented from being instantiated directly (all members and -/// methods are private). To instantiate a multiple producers lock free queue -/// you must use the ArrayLockFreeQueue fachade: -/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; - - -#define _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - -template <typename ELEM_T> -class ArrayLockFreeQueue -{ - // ArrayLockFreeQueue will be using this' private members - template < - typename ELEM_T_, - template <typename T> class Q_TYPE > - friend class LockFreeQueue; - -private: - /// @brief constructor of the class - ArrayLockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - virtual ~ArrayLockFreeQueue(); - - inline uint32_t size(); - - inline bool full(); - - inline bool empty(); - - bool push(const ELEM_T &a_data); - - bool pop(ELEM_T &a_data); - - /// @brief calculate the index in the circular array that corresponds - /// to a particular "count" value - inline uint32_t countToIndex(uint32_t a_count); - - ELEM_T& operator[](unsigned i); - -private: - size_t Q_SIZE; - /// @brief array to keep the elements - ELEM_T *m_theQueue; - - /// @brief where a new element will be inserted - std::atomic<uint32_t> m_writeIndex; - - /// @brief where the next element where be extracted from - std::atomic<uint32_t> m_readIndex; - - /// @brief maximum read index for multiple producer queues - /// If it's not the same as m_writeIndex it means - /// there are writes pending to be "committed" to the queue, that means, - /// the place for the data was reserved (the index in the array) but - /// data is still not in the queue, so the thread trying to read will have - /// to wait for those other threads to save the data into the queue - /// - /// note this is only used for multiple producers - std::atomic<uint32_t> m_maximumReadIndex; - -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - /// @brief number of elements in the queue - std::atomic<uint32_t> m_count; -#endif - - -private: - /// @brief disable copy constructor declaring it private - ArrayLockFreeQueue<ELEM_T>(const ArrayLockFreeQueue<ELEM_T> &a_src); - -}; - - -template <typename ELEM_T> -ArrayLockFreeQueue<ELEM_T>::ArrayLockFreeQueue(size_t qsize): - Q_SIZE(qsize), - m_writeIndex(0), // initialisation is not atomic - m_readIndex(0), // - m_maximumReadIndex(0) // -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - ,m_count(0) // -#endif -{ - m_theQueue = (ELEM_T*)mm_malloc(Q_SIZE * sizeof(ELEM_T)); - -} - -template <typename ELEM_T> -ArrayLockFreeQueue<ELEM_T>::~ArrayLockFreeQueue() -{ - // std::cout << "destroy ArrayLockFreeQueue\n"; - mm_free(m_theQueue); - -} - -template <typename ELEM_T> -inline -uint32_t ArrayLockFreeQueue<ELEM_T>::countToIndex(uint32_t a_count) -{ - // if Q_SIZE is a power of 2 this statement could be also written as - // return (a_count & (Q_SIZE - 1)); - return (a_count % Q_SIZE); -} - -template <typename ELEM_T> -inline -uint32_t ArrayLockFreeQueue<ELEM_T>::size() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return m_count.load(); -#else - - uint32_t currentWriteIndex = m_maximumReadIndex.load(); - uint32_t currentReadIndex = m_readIndex.load(); - - // let's think of a scenario where this function returns bogus data - // 1. when the statement 'currentWriteIndex = m_maximumReadIndex' is run - // m_maximumReadIndex is 3 and m_readIndex is 2. Real size is 1 - // 2. afterwards this thread is preemted. While this thread is inactive 2 - // elements are inserted and removed from the queue, so m_maximumReadIndex - // is 5 and m_readIndex 4. Real size is still 1 - // 3. Now the current thread comes back from preemption and reads m_readIndex. - // currentReadIndex is 4 - // 4. currentReadIndex is bigger than currentWriteIndex, so - // m_totalSize + currentWriteIndex - currentReadIndex is returned, that is, - // it returns that the queue is almost full, when it is almost empty - // - if (countToIndex(currentWriteIndex) >= countToIndex(currentReadIndex)) - { - return (currentWriteIndex - currentReadIndex); - } - else - { - return (Q_SIZE + currentWriteIndex - currentReadIndex); - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T> -inline -bool ArrayLockFreeQueue<ELEM_T>::full() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count.load() == (Q_SIZE)); -#else - - uint32_t currentWriteIndex = m_writeIndex; - uint32_t currentReadIndex = m_readIndex; - - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - -template <typename ELEM_T> -inline -bool ArrayLockFreeQueue<ELEM_T>::empty() -{ -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - return (m_count.load() == 0); -#else - - if (countToIndex( m_readIndex.load()) == countToIndex(m_maximumReadIndex.load())) - { - // the queue is full - return true; - } - else - { - // not full! - return false; - } -#endif // _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE -} - - -template <typename ELEM_T> -bool ArrayLockFreeQueue<ELEM_T>::push(const ELEM_T &a_data) -{ - uint32_t currentReadIndex; - uint32_t currentWriteIndex; - - do - { - currentWriteIndex = m_writeIndex.load(); - currentReadIndex = m_readIndex.load(); -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count.load() == Q_SIZE) { - return false; - } -#else - if (countToIndex(currentWriteIndex + 1) == countToIndex(currentReadIndex)) - { - // the queue is full - return false; - } -#endif - - // There is more than one producer. Keep looping till this thread is able - // to allocate space for current piece of data - // - // using compare_exchange_strong because it isn't allowed to fail spuriously - // When the compare_exchange operation is in a loop the weak version - // will yield better performance on some platforms, but here we'd have to - // load m_writeIndex all over again - } while (!m_writeIndex.compare_exchange_strong( - currentWriteIndex, (currentWriteIndex + 1))); - - // Just made sure this index is reserved for this thread. - m_theQueue[countToIndex(currentWriteIndex)] = a_data; - //memcpy((void *)(&m_theQueue[countToIndex(currentWriteIndex)]), (void *)(&a_data), sizeof(ELEM_T) ); - - // update the maximum read index after saving the piece of data. It can't - // fail if there is only one thread inserting in the queue. It might fail - // if there is more than 1 producer thread because this operation has to - // be done in the same order as the previous CAS - // - // using compare_exchange_weak because they are allowed to fail spuriously - // (act as if *this != expected, even if they are equal), but when the - // compare_exchange operation is in a loop the weak version will yield - // better performance on some platforms. - while (!m_maximumReadIndex.compare_exchange_weak( - currentWriteIndex, (currentWriteIndex + 1))) - { - // this is a good place to yield the thread in case there are more - // software threads than hardware processors and you have more - // than 1 producer thread - // have a look at sched_yield (POSIX.1b) - sched_yield(); - } - - // The value was successfully inserted into the queue -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - m_count.fetch_add(1); -#endif - - return true; -} - -template <typename ELEM_T> -bool ArrayLockFreeQueue<ELEM_T>::pop(ELEM_T &a_data) -{ - uint32_t currentMaximumReadIndex; - uint32_t currentReadIndex; - - do - { - currentReadIndex = m_readIndex.load(); - currentMaximumReadIndex = m_maximumReadIndex.load(); - - #ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - - if (m_count.load() == 0) { - return false; - } - #else - // to ensure thread-safety when there is more than 1 producer - // thread a second index is defined (m_maximumReadIndex) - if (countToIndex(currentReadIndex) == countToIndex(currentMaximumReadIndex)) - { - // the queue is empty or - // a producer thread has allocate space in the queue but is - // waiting to commit the data into it - return false; - } - #endif - - // retrieve the data from the queue - a_data = m_theQueue[countToIndex(currentReadIndex)]; - //memcpy((void*) (&a_data), (void *)(&m_theQueue[countToIndex(currentReadIndex)]),sizeof(ELEM_T) ); - // try to perfrom now the CAS operation on the read index. If we succeed - // a_data already contains what m_readIndex pointed to before we - // increased it - if (m_readIndex.compare_exchange_strong(currentReadIndex, (currentReadIndex + 1))) - { - // got here. The value was retrieved from the queue. Note that the - // data inside the m_queue array is not deleted nor reseted -#ifdef _WITH_LOCK_FREE_Q_KEEP_REAL_SIZE - m_count.fetch_sub(1); -#endif - return true; - } - - // it failed retrieving the element off the queue. Someone else must - // have read the element stored at countToIndex(currentReadIndex) - // before we could perform the CAS operation - - } while(1); // keep looping to try again! - - // Something went wrong. it shouldn't be possible to reach here - assert(0); - - // Add this return statement to avoid compiler warnings - return false; -} - - -template <typename ELEM_T> -ELEM_T& ArrayLockFreeQueue<ELEM_T>::operator[](unsigned int i) -{ - int currentCount = m_count.load(); - uint32_t currentReadIndex = m_readIndex.load(); - if (i < 0 || i >= currentCount) - { - std::cerr << "Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } - return m_theQueue[countToIndex(currentReadIndex+i)]; -} - -#endif // __LOCK_FREE_QUEUE_IMPL_MULTIPLE_PRODUCER_H__ diff --git a/src/queue/include/hashtable.h b/src/queue/include/hashtable.h deleted file mode 100755 index b8edaa3..0000000 --- a/src/queue/include/hashtable.h +++ /dev/null @@ -1,41 +0,0 @@ -#ifndef __HASHTABLE_H__ -#define __HASHTABLE_H__ - -#include <sys/queue.h> -#include <set> - -#define MAPSIZE 100 - -typedef struct hashtable_t -{ - struct tailq_header_t* array[MAPSIZE]; - int mutex; - int wlock; - int cond; - size_t readcnt; - -} hashtable_t; -typedef void (*hashtable_foreach_cb)(int key, void *value); - -void hashtable_init(hashtable_t *hashtable); -void *hashtable_get(hashtable_t *hashtable, int key); -void hashtable_put(hashtable_t *hashtable, int key, void *value); -void *hashtable_remove(hashtable_t *hashtable, int key); -void hashtable_removeall(hashtable_t *hashtable); - -/** - * 閬嶅巻hash_table - * @demo - * hashtable_foreach(&hashtable, [&](int key, void * value){ - * printf("%d, %p\n", key, value); - * }); - * -*/ -void hashtable_foreach(hashtable_t *hashtable, std::function<void(int, void *)> cb); - -// void hashtable_printall(hashtable_t *hashtable); - -int hashtable_alloc_key(hashtable_t *hashtable); - -std::set<int> * hashtable_keyset(hashtable_t *hashtable) ; -#endif diff --git a/src/queue/include/linked_lock_free_queue.h b/src/queue/include/linked_lock_free_queue.h deleted file mode 100644 index 3906a42..0000000 --- a/src/queue/include/linked_lock_free_queue.h +++ /dev/null @@ -1,245 +0,0 @@ -// queue.h -- interface for a queue -#ifndef __LINKED_LOCK_FREE_QUEUE_H_ -#define __LINKED_LOCK_FREE_QUEUE_H_ -#include "mm.h" -#include "sem_util.h" - -template <typename T> class Node; - -template <typename T> -class Pointer { -public: - - Node<T> *ptr; - unsigned long count; - Pointer( Node<T> *node = NULL, int c=0) noexcept : ptr(node), count(c) {} - - bool operator == (const Pointer<T> o) const { - return (o.ptr == ptr) && (o.count == count); - } - bool operator != (const Pointer<T> o) const { - return !((o.ptr == ptr) && (o.count == count)); - } - - - -}; - -template <typename T> -class Node { -public: - alignas(16) std::atomic<Pointer<T> > next; - T value; - - Node() { - } - - void *operator new(size_t size){ - return mm_malloc(size); - } - - void operator delete(void *p) { - return mm_free(p); - } -}; - - - - - -template <typename ELEM_T> -class LinkedLockFreeQueue -{ - - template < - typename ELEM_T_, - template <typename T> class Q_TYPE > - friend class LockFreeQueue; -private: -// class scope definitions - enum {Q_SIZE = 10}; - -// private class members - std::atomic<Pointer<ELEM_T> > Head; // pointer to front of Queue - std::atomic<Pointer<ELEM_T> > Tail; // pointer to rear of Queue - //std::atomic_uint count; // current number of size in Queue - std::atomic_uint count; - const size_t qsize; // maximum number of size in Queue - // preemptive definitions to prevent public copying - LinkedLockFreeQueue(const LinkedLockFreeQueue & q) : qsize(0) { } - LinkedLockFreeQueue & operator=(const LinkedLockFreeQueue & q) { return *this;} -protected: - LinkedLockFreeQueue(size_t qs = Q_SIZE); // create queue with a qs limit - ~LinkedLockFreeQueue(); - bool empty() const; - bool full() const; - unsigned int size() const; - bool push(const ELEM_T &item); // add item to end - bool pop(ELEM_T &item); - - - ELEM_T& operator[](unsigned i); - -}; - - -// Queue methods -template <typename T> -LinkedLockFreeQueue<T>::LinkedLockFreeQueue(size_t qs) : count(0), qsize(qs) -{ - Node<T> *node = new Node<T>; - Pointer<T> pointer(node, 0); - - Head.store(pointer, std::memory_order_relaxed); - Tail.store(pointer, std::memory_order_relaxed); - -} - -template <typename T> -LinkedLockFreeQueue<T>::~LinkedLockFreeQueue() -{ - LoggerFactory::getLogger().debug("LinkedLockFreeQueue destory"); - Node<T> * nodeptr; - Pointer<T> tmp = Head.load(std::memory_order_relaxed); - while((nodeptr = tmp.ptr) != NULL) { - tmp = (tmp.ptr->next).load(std::memory_order_relaxed); - //std::cerr << "delete " << nodeptr << std::endl; - delete nodeptr; - - } -} - -template <typename T> -bool LinkedLockFreeQueue<T>::empty() const -{ - return count == 0; -} - -template <typename T> -bool LinkedLockFreeQueue<T>::full() const -{ - return count == qsize; -} - -template <typename T> -unsigned int LinkedLockFreeQueue<T>::size() const -{ - return count; -} - -// Add item to queue -template <typename T> -bool LinkedLockFreeQueue<T>::push(const T & item) -{ - if (full()) - return false; - - Node<T> * node = new Node<T>; - node->value = item; - - - Pointer<T> tail ; - Pointer<T> next ; - - - while(true) { - tail = Tail.load(std::memory_order_relaxed); - next = (tail.ptr->next).load(std::memory_order_relaxed); - if (tail == Tail.load(std::memory_order_relaxed)) { - if (next.ptr == NULL) { - if ((tail.ptr->next).compare_exchange_weak(next, - Pointer<T>(node, next.count+1), - std::memory_order_release, - std::memory_order_relaxed) ) - break; - else - Tail.compare_exchange_weak(tail, - Pointer<T>(next.ptr, tail.count+1), - std::memory_order_release, - std::memory_order_relaxed); - } - - } - } - - Tail.compare_exchange_weak(tail, Pointer<T>(node, tail.count+1), - std::memory_order_release, - std::memory_order_relaxed); - count++; - return true; -} - - - - -// Place front item into item variable and remove from queue -template <typename T> -bool LinkedLockFreeQueue<T>::pop(T & item) -{ - if (empty()) - return false; - - Pointer<T> head; - Pointer<T> tail; - Pointer<T> next; - - while(true) { - head = Head.load(std::memory_order_relaxed); - tail = Tail.load(std::memory_order_relaxed); - next = (head.ptr->next).load(); - if (head == Head.load(std::memory_order_relaxed)) { - if(head.ptr == tail.ptr) { - if (next.ptr == NULL) - return false; - // Tail is falling behind. Try to advance it - Tail.compare_exchange_weak(tail, - Pointer<T>(next.ptr, tail.count+1), - std::memory_order_release, - std::memory_order_relaxed); - } else { - item = next.ptr->value; - if (Head.compare_exchange_weak(head, - Pointer<T>(next.ptr, head.count+1), - std::memory_order_release, - std::memory_order_relaxed)) { - delete head.ptr; - break; - } - - } - } - - } - - count--; - return true; - -} - - -template <class T> -T& LinkedLockFreeQueue<T>::operator[](unsigned int i) -{ - if (i < 0 || i >= count) - { - std::cerr << "Error in array limits: " << i << " is out of range\n"; - std::exit(EXIT_FAILURE); - } - - - Pointer<T> tmp = Head.load(std::memory_order_relaxed); - //Pointer<T> tail = Tail.load(std::memory_order_relaxed); - - while(i > 0) { - //std::cout << i << ":" << std::endl; - tmp = (tmp.ptr->next).load(std::memory_order_relaxed); - i--; - } - - tmp = (tmp.ptr->next).load(std::memory_order_relaxed); - return tmp.ptr->value; -} - - - -#endif diff --git a/src/queue/include/lock_free_queue.h b/src/queue/include/lock_free_queue.h deleted file mode 100644 index 17e8c56..0000000 --- a/src/queue/include/lock_free_queue.h +++ /dev/null @@ -1,369 +0,0 @@ -#ifndef __LOCK_FREE_QUEUE_H__ -#define __LOCK_FREE_QUEUE_H__ - -#include <usg_common.h> -#include <assert.h> // assert() -#include "mem_pool.h" -#include "sem_util.h" -#include "logger_factory.h" -#include "shm_allocator.h" - -// default Queue size -#define LOCK_FREE_Q_DEFAULT_SIZE 16 - -// define this macro if calls to "size" must return the real size of the -// queue. If it is undefined that function will try to take a snapshot of -// the queue, but returned value might be bogus - - -// forward declarations for default template values -// - -template <typename ELEM_T, typename Allocator> -class ArrayLockFreeQueue; - -// template <typename ELEM_T> -// class LinkedLockFreeQueue; - - -/// @brief Lock-free queue based on a circular array -/// No allocation of extra memory for the nodes handling is needed, but it has -/// to add extra overhead (extra CAS operation) when inserting to ensure the -/// thread-safety of the queue when the queue type is not -/// ArrayLockFreeQueueSingleProducer. -/// -/// examples of instantiation: -/// ArrayLockFreeQueue<int> q; // queue of ints of default size (65535 - 1) -/// // and defaulted to single producer -/// ArrayLockFreeQueue<int, 16> q; -/// // queue of ints of size (16 - 1) and -/// // defaulted to single producer -/// ArrayLockFreeQueue<int, 100, ArrayLockFreeQueue> q; -/// // queue of ints of size (100 - 1) with support -/// // for multiple producers -/// -/// ELEM_T represents the type of elementes pushed and popped from the queue -/// Q_SIZE size of the queue. The actual size of the queue is (Q_SIZE-1) -/// This number should be a power of 2 to ensure -/// indexes in the circular queue keep stable when the uint32_t -/// variable that holds the current position rolls over from FFFFFFFF -/// to 0. For instance -/// 2 -> 0x02 -/// 4 -> 0x04 -/// 8 -> 0x08 -/// 16 -> 0x10 -/// (...) -/// 1024 -> 0x400 -/// 2048 -> 0x800 -/// -/// if queue size is not defined as requested, let's say, for -/// instance 100, when current position is FFFFFFFF (4,294,967,295) -/// index in the circular array is 4,294,967,295 % 100 = 95. -/// When that value is incremented it will be set to 0, that is the -/// last 4 elements of the queue are not used when the counter rolls -/// over to 0 -/// Q_TYPE type of queue implementation. ArrayLockFreeQueueSingleProducer and -/// ArrayLockFreeQueue are supported (single producer -/// by default) -template < - typename ELEM_T, - typename Allocator = SHM_Allocator, - template <typename T, typename AT> class Q_TYPE = ArrayLockFreeQueue - > -class LockFreeQueue -{ - -private: - int slots; - int items; - -public: - int mutex; - LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - /// @brief destructor of the class. - /// Note it is not virtual since it is not expected to inherit from this - /// template - ~LockFreeQueue(); - std::atomic_uint reference; - /// @brief constructor of the class - - - /// @brief returns the current number of items in the queue - /// It tries to take a snapshot of the size of the queue, but in busy environments - /// this function might return bogus values. - /// - /// If a reliable queue size must be kept you might want to have a look at - /// the preprocessor variable in this header file called '_WITH_LOCK_FREE_Q_KEEP_REAL_SIZE' - /// it enables a reliable size though it hits overall performance of the queue - /// (when the reliable size variable is on it's got an impact of about 20% in time) - inline uint32_t size(); - - /// @brief return true if the queue is full. False otherwise - /// It tries to take a snapshot of the size of the queue, but in busy - /// environments this function might return bogus values. See help in method - /// LockFreeQueue::size - inline bool full(); - - inline bool empty(); - - inline ELEM_T& operator[](unsigned i); - - /// @brief push an element at the tail of the queue - /// @param the element to insert in the queue - /// Note that the element is not a pointer or a reference, so if you are using large data - /// structures to be inserted in the queue you should think of instantiate the template - /// of the queue as a pointer to that large structure - /// @return true if the element was inserted in the queue. False if the queue was full - bool push(const ELEM_T &a_data); - bool push_nowait(const ELEM_T &a_data); - bool push_timeout(const ELEM_T &a_data, const struct timespec * timeout); - - /// @brief pop the element at the head of the queue - /// @param a reference where the element in the head of the queue will be saved to - /// Note that the a_data parameter might contain rubbish if the function returns false - /// @return true if the element was successfully extracted from the queue. False if the queue was empty - bool pop(ELEM_T &a_data); - bool pop_nowait(ELEM_T &a_data); - bool pop_timeout(ELEM_T &a_data, struct timespec * timeout); - - - void *operator new(size_t size); - void operator delete(void *p); - -protected: - /// @brief the actual queue. methods are forwarded into the real - /// implementation - Q_TYPE<ELEM_T, Allocator> m_qImpl; - -private: - /// @brief disable copy constructor declaring it private - LockFreeQueue<ELEM_T, Allocator, Q_TYPE>(const LockFreeQueue<ELEM_T, Allocator, Q_TYPE> &a_src); -}; - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): reference(0), m_qImpl(qsize) -{ -// std::cout << "LockFreeQueue init reference=" << reference << std::endl; - slots = SemUtil::get(IPC_PRIVATE, qsize); - items = SemUtil::get(IPC_PRIVATE, 0); - mutex = SemUtil::get(IPC_PRIVATE, 1); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() -{ - LoggerFactory::getLogger().debug("LockFreeQueue desctroy"); - SemUtil::remove(slots); - SemUtil::remove(items); - SemUtil::remove(mutex); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline uint32_t LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::size() -{ - return m_qImpl.size(); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::full() -{ - return m_qImpl.full(); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -inline bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::empty() -{ - return m_qImpl.empty(); -} - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data) -{ - // printf("==================LockFreeQueue push before\n"); - if (SemUtil::dec(slots) == -1) { - err_msg(errno, "LockFreeQueue push"); - return false; - } - - if ( m_qImpl.push(a_data) ) { - - SemUtil::inc(items); - // printf("==================LockFreeQueue push after\n"); - return true; - } - return false; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_nowait(const ELEM_T &a_data) -{ - if (SemUtil::dec_nowait(slots) == -1) { - if (errno == EAGAIN) - return false; - else { - err_msg(errno, "LockFreeQueue push_nowait"); - return false; - } - - } - - if ( m_qImpl.push(a_data)) { - SemUtil::inc(items); - return true; - } - return false; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout) -{ - - - if (SemUtil::dec_timeout(slots, timeout) == -1) { - if (errno == EAGAIN) - return false; - else { - // err_msg(errno, "LockFreeQueue push_timeout"); - return false; - } - } - - if (m_qImpl.push(a_data)){ - SemUtil::inc(items); - return true; - } - return false; - -} - - - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data) -{ - // printf("==================LockFreeQueue pop before\n"); - if (SemUtil::dec(items) == -1) { - err_msg(errno, "LockFreeQueue pop"); - return false; - } - - if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); - // printf("==================LockFreeQueue pop after\n"); - return true; - } - return false; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_nowait(ELEM_T &a_data) -{ - if (SemUtil::dec_nowait(items) == -1) { - if (errno == EAGAIN) - return false; - else { - err_msg(errno, "LockFreeQueue pop_nowait"); - return false; - } - } - - if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); - return true; - } - return false; - -} - - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -bool LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) -{ -// printf("==================LockFreeQueue pop_timeout before\n"); - if (SemUtil::dec_timeout(items, timeout) == -1) { - if (errno == EAGAIN) - return false; - else { - // err_msg(errno, "LockFreeQueue pop_timeout"); - return false; - } - } - - if (m_qImpl.pop(a_data)) { - SemUtil::inc(slots); -// printf("==================LockFreeQueue pop_timeout after\n"); - return true; - } - return false; - -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -ELEM_T& LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator[](unsigned i) { - return m_qImpl.operator[](i); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -void * LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator new(size_t size){ - return Allocator::allocate(size); -} - -template < - typename ELEM_T, - typename Allocator, - template <typename T, typename AT> class Q_TYPE> -void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::operator delete(void *p) { - return Allocator::deallocate(p); -} - -// include implementation files -//#include "linked_lock_free_queue.h" -#include "array_lock_free_queue.h" - -#endif // _LOCK_FREE_QUEUE_H__ diff --git a/src/queue/include/mem_pool.h b/src/queue/include/mem_pool.h deleted file mode 100644 index 2ea1f6b..0000000 --- a/src/queue/include/mem_pool.h +++ /dev/null @@ -1,80 +0,0 @@ -#ifndef _MEM_POOL_H_ -#define _MEM_POOL_H_ -#include "mm.h" -#include "sem_util.h" -#define MEM_POOL_COND_KEY 0x8801 - -static int mem_pool_cond = SemUtil::get(MEM_POOL_COND_KEY, 0); - -// static int mem_pool_mutex = SemUtil::get(MEM_POOL_COND_KEY, 1); - -static inline void mem_pool_init(size_t heap_size) { - if(mm_init(heap_size)) { - - } -} - -static inline void mem_pool_destroy(void) { - if(mm_destroy()) { - SemUtil::remove(mem_pool_cond); - } - -} - -static inline void *mem_pool_malloc (size_t size) { - void *ptr; - while( (ptr = mm_malloc(size)) == NULL ) { - err_msg(0, "There is not enough memery to allocate, waiting someone else to free."); - SemUtil::set(mem_pool_cond, 0); - // wait for someone else to free space - SemUtil::dec(mem_pool_cond); - - } - - return ptr; -} - - -static inline void mem_pool_free (void *ptr) { - mm_free(ptr); - // notify malloc - SemUtil::set(mem_pool_cond, 1); - -} - - -template <typename T> -static inline T* mem_pool_attach(int key) { - void *ptr; - // T* tptr; - hashtable_t *hashtable = mm_get_hashtable(); - ptr = hashtable_get(hashtable, key); -// printf("mem_pool_malloc_by_key malloc before %d, %p\n", key, ptr); - if(ptr == NULL || ptr == (void *)1 ) { - ptr = mm_malloc(sizeof(T)); - hashtable_put(hashtable, key, ptr); - new(ptr) T; -// printf("mem_pool_malloc_by_key use new %d, %p\n", key, ptr); - } - return (T*)ptr; -} - -static inline void mem_pool_free_by_key(int key) { - return mm_free_by_key(key); -} - - -static inline void *mem_pool_realloc (void *ptr, size_t size) { - return mm_realloc(ptr, size); -} - -static inline int mem_pool_alloc_key() { - - return mm_alloc_key(); -} - - -// extern int mm_checkheap(int verbose); - - -#endif \ No newline at end of file diff --git a/src/queue/include/mm.h b/src/queue/include/mm.h deleted file mode 100644 index 6dbb979..0000000 --- a/src/queue/include/mm.h +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef MM_HDR_H -#define MM_HDR_H /* Prevent accidental double inclusion */ - -#include <usg_common.h> -#include "usg_typedef.h" -#include "hashtable.h" - -extern bool mm_init(size_t heap_size); -extern bool mm_destroy(void); - -extern void *mm_malloc (size_t size); -extern void mm_free (void *ptr); -extern void *mm_realloc(void *ptr, size_t size); - -extern void * mm_get_by_key(int key); - -extern void mm_free_by_key(int key); - -extern int mm_alloc_key() ; - -extern hashtable_t * mm_get_hashtable(); - -// extern int mm_checkheap(int verbose); - -// extern void *get_mm_start_brk(); -// extern size_t get_mm_max_size(); -#endif diff --git a/src/queue/include/shm_allocator.h b/src/queue/include/shm_allocator.h deleted file mode 100644 index 084a678..0000000 --- a/src/queue/include/shm_allocator.h +++ /dev/null @@ -1,100 +0,0 @@ -#ifndef __SHM_ALLOCATOR_H__ -#define __SHM_ALLOCATOR_H__ -#include "usg_common.h" -#include "mem_pool.h" -#include <new> -#include <cstdlib> // for exit() -#include <climits> // for UNIX_MAX -#include <cstddef> - - - -template<class T> class SHM_STL_Allocator -{ -public: - typedef T value_type; - typedef T* pointer; - typedef const T* const_pointer; - typedef T& reference; - typedef const T& const_reference; - typedef size_t size_type; - typedef ptrdiff_t difference_type; - - - SHM_STL_Allocator() {}; - ~SHM_STL_Allocator() {}; - template<class U> SHM_STL_Allocator(const SHM_STL_Allocator<U>& t) { }; - template<class U> struct rebind { typedef SHM_STL_Allocator<U> other; }; - - pointer allocate(size_type n, const void* hint=0) { -// fprintf(stderr, "allocate n=%u, hint= %p\n",n, hint); - return((T*) (mm_malloc(n * sizeof(T)))); - } - - void deallocate(pointer p, size_type n) { -// fprintf(stderr, "dealocate n=%u" ,n); - mm_free((void*)p); - } - - void construct(pointer p, const T& value) { - ::new(p) T(value); - } - - void construct(pointer p) - { - ::new(p) T(); - } - - void destroy(pointer p) { - p->~T(); - } - - pointer address(reference x) { - return (pointer)&x; - } - - const_pointer address(const_reference x) { - return (const_pointer)&x; - } - - size_type max_size() const { - return size_type(UINT_MAX/sizeof(T)); - } -}; - - -class SHM_Allocator { - public: - static void *allocate (size_t size) { - return mm_malloc(size); - // return mem_pool_malloc(size); - } - - static void deallocate (void *ptr) { - return mm_free(ptr); - // return mem_pool_free(ptr); - } -}; - - -class DM_Allocator { - public: - static void *allocate (size_t size) { - return malloc(size); - } - - static void deallocate (void *ptr) { - return free(ptr); - } -}; - - -// template<class charT, class traits = char _traits<charT>, -// class Allocator = allocator<charT> > - - - - -typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString; - -#endif \ No newline at end of file diff --git a/src/queue/include/shm_mm.h b/src/queue/include/shm_mm.h deleted file mode 100644 index ec094ac..0000000 --- a/src/queue/include/shm_mm.h +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef __SHM_MM_H__ -#define __SHM_MM_H__ - -#ifdef __cplusplus -extern "C" { -#endif - -/** - * 鍒濆鍖栧叡浜唴瀛� - * @size 鍏变韩鍐呭瓨澶у皬, 鍗曚綅M - * - */ -void shm_init(int size); - -/** - * 閿�姣佸叡浜唴瀛� - * 鏁翠釜杩涚▼閫�鍑烘椂闇�瑕佹墽琛岃繖涓柟娉曪紝璇ユ柟娉曢鍏堜細妫�鏌ユ槸鍚﹁繕鏈夊叾浠栬繘绋嬪湪浣跨敤璇ュ叡浜唴瀛橈紝濡傛灉杩樻湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ氨鍙槸detach,濡傛灉娌℃湁鍏朵粬杩涚▼鍦ㄤ娇鐢ㄥ垯閿�姣佹暣鍧楀唴瀛樸�� - */ -void shm_destroy(); - -int shm_alloc_key(); - - -#ifdef __cplusplus -} -#endif - -#endif - diff --git a/src/queue/include/shm_queue.h b/src/queue/include/shm_queue.h deleted file mode 100644 index 5c82b05..0000000 --- a/src/queue/include/shm_queue.h +++ /dev/null @@ -1,209 +0,0 @@ -#ifndef __SHM_QUEUE_H__ -#define __SHM_QUEUE_H__ - -#include "hashtable.h" -#include "lock_free_queue.h" -#include "logger_factory.h" -#include "sem_util.h" -#include "shm_allocator.h" -#include "usg_common.h" -// default Queue size -// #define LOCK_FREE_Q_DEFAULT_SIZE 16 - -template <typename ELEM_T> class SHMQueue { - -private: - const int KEY; - -public: - /// @brief constructor of the class - SHMQueue(int key = 0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); - - ~SHMQueue(); - - void force_destroy(); - - inline uint32_t size(); - - inline bool full(); - inline bool empty(); - - inline bool push(const ELEM_T &a_data); - inline bool push_nowait(const ELEM_T &a_data); - inline bool push_timeout(const ELEM_T &a_data, - const struct timespec *timeout); - inline bool pop(ELEM_T &a_data); - inline bool pop_nowait(ELEM_T &a_data); - inline bool pop_timeout(ELEM_T &a_data, struct timespec *timeout); - - inline ELEM_T &operator[](unsigned i); - - // @deprecate - static size_t remove_queues_exclude(int keys[], size_t length); - static size_t remove_queues(int keys[], size_t length); - static size_t remove_queue(int key); - -private: -protected: - /// @brief the actual queue-> methods are forwarded into the real - /// implementation - LockFreeQueue<ELEM_T, SHM_Allocator> *queue; - -private: - /// @brief disable copy constructor declaring it private - SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src); -}; - -// @deprecate -template <typename ELEM_T> -size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) { - hashtable_t *hashtable = mm_get_hashtable(); - std::set<int> *keyset = hashtable_keyset(hashtable); - std::set<int>::iterator keyItr; - LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; - bool found; - size_t count = 0; - for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { - found = false; - for (size_t i = 0; i < length; i++) { - if (*keyItr == keys[i]) { - found = true; - break; - } - } - if (!found) { - // 閿�姣佸叡浜唴瀛樼殑queue - mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); - delete mqueue; - hashtable_remove(hashtable, *keyItr); - count++; - } - } - delete keyset; - return count; -} - - -template <typename ELEM_T> -size_t SHMQueue<ELEM_T>::remove_queues(int keys[], size_t length) { - hashtable_t *hashtable = mm_get_hashtable(); - LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; - size_t count = 0; - for(int i = 0; i< length; i++) { - // 閿�姣佸叡浜唴瀛樼殑queue - mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)mm_get_by_key(keys[i]); - delete mqueue; - hashtable_remove(hashtable, keys[i]); - count++; - } - return count; -} - -template <typename ELEM_T> -size_t SHMQueue<ELEM_T>::remove_queue(int key) { - int keys[] = {key}; - return remove_queues(keys, 1); -} - -template <typename ELEM_T> -SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) { - - hashtable_t *hashtable = mm_get_hashtable(); - queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); - if (queue == NULL || (void *)queue == (void *)1) { - queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize); - hashtable_put(hashtable, key, (void *)queue); - } - queue->reference++; - LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load()); -} - -template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() { - if(queue == NULL) { - // queue宸茬粡閿�姣� - return; - } - - SemUtil::dec(queue->mutex); - queue->reference--; - // LoggerFactory::getLogger().debug("SHMQueue destructor reference===%d", - if (queue->reference.load() == 0) { - delete queue; - queue = NULL; - hashtable_t *hashtable = mm_get_hashtable(); - hashtable_remove(hashtable, KEY); - // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶 SemUtil::inc(queue->mutex) - // printf("SHMQueue destructor delete queue\n"); - } else { - SemUtil::inc(queue->mutex); - } - -} - -template <typename ELEM_T> void SHMQueue<ELEM_T>::force_destroy() { - if(queue == NULL) { - // queue宸茬粡閿�姣� - return; - } - - SemUtil::dec(queue->mutex); - delete queue; - queue = NULL; - hashtable_t *hashtable = mm_get_hashtable(); - hashtable_remove(hashtable, KEY); - // 姝ゆ椂queue宸茬粡閿�姣侊紝鏃犻渶 SemUtil::inc(queue->mutex) -} - -template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() { - return queue->size(); -} - -template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::full() { - return queue->full(); -} - -template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::empty() { - return queue->empty(); -} - -template <typename ELEM_T> -inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data) { - return queue->push(a_data); -} - -template <typename ELEM_T> -inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { - return queue->push_nowait(a_data); -} - -template <typename ELEM_T> -inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, - const struct timespec *timeout) { - - return queue->push_timeout(a_data, timeout); -} - -template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data) { - // printf("SHMQueue pop before\n"); - int rv = queue->pop(a_data); - // printf("SHMQueue after before\n"); - return rv; -} - -template <typename ELEM_T> -inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { - return queue->pop_nowait(a_data); -} - -template <typename ELEM_T> -inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, - struct timespec *timeout) { - return queue->pop_timeout(a_data, timeout); -} - -template <typename ELEM_T> -inline ELEM_T &SHMQueue<ELEM_T>::operator[](unsigned i) { - return queue->operator[](i); -} - -#endif diff --git a/src/queue/include/shm_queue_wrapper.h b/src/queue/include/shm_queue_wrapper.h deleted file mode 100644 index 10d3b16..0000000 --- a/src/queue/include/shm_queue_wrapper.h +++ /dev/null @@ -1,103 +0,0 @@ -#ifndef __SHM_QUEUE_WRAPPER_H__ -#define __SHM_QUEUE_WRAPPER_H__ - -#include "usg_common.h" -#include "usg_typedef.h" - - -#ifdef __cplusplus -extern "C" { -#endif - - - -/** - * @depracate 宸插簾寮冧笉鐢� - * 绉婚櫎涓嶅寘鍚湪keys涓殑闃熷垪 - */ -void shm_remove_queues_exclude(void *keys, int length); -/** - * 鍒涘缓闃熷垪 - * @ shmqueue - * @ key 鏍囪瘑鍏变韩闃熷垪鐨勫敮涓�鏍囪瘑, key鏄竴涓寚閽堥噷闈㈠瓨鍌ㄤ簡key鐨勫�硷紝 濡傛灉key鐨勫�间负-1绯荤粺浼氳嚜鍔ㄥ垎閰嶄竴涓猭ey鍊煎苟鎶婅key鐨勫�艰祴缁檏ey鎸囬拡銆傚鏋渒ey鐨勫�间笉浼氱┖浼氭鏌ユ槸鍚︽湁閲嶅缁戝畾鐨勬儏鍐�, 鏈夐噸澶嶅氨鎶ラ敊娌℃湁灏卞垱寤洪槦鍒楀苟缁戝畾key. - * @ queue_size 闃熷垪澶у皬 - */ -void* shmqueue_create( int * key, int queue_size); - -/** - * 缁戝畾key鍒伴槦鍒楋紝浣嗘槸骞朵笉浼氬垱寤洪槦鍒椼�傚鏋滄病鏈夊搴旀寚瀹歬ey鐨勯槦鍒楁彁绀洪敊璇苟閫�鍑� - */ -void* shmqueue_attach(int key) ; - -/** - * 閿�姣� -*/ -void shmqueue_drop(void * _shmqueue); - -/** - * 闃熷垪鍏冪礌鐨勪釜鏁� - */ -int shmqueue_size(void * _shmqueue) ; - -/** - * 鏄惁宸叉弧 - * @return 1鏄紝 0鍚� - */ -int shmqueue_full(void * _shmqueue); - -/** - * 鏄惁涓虹┖ - * @return 1鏄紝 0鍚� - */ -int shmqueue_empty(void * _shmqueue) ; - -/** - * 鍏ラ槦, 闃熷垪婊℃椂绛夊緟. - * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触 - */ -int shmqueue_push(void * _shmqueue, void *src, int size); - -/** - * 鍏ラ槦, 绔嬪埢杩斿洖 - * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触 - */ -int shmqueue_push_nowait(void * _shmqueue, void *src, int size) ; - -/** - * 鍏ラ槦, 鎸囧畾鏃堕棿鍐呭叆闃熶笉鎴愬姛灏辫繑鍥� - * @sec 绉� - * @nsec 绾崇 - * @return 1 鍏ラ槦鎴愬姛, 0 鍏ラ槦澶辫触 - */ -int shmqueue_push_timeout(void * _shmqueue, void *src, int size, int sec, int nsec) ; - -/** - * 鍑洪槦, 闃熷垪绌烘椂绛夊緟 - * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触 - */ -int shmqueue_pop(void * _shmqueue, void **dest, int *size); - -/** - * 鍑洪槦, 绔嬪埢杩斿洖 - * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触 - */ -int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) ; - -/** - * 鍑洪槦, 鎸囧畾鏃堕棿鍐呭嚭闃熶笉鎴愬姛灏辫繑鍥� - * @sec绉� - * @nsec绾崇 - * @return 1 鍑洪槦鎴愬姛锛� 0鍑洪槦澶辫触 - */ -int shmqueue_pop_timeout(void * _shmqueue, void **dest, int *size, int sec, int nsec); - -/** - * 閲婃斁鍑洪槦鍒嗛厤鐨勫唴瀛� - */ -void shmqueue_free(void *ptr); - -#ifdef __cplusplus -} -#endif - -#endif \ No newline at end of file diff --git a/src/socket/dmod_socket.c b/src/socket/dmod_socket.c deleted file mode 100644 index 451e8b2..0000000 --- a/src/socket/dmod_socket.c +++ /dev/null @@ -1,524 +0,0 @@ -#include "dmod_socket.h" - - -void DModSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { - SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); - SHMKeySet *subscripter_set; - SHMKeySet::iterator set_iter; - SHMTopicSubMap::iterator map_iter; - - if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { - subscripter_set = map_iter->second; - if(subscripter_set != NULL) { - for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { - cb(subscripter_set, *set_iter); - } - } - } - } -} - -bool DModSocket::include_in_keys(int key, int keys[], size_t length) { - if(length == 0) { - return false; - } - for(int i = 0; i < length; i++) { - if(keys[i] == key) - return true; - } - return false; -} - -size_t DModSocket::remove_subscripters(int keys[], size_t length) { - size_t count = 0; - int key; - for(int i = 0; i < length; i++) { - key = keys[i]; - SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); - SHMKeySet *subscripter_set; - SHMKeySet::iterator set_iter; - SHMTopicSubMap::iterator map_iter; - - if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { - subscripter_set = map_iter->second; - if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { - subscripter_set->erase(set_iter); -// printf("remove_subscripter %s, %d\n", map_iter->first, key); - count++; - } - } - } - } - return count; - -} - - -size_t DModSocket::remove_keys(int keys[], size_t length) { - remove_subscripters(keys, length); - return shm_socket_remove_keys(keys, length); -} - -DModSocket::DModSocket() { - mod = (socket_mod_t)0; - shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); - bus_set = new std::set<int>; - topic_sub_map = NULL; -} - -DModSocket::~DModSocket() { -// printf("DModSocket destory 1\n"); - SHMKeySet *subscripter_set; - SHMTopicSubMap::iterator map_iter; - struct timespec timeout = {1, 0}; - if(bus_set != NULL) { - for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) { -// printf("DModSocket desub_timeout before"); - desub_timeout(NULL, 0, *bus_iter, &timeout); -// printf("DModSocket desub_timeout after %d\n", *bus_iter); - } - delete bus_set; - } - -// printf("DModSocket destory 2\n"); - if(topic_sub_map != NULL) { - for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { - subscripter_set = map_iter->second; -// printf("DModSocket destory 2-1\n"); - if(subscripter_set != NULL) { -// printf("DModSocket destory 2-2\n"); - subscripter_set->clear(); -// printf("DModSocket destory 2-3\n"); - mm_free((void *)subscripter_set); -// printf("DModSocket destory 2-4\n"); - } - - } - topic_sub_map->clear(); - mem_pool_free_by_key(BUS_MAP_KEY); - } -// printf("DModSocket destory 3\n"); - // printf("=============close socket\n"); - shm_close_socket(shm_socket); -// printf("DModSocket destory 4\n"); -} - -int DModSocket::bind(int port) { - return shm_socket_bind(shm_socket, port); -} - - - -/** - * 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int DModSocket::force_bind(int port) { - return shm_socket_force_bind(shm_socket, port); -} -/** - * 鍙戦�佷俊鎭� - * @port 鍙戦�佺粰璋� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ -int DModSocket::sendto(const void *buf, const int size, const int port) { - return shm_sendto(shm_socket, buf, size, port, NULL, 0); -} -// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::sendto_timeout(const void *buf, const int size, const int port, const struct timespec *timeout) { - return shm_sendto(shm_socket, buf, size, port, timeout, 0); -} -// 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� -int DModSocket::sendto_nowait( const void *buf, const int size, const int port){ - return shm_sendto(shm_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT); -} - - -inline int DModSocket::_recvfrom_(void **buf, int *size, int *port, struct timespec *timeout, int flags) { - - if(mod == BUS) { - err_exit(0, "Can not use method recvfrom in a Bus"); - } -// printf("dgram_mod_recvfrom before\n"); - int rv = shm_recvfrom(shm_socket, buf, size, port, timeout, flags); -// printf("dgram_mod_recvfrom after\n"); - return rv; -} -/** - * 鎺ユ敹淇℃伅 - * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int DModSocket::recvfrom(void **buf, int *size, int *port) { - - return _recvfrom_( buf, size, port, NULL, 0); -} - - -// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::recvfrom_timeout( void **buf, int *size, int *port, struct timespec *timeout) { - return _recvfrom_(buf, size, port, timeout, 0); -} - -int DModSocket::recvfrom_nowait( void **buf, int *size, int *port){ - return _recvfrom_(buf, size, port, NULL, (int)SHM_MSG_NOWAIT); -} - -/** - * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 - * @port 鍙戦�佺粰璋� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int DModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ - return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){ - return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0); -} -int DModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){ - return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); -} - - -/** - * 鍚姩bus - * - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int DModSocket::start_bus(){ - mod = BUS; - topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY); - - run_pubsub_proxy(); - // pthread_t tid; - // pthread_create(&tid, NULL, run_accept_sub_request, _socket); - return 0; -} - -/** - * 璁㈤槄鎸囧畾涓婚 - * @topic 涓婚 - * @size 涓婚闀垮害 - * @port 鎬荤嚎绔彛 - */ -int DModSocket::sub(char *topic, int size, int port){ - return _sub_( topic, size, port, NULL, 0); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::sub_timeout(char *topic, int size, int port, struct timespec *timeout){ - return _sub_(topic, size, port, timeout, 0); -} -int DModSocket::sub_nowait(char *topic, int size, int port) { - return _sub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); -} - - - -/** - * 鍙栨秷璁㈤槄鎸囧畾涓婚 - * @topic 涓婚 - * @size 涓婚闀垮害 - * @port 鎬荤嚎绔彛 - */ -int DModSocket::desub(char *topic, int size, int port){ - return _desub_( topic, size, port, NULL, 0); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::desub_timeout(char *topic, int size, int port, struct timespec *timeout){ - return _desub_(topic, size, port, timeout, 0); -} -int DModSocket::desub_nowait(char *topic, int size, int port) { - return _desub_(topic, size, port, NULL, (int)SHM_MSG_NOWAIT); -} - - - -/** - * 鍙戝竷涓婚 - * @topic 涓婚 - * @content 涓婚鍐呭 - * @port 鎬荤嚎绔彛 - */ -int DModSocket::pub(char *topic, int topic_size, void *content, int content_size, int port){ - return _pub_(topic, topic_size, content, content_size, port, NULL, 0); -} -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int DModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){ - return _pub_( topic, topic_size, content, content_size, port, timeout, 0); -} -int DModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int port){ - return _pub_(topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT); -} - - -/** - * 鑾峰彇soket绔彛鍙� - */ -int DModSocket::get_port(){ - return shm_socket->port; -} - - - -// ============================================================================= -/** - * @port 鎬荤嚎绔彛 - */ -int DModSocket::_sub_(char *topic, int size, int port, - struct timespec *timeout, int flags) { - char buf[8192]; - int rv; - snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); - rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); - if(rv == 0) { - bus_set->insert(port); - } - return rv; -} - - -/** - * @port 鎬荤嚎绔彛 - */ -int DModSocket::_desub_(char *topic, int size, int port, - struct timespec *timeout, int flags) { - char buf[8192]; - if(topic == NULL) { - topic = ""; - } - snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); - return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags); -} - -/** - * @port 鎬荤嚎绔彛 - */ -int DModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int port, - struct timespec *timeout, int flags) { - int head_len; - char buf[8192+content_size]; - snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); - head_len = strlen(buf); - memcpy(buf+head_len, content, content_size); - return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags); - -} -/* - * 澶勭悊璁㈤槄 -*/ -void DModSocket::_proxy_sub( char *topic, int port) { - SHMKeySet *subscripter_set; - - SHMTopicSubMap::iterator map_iter; - SHMKeySet::iterator set_iter; -printf("_proxy_sub topic = %s\n", topic); - if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { - subscripter_set = map_iter->second; - } else { - void *set_ptr = mm_malloc(sizeof(SHMKeySet)); - subscripter_set = new(set_ptr) SHMKeySet; - topic_sub_map->insert({topic, subscripter_set}); - } - subscripter_set->insert(port); -} - -/* - * 澶勭悊鍙栨秷璁㈤槄 -*/ -void DModSocket::_proxy_desub( char *topic, int port) { - SHMKeySet *subscripter_set; - - SHMTopicSubMap::iterator map_iter; - // SHMKeySet::iterator set_iter; - - if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { - subscripter_set = map_iter->second; - - subscripter_set->erase(port); -printf("============ desub %d\n", port); - } -} - -/* - * 澶勭悊鍙栨秷鎵�鏈夎闃� -*/ -void DModSocket::_proxy_desub_all(int port) { - SHMKeySet *subscripter_set; - - SHMTopicSubMap::iterator map_iter; - // SHMKeySet::iterator set_iter; - for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) { - subscripter_set = map_iter->second; - subscripter_set->erase(port); -printf("============ desub %d\n", port); - } -} - -/* - * 澶勭悊鍙戝竷锛屼唬鐞嗚浆鍙� -*/ -void DModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port) { - SHMKeySet *subscripter_set; - - SHMTopicSubMap::iterator map_iter; - SHMKeySet::iterator set_iter; - - std::vector<int> subscripter_to_del; - std::vector<int>::iterator vector_iter; - - int send_port; - struct timespec timeout = {1,0}; - - if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) { - subscripter_set = map_iter->second; - for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { - send_port = *set_iter; - // printf("_proxy_pub send before %d \n", send_port); - if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_port, &timeout) == SHM_SOCKET_ECONNFAILED ) { - //瀵规柟宸插叧闂殑杩炴帴鏀惧埌寰呭垹闄ら槦鍒楅噷銆傚鏋滅洿鎺ュ垹闄や細璁﹊ter鎸囬拡鍑虹幇閿欎贡 - subscripter_to_del.push_back(send_port); - } else { -// printf("_proxy_pub send after: %d \n", send_port); - } - - - } - - // 鍒犻櫎宸插叧闂殑绔� - for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) { - if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) { - subscripter_set->erase(set_iter); - printf("remove closed subscripter %d \n", send_port); - } - } - subscripter_to_del.clear(); - - } -} - -void * DModSocket::run_pubsub_proxy() { - // pthread_detach(pthread_self()); - int size; - int port; - char * action, *topic, *topics, *buf; - size_t head_len; - - const char *topic_delim = ","; -// printf("run_pubsub_proxy server receive before\n"); - while(shm_recvfrom(shm_socket, (void **)&buf, &size, &port) == 0) { -//printf("run_pubsub_proxy server recv after: %s \n", buf); - if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) { -// printf("run_pubsub_proxy %s %s \n", action, topics); - if(strcmp(action, "sub") == 0) { - // 璁㈤槄鏀寔澶氫富棰樿闃� - topic = strtok(topics, topic_delim); -//printf("run_pubsub_proxy topic = %s\n", topic); - while(topic) { - _proxy_sub(trim(topic, 0), port); - topic = strtok(NULL, topic_delim); - } - - } else if(strcmp(action, "desub") == 0) { -printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); - if(strcmp(trim(topics, 0), "") == 0) { - // 鍙栨秷鎵�鏈夎闃� - printf("====鍙栨秷鎵�鏈夎闃匼n"); - _proxy_desub_all(port); - } else { - - topic = strtok(topics, topic_delim); - while(topic) { - _proxy_desub(trim(topic, 0), port); - topic = strtok(NULL, topic_delim); - } - } - - - - } else if(strcmp(action, "pub") == 0) { - _proxy_pub(topics, head_len, buf, size, port); - } - - free(action); - free(topics); - } else { - err_msg(0, "incorrect format msg"); - } - free(buf); - } - return NULL; -} - - -/** - * @str "<**sub**>{缁忔祹}" - */ - -int DModSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) { - char *ptr = str; - char *str_end_ptr = str + size; - char *action_start_ptr; - char *action_end_ptr; - size_t action_len = 0; - - char *topic_start_ptr; - char *topic_end_ptr; - size_t topic_len = 0; - - // if (strlen(identifier) > strlen(str)) { - // return 0; - // } - - if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) { - ptr += strlen(ACTION_LIDENTIFIER); - action_start_ptr = ptr; - while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) { - if(ptr >= str_end_ptr) { - return 0; - } - } -// printf("%s\n", ptr); - action_end_ptr = ptr; - action_len = action_end_ptr - action_start_ptr; - ptr += strlen(ACTION_RIDENTIFIER); -// printf("%s\n", ptr); -// printf("%s\n", str_end_ptr-1); - if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) { - topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER); - - - while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) { - if(ptr >= str_end_ptr) { - return 0; - } - } - topic_end_ptr = ptr; - topic_len = topic_end_ptr - topic_start_ptr; - - ptr += strlen(TOPIC_RIDENTIFIER); - - } else { - return 0; - } - } else { - return 0; - } - - char *topic = (char *)malloc(topic_len+1); - strncpy(topic, topic_start_ptr, topic_len); - *(topic+topic_len) = '\0'; - *_topic = topic; - - char *action = (char *)malloc(action_len+1); - strncpy(action, action_start_ptr, action_len); - *(action+action_len) = '\0'; - *_action = action; - *head_len = ptr-str; - - return 1; -} - - \ No newline at end of file diff --git a/src/socket/include/dgram_mod_socket.h b/src/socket/include/dgram_mod_socket.h deleted file mode 100644 index 2761ff8..0000000 --- a/src/socket/include/dgram_mod_socket.h +++ /dev/null @@ -1,131 +0,0 @@ -#ifndef __DGRAM_MOD_SOCKET_H__ -#define __DGRAM_MOD_SOCKET_H__ - - -#ifdef __cplusplus -extern "C" { -#endif - -/** - * 鍒犻櫎key瀵瑰簲鐨勫叡浜槦鍒楋紝骞跺湪bus閲屽垹闄よkey鐨勮闃� - */ -int dgram_mod_remove_key(int key); - -/** - * 鎵归噺鍒犻櫎key瀵瑰簲鐨勫叡浜槦鍒楋紝骞跺湪bus閲屽垹闄よkey鐨勮闃� - */ -int dgram_mod_remove_keys(int keys[], int length); - - -/** - * 鍒涘缓socket - * @return socket鍦板潃 -*/ -void *dgram_mod_open_socket(); - -/** - * 鍏抽棴socket - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_close_socket(void * _socket); - -/** - * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_bind(void * _socket, int port); - -/** - * 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_force_bind(void * _socket, int port); -/** - * 鍙戦�佷俊鎭� - * @port 鍙戦�佺粰璋� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ -int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port); -// 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_sendto_timeout(void *_socket, const void *buf, const int size, const int port, int sec, int nsec); -// 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� -int dgram_mod_sendto_nowait(void *_socket, const void *buf, const int size, const int port); - -/** - * 鎺ユ敹淇℃伅 - * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port); -// 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_recvfrom_timeout(void *_socket, void **buf, int *size, int *port, int sec, int nsec); -int dgram_mod_recvfrom_nowait(void *_socket, void **buf, int *size, int *port); - -/** - * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 - * @port 鍙戦�佺粰璋� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) ; -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, int sec, int nsec) ; -int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) ; - - -/** - * 鍚姩bus - * - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int dgram_mod_start_bus(void * _socket); - -/** - * 璁㈤槄鎸囧畾涓婚 - * @topic 涓婚 - * @size 涓婚闀垮害 - * @port 鎬荤嚎绔彛 - */ -int dgram_mod_sub(void * _socket, void *topic, int size, int port); -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec); -int dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port); - - -/** - * 鍙栨秷璁㈤槄鎸囧畾涓婚 - * @topic 涓婚,涓婚涓虹┖鏃跺彇娑堝叏閮ㄨ闃� - * @size 涓婚闀垮害 - * @port 鎬荤嚎绔彛 - */ -int dgram_mod_desub(void * _socket, void *topic, int size, int port); -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec); -int dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port); - -/** - * 鍙戝竷涓婚 - * @topic 涓婚 - * @content 涓婚鍐呭 - * @port 鎬荤嚎绔彛 - */ -int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port); -// 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 -int dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, int sec, int nsec); -int dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int port); - - -/** - * 鑾峰彇soket绔彛鍙� - */ -int dgram_mod_get_port(void * _socket) ; - - -/** - * 閲婃斁瀛樺偍鎺ユ敹淇℃伅鐨刡uf - */ -void dgram_mod_free(void *buf) ; -#ifdef __cplusplus -} -#endif - -#endif \ No newline at end of file diff --git a/src/socket/include/dmod_socket.h b/src/socket/include/dmod_socket.h deleted file mode 100644 index b57c6b3..0000000 --- a/src/socket/include/dmod_socket.h +++ /dev/null @@ -1,159 +0,0 @@ -#ifndef __DMODE_SOCKET_H__ -#define __DMODE_SOCKET_H__ -#include "usg_common.h" -#include "shm_socket.h" -#include "shm_allocator.h" -#include "mem_pool.h" -#include "hashtable.h" -#include "sem_util.h" -#include "logger_factory.h" -#include <set> - -#define ACTION_LIDENTIFIER "<**" -#define ACTION_RIDENTIFIER "**>" -#define TOPIC_LIDENTIFIER "{" -#define TOPIC_RIDENTIFIER "}" - -static Logger logger = LoggerFactory::getLogger(); -#define BUS_MAP_KEY 1 -//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString; -typedef std::set<int, std::less<int>, SHM_STL_Allocator<int> > SHMKeySet; -typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<SHMString, SHMKeySet *> > > SHMTopicSubMap; - -enum socket_mod_t -{ - PULL_PUSH = 1, - REQ_REP = 2, - PAIR = 3, - PUB_SUB = 4, - SURVEY = 5, - BUS = 6 - -}; - -class DModSocket { -private: - shm_socket_t *shm_socket; - socket_mod_t mod; - // pthread_t recv_thread; - // <涓婚锛� 璁㈤槄鑰�> - SHMTopicSubMap *topic_sub_map; - std::set<int> *bus_set; - -private: - inline int _recvfrom_(void **buf, int *size, int *port, struct timespec *timeout, int flags); - void _proxy_sub( char *topic, int port); - void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port); - void *run_pubsub_proxy(); - int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); - int _sub_( char *topic, int size, int port, struct timespec *timeout, int flags); - int _pub_( char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags); - - void _proxy_desub( char *topic, int port); - void _proxy_desub_all(int port); - int _desub_( char *topic, int size, int port, struct timespec *timeout, int flags); - - static void foreach_subscripters(std::function<void(SHMKeySet *, int)> cb); - static bool include_in_keys(int key, int keys[], size_t length); - static size_t remove_subscripters(int keys[], size_t length) ; -public: - static size_t remove_keys(int keys[], size_t length); -public: - DModSocket(); - ~DModSocket(); - - - /** - * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ - int bind(int port); - - /** - * 寮哄埗缁戝畾绔彛鍒皊ocket, 閫傜敤浜庣▼搴忛潪姝e父鍏抽棴鐨勬儏鍐典笅锛岄噸鍚▼搴忕粦瀹氬師鏉ヨ繕娌¢噴鏀剧殑key - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ - int force_bind(int port); - /** - * 鍙戦�佷俊鎭� - * @port 鍙戦�佺粰璋� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ - int sendto(const void *buf, const int size, const int port); - // 鍙戦�佷俊鎭秴鏃惰繑鍥炪�� @sec 绉� 锛� @nsec 绾崇 - int sendto_timeout(const void *buf, const int size, const int port, const struct timespec *timeout); - // 鍙戦�佷俊鎭珛鍒昏繑鍥炪�� - int sendto_nowait(const void *buf, const int size, const int port); - - /** - * 鎺ユ敹淇℃伅 - * @port 浠庤皝鍝噷鏀跺埌鐨勪俊鎭� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ - int recvfrom(void **buf, int *size, int *port); - // 鎺ュ彈淇℃伅瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 - int recvfrom_timeout(void **buf, int *size, int *port, struct timespec *timeout); - int recvfrom_nowait(void **buf, int *size, int *port); - - /** - * 鍙戦�佽姹備俊鎭苟绛夊緟鎺ユ敹搴旂瓟 - * @port 鍙戦�佺粰璋� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ - int sendandrecv(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; - // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 - int sendandrecv_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, struct timespec *timeout) ; - int sendandrecv_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; - - - /** - * 鍚姩bus - * - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ - int start_bus(); - - /** - * 璁㈤槄鎸囧畾涓婚 - * @topic 涓婚 - * @size 涓婚闀垮害 - * @port 鎬荤嚎绔彛 - */ - int sub(char *topic, int size, int port); - // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 - int sub_timeout(char *topic, int size, int port, struct timespec *timeout); - int sub_nowait(char *topic, int size, int port); - - - /** - * 鍙栨秷璁㈤槄鎸囧畾涓婚 - * @topic 涓婚,涓婚涓虹┖鏃跺彇娑堝叏閮ㄨ闃� - * @size 涓婚闀垮害 - * @port 鎬荤嚎绔彛 - */ - int desub( char *topic, int size, int port); - // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 - int desub_timeout(char *topic, int size, int port, struct timespec *timeout); - int desub_nowait(char *topic, int size, int port) ; - - /** - * 鍙戝竷涓婚 - * @topic 涓婚 - * @content 涓婚鍐呭 - * @port 鎬荤嚎绔彛 - */ - int pub(char *topic, int topic_size, void *content, int content_size, int port); - // 瓒呮椂杩斿洖銆� @sec 绉� 锛� @nsec 绾崇 - int pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout); - int pub_nowait(char *topic, int topic_size, void *content, int content_size, int port); - - - /** - * 鑾峰彇soket绔彛鍙� - */ - int get_port() ; - - -}; - -#endif \ No newline at end of file diff --git a/src/socket/include/mod_socket.h b/src/socket/include/mod_socket.h deleted file mode 100644 index 2fd44cc..0000000 --- a/src/socket/include/mod_socket.h +++ /dev/null @@ -1,77 +0,0 @@ -#ifndef __MOD_SOCKET_H__ -#define __MOD_SOCKET_H__ - - -#ifdef __cplusplus -extern "C" { -#endif - -enum socket_mod_t -{ - PULL_PUSH = 1, - REQ_REP = 2, - PAIR = 3, - PUB_SUB = 4, - SURVEY = 5, - BUS = 6 - -}; - -/** - * 鍒涘缓socket - * @return socket鍦板潃 -*/ -void *mod_open_socket(int mod); - -/** - * 鍏抽棴socket - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int mod_close_socket(void * _socket); - -/** - * 缁戝畾绔彛鍒皊ocket, 濡傛灉涓嶇粦瀹氬垯绯荤粺鑷姩鍒嗛厤涓�涓� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int mod_socket_bind(void * _socket, int port); - - -/** - * 鏈嶅姟绔紑鍚繛鎺ョ洃鍚� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ -int mod_listen(void * _socket); - -/** - * 瀹㈡埛绔彂璧疯繛鎺ヨ姹� - */ -int mod_connect(void * _socket, int port); - -/** - * 鍙戦�佷俊鎭� - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 - */ -int mod_send(void * _socket, const void *buf, const int size); - -/** - * 鎺ユ敹淇℃伅 - * @return 0 鎴愬姛锛� 鍏朵粬鍊� 澶辫触鐨勯敊璇爜 -*/ -int mod_recv(void * _socket, void **buf, int *size) ; - -/** - * 閲婃斁瀛樺偍鎺ユ敹淇℃伅鐨刡uf - */ -void mod_free(void *buf); - - -/** - * 鑾峰彇soket绔彛鍙� - */ -int mod_get_socket_port(void * _socket); - -#ifdef __cplusplus -} -#endif - -#endif \ No newline at end of file diff --git a/src/socket/include/shm_socket.h b/src/socket/include/shm_socket.h deleted file mode 100644 index fd67d9c..0000000 --- a/src/socket/include/shm_socket.h +++ /dev/null @@ -1,104 +0,0 @@ -#ifndef __SHM_SOCKET_H__ -#define __SHM_SOCKET_H__ - -#include "usg_common.h" -#include "usg_typedef.h" -#include "shm_queue.h" - - - -enum shm_msg_type_t -{ - SHM_SOCKET_OPEN = 1, - SHM_SOCKET_OPEN_REPLY = 2, - SHM_SOCKET_CLOSE = 3, - SHM_COMMON_MSG = 4 - -}; - -enum shm_socket_flag_t -{ - SHM_MSG_TIMEOUT = 1, - SHM_MSG_NOWAIT = 2 -}; - -enum shm_socket_type_t -{ - SHM_SOCKET_STREAM = 1, - SHM_SOCKET_DGRAM = 2 - -}; - -enum shm_socket_error_type_t { - SHM_SOCKET_ECONNFAILED = 1, - SHM_SOCKET_ETIMEOUT = 2 -}; - -enum shm_connection_status_t { - SHM_CONN_CLOSED=1, - SHM_CONN_LISTEN=2, - SHM_CONN_ESTABLISHED=3 -}; - -typedef struct shm_msg_t { - int port; - shm_msg_type_t type; - size_t size; - void * buf; - -} shm_msg_t; - - - - -typedef struct shm_socket_t { - shm_socket_type_t socket_type; - // 鏈湴port - int port; - bool force_bind; - shm_connection_status_t status; - SHMQueue<shm_msg_t> *queue; - SHMQueue<shm_msg_t> *remoteQueue; - LockFreeQueue<shm_msg_t, DM_Allocator> *messageQueue; - LockFreeQueue<shm_msg_t, DM_Allocator> *acceptQueue; - std::map<int, shm_socket_t* > *clientSocketMap; - pthread_t dispatch_thread; - -} shm_socket_t; - - - -size_t shm_socket_remove_keys(int keys[], size_t length); - -shm_socket_t *shm_open_socket(shm_socket_type_t socket_type); - - -int shm_close_socket(shm_socket_t * socket) ; - - -int shm_socket_bind(shm_socket_t * socket, int port) ; - -int shm_socket_force_bind(shm_socket_t * socket, int port) ; - - -int shm_listen(shm_socket_t * socket) ; - -shm_socket_t* shm_accept(shm_socket_t* socket); - -int shm_connect(shm_socket_t * socket, int port); - -int shm_send(shm_socket_t * socket, const void *buf, const int size) ; - - -int shm_recv(shm_socket_t * socket, void **buf, int *size) ; - -int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec * timeout = NULL, const int flags=0); - -int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port, struct timespec * timeout = NULL, int flags=0); - -int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, - struct timespec * timeout = NULL, int flags=0); - - - -#endif \ No newline at end of file diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c index 2890401..97e4fea 100644 --- a/src/socket/net_mod_server_socket.c +++ b/src/socket/net_mod_server_socket.c @@ -1,11 +1,10 @@ #include "net_mod_server_socket.h" #include "socket_io.h" +#include "net_mod_socket_io.h" -NetModServerSocket::NetModServerSocket(int port, ShmModSocket * modsocket) +NetModServerSocket::NetModServerSocket(int port, ShmModSocket * modsocket): shm_mod_socket(modsocket) { - shm_mod_socket = modsocket; - char portstr[32]; sprintf(portstr, "%d", port); listenfd = Open_listenfd(portstr); @@ -81,6 +80,8 @@ } /* $end add_client */ + + /* $begin check_clients */ void NetModServerSocket::check_clients() { @@ -97,10 +98,11 @@ if ((connfd > 0) && (FD_ISSET(connfd, &pool.ready_set))) { pool.nready--; - if ((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) + if ((n = rio_readpkgb(&rio, buf, MAXLINE)) > 0) { Rio_writen(connfd, buf, n); + Rio_writen(connfd, PKG_SEP, strlen(PKG_SEP)); // shm_mod_socket->sendto(buf, n, msg->key); // net_mod_msg_t *msg = (net_mod_msg_t*)buf; // if(msg.mod == PUB_SUB) { diff --git a/src/socket/net_mod_server_socket.h b/src/socket/net_mod_server_socket.h index 9fe83b2..d908180 100644 --- a/src/socket/net_mod_server_socket.h +++ b/src/socket/net_mod_server_socket.h @@ -37,7 +37,7 @@ public: - NetModServerSocket(int port, ShmModSocket *shm_mod_socket); + NetModServerSocket(int port, ShmModSocket *_shm_mod_socket); void start(); ~NetModServerSocket(); diff --git a/src/socket/net_mod_socket.c b/src/socket/net_mod_socket.c index 572fe21..e1f71f0 100644 --- a/src/socket/net_mod_socket.c +++ b/src/socket/net_mod_socket.c @@ -1,5 +1,6 @@ #include "net_mod_socket.h" #include "socket_io.h" +#include "net_mod_socket_io.h" NetModSocket::NetModSocket(const char *host, int port) { @@ -12,10 +13,11 @@ ssize_t NetModSocket::send(void *buf, size_t size) { int n = rio_writen(clientfd, buf, size); + rio_writen(clientfd, PKG_SEP, strlen(PKG_SEP)); char resp[MAXLINE]; int ss; - ss = rio_readlineb(&rio, resp, MAXLINE); + ss = rio_readpkgb(&rio, resp, MAXLINE); puts(resp); return n; } diff --git a/src/socket/net_mod_socket_io.c b/src/socket/net_mod_socket_io.c new file mode 100644 index 0000000..a4dbf0d --- /dev/null +++ b/src/socket/net_mod_socket_io.c @@ -0,0 +1,51 @@ +#include "net_mod_socket_io.h" +#include "socket_io.h" + + +ssize_t rio_readpkgb(rio_t *rp, char *usrbuf, size_t maxlen) +{ + int n, rc; + char c; + char *bufp = usrbuf; + int pkg_sep_i = 0; + int pkg_sep_len = strlen(PKG_SEP); + const char * pkg_sep = PKG_SEP; + + for (n = 0; n < maxlen; n++) + { + if ((rc = rio_readnb(rp, &c, 1)) == 1) + { + + *bufp++ = c; + + if(c == *(pkg_sep + pkg_sep_i)) { + pkg_sep_i++; + if(pkg_sep_i == pkg_sep_len) { + + break; + } + } else { + + pkg_sep_i = 0; + } + + } + else if (rc == 0) + { + if (n == 0) + return 0; /* EOF, no data read */ + else + break; /* EOF, some data was read */ + } + else + return -1; /* Error */ + } + + if(pkg_sep_i == pkg_sep_len) { + *(bufp - pkg_sep_len) = 0; + return n - pkg_sep_len; + } else { + return -1; + } + +} \ No newline at end of file diff --git a/src/socket/net_mod_socket_io.h b/src/socket/net_mod_socket_io.h new file mode 100644 index 0000000..cfe153d --- /dev/null +++ b/src/socket/net_mod_socket_io.h @@ -0,0 +1,10 @@ +#ifndef __NET_MODE_SOCKET_IO_H__ +#define __NET_MODE_SOCKET_IO_H__ +#include "socket_io.h" +#include "usg_common.h" + +#define PKG_SEP "\r\n\r\n" + +ssize_t rio_readpkgb(rio_t *rp, char *usrbuf, size_t maxlen); + +#endif \ No newline at end of file diff --git a/src/util/include/sem_util.h b/src/util/include/sem_util.h deleted file mode 100644 index 5d2cf77..0000000 --- a/src/util/include/sem_util.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef PCSEM_H -#define PCSEM_H - -#include "usg_common.h" -#include "usg_typedef.h" - -namespace SemUtil { - - int get(key_t key, unsigned int value); - int dec(int semId); - int dec_nowait(int semId); - int dec_timeout(const int semId, const struct timespec * timeout); - int inc(int semId); - void remove(int semid); - - void set(int semId, int val); - -} - -#endif diff --git a/src/util/sem_util.c b/src/util/sem_util.c deleted file mode 100644 index bc8c4f4..0000000 --- a/src/util/sem_util.c +++ /dev/null @@ -1,150 +0,0 @@ -#include "sem_util.h" -#include "logger_factory.h" - -static Logger logger = LoggerFactory::getLogger(); - -int SemUtil::get(key_t key, unsigned int value) { -// printf("==================SemUtil::get===============================\n"); - int semid, perms; - - perms = S_IRUSR | S_IWUSR; - - semid = semget(key, 1, IPC_CREAT | IPC_EXCL | perms); - - if (semid != -1) { /* Successfully created the semaphore */ - union semun arg; - struct sembuf sop; - - //logger.info("%ld: created semaphore\n", (long)getpid()); - - arg.val = 0; /* So initialize it to 0 */ - if (semctl(semid, 0, SETVAL, arg) == -1) - err_exit(errno, "semctl 1"); - //logger.info("%ld: initialized semaphore\n", (long)getpid()); - - /* Perform a "no-op" semaphore operation - changes sem_otime - so other processes can see we've initialized the set. */ - - sop.sem_num = 0; /* Operate on semaphore 0 */ - sop.sem_op = value; - sop.sem_flg = 0; - if (semop(semid, &sop, 1) == -1) - err_exit(errno, "semop"); - //logger.info("%ld: completed dummy semop()\n", (long)getpid()); - - } else { /* We didn't create the semaphore set */ - - if (errno != EEXIST) { /* Unexpected error from semget() */ - err_exit(errno, "semget 1"); - - } else { /* Someone else already created it */ - const int MAX_TRIES = 10; - int j; - union semun arg; - struct semid_ds ds; - - semid = semget(key, 1, perms); /* So just get ID */ - if (semid == -1) - err_exit(errno, "semget 2"); - - // logger.info("%ld: got semaphore key\n", (long)getpid()); - /* Wait until another process has called semop() */ - - arg.buf = &ds; - for (j = 0; j < MAX_TRIES; j++) { - //logger.info("Try %d\n", j); - if (semctl(semid, 0, IPC_STAT, arg) == -1) - err_exit(errno, "semctl 2"); - - if (ds.sem_otime != 0) /* Semop() performed? */ - break; /* Yes, quit loop */ - sleep(1); /* If not, wait and retry */ - } - - if (ds.sem_otime == 0) /* Loop ran to completion! */ - err_exit(errno, "Existing semaphore not initialized"); - } - } - return semid; -} - -/* Reserve semaphore (blocking), return 0 on success, or -1 with 'errno' - set to EINTR if operation was interrupted by a signal handler */ - -/* Reserve semaphore - decrement it by 1 */ -int SemUtil::dec(int semId) { -// logger.debug("%d: SemUtil::dec\n", semId); - struct sembuf sops; - - sops.sem_num = 0; - sops.sem_op = -1; - sops.sem_flg = 0; - - while (semop(semId, &sops, 1) == -1) - if (errno != EINTR) { - err_msg(errno, "SemUtil::dec"); - return -1; - } - - return 0; -} - -int SemUtil::dec_nowait(int semId) { - struct sembuf sops; - - sops.sem_num = 0; - sops.sem_op = -1; - sops.sem_flg = IPC_NOWAIT; - - while (semop(semId, &sops, 1) == -1) - if (errno != EINTR) { - err_msg(errno, "SemUtil::dec_nowait"); - return -1; - } - - return 0; -} - -int SemUtil::dec_timeout(const int semId, const struct timespec *timeout) { - struct sembuf sops; - - sops.sem_num = 0; - sops.sem_op = -1; - sops.sem_flg = 0; - - while (semtimedop(semId, &sops, 1, timeout) == -1) - if (errno != EINTR) { - // err_msg(errno, "SemUtil::dec_timeout"); - return -1; - } - - return 0; -} - -/* Release semaphore - increment it by 1 */ -int SemUtil::inc(int semId) { - struct sembuf sops; - - sops.sem_num = 0; - sops.sem_op = 1; - sops.sem_flg = 0; - - int rv = semop(semId, &sops, 1); - if (rv == -1) { - err_msg(errno, "SemUtil::inc"); - } - return rv; -} - -void SemUtil::remove(int semid) { - union semun dummy; - if (semctl(semid, 0, IPC_RMID, dummy) == -1) - err_msg(errno, "SemUtil::remove"); -} - -void SemUtil::set(int semId, int val) { - union semun arg; - arg.val = val; - if (semctl(semId, 0, SETVAL, arg) == -1) - err_msg(errno, "SemUtil::set"); -} diff --git a/src/util/sem_util.h b/src/util/sem_util.h deleted file mode 100644 index 5d2cf77..0000000 --- a/src/util/sem_util.h +++ /dev/null @@ -1,20 +0,0 @@ -#ifndef PCSEM_H -#define PCSEM_H - -#include "usg_common.h" -#include "usg_typedef.h" - -namespace SemUtil { - - int get(key_t key, unsigned int value); - int dec(int semId); - int dec_nowait(int semId); - int dec_timeout(const int semId, const struct timespec * timeout); - int inc(int semId); - void remove(int semid); - - void set(int semId, int val); - -} - -#endif diff --git a/test_net_socket/test_net_mod_socket.c b/test_net_socket/test_net_mod_socket.c index a065469..2d9c94e 100644 --- a/test_net_socket/test_net_mod_socket.c +++ b/test_net_socket/test_net_mod_socket.c @@ -1,11 +1,16 @@ #include "net_mod_server_socket.h" #include "net_mod_socket.h" #include "shm_mm.h" +#include "dgram_mod_socket.h" void server() { - ShmModSocket * m_socket = new ShmModSocket(); - NetModServerSocket serverSocket(5000, m_socket); - serverSocket.start(); + // void *socket = dgram_mod_open_socket(); + // ShmModSocket tt; + ShmModSocket * m_socket = new ShmModSocket; + // NetModServerSocket(5000, NULL); + + NetModServerSocket *serverSocket = new NetModServerSocket(5000, m_socket); + serverSocket->start(); } void client(){ @@ -20,6 +25,7 @@ int main(int argc, char *argv[]) { shm_init(512); + if (argc < 2) { fprintf(stderr, "Usage: %s %s|%s\n", argv[0], "server", "client"); return 1; @@ -31,4 +37,7 @@ if (strcmp("client", argv[1]) == 0) client(); -} \ No newline at end of file +} + + + -- Gitblit v1.8.0