| | |
| | | ./shm/shm_mm_wrapper.cpp |
| | | ./shm/mm.cpp |
| | | ./shm/hashtable.cpp |
| | | ./shm/shm_mm.cpp |
| | | |
| | | ) |
| | | |
| | | if (BUILD_SHARED_LIBS) |
| | | add_library(shm_queue SHARED ${_SOURCES_}) |
| | | else() |
| | | add_library(shm_queue SHARED ${_SOURCES_}) |
| | | add_library(shm_queue STATIC ${_SOURCES_}) |
| | | endif() |
| | | |
| | | # STATIC |
| | |
| | | # install rules |
| | | install(TARGETS shm_queue DESTINATION lib) |
| | | install(FILES |
| | | ./socket/socket_def.h |
| | | ./socket/bus_server_socket.h |
| | | ./socket/shm_socket.h |
| | | ./socket/shm_mod_socket.h |
| | | ./socket/bus_server_socket_wrapper.h |
| | | ./psem.h |
| | | ./key_def.h |
| | | ./time_util.h |
| | | ./futex_sem.h |
| | | ./bus_error.h |
| | | ./bus_def.h |
| | | ./sole.h |
| | | ./logger_factory.h |
| | | ./queue/linked_lock_free_queue.h |
| | | ./queue/array_lock_free_queue.h |
| | | ./queue/shm_queue.h |
| | | ./queue/array_lock_free_sem_queue.h |
| | | ./queue/lock_free_queue.h |
| | | ./svsem.h |
| | | ./net/net_conn_pool.h |
| | | ./net/net_mod_socket.h |
| | | ./net/net_mod_server_socket_wrapper.h |
| | | ./net/net_mod_socket_io.h |
| | | ./net/net_mod_server_socket.h |
| | | ./net/net_mod_socket_wrapper.h |
| | | ./shm/hashtable.h |
| | | ./shm/mem_pool.h |
| | | ./shm/mm.h |
| | | ./shm/shm_mm_wrapper.h |
| | | ./shm/shm_allocator.h |
| | | ./socket/socket_def.h |
| | | ./socket/bus_server_socket.h |
| | | ./socket/shm_socket.h |
| | | ./socket/shm_mod_socket.h |
| | | ./socket/bus_server_socket_wrapper.h |
| | | ./psem.h |
| | | ./pread_write_lock.h |
| | | ./key_def.h |
| | | ./time_util.h |
| | | ./sv_read_write_lock.h |
| | | ./futex_sem.h |
| | | ./bus_error.h |
| | | ./bus_def.h |
| | | ./logger_factory.h |
| | | ./sole.h |
| | | ./queue/linked_lock_free_queue.h |
| | | ./queue/array_lock_free_queue.h |
| | | ./queue/shm_queue.h |
| | | ./queue/array_lock_free_sem_queue.h |
| | | ./queue/lock_free_queue.h |
| | | ./svsem.h |
| | | ./net/net_conn_pool.h |
| | | ./net/net_mod_socket.h |
| | | ./net/net_mod_server_socket_wrapper.h |
| | | ./net/net_mod_socket_io.h |
| | | ./net/net_mod_server_socket.h |
| | | ./net/net_mod_socket_wrapper.h |
| | | ./shm/hashtable.h |
| | | ./shm/mm.h |
| | | ./shm/shm_mm_wrapper.h |
| | | ./shm/shm_allocator.h |
| | | ./shm/shm_mm.h |
| | | |
| | | |
| | | DESTINATION include) |
| | | |
| | |
| | | #ifndef _KEY_DEF_H_ |
| | | #define _KEY_DEF_H_ |
| | | |
| | | // bus中主题与订阅者对应关系的map |
| | | #define SHM_BUS_MAP_KEY 1 |
| | | |
| | | // 队列状态标记的map |
| | | #define SHM_QUEUE_ST_KEY 3 |
| | | |
| | | // BUS key |
| | | #define SHM_BUS_KEY 8 |
| | | |
| | | // 网络代理key |
| | | #define SHM_NET_PROXY_KEY 99 |
| | | |
| | |
| | | |
| | | |
| | | NetModSocket::~NetModSocket() { |
| | | int s; |
| | | // int s; |
| | | // delete gpool; |
| | | // s = pthread_mutex_destroy(&sendMutex); |
| | | if(s != 0) { |
| | | err_exit(s, "shm_socket_close"); |
| | | } |
| | | // if(s != 0) { |
| | | // err_exit(s, "shm_socket_close"); |
| | | // } |
| | | } |
| | | |
| | | |
| | |
| | | * 关闭 |
| | | */ |
| | | void net_mod_socket_close(void *_socket) { |
| | | // NetModSocket *sockt = (NetModSocket *)_socket; |
| | | // delete sockt; |
| | | NetModSocket *sockt = (NetModSocket *)_socket; |
| | | delete sockt; |
| | | } |
| | | |
| | | int net_mod_socket_stop(void *_socket) { |
| | |
| | | #include <assert.h> // assert() |
| | | #include <sched.h> // sched_yield() |
| | | #include "logger_factory.h" |
| | | #include "mem_pool.h" |
| | | #include "shm_mm.h" |
| | | #include "shm_allocator.h" |
| | | |
| | | /// @brief implementation of an array based lock free queue with support for |
| | |
| | | #include <assert.h> // assert() |
| | | #include <sched.h> // sched_yield() |
| | | #include "logger_factory.h" |
| | | #include "mem_pool.h" |
| | | #include "shm_mm.h" |
| | | #include "shm_allocator.h" |
| | | #include "futex_sem.h" |
| | | #include "time_util.h" |
| | |
| | | |
| | | #include <usg_common.h> |
| | | #include <assert.h> // assert() |
| | | #include "mem_pool.h" |
| | | #include "shm_mm.h" |
| | | #include "sem_util.h" |
| | | #include "logger_factory.h" |
| | | #include "shm_allocator.h" |
| | |
| | | |
| | | // default Queue size |
| | | #define LOCK_FREE_Q_DEFAULT_SIZE 16 |
| | | |
| | | |
| | | #define LOCK_FREE_Q_ST_OPENED 0 |
| | | |
| | | #define LOCK_FREE_Q_ST_CLOSED 1 |
| | | |
| | | // static Logger *logger = LoggerFactory::getLogger(); |
| | | // define this macro if calls to "size" must return the real size of the |
| | |
| | | sem_t items; |
| | | |
| | | time_t createTime; |
| | | time_t closeTime; |
| | | int status; |
| | | |
| | | public: |
| | | // sem_t mutex; |
| | | |
| | | |
| | | LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | |
| | | /// Note it is not virtual since it is not expected to inherit from this |
| | | /// template |
| | | ~LockFreeQueue(); |
| | | |
| | | inline void close(); |
| | | |
| | | // std::atomic_uint reference; |
| | | /// @brief constructor of the class |
| | |
| | | |
| | | inline ELEM_T &operator[](unsigned i); |
| | | |
| | | |
| | | |
| | | time_t getCreateTime() { |
| | | return createTime; |
| | | } |
| | | |
| | | time_t getCloseTime() { |
| | | return closeTime; |
| | | } |
| | | |
| | | int getStatus() { |
| | | return status; |
| | | } |
| | | |
| | | /// @brief push an element at the tail of the queue |
| | |
| | | err_exit(errno, "LockFreeQueue sem_init"); |
| | | |
| | | createTime = time(NULL); |
| | | status = LOCK_FREE_Q_ST_OPENED; |
| | | |
| | | } |
| | | |
| | | |
| | | template< |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() { |
| | | status = LOCK_FREE_Q_ST_CLOSED; |
| | | closeTime = time(NULL); |
| | | } |
| | | |
| | | |
| | |
| | | if (sem_destroy(&items) == -1) { |
| | | err_exit(errno, "LockFreeQueue sem_destroy"); |
| | | } |
| | | // if (sem_destroy(&mutex) == -1) { |
| | | // err_exit(errno, "LockFreeQueue sem_destroy"); |
| | | // } |
| | | |
| | | } |
| | | |
| | | template< |
| | |
| | | |
| | | ELEM_T &operator[](unsigned i); |
| | | |
| | | // @deprecate |
| | | static size_t remove_queues_exclude(int keys[], size_t length); |
| | | |
| | | private: |
| | | protected: |
| | |
| | | SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src); |
| | | }; |
| | | |
| | | // @deprecate |
| | | // template <typename ELEM_T> |
| | | // size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) { |
| | | // hashtable_t *hashtable = mm_get_hashtable(); |
| | | // std::set<int> *keyset = hashtable_keyset(hashtable); |
| | | // std::set<int>::iterator keyItr; |
| | | // LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; |
| | | // bool found; |
| | | // size_t count = 0; |
| | | // for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | // found = false; |
| | | // for (size_t i = 0; i < length; i++) { |
| | | // if (*keyItr == keys[i]) { |
| | | // found = true; |
| | | // break; |
| | | // } |
| | | // } |
| | | // if (!found && *keyItr > 100) { |
| | | // // 销毁共享内存的queue |
| | | // mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); |
| | | // delete mqueue; |
| | | // hashtable_remove(hashtable, *keyItr); |
| | | // count++; |
| | | // } |
| | | // } |
| | | // delete keyset; |
| | | // return count; |
| | | // } |
| | | |
| | | |
| | | |
| | | |
| | |
| | | |
| | | template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() { |
| | | LoggerFactory::getLogger()->debug("SHMQueue destroy"); |
| | | if(owner) { |
| | | delete queue; |
| | | hashtable_remove(hashtable, mkey); |
| | | } |
| | | |
| | | |
| | | |
| | | } |
| | |
| | | #ifndef __SHM_ALLOCATOR_H__ |
| | | #define __SHM_ALLOCATOR_H__ |
| | | #include "usg_common.h" |
| | | #include "mem_pool.h" |
| | | #include "mm.h" |
| | | #include <new> |
| | | #include <cstdlib> // for exit() |
| | | #include <climits> // for UNIX_MAX |
| | |
| | | public: |
| | | static void *allocate (size_t size) { |
| | | return mm_malloc(size); |
| | | // return mem_pool_malloc(size); |
| | | // return shm_mm_malloc(size); |
| | | } |
| | | |
| | | static void deallocate (void *ptr) { |
| | | return mm_free(ptr); |
| | | // return mem_pool_free(ptr); |
| | | // return shm_mm_free(ptr); |
| | | } |
| | | }; |
| | | |
New file |
| | |
| | | #include "shm_mm.h" |
| | | #include "mm.h" |
| | | #include "sem_util.h" |
| | | |
| | | |
| | | void shm_mm_init(size_t heap_size) { |
| | | mm_init(heap_size); |
| | | shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); |
| | | } |
| | | |
| | | void shm_mm_destroy(void) { |
| | | mm_destroy(); |
| | | |
| | | } |
| | | |
| | | void *shm_mm_malloc (size_t size) { |
| | | return mm_malloc(size); |
| | | } |
| | | |
| | | |
| | | void shm_mm_free (void *ptr) { |
| | | mm_free(ptr); |
| | | } |
| | | |
| | | |
| | | |
| | | void shm_mm_free_by_key(int key) { |
| | | return mm_free_by_key(key); |
| | | } |
| | | |
| | | |
| | | void *shm_mm_realloc (void *ptr, size_t size) { |
| | | return mm_realloc(ptr, size); |
| | | } |
| | | |
| | | int shm_mm_alloc_key() { |
| | | |
| | | return mm_alloc_key(); |
| | | } |
New file |
| | |
| | | #ifndef __SHM_MM_H__ |
| | | #define __SHM_MM_H__ |
| | | #include "usg_common.h" |
| | | #include "shm_allocator.h" |
| | | #include "key_def.h" |
| | | |
| | | #define SHM_QUEUE_ST_OPENED 0 |
| | | #define SHM_QUEUE_ST_CLOSED 1 |
| | | #define SHM_QUEUE_ST_RECYCLED 2 |
| | | |
| | | struct shm_queue_status_t { |
| | | |
| | | int status; |
| | | time_t createTime; |
| | | time_t closeTime; |
| | | }; |
| | | |
| | | typedef std::map<int, shm_queue_status_t, std::less<int>, SHM_STL_Allocator<std::pair<const int, shm_queue_status_t> > > ShmQueueStMap; |
| | | |
| | | |
| | | void shm_mm_init(size_t heap_size) ; |
| | | |
| | | void shm_mm_destroy(void) ; |
| | | |
| | | void *shm_mm_malloc (size_t size); |
| | | |
| | | void shm_mm_free (void *ptr); |
| | | |
| | | |
| | | |
| | | template <typename T> |
| | | T* shm_mm_attach(int key) { |
| | | void *ptr; |
| | | // T* tptr; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | ptr = hashtable_get(hashtable, key); |
| | | // printf("shm_mm_malloc_by_key malloc before %d, %p\n", key, ptr); |
| | | if(ptr == NULL || ptr == (void *)1 ) { |
| | | ptr = mm_malloc(sizeof(T)); |
| | | hashtable_put(hashtable, key, ptr); |
| | | new(ptr) T; |
| | | // printf("shm_mm_malloc_by_key use new %d, %p\n", key, ptr); |
| | | } |
| | | return (T*)ptr; |
| | | } |
| | | |
| | | void shm_mm_free_by_key(int key) ; |
| | | |
| | | |
| | | void *shm_mm_realloc (void *ptr, size_t size); |
| | | |
| | | int shm_mm_alloc_key(); |
| | | |
| | | #endif |
| | |
| | | #include "shm_mm_wrapper.h" |
| | | #include "mem_pool.h" |
| | | #include "shm_mm.h" |
| | | #include "hashtable.h" |
| | | #include "lock_free_queue.h" |
| | | #include "shm_socket.h" |
| | | |
| | | #define BUFFER_TIME 10 |
| | | #define BUFFER_TIME 1 |
| | | |
| | | |
| | | void shm_mm_wrapper_init(int size) { |
| | | mem_pool_init(size); |
| | | shm_mm_init(size); |
| | | |
| | | } |
| | | |
| | | void shm_mm_wrapper_destroy() { |
| | | mem_pool_destroy(); |
| | | shm_mm_destroy(); |
| | | } |
| | | |
| | | int shm_mm_wrapper_alloc_key() { |
| | | return mm_alloc_key(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 回收假删除的key |
| | | */ |
| | | int shm_mm_wrapper_start_resycle() { |
| | | ShmQueueStMap * shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | LockFreeQueue<shm_packet_t> *mqueue; |
| | | while(true) { |
| | | for(auto it = shmQueueStMap->begin(); it != shmQueueStMap->end(); ++it ) { |
| | | if(it->second.status == SHM_QUEUE_ST_CLOSED && difftime(time(NULL), it->second.closeTime) > BUFFER_TIME ) { |
| | | // mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]); |
| | | // if(mqueue != NULL) { |
| | | // delete mqueue; |
| | | // } |
| | | |
| | | hashtable_remove(hashtable, it->first); |
| | | printf("reomved queue %d\n\n", it->first); |
| | | it->second.status = SHM_QUEUE_ST_RECYCLED; |
| | | // 不能 erase ,否则会出现多进程之间的同步问题, 而这正是这里要解决的问题 |
| | | // it = shmQueueStMap->erase(it); |
| | | // continue; |
| | | } |
| | | } |
| | | |
| | | sleep(1); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | | //删除包含在keys内的queue |
| | |
| | | int count = 0; |
| | | for(int i = 0; i< length; i++) { |
| | | // 销毁共享内存的queue |
| | | mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]); |
| | | if(mqueue == NULL) { |
| | | continue; |
| | | } |
| | | if(difftime(time(NULL), mqueue->getCreateTime()) > BUFFER_TIME ) { |
| | | delete mqueue; |
| | | hashtable_remove(hashtable, keys[i]); |
| | | LoggerFactory::getLogger()->debug("remove queue %d", keys[i]); |
| | | count++; |
| | | } |
| | | hashtable_remove(hashtable, keys[i]); |
| | | LoggerFactory::getLogger()->debug("remove queue %d", keys[i]); |
| | | count++; |
| | | |
| | | } |
| | | return count; |
| | |
| | | // 100内的是bus内部自己用的 |
| | | if (!found && *keyItr > 100) { |
| | | // 销毁共享内存的queue |
| | | mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, *keyItr); |
| | | if(difftime(time(NULL), mqueue->getCreateTime()) > BUFFER_TIME ) { |
| | | delete mqueue; |
| | | hashtable_remove(hashtable, *keyItr); |
| | | LoggerFactory::getLogger()->debug("remove queue %d", *keyItr); |
| | | count++; |
| | | } |
| | | hashtable_remove(hashtable, *keyItr); |
| | | LoggerFactory::getLogger()->debug("remove queue %d", *keyItr); |
| | | count++; |
| | | |
| | | } |
| | | } |
| | |
| | | * |
| | | */ |
| | | |
| | | #ifndef __SHM_MM_H__ |
| | | #define __SHM_MM_H__ |
| | | #ifndef __SHM_MM_WRAPPER_H__ |
| | | #define __SHM_MM_WRAPPER_H__ |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | |
| | | */ |
| | | void shm_mm_wrapper_destroy(); |
| | | |
| | | /** |
| | | * @brief 回收标记为删除的队列 |
| | | * @return 错误码 |
| | | */ |
| | | int shm_mm_wrapper_start_resycle() ; |
| | | |
| | | /** |
| | | * @brief 分配一个key给申请者 |
| | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { |
| | | SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | SHMKeySet *subscripter_set; |
| | | SHMKeySet::iterator set_iter; |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | int key; |
| | | for(int i = 0; i < length; i++) { |
| | | key = keys[i]; |
| | | SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | SHMKeySet *subscripter_set; |
| | | SHMKeySet::iterator set_iter; |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | * 启动bus |
| | | * |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | */ |
| | | int BusServerSocket::start(){ |
| | | topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | |
| | | _run_proxy_(); |
| | | return 0; |
| | |
| | | |
| | | } |
| | | topic_sub_map->clear(); |
| | | mem_pool_free_by_key(SHM_BUS_MAP_KEY); |
| | | shm_mm_free_by_key(SHM_BUS_MAP_KEY); |
| | | } |
| | | shm_socket_close(shm_socket); |
| | | logger->debug("BusServerSocket destory 3"); |
| | |
| | | #include "usg_common.h" |
| | | #include "shm_socket.h" |
| | | #include "shm_allocator.h" |
| | | #include "mem_pool.h" |
| | | #include "shm_mm.h" |
| | | #include "hashtable.h" |
| | | #include "sem_util.h" |
| | | #include "logger_factory.h" |
| | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | |
| | | // size_t ShmModSocket::remove_keys(int keys[], size_t length) { |
| | | // BusServerSocket::remove_subscripters(keys, length); |
| | | // return shm_socket_remove_keys(keys, length); |
| | | // } |
| | | |
| | | // size_t ShmModSocket::remove_keys_exclude(int keys[], size_t length) { |
| | | // BusServerSocket::remove_subscripters(keys, length); |
| | | // return shm_socket_remove_keys_exclude(keys, length); |
| | | // } |
| | | |
| | | ShmModSocket::ShmModSocket() { |
| | | shm_socket = shm_socket_open(SHM_SOCKET_DGRAM); |
| | |
| | | #include "usg_common.h" |
| | | #include "shm_socket.h" |
| | | #include "shm_allocator.h" |
| | | #include "mem_pool.h" |
| | | #include "shm_mm.h" |
| | | #include "hashtable.h" |
| | | #include "sem_util.h" |
| | | #include "logger_factory.h" |
| | |
| | | #include <cassert> |
| | | #include "bus_error.h" |
| | | #include "sole.h" |
| | | #include "shm_mm.h" |
| | | #include "key_def.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | |
| | | ShmQueueStMap * shmQueueStMap ; |
| | | |
| | | static void print_msg(char *head, shm_packet_t &msg) { |
| | | // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); |
| | |
| | | if (s != 0) |
| | | err_exit(s, "pthread_mutexattr_destroy"); |
| | | |
| | | |
| | | shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); |
| | | |
| | | return sockt; |
| | | } |
| | | |
| | | |
| | | int shm_socket_close(shm_socket_t *sockt) { |
| | | static int _shm_socket_close_(shm_socket_t *sockt) { |
| | | |
| | | int rv; |
| | | logger->debug("shm_socket_close\n"); |
| | | if(sockt->queue != NULL) { |
| | | delete sockt->queue; |
| | | sockt->queue = NULL; |
| | | // if(sockt->queue != NULL) { |
| | | // delete sockt->queue; |
| | | // sockt->queue = NULL; |
| | | // } |
| | | |
| | | // hashtable_remove(hashtable, mkey); |
| | | |
| | | |
| | | if(sockt->key != 0) { |
| | | auto it = shmQueueStMap->find(sockt->key); |
| | | if(it != shmQueueStMap->end()) { |
| | | it->second.status = SHM_QUEUE_ST_CLOSED; |
| | | it->second.closeTime = time(NULL); |
| | | } |
| | | } |
| | | |
| | | rv = pthread_mutex_destroy(&(sockt->mutex) ); |
| | | if(rv != 0) { |
| | | err_exit(rv, "shm_socket_close"); |
| | | } |
| | | |
| | | |
| | | pthread_mutex_destroy(&(sockt->mutex) ); |
| | | free(sockt); |
| | | return 0; |
| | | } |
| | | |
| | | |
| | | int shm_socket_close(shm_socket_t *sockt) { |
| | | return _shm_socket_close_(sockt); |
| | | } |
| | | |
| | | |
| | |
| | | return; |
| | | |
| | | logger->debug("%d destroy tmp socket\n", pthread_self()); |
| | | shm_socket_close((shm_socket_t *)tmp_socket); |
| | | _shm_socket_close_((shm_socket_t *)tmp_socket); |
| | | rv = pthread_setspecific(_perthread_socket_key_, NULL); |
| | | if ( rv != 0) { |
| | | logger->error(rv, "shm_sendandrecv : pthread_setspecific"); |
| | |
| | | return EBUS_RECVFROM_WRONG_END; |
| | | } |
| | | |
| | | shm_socket_close(tmp_socket); |
| | | _shm_socket_close_(tmp_socket); |
| | | return rv; |
| | | |
| | | } |
| | |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | |
| | | int rv; |
| | | shm_queue_status_t stRecord; |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if( sockt->queue != NULL) |
| | |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | stRecord.createTime = time(NULL); |
| | | shmQueueStMap->insert({sockt->key, stRecord}); |
| | | |
| | | } |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | |
| | | return EBUS_SENDTO_SELF; |
| | | } |
| | | |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) { |
| | | bus_errno = EBUS_CLOSED; |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno)); |
| | | return EBUS_CLOSED; |
| | | // 检查key标记的状态 |
| | | auto it = shmQueueStMap->find(key); |
| | | if(it != shmQueueStMap->end()) { |
| | | if(it->second.status == SHM_QUEUE_ST_CLOSED) { |
| | | // key对应的状态是关闭的 |
| | | goto ERR_CLOSED; |
| | | } |
| | | } |
| | | |
| | | |
| | | remoteQueue = shm_socket_attach_queue(key); |
| | | |
| | | if (remoteQueue == NULL ) { |
| | | goto ERR_CLOSED; |
| | | } |
| | | |
| | | rv = remoteQueue->push(*sendpak, timeout, flag); |
| | | |
| | | return rv; |
| | | |
| | | ERR_CLOSED: |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(EBUS_CLOSED)); |
| | | return EBUS_CLOSED; |
| | | |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) { |
| | | int rv; |
| | | |
| | | shm_queue_status_t stRecord; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | shm_packet_t recvpak; |
| | | |
| | |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | stRecord.createTime = time(NULL); |
| | | shmQueueStMap->insert({sockt->key, stRecord}); |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | |
| | | |
| | | LABEL_POP: |
| | | |
| | | // |
| | | // printf("%p start recv.....\n", sockt); |
| | | // 检查key标记的状态 |
| | | // auto shmQueueMapIter = shmQueueStMap->find(sockt->key); |
| | | // if(shmQueueMapIter != shmQueueStMap->end()) { |
| | | // stRecord = shmQueueMapIter->second; |
| | | // if(stRecord.status = SHM_QUEUE_ST_CLOSED) { |
| | | // // key对应的状态是关闭的 |
| | | // goto ERR_CLOSED; |
| | | // } |
| | | // } |
| | | |
| | | rv = sockt->queue->pop(recvpak, timeout, flag); |
| | | if(rv != 0) |
| | |
| | | *_recvpak = recvpak; |
| | | return rv; |
| | | } |
| | | // int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf, |
| | | // const int send_size, const int send_key, void **recv_buf, |
| | | // int *recv_size, const struct timespec *timeout, int flags) { |
| | | |
| | | // struct timespec tm = {10, 0}; |
| | | // return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, send_key,recv_buf, recv_size, &tm, flags); |
| | | // } |
| | | |
| | |
| | | ./test_net_mod_socket --fun="start_reply" --key=100 & server_pid=$! && echo "pid: ${server_pid}" |
| | | ./test_net_mod_socket --fun="start_reply" --key=101 & server_pid=$! && echo "pid: ${server_pid}" |
| | | ./test_net_mod_socket --fun="start_reply" --key=102 & server_pid=$! && echo "pid: ${server_pid}" |
| | | |
| | | # 打开回队列收进程 |
| | | ./test_net_mod_socket --fun="start_resycle" & server_pid=$! && echo "pid: ${server_pid}" |
| | | } |
| | | |
| | | # 交互式客户端 |
| | |
| | | } |
| | | } |
| | | |
| | | void start_resycle() { |
| | | shm_mm_wrapper_start_resycle(); |
| | | } |
| | | |
| | | |
| | | // 打印接受到的订阅消息 |
| | | void *print_sub_msg(void *sockt) { |
| | | pthread_detach(pthread_self()); |
| | |
| | | } |
| | | |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | shm_mm_wrapper_init(512); |
| | | |
| | | argument_t opt = parse_args(argc, argv); |
| | | |
| | | // port = atoi(argv[2]); |
| | | |
| | | if(opt.fun == NULL) { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | if (strcmp("start_net_proxy", opt.fun) == 0 ) { |
| | | if(opt.port == 0) { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | start_net_proxy(opt); |
| | | |
| | | } |
| | | else if (strcmp("start_bus_server", opt.fun) == 0) { |
| | | |
| | | start_bus_server(opt); |
| | | } |
| | | else if (strcmp("start_reply", opt.fun) == 0) { |
| | | if(opt.key == 0) { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | start_reply(opt.key); |
| | | } |
| | | else if (strcmp("start_net_client", opt.fun) == 0) { |
| | | if(opt.sendlist == 0) { |
| | | fprintf(stderr, "Missing sendlist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | if(opt.publist == 0) { |
| | | fprintf(stderr, "Missing publist.\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | start_net_client(opt.sendlist, opt.publist); |
| | | } |
| | | else if (strcmp("one_sendto_many", opt.fun) == 0) { |
| | | if(opt.sendlist == 0) { |
| | | fprintf(stderr, "Missing sendlist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | one_sendto_many(opt.sendlist); |
| | | } |
| | | else if (strcmp("test_net_sendandrecv", opt.fun) == 0) { |
| | | if(opt.sendlist == 0) { |
| | | fprintf(stderr, "Missing sendlist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | test_net_sendandrecv(opt.sendlist); |
| | | } |
| | | else if (strcmp("test_net_pub_threads", opt.fun) == 0) { |
| | | if(opt.publist == 0) { |
| | | fprintf(stderr, "Missing publist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | test_net_pub_threads(opt.publist); |
| | | } |
| | | else if (strcmp("test_net_pub", opt.fun) == 0) { |
| | | if(opt.publist == 0) { |
| | | fprintf(stderr, "Missing publist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | test_net_pub(opt.publist); |
| | | } |
| | | |
| | | else { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | |
| | | } |
| | | |
| | | printf("==========end========\n"); |
| | | shm_mm_wrapper_destroy(); |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | void usage(char *name) |
| | | { |
| | |
| | | } |
| | | printf("============node list end==========\n"); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | shm_mm_wrapper_init(512); |
| | | |
| | | argument_t opt = parse_args(argc, argv); |
| | | |
| | | // port = atoi(argv[2]); |
| | | |
| | | if(opt.fun == NULL) { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | if (strcmp("start_net_proxy", opt.fun) == 0 ) { |
| | | if(opt.port == 0) { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | start_net_proxy(opt); |
| | | |
| | | } |
| | | else if (strcmp("start_bus_server", opt.fun) == 0) { |
| | | |
| | | start_bus_server(opt); |
| | | } |
| | | else if (strcmp("start_reply", opt.fun) == 0) { |
| | | if(opt.key == 0) { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | start_reply(opt.key); |
| | | } |
| | | else if (strcmp("start_net_client", opt.fun) == 0) { |
| | | if(opt.sendlist == 0) { |
| | | fprintf(stderr, "Missing sendlist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | if(opt.publist == 0) { |
| | | fprintf(stderr, "Missing publist.\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | start_net_client(opt.sendlist, opt.publist); |
| | | } |
| | | else if (strcmp("one_sendto_many", opt.fun) == 0) { |
| | | if(opt.sendlist == 0) { |
| | | fprintf(stderr, "Missing sendlist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | one_sendto_many(opt.sendlist); |
| | | } |
| | | else if (strcmp("test_net_sendandrecv", opt.fun) == 0) { |
| | | if(opt.sendlist == 0) { |
| | | fprintf(stderr, "Missing sendlist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | test_net_sendandrecv(opt.sendlist); |
| | | } |
| | | else if (strcmp("test_net_pub_threads", opt.fun) == 0) { |
| | | if(opt.publist == 0) { |
| | | fprintf(stderr, "Missing publist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | test_net_pub_threads(opt.publist); |
| | | } |
| | | else if (strcmp("test_net_pub", opt.fun) == 0) { |
| | | if(opt.publist == 0) { |
| | | fprintf(stderr, "Missing publist .\n"); |
| | | usage(argv[0]); |
| | | exit(1); |
| | | } |
| | | |
| | | test_net_pub(opt.publist); |
| | | } |
| | | else if (strcmp("start_resycle", opt.fun) == 0) { |
| | | start_resycle(); |
| | | } |
| | | |
| | | else { |
| | | usage(argv[0]); |
| | | exit(1); |
| | | |
| | | } |
| | | |
| | | printf("==========end========\n"); |
| | | // shm_mm_wrapper_destroy(); |
| | | |
| | | } |