| | |
| | | #define _KEY_DEF_H_ |
| | | |
| | | #define SHM_BUS_MAP_KEY 1 |
| | | |
| | | |
| | | #define SHM_QUEUE_ST_KEY 3 |
| | | // BUS key |
| | | #define SHM_BUS_KEY 8 |
| | | // 网络代理key |
| | |
| | | // default Queue size |
| | | #define LOCK_FREE_Q_DEFAULT_SIZE 16 |
| | | |
| | | |
| | | #define LOCK_FREE_Q_ST_OPENED 0 |
| | | |
| | | #define LOCK_FREE_Q_ST_CLOSED 1 |
| | | |
| | | // static Logger *logger = LoggerFactory::getLogger(); |
| | | // define this macro if calls to "size" must return the real size of the |
| | | // queue. If it is undefined that function will try to take a snapshot of |
| | |
| | | sem_t items; |
| | | |
| | | time_t createTime; |
| | | time_t closeTime; |
| | | int status; |
| | | |
| | | public: |
| | | // sem_t mutex; |
| | | |
| | | |
| | | LockFreeQueue(size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | |
| | | /// Note it is not virtual since it is not expected to inherit from this |
| | | /// template |
| | | ~LockFreeQueue(); |
| | | |
| | | inline void close(); |
| | | |
| | | // std::atomic_uint reference; |
| | | /// @brief constructor of the class |
| | |
| | | |
| | | inline ELEM_T &operator[](unsigned i); |
| | | |
| | | |
| | | |
| | | time_t getCreateTime() { |
| | | return createTime; |
| | | } |
| | | |
| | | time_t getCloseTime() { |
| | | return closeTime; |
| | | } |
| | | |
| | | int getStatus() { |
| | | return status; |
| | | } |
| | | |
| | | /// @brief push an element at the tail of the queue |
| | |
| | | err_exit(errno, "LockFreeQueue sem_init"); |
| | | |
| | | createTime = time(NULL); |
| | | status = LOCK_FREE_Q_ST_OPENED; |
| | | |
| | | } |
| | | |
| | | |
| | | template< |
| | | typename ELEM_T, |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | inline void LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::close() { |
| | | status = LOCK_FREE_Q_ST_CLOSED; |
| | | closeTime = time(NULL); |
| | | } |
| | | |
| | | |
| | |
| | | if (sem_destroy(&items) == -1) { |
| | | err_exit(errno, "LockFreeQueue sem_destroy"); |
| | | } |
| | | // if (sem_destroy(&mutex) == -1) { |
| | | // err_exit(errno, "LockFreeQueue sem_destroy"); |
| | | // } |
| | | |
| | | } |
| | | |
| | | template< |
| | |
| | | |
| | | ELEM_T &operator[](unsigned i); |
| | | |
| | | // @deprecate |
| | | static size_t remove_queues_exclude(int keys[], size_t length); |
| | | |
| | | private: |
| | | protected: |
| | |
| | | SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src); |
| | | }; |
| | | |
| | | // @deprecate |
| | | // template <typename ELEM_T> |
| | | // size_t SHMQueue<ELEM_T>::remove_queues_exclude(int keys[], size_t length) { |
| | | // hashtable_t *hashtable = mm_get_hashtable(); |
| | | // std::set<int> *keyset = hashtable_keyset(hashtable); |
| | | // std::set<int>::iterator keyItr; |
| | | // LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; |
| | | // bool found; |
| | | // size_t count = 0; |
| | | // for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | // found = false; |
| | | // for (size_t i = 0; i < length; i++) { |
| | | // if (*keyItr == keys[i]) { |
| | | // found = true; |
| | | // break; |
| | | // } |
| | | // } |
| | | // if (!found && *keyItr > 100) { |
| | | // // 销毁共享内存的queue |
| | | // mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); |
| | | // delete mqueue; |
| | | // hashtable_remove(hashtable, *keyItr); |
| | | // count++; |
| | | // } |
| | | // } |
| | | // delete keyset; |
| | | // return count; |
| | | // } |
| | | |
| | | |
| | | |
| | | |
| | |
| | | public: |
| | | static void *allocate (size_t size) { |
| | | return mm_malloc(size); |
| | | // return mem_pool_malloc(size); |
| | | // return shm_mm_malloc(size); |
| | | } |
| | | |
| | | static void deallocate (void *ptr) { |
| | | return mm_free(ptr); |
| | | // return mem_pool_free(ptr); |
| | | // return shm_mm_free(ptr); |
| | | } |
| | | }; |
| | | |
New file |
| | |
| | | #include "shm_mm.h" |
| | | #include "mm.h" |
| | | #include "sem_util.h" |
| | | |
| | | |
| | | void shm_mm_init(size_t heap_size) { |
| | | mm_init(heap_size); |
| | | shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); |
| | | } |
| | | |
| | | void shm_mm_destroy(void) { |
| | | mm_destroy(); |
| | | |
| | | } |
| | | |
| | | void *shm_mm_malloc (size_t size) { |
| | | return mm_malloc(size); |
| | | } |
| | | |
| | | |
| | | void shm_mm_free (void *ptr) { |
| | | mm_free(ptr); |
| | | } |
| | | |
| | | |
| | | template <typename T> |
| | | T* shm_mm_attach(int key) { |
| | | void *ptr; |
| | | // T* tptr; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | ptr = hashtable_get(hashtable, key); |
| | | // printf("shm_mm_malloc_by_key malloc before %d, %p\n", key, ptr); |
| | | if(ptr == NULL || ptr == (void *)1 ) { |
| | | ptr = mm_malloc(sizeof(T)); |
| | | hashtable_put(hashtable, key, ptr); |
| | | new(ptr) T; |
| | | // printf("shm_mm_malloc_by_key use new %d, %p\n", key, ptr); |
| | | } |
| | | return (T*)ptr; |
| | | } |
| | | |
| | | void shm_mm_free_by_key(int key) { |
| | | return mm_free_by_key(key); |
| | | } |
| | | |
| | | |
| | | void *shm_mm_realloc (void *ptr, size_t size) { |
| | | return mm_realloc(ptr, size); |
| | | } |
| | | |
| | | int shm_mm_alloc_key() { |
| | | |
| | | return mm_alloc_key(); |
| | | } |
| | | |
| | | |
| | | // extern int mm_checkheap(int verbose); |
| | | |
| | | |
| | | #endif |
New file |
| | |
| | | #ifndef __SHM_MM_H__ |
| | | #define __SHM_MM_H__ |
| | | |
| | | #define SHM_QUEUE_ST_OPENED 0 |
| | | |
| | | #define SHM_QUEUE_ST_CLOSED 1 |
| | | |
| | | struct shm_queue_status_t { |
| | | |
| | | int status; |
| | | time_t createTime; |
| | | time_t closeTime; |
| | | }; |
| | | |
| | | typedef std::map<int, shm_queue_status_t, std::less<int>, SHM_STL_Allocator<std::pair<const int, shm_queue_status_t> > > ShmQueueStMap; |
| | | |
| | | |
| | | void shm_mm_init(size_t heap_size) ; |
| | | |
| | | void shm_mm_destroy(void) ; |
| | | |
| | | void *shm_mm_malloc (size_t size); |
| | | |
| | | void shm_mm_free (void *ptr); |
| | | |
| | | |
| | | template <typename T> |
| | | T* shm_mm_attach(int key) ; |
| | | |
| | | void shm_mm_free_by_key(int key) ; |
| | | |
| | | |
| | | void *shm_mm_realloc (void *ptr, size_t size); |
| | | |
| | | int shm_mm_alloc_key(); |
| | | |
| | | #endif |
| | |
| | | #include "shm_socket.h" |
| | | |
| | | #define BUFFER_TIME 10 |
| | | |
| | | |
| | | void shm_mm_wrapper_init(int size) { |
| | | mem_pool_init(size); |
| | | shm_mm_init(size); |
| | | |
| | | } |
| | | |
| | | void shm_mm_wrapper_destroy() { |
| | | mem_pool_destroy(); |
| | | shm_mm_destroy(); |
| | | } |
| | | |
| | | int shm_mm_wrapper_alloc_key() { |
| | | return mm_alloc_key(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 回收假删除的key |
| | | */ |
| | | int shm_mm_wrapper_start_resycle() { |
| | | ShmQueueStMap * shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | LockFreeQueue<shm_packet_t> *mqueue; |
| | | while(true) { |
| | | for(auto it = shmQueueStMap->begin(); it != shmQueueStMap->end(); ++it ) { |
| | | if(it->second.status = SHM_QUEUE_ST_CLOSED && difftime(time(NULL), it->second.closeTime) > 2 ) { |
| | | mqueue = (LockFreeQueue<shm_packet_t> *)hashtable_get(hashtable, keys[i]); |
| | | if(mqueue != NULL) { |
| | | delete mqueue; |
| | | hashtable_remove(hashtable, it->first); |
| | | printf("reove queue %d\n", it->first); |
| | | // 不能 erase ,否则会出现多进程之间的同步问题, 而这正是这里要解决的问题 |
| | | // it = shmQueueStMap->erase(it); |
| | | // continue; |
| | | } |
| | | } |
| | | } |
| | | |
| | | sleep(1); |
| | | } |
| | | } |
| | | |
| | | //删除包含在keys内的queue |
| | | int shm_mm_wrapper_remove_keys(int keys[], int length) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | |
| | | * |
| | | */ |
| | | |
| | | #ifndef __SHM_MM_H__ |
| | | #define __SHM_MM_H__ |
| | | #ifndef __SHM_MM_WRAPPER_H__ |
| | | #define __SHM_MM_WRAPPER_H__ |
| | | |
| | | #ifdef __cplusplus |
| | | extern "C" { |
| | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)> cb) { |
| | | SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | SHMKeySet *subscripter_set; |
| | | SHMKeySet::iterator set_iter; |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | int key; |
| | | for(int i = 0; i < length; i++) { |
| | | key = keys[i]; |
| | | SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | SHMTopicSubMap *topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | SHMKeySet *subscripter_set; |
| | | SHMKeySet::iterator set_iter; |
| | | SHMTopicSubMap::iterator map_iter; |
| | |
| | | * 启动bus |
| | | * |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | */ |
| | | int BusServerSocket::start(){ |
| | | topic_sub_map = mem_pool_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | topic_sub_map = shm_mm_attach<SHMTopicSubMap>(SHM_BUS_MAP_KEY); |
| | | |
| | | _run_proxy_(); |
| | | return 0; |
| | |
| | | |
| | | } |
| | | topic_sub_map->clear(); |
| | | mem_pool_free_by_key(SHM_BUS_MAP_KEY); |
| | | shm_mm_free_by_key(SHM_BUS_MAP_KEY); |
| | | } |
| | | shm_socket_close(shm_socket); |
| | | logger->debug("BusServerSocket destory 3"); |
| | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | |
| | | // size_t ShmModSocket::remove_keys(int keys[], size_t length) { |
| | | // BusServerSocket::remove_subscripters(keys, length); |
| | | // return shm_socket_remove_keys(keys, length); |
| | | // } |
| | | |
| | | // size_t ShmModSocket::remove_keys_exclude(int keys[], size_t length) { |
| | | // BusServerSocket::remove_subscripters(keys, length); |
| | | // return shm_socket_remove_keys_exclude(keys, length); |
| | | // } |
| | | |
| | | ShmModSocket::ShmModSocket() { |
| | | shm_socket = shm_socket_open(SHM_SOCKET_DGRAM); |
| | |
| | | #include <cassert> |
| | | #include "bus_error.h" |
| | | #include "sole.h" |
| | | #include "shm_mm.h" |
| | | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | |
| | | ShmQueueStMap * shmQueueStMap ; |
| | | |
| | | static void print_msg(char *head, shm_packet_t &msg) { |
| | | // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type); |
| | |
| | | if (s != 0) |
| | | err_exit(s, "pthread_mutexattr_destroy"); |
| | | |
| | | |
| | | shmQueueStMap = shm_mm_attach<ShmQueueStMap>(SHM_QUEUE_ST_KEY); |
| | | |
| | | return sockt; |
| | | } |
| | | |
| | |
| | | |
| | | int rv; |
| | | logger->debug("shm_socket_close\n"); |
| | | if(sockt->queue != NULL) { |
| | | delete sockt->queue; |
| | | sockt->queue = NULL; |
| | | } |
| | | // if(sockt->queue != NULL) { |
| | | // delete sockt->queue; |
| | | // sockt->queue = NULL; |
| | | // } |
| | | |
| | | rv = pthread_mutex_destroy(&(sockt->mutex) ); |
| | | if(rv != 0) { |
| | |
| | | } |
| | | |
| | | free(sockt); |
| | | |
| | | auto it = shmQueueStMap.find(key); |
| | | if(it != shmQueueStMap.end()) { |
| | | it->second.status = SHM_QUEUE_ST_CLOSED |
| | | it->second.closeTime = time(NULL); |
| | | } |
| | | return 0; |
| | | } |
| | | |
| | |
| | | const int key, const struct timespec *timeout, const int flag) { |
| | | |
| | | int rv; |
| | | shm_queue_status_t stRecord; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | |
| | | if( sockt->queue != NULL) |
| | |
| | | logger->error("%s. key = %d", bus_strerror(EBUS_KEY_INUSED), sockt->key); |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | stRecord.createTime = time(NULL); |
| | | shmQueueStMap.insert({sockt->key, stRecord}); |
| | | |
| | | } |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | |
| | | return EBUS_SENDTO_SELF; |
| | | } |
| | | |
| | | LockFreeQueue<shm_packet_t> *remoteQueue; |
| | | if ((remoteQueue = shm_socket_attach_queue(key)) == NULL) { |
| | | bus_errno = EBUS_CLOSED; |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(bus_errno)); |
| | | return EBUS_CLOSED; |
| | | // 检查key标记的状态 |
| | | auto it = shmQueueStMap.find(key); |
| | | if(it != shmQueueStMap.end()) { |
| | | if(it->second.status == SHM_QUEUE_ST_CLOSED) { |
| | | // key对应的状态是关闭的 |
| | | goto ERR_CLOSED; |
| | | } |
| | | } |
| | | |
| | | |
| | | LockFreeQueue<shm_packet_t> *remoteQueue = shm_socket_attach_queue(key); |
| | | |
| | | if (remoteQueue == NULL ) { |
| | | goto ERR_CLOSED; |
| | | } |
| | | |
| | | rv = remoteQueue->push(*sendpak, timeout, flag); |
| | | |
| | | return rv; |
| | | |
| | | ERR_CLOSED: |
| | | logger->error("sendto key %d failed, %s", key, bus_strerror(EBUS_CLOSED)); |
| | | return EBUS_CLOSED; |
| | | |
| | | } |
| | | |
| | | // 短连接方式接受 |
| | | static int shm_recvpakfrom(shm_socket_t *sockt, shm_packet_t *_recvpak , const struct timespec *timeout, int flag) { |
| | | int rv; |
| | | |
| | | shm_queue_status_t stRecord; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | shm_packet_t recvpak; |
| | | |
| | |
| | | return EBUS_KEY_INUSED; |
| | | } |
| | | |
| | | // 标记key对应的状态 ,为opened |
| | | stRecord.status = SHM_QUEUE_ST_OPENED; |
| | | stRecord.createTime = time(NULL); |
| | | shmQueueStMap.insert({sockt->key, stRecord}); |
| | | |
| | | if ((rv = pthread_mutex_unlock(&(sockt->mutex))) != 0) |
| | | err_exit(rv, "shm_recvfrom : pthread_mutex_unlock"); |
| | |
| | | |
| | | LABEL_POP: |
| | | |
| | | // |
| | | // printf("%p start recv.....\n", sockt); |
| | | // 检查key标记的状态 |
| | | // auto shmQueueMapIter = shmQueueStMap.find(sockt->key); |
| | | // if(shmQueueMapIter != shmQueueStMap.end()) { |
| | | // stRecord = shmQueueMapIter->second; |
| | | // if(stRecord.status = SHM_QUEUE_ST_CLOSED) { |
| | | // // key对应的状态是关闭的 |
| | | // goto ERR_CLOSED; |
| | | // } |
| | | // } |
| | | |
| | | rv = sockt->queue->pop(recvpak, timeout, flag); |
| | | if(rv != 0) |
| | |
| | | *_recvpak = recvpak; |
| | | return rv; |
| | | } |
| | | // int shm_sendandrecv(shm_socket_t *sockt, const void *send_buf, |
| | | // const int send_size, const int send_key, void **recv_buf, |
| | | // int *recv_size, const struct timespec *timeout, int flags) { |
| | | |
| | | // struct timespec tm = {10, 0}; |
| | | // return _shm_sendandrecv_thread_local(sockt, send_buf, send_size, send_key,recv_buf, recv_size, &tm, flags); |
| | | // } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | void start_resycle() { |
| | | shm_mm_wrapper_start_resycle(); |
| | | } |
| | | |
| | | |
| | | // 打印接受到的订阅消息 |
| | | void *print_sub_msg(void *sockt) { |
| | | pthread_detach(pthread_self()); |
| | |
| | | |
| | | test_net_pub(opt.publist); |
| | | } |
| | | else if (strcmp("start_resycle", opt.fun) == 0) { |
| | | start_resycle(); |
| | | } |
| | | |
| | | else { |
| | | usage(argv[0]); |