| | |
| | | */ |
| | | #include <assert.h> |
| | | #include "net_mod_server_socket_wrapper.h" |
| | | #include "net_mod_socket_wrapper.h" |
| | | #include "bus_server_socket_wrapper.h" |
| | | //#include "net_mod_socket_wrapper.h" |
| | | //#include "bus_server_socket_wrapper.h" |
| | | |
| | | #include "shm_mm_wrapper.h" |
| | | #include "usg_common.h" |
| | | #include <getopt.h> |
| | | #include "logger_factory.h" |
| | | //#include "logger_factory.h" |
| | | |
| | | |
| | | static void usage(const char *name) { |
| | |
| | | printf("%10d \t %10d\n", it->first, it->second.status); |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | |
| | | |
| | | |
| | | |
| | | } |
| | | } |
| | |
| | | |
| | | target_link_libraries(shm_queue PUBLIC ${EXTRA_LIBS} ) |
| | | |
| | | |
| | | # generate md5 |
| | | if (BUILD_SHARED_LIBS) |
| | | add_custom_command( |
| | | OUTPUT ${PROJECT_BINARY_DIR}/lib/libshm_queue.so.md5 |
| | | COMMAND md5sum ${PROJECT_BINARY_DIR}/lib/libshm_queue.so > ${PROJECT_BINARY_DIR}/lib/libshm_queue.so.md5 |
| | | DEPENDS ${PROJECT_BINARY_DIR}/lib/libshm_queue.so |
| | | COMMENT "Generate libshm_queue.so.md5" |
| | | VERBATIM |
| | | ) |
| | | |
| | | add_custom_target("genmd5" ALL DEPENDS ${PROJECT_BINARY_DIR}/lib/libshm_queue.so.md5) |
| | | endif() |
| | | |
| | | # install rules |
| | | install(TARGETS shm_queue DESTINATION lib) |
| | | install(FILES |
| | |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::LockFreeQueue(size_t qsize): m_qImpl(qsize) { |
| | | // std::cout << "LockFreeQueue init reference=" << reference << std::endl; |
| | | if (sem_init(&slots, 1, qsize) == -1) |
| | | err_exit(errno, "LockFreeQueue sem_init"); |
| | | if (sem_init(&items, 1, 0) == -1) |
| | |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue() { |
| | | // LoggerFactory::getLogger()->debug("LockFreeQueue desctroy"); |
| | | if (sem_destroy(&slots) == -1) { |
| | | err_exit(errno, "LockFreeQueue sem_destroy"); |
| | | } |
| | |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::push(const ELEM_T &a_data, const struct timespec *timeout, int flag) { |
| | | // LoggerFactory::getLogger()->debug("==================LockFreeQueue push before\n"); |
| | | // sigset_t mask_all, pre; |
| | | // sigfillset(&mask_all); |
| | | |
| | |
| | | if (m_qImpl.push(a_data)) { |
| | | psem_post(&items); |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | // LoggerFactory::getLogger()->debug("==================LockFreeQueue push after\n"); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | typename Allocator, |
| | | template<typename T, typename AT> class Q_TYPE> |
| | | int LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::pop(ELEM_T &a_data, const struct timespec *timeout, int flag) { |
| | | // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before...."); |
| | | |
| | | // sigset_t mask_all, pre; |
| | | // sigfillset(&mask_all); |
| | |
| | | goto LABEL_FAILTURE; |
| | | } |
| | | } else if ((flag & BUS_TIMEOUT_FLAG) == BUS_TIMEOUT_FLAG && timeout != NULL) { |
| | | // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop before. flag=%d , %d\n", flag, timeout->tv_sec); |
| | | if (psem_timedwait(&items, timeout) == -1) { |
| | | if (psem_timedwait(&items, timeout) == -1) { |
| | | goto LABEL_FAILTURE; |
| | | } |
| | | } else { |
| | |
| | | if (m_qImpl.pop(a_data)) { |
| | | psem_post(&slots); |
| | | // sigprocmask(SIG_SETMASK, &pre, NULL); |
| | | // LoggerFactory::getLogger()->debug("==================LockFreeQueue pop after\n"); |
| | | return 0; |
| | | } |
| | | |
| | |
| | | template <typename ELEM_T> |
| | | bool SHMQueue<ELEM_T>::bind(int key, bool force) { |
| | | |
| | | |
| | | hashtable_lock(hashtable); |
| | | void *tmp_ptr = hashtable_get(hashtable, key); |
| | | if (tmp_ptr == NULL || tmp_ptr == (void *)1 || force) { |
| | | queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(mqsize); |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | mkey = key; |
| | | owner = true; |
| | | hashtable_unlock(hashtable); |
| | | return true; |
| | | } |
| | | |
| | | hashtable_unlock(hashtable); |
| | | return false; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | void *hashtable_get(hashtable_t *hashtable, int key) { |
| | | int rv; |
| | | |
| | | if((rv = svsem_wait(hashtable->mutex)) != 0) { |
| | | LoggerFactory::getLogger()->error(errno, "hashtable_get\n"); |
| | | } |
| | | void * res = _hashtable_get(hashtable, key); |
| | | |
| | | if((rv = svsem_post(hashtable->mutex)) != 0) { |
| | | LoggerFactory::getLogger()->error(errno, "hashtable_get\n"); |
| | | } |
| | | return res; |
| | | } |
| | | |
| | | void hashtable_put(hashtable_t *hashtable, int key, void *value) { |
| | | int rv; |
| | | |
| | | if((rv = svsem_wait(hashtable->mutex)) != 0) { |
| | | LoggerFactory::getLogger()->error(errno, "hashtable_put\n"); |
| | | } |
| | | _hashtable_put(hashtable, key, value); |
| | | hashtable->queueCount++; |
| | | |
| | | if((rv = svsem_post(hashtable->mutex)) != 0) { |
| | | LoggerFactory::getLogger()->error(errno, "hashtable_put\n"); |
| | | } |
| | | |
| | | } |
| | | |
| | | bool hashtable_check_put(hashtable_t *hashtable, int key, void *value, bool overwrite) { |
| | |
| | | return keyset; |
| | | } |
| | | |
| | | |
| | | |
| | | int hashtable_lock(hashtable_t *hashtable) { |
| | | return svsem_wait(hashtable->mutex); |
| | | } |
| | | |
| | | int hashtable_unlock(hashtable_t *hashtable) { |
| | | return svsem_post(hashtable->mutex); |
| | | } |
| | | |
| | | |
| | | void hashtable_removeall(hashtable_t *hashtable) |
| | | { |
| | | tailq_entry_t *item; |
| | |
| | | { |
| | | |
| | | return key % MAPSIZE; |
| | | /*printf("hashfun = %ld\n", code);*/ |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | printf("\n"); |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | void *hashtable_remove(hashtable_t *hashtable, int key); |
| | | void hashtable_removeall(hashtable_t *hashtable); |
| | | |
| | | int hashtable_lock(hashtable_t *hashtable); |
| | | int hashtable_unlock(hashtable_t *hashtable); |
| | | |
| | | int hashtable_get_queue_count(hashtable_t *hashtable) ; |
| | | /** |
| | | * 遍历hash_table |
| | |
| | | |
| | | newsize = ALIGN(size + (SIZE_T_SIZE << 1) + (PTR_SIZE << 1) ); |
| | | |
| | | //fprintf(stderr, "mm_malloc : size=%u\n", size); |
| | | /* Search the free list for a fit */ |
| | | SemUtil::dec(mutex); |
| | | if ((ptr = find_fit(newsize)) != NULL) |
| | |
| | | |
| | | template <typename T> |
| | | T* shm_mm_attach(int key) { |
| | | void *ptr; |
| | | // T* tptr; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | void *ptr; |
| | | // T* tptr; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | ptr = hashtable_get(hashtable, key); |
| | | // printf("shm_mm_malloc_by_key malloc before %d, %p\n", key, ptr); |
| | | if(ptr == NULL || ptr == (void *)1 ) { |
| | | ptr = mm_malloc(sizeof(T)); |
| | | hashtable_put(hashtable, key, ptr); |
| | | new(ptr) T; |
| | | // printf("shm_mm_malloc_by_key use new %d, %p\n", key, ptr); |
| | | } |
| | | return (T*)ptr; |
| | | } |
| | |
| | | |
| | | int shm_mm_alloc_key(); |
| | | |
| | | #endif |
| | | #endif |
| | |
| | | subscripter_set = map_iter->second; |
| | | if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) { |
| | | subscripter_set->erase(set_iter); |
| | | // printf("remove_subscripter %s, %d\n", map_iter->first, key); |
| | | count++; |
| | | } |
| | | } |
| | |
| | | subscripter_set = map_iter->second; |
| | | for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) { |
| | | send_key = *set_iter; |
| | | // logger->debug("_proxy_pub send before %d \n", send_key); |
| | | rv = shm_sendto(shm_socket, buf, size, send_key, &timeout, BUS_TIMEOUT_FLAG); |
| | | if(rv == 0) { |
| | | continue; |
| | |
| | | char resp_buf[128]; |
| | | bus_head_t head; |
| | | |
| | | int rv; |
| | | char send_buf[512] = { 0x00 }; |
| | | |
| | | const char *topic_delim = ","; |
| | | // logger.debug("_run_proxy_ server receive before\n"); |
| | | |
| | | while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) { |
| | | // logger.debug("_run_proxy_ server recvfrom %d after: %s \n", key, buf); |
| | | head = ShmModSocket::decode_bus_head(buf); |
| | | topics = buf + BUS_HEAD_SIZE; |
| | | action = head.action; |
| | | // logger.debug("_run_proxy_ : %s\n", action); |
| | | |
| | | if(strcmp(action, "sub") == 0) { |
| | | // 订阅支持多主题订阅 |
| | | topic = strtok(topics, topic_delim); |
| | | // logger.debug("_run_proxy_ topic = %s\n", topic); |
| | | while(topic) { |
| | | |
| | | _proxy_sub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | | |
| | | } |
| | | else if(strcmp(action, "desub") == 0) { |
| | | // logger.debug("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), "")); |
| | | |
| | | if(strcmp(trim(topics, 0), "") == 0) { |
| | | // 取消所有订阅 |
| | | _proxy_desub_all(key); |
| | |
| | | |
| | | topic = strtok(topics, topic_delim); |
| | | while(topic) { |
| | | |
| | | _proxy_desub(trim(topic, 0), key); |
| | | topic = strtok(NULL, topic_delim); |
| | | } |
| | |
| | | _proxy_pub(topics, content, head.content_size, key); |
| | | |
| | | } |
| | | else if(strcmp(action, "stop") == 0) { |
| | | else if (strncmp(buf, "request", strlen("request")) == 0) { |
| | | sprintf(send_buf, "%4d", key); |
| | | strncpy(send_buf + 4, buf, (sizeof(send_buf) - 4) >= (strlen(buf) + 1) ? strlen(buf) : (sizeof(send_buf) - 4)); |
| | | |
| | | rv = shm_sendto(shm_socket, send_buf, strlen(send_buf) + 1, key); |
| | | if(rv != 0) { |
| | | logger->error( "BusServerSocket::_run_proxy_ : requst answer fail!\n"); |
| | | } |
| | | } |
| | | else if(strcmp(action, "stop") == 0) { |
| | | logger->info( "Stopping Bus..."); |
| | | free(buf); |
| | | break; |
| | |
| | | } |
| | | |
| | | ShmModSocket::~ShmModSocket() { |
| | | // logger->debug("Close ShmModSocket...\n"); |
| | | struct timespec timeout = {1, 0}; |
| | | if(bus_set != NULL) { |
| | | for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) { |
| | |
| | | int buf_size; |
| | | char *buf; |
| | | int max_buf_size; |
| | | void *buf_ptr; |
| | | if((buf = (char *) malloc(MAXBUF)) == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc"); |
| | | exit(1); |
| | |
| | | } |
| | | } |
| | | |
| | | memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE); |
| | | buf_ptr = ShmModSocket::encode_bus_head(request_head); |
| | | memcpy(buf, buf_ptr, BUS_HEAD_SIZE); |
| | | if(topic_size != 0 ) |
| | | memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size); |
| | | if(content_size != 0) |
| | | memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); |
| | | |
| | | *retbuf = buf; |
| | | free(buf_ptr); |
| | | return buf_size; |
| | | } |
| | | |
| | |
| | | tmp_ptr += sizeof(head.action); |
| | | PUT(tmp_ptr, htonl(head.topic_size)); |
| | | |
| | | tmp_ptr += 4; |
| | | tmp_ptr += sizeof(head.topic_size); |
| | | PUT(tmp_ptr, htonl(head.content_size)); |
| | | |
| | | return headbs; |
| | |
| | | tmp_ptr += sizeof(head.action); |
| | | head.topic_size = ntohl(GET(tmp_ptr)); |
| | | |
| | | tmp_ptr += 4; |
| | | tmp_ptr += sizeof(head.topic_size); |
| | | head.content_size = ntohl(GET(tmp_ptr)); |
| | | |
| | | return head; |
| | |
| | | #include <set> |
| | | #include "socket_def.h" |
| | | |
| | | #define BUS_HEAD_SIZE (64 + 2 * sizeof(uint32_t)) |
| | | #define BUS_HEAD_SIZE sizeof(bus_head_t) |
| | | class BusServerSocket; |
| | | |
| | | struct bus_head_t |
| | |
| | | static LockFreeQueue<shm_packet_t> * shm_socket_bind_queue(int key, bool force) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | LockFreeQueue<shm_packet_t> *queue; |
| | | hashtable_lock(hashtable); |
| | | void *tmp_ptr = hashtable_get(hashtable, key); |
| | | |
| | | |
| | | if (tmp_ptr == NULL || tmp_ptr == (void *)1 ) { |
| | | queue = new LockFreeQueue<shm_packet_t>(32); |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | hashtable_unlock(hashtable); |
| | | return queue; |
| | | } else if(force) { |
| | | hashtable_unlock(hashtable); |
| | | return (LockFreeQueue<shm_packet_t> *) tmp_ptr; |
| | | } |
| | | |
| | | hashtable_unlock(hashtable); |
| | | return NULL; |
| | | } |
| | | |
| | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | void *tmp_ptr = hashtable_get(hashtable, key); |
| | | if (tmp_ptr == NULL || tmp_ptr == (void *)1) { |
| | | //logger->error("shm_socket._remote_queue_attach:connet at key %d failed!", key); |
| | | return NULL; |
| | | } |
| | | |
| | |
| | | // } |
| | | |
| | | |
| | | // printf("====sockt->queue addr = %p\n", sockt->queue); |
| | | |
| | | if(sockt->queue != NULL) { |
| | | sockt->queue->close(); |
| | |
| | | shm_packet_t sendpak = {0}; |
| | | sendpak.key = sockt->key; |
| | | sendpak.size = sendsize; |
| | | memcpy(sendpak.uuid, recvpak.uuid, sizeof sendpak.uuid); |
| | | memcpy(sendpak.uuid, recvpak.uuid, sizeof(sendpak.uuid)); |
| | | if(sendbuf !=NULL && sendsize > 0) { |
| | | sendpak.buf = mm_malloc(sendsize); |
| | | memcpy(sendpak.buf, sendbuf, sendsize); |
| | |
| | | |
| | | } |
| | | |
| | | |
| | | if(buf != NULL && recvpak.buf != NULL) { |
| | | void *_buf = malloc(recvpak.size); |
| | | memcpy(_buf, recvpak.buf, recvpak.size); |
| | | *buf = _buf; |
| | | if(recvpak.buf != NULL) { |
| | | if (buf == NULL) { |
| | | logger->warn("!!!Alert: buf should be not NULL!\n"); |
| | | } else { |
| | | void *_buf = malloc(recvpak.size); |
| | | memcpy(_buf, recvpak.buf, recvpak.size); |
| | | *buf = _buf; |
| | | } |
| | | } |
| | | |
| | | if(size != NULL) |
| | |
| | | logger->debug("send uuid:%s, recv uuid: %s", uuid.c_str(), recvpak.uuid); |
| | | if(strlen(recvpak.uuid) == 0) { |
| | | continue; |
| | | } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof recvpak.uuid) == 0) { |
| | | } else if (strncmp(uuid.c_str(), recvpak.uuid, sizeof(recvpak.uuid)) == 0) { |
| | | // 发送与接受的UUID匹配成功 |
| | | goto LABLE_SUC; |
| | | } else { |
| | |
| | | |
| | | |
| | | int rv = 0, tryn = 16; |
| | | static int Counter_suc = 0; |
| | | static int Counter_fail = 0; |
| | | shm_packet_t sendpak; |
| | | shm_packet_t recvpak; |
| | | std::map<int, shm_packet_t>::iterator recvbufIter; |
| | |
| | | if (tmp_socket == NULL) |
| | | { |
| | | /* If first call from this thread, allocate buffer for thread, and save its location */ |
| | | logger->debug("%lu create threadlocal socket\n", (long)pthread_self() ); |
| | | tmp_socket = shm_socket_open(SHM_SOCKET_DGRAM); |
| | | |
| | | rv = pthread_setspecific(_localthread_socket_key_, tmp_socket); |
| | |
| | | recvbufIter = tmp_socket->recvbuf2.find(key); |
| | | if(recvbufIter != tmp_socket->recvbuf2.end()) { |
| | | // 在缓存里查到了key匹配成功的 |
| | | // logger->info("get from recvbuf: %d", key); |
| | | recvpak = recvbufIter->second; |
| | | tmp_socket->recvbuf2.erase(recvbufIter); |
| | | goto LABLE_SUC; |
| | |
| | | return rv; |
| | | } |
| | | |
| | | if (key == recvpak.key) { |
| | | if (key == recvpak.key) { |
| | | // 发送与接受的UUID匹配成功 |
| | | goto LABLE_SUC; |
| | | } else { |
| | |
| | | return EBUS_RECVFROM_WRONG_END; |
| | | |
| | | LABLE_SUC: |
| | | if(recv_buf != NULL) { |
| | | sockt->key = tmp_socket->key; |
| | | if(recv_buf != NULL) { |
| | | void *_buf = malloc(recvpak.size); |
| | | memcpy(_buf, recvpak.buf, recvpak.size); |
| | | *recv_buf = _buf; |
| | |
| | | |
| | | if(recv_size != NULL) |
| | | *recv_size = recvpak.size; |
| | | |
| | | |
| | | return 0; |
| | | } |
| | |
| | | #include "svsem.h" |
| | | |
| | | int svsem_get(key_t key, unsigned int value) { |
| | | // printf("==================svsem_get===============================\n"); |
| | | int semid, perms; |
| | | |
| | | perms = S_IRUSR | S_IWUSR; |
| | |
| | | union semun arg; |
| | | struct sembuf sop; |
| | | |
| | | //logger.info("%ld: created semaphore\n", (long)getpid()); |
| | | |
| | | arg.val = 0; /* So initialize it to 0 */ |
| | | if (semctl(semid, 0, SETVAL, arg) == -1) |
| | |
| | | arg.val = 1; |
| | | if (semctl(semid, 1, SETVAL, arg) == -1) |
| | | err_exit(errno, "semctl 2"); |
| | | //logger.info("%ld: initialized semaphore\n", (long)getpid()); |
| | | |
| | | |
| | | /* Perform a "no-op" semaphore operation - changes sem_otime |
| | | so other processes can see we've initialized the set. */ |
| | | |
| | |
| | | sop.sem_flg = 0; |
| | | if (semop(semid, &sop, 1) == -1) |
| | | err_exit(errno, "semop"); |
| | | //logger.info("%ld: completed dummy semop()\n", (long)getpid()); |
| | | |
| | | } else { /* We didn't create the semaphore set */ |
| | | |
| | |
| | | if (semid == -1) |
| | | err_exit(errno, "semget 2"); |
| | | |
| | | // logger.info("%ld: got semaphore key\n", (long)getpid()); |
| | | /* Wait until another process has called semop() */ |
| | | |
| | | arg.buf = &ds; |
| | | for (j = 0; j < MAX_TRIES; j++) { |
| | | //logger.info("Try %d\n", j); |
| | | if (semctl(semid, 0, IPC_STAT, arg) == -1) |
| | | err_exit(errno, "semctl 2"); |
| | | |
| | |
| | | |
| | | while (semop(semid, &sops, 1) == -1) |
| | | if (errno != EINTR) { |
| | | // err_msg(errno, "svsem_dec"); |
| | | err_msg(errno, "svsem_dec"); |
| | | |
| | | return -1; |
| | | } |
| | | |
| | |
| | | |
| | | |
| | | |
| | | |
| | | |
| | |
| | | |
| | | mm_destroy(); |
| | | return 0; |
| | | } |
| | | } |
| | |
| | | char content[512]; |
| | | long i = 0; |
| | | |
| | | |
| | | pthread_create(&tid, NULL, run_recv, (void *)sk); |
| | | |
| | | while (true) { |
| | |
| | | |
| | | |
| | | return 0; |
| | | } |
| | | } |