| | |
| | | |
| | | ## 实例 |
| | | # 1. 实例 |
| | | |
| | | ### 请求应答模式 |
| | | |
| | | `source ./demo/server.c` |
| | | ## 1.1 Bus模式 |
| | | Source |
| | | |
| | | `dgram_mod_bus.c` |
| | | |
| | | 编译 |
| | | |
| | | 安装好so包后,用如下的方式编译,shm_queue是通信队列包,usgcommon是公共包,pthread是系统的线程包 |
| | | `g++ dgram_mod_bus.c -mcx16 -std=c++11 -lshm_queue -lusgcommon -lpthread` |
| | | |
| | | 演示 |
| | | |
| | | 启动bus `./dgram_mod_bus server 8`。然后打开两个客户端连接bus, 第一个客户端订阅 "news", 第二个客户端发布 "news". 第一个客户端会收到第二个客户端推送的信息。 |
| | | |
| | | 启动bus |
| | | ``` |
| | | $ ./dgram_mod_bus server 8 |
| | | ``` |
| | | |
| | | 第一个客户端订阅 "news" |
| | | ``` |
| | | |
| | | $ ./dgram_mod_bus client 8 |
| | | Can I help you? sub, pub or quit |
| | | sub |
| | | Please input topic! |
| | | news |
| | | Sub success! |
| | | Can I help you? sub, pub or quit |
| | | 收到订阅消息:111111111111111111111 |
| | | |
| | | ``` |
| | | 第二个客户端发布 "news" |
| | | ``` |
| | | $ ./dgram_mod_bus client 8 |
| | | Can I help you? sub, pub or quit |
| | | pub |
| | | Please input topic and content |
| | | news 111111111111111111111 |
| | | Pub success! |
| | | Can I help you? sub, pub or quit |
| | | |
| | | ``` |
| | | 这里可以打开许多个客户端发布和订阅消息。 |
| | | |
| | | |
| | | ## 1.2 req_rep模式, 适应于注册 |
| | | |
| | | **运行server端:** |
| | | |
| | | `./req_req server 8` |
| | | |
| | | **运行client端:** |
| | | 可以打开多个client |
| | | |
| | | `./req_rep client 8` |
| | | |
| | | 在client端输入请求信息,server 端回应,client端输出回应信息 |
| | | |
| | | |
| | | ### 发布订阅模式 |
| | | |
| | | **运行server端:** |
| | | |
| | | `./pub_sub server 8` |
| | | |
| | | |
| | | **运行client端:** |
| | | 可以打开多个client |
| | | |
| | | `./pub_sub client 8` |
| | | |
| | | 在server端输入发布信息,client端输出收到的订阅信息 |
| | | Source `dgram_mod_req_rep.c` |
| | | |
| | | ## 接口说明 |
| | | 编译 同上 |
| | | |
| | | 演示 |
| | | |
| | | ``` |
| | | ## 启动注册中心 |
| | | ./dgram_mod_req_rep server 2 & node0=$! && sleep 1 |
| | | ## 向注册中心发送消息 |
| | | ./dgram_mod_req_rep client 2 node1 |
| | | kill $node0 |
| | | ``` |
| | | |
| | | ## survey模式, 适应于心跳 |
| | | Source `dgram_mod_survey.c` |
| | | |
| | | 编译 同上 |
| | | |
| | | 启动心跳中心 |
| | | ``` |
| | | $ ./dgram_mod_survey server 3 |
| | | |
| | | RECEIVED HREARTBEAT FROM 1000: 0 |
| | | RECEIVED HREARTBEAT FROM 1000: 1 |
| | | RECEIVED HREARTBEAT FROM 1000: 2 |
| | | RECEIVED HREARTBEAT FROM 1000: 3 |
| | | RECEIVED HREARTBEAT FROM 1000: 4 |
| | | RECEIVED HREARTBEAT FROM 1000: 5 |
| | | RECEIVED HREARTBEAT FROM 1000: 6 |
| | | RECEIVED HREARTBEAT FROM 1000: 7 |
| | | RECEIVED HREARTBEAT FROM 1000: 8 |
| | | RECEIVED HREARTBEAT FROM 1000: 9 |
| | | |
| | | ``` |
| | | |
| | | 打开一个客户端,连接心跳中心 |
| | | ``` |
| | | $ ./dgram_mod_survey client 3 |
| | | |
| | | SEND HEART:0 |
| | | SEND HEART:1 |
| | | SEND HEART:2 |
| | | SEND HEART:3 |
| | | SEND HEART:4 |
| | | SEND HEART:5 |
| | | SEND HEART:6 |
| | | SEND HEART:7 |
| | | SEND HEART:8 |
| | | SEND HEART:9 |
| | | |
| | | ``` |
| | | |
| | | |
| | | # 2. 接口说明 |
| | | |
| | | shm_mm.h |
| | | ``` |
| | | /** |
| | | * 初始化共享内存 |
| | | * @size 共享内存大小, 单位M |
| | | * |
| | | */ |
| | | void shm_init(int size); |
| | | |
| | | /** |
| | | * 销毁共享内存 |
| | | * 整个进程退出时需要执行这个方法,该方法首先会检查是否还有其他进程在使用该共享内存,如果还有其他进程在使用就只是detach,如果没有其他进程在使用则销毁整块内存。 |
| | | */ |
| | | void shm_destroy(); |
| | | ``` |
| | | |
| | | dgram_mod_socket.h |
| | | ``` |
| | | |
| | | /** |
| | | * 创建socket |
| | | * @return socket地址 |
| | | */ |
| | | void *mod_open_socket(int mod); |
| | | void *dgram_mod_open_socket(); |
| | | |
| | | /** |
| | | * 关闭socket |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int mod_close_socket(void * _socket); |
| | | int dgram_mod_close_socket(void * _socket); |
| | | |
| | | /** |
| | | * 绑定端口到socket, 如果不绑定则系统自动分配一个 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int mod_socket_bind(void * _socket, int port); |
| | | |
| | | |
| | | /** |
| | | * 服务端开启连接监听 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int mod_listen(void * _socket); |
| | | |
| | | /** |
| | | * 客户端发起连接请求 |
| | | */ |
| | | int mod_connect(void * _socket, int port); |
| | | int dgram_mod_bind(void * _socket, int port); |
| | | |
| | | /** |
| | | * 发送信息 |
| | | * @port 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int mod_send(void * _socket, const void *buf, const int size); |
| | | int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port); |
| | | |
| | | |
| | | /** |
| | | * 接收信息 |
| | | * @port 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int mod_recv(void * _socket, void **buf, int *size) ; |
| | | int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port); |
| | | |
| | | /** |
| | | * 释放接收信息的buf |
| | | * 发送请求信息并等待接收应答 |
| | | * @port 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; |
| | | |
| | | |
| | | /** |
| | | * 启动bus |
| | | * |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int dgram_mod_start_bus(void * _socket); |
| | | |
| | | /** |
| | | * 订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @port 总线端口 |
| | | */ |
| | | void mod_free(void *buf); |
| | | int dgram_mod_sub(void * _socket, void *topic, int size, int port); |
| | | |
| | | /** |
| | | * 发布主题 |
| | | * @topic 主题 |
| | | * @content 主题内容 |
| | | * @port 总线端口 |
| | | */ |
| | | int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port); |
| | | |
| | | |
| | | /** |
| | | * 获取soket端口号 |
| | | */ |
| | | int mod_get_socket_port(void * _socket); |
| | | int dgram_mod_get_port(void * _socket) ; |
| | | |
| | | |
| | | /** |
| | | * 释放存储接收信息的buf |
| | | */ |
| | | void dgram_mod_free(void *buf) ; |
| | | ``` |
| | | |
| | | |
| | |
| | | include $(ROOT)/Make.defines.$(PLATFORM) |
| | | |
| | | |
| | | PROGS = |
| | | PROGS = dgram_mod_req_rep dgram_mod_survey |
| | | |
| | | |
| | | build: $(PROGS) |
New file |
| | |
| | | #include "dgram_mod_socket.h" |
| | | #include "shm_mm.h" |
| | | #include "usg_common.h" |
| | | |
| | | void server(int port) { |
| | | void *socket = dgram_mod_open_socket(); |
| | | dgram_mod_bind(socket, port); |
| | | |
| | | dgram_mod_start_bus(socket); |
| | | |
| | | } |
| | | |
| | | |
| | | void *run_recv(void *socket) { |
| | | pthread_detach(pthread_self()); |
| | | void *recvbuf; |
| | | int size; |
| | | int port; |
| | | while (dgram_mod_recvfrom( socket, &recvbuf, &size, &port) == 0) { |
| | | printf("收到订阅消息:%s\n", recvbuf); |
| | | free(recvbuf); |
| | | } |
| | | |
| | | } |
| | | |
| | | void client(int port) { |
| | | void *socket = dgram_mod_open_socket(); |
| | | pthread_t tid; |
| | | pthread_create(&tid, NULL, run_recv, socket); |
| | | int size; |
| | | |
| | | char action[512]; |
| | | char topic[512]; |
| | | char content[512]; |
| | | long i = 0; |
| | | while (true) { |
| | | //printf("Usage: pub <topic> [content] or sub <topic>\n"); |
| | | printf("Can I help you? sub, pub or quit\n"); |
| | | scanf("%s",action); |
| | | |
| | | if(strcmp(action, "sub") == 0) { |
| | | printf("Please input topic!\n"); |
| | | scanf("%s", topic); |
| | | dgram_mod_sub(socket, topic, strlen(topic), port); |
| | | printf("Sub success!\n"); |
| | | } |
| | | else if(strcmp(action, "pub") == 0) { |
| | | // printf("%s %s %s\n", action, topic, content); |
| | | printf("Please input topic and content\n"); |
| | | scanf("%s %s", topic, content); |
| | | dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port); |
| | | printf("Pub success!\n"); |
| | | } else if(strcmp(action, "quit") == 0) { |
| | | break; |
| | | } else { |
| | | printf("error input\n"); |
| | | continue; |
| | | } |
| | | |
| | | } |
| | | printf("(%d) quit\n", dgram_mod_get_port(socket)); |
| | | dgram_mod_close_socket(socket); |
| | | } |
| | | |
| | | |
| | | |
| | | int main(int argc, char *argv[]) { |
| | | shm_init(512); |
| | | int port; |
| | | if (argc < 3) { |
| | | fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client"); |
| | | return 1; |
| | | } |
| | | |
| | | port = atoi(argv[2]); |
| | | |
| | | if (strcmp("server", argv[1]) == 0) { |
| | | server(port); |
| | | } |
| | | |
| | | if (strcmp("client", argv[1]) == 0) |
| | | client(port); |
| | | |
| | | |
| | | return 0; |
| | | } |
| | |
| | | int rv; |
| | | int remote_port; |
| | | while ( (rv = dgram_mod_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) { |
| | | printf( "REGIST CENTER RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf); |
| | | sprintf(sendbuf, "RECEIVED FROM PORT %d NAME %s", remote_port, recvbuf); |
| | | printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf); |
| | | sprintf(sendbuf, "RECEIVED PORT %d NAME %s", remote_port, recvbuf); |
| | | dgram_mod_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port); |
| | | free(recvbuf); |
| | | } |
| | |
| | | void *socket = dgram_mod_open_socket(); |
| | | int size; |
| | | void *recvbuf; |
| | | dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size); |
| | | printf("reply: %s\n", (char *)recvbuf); |
| | | free(recvbuf); |
| | | printf("client :send request%s\n", sendbuf); |
| | | if(dgram_mod_sendandrecv(socket, sendbuf, strlen(sendbuf) + 1, port, &recvbuf, &size) == 0) { |
| | | printf("client :received reply => %s\n", (char *)recvbuf); |
| | | free(recvbuf); |
| | | } |
| | | |
| | | dgram_mod_close_socket(socket); |
| | | } |
| | | |
| | |
| | | ipcrm -a |
| | | |
| | | ./dgram_mod_req_rep server 8 & node0=$! |
| | | ./dgram_mod_req_rep client 8 node1 |
| | | ./dgram_mod_req_rep server 2 & node0=$! && sleep 1 |
| | | ./dgram_mod_req_rep client 2 node1 |
| | | kill $node0 |
| | |
| | | #include "usg_common.h" |
| | | |
| | | void server(int port) { |
| | | void *socket = dgram_mod_open_socket(SURVEY); |
| | | void *socket = dgram_mod_open_socket(); |
| | | dgram_mod_bind(socket, port); |
| | | int size; |
| | | void *recvbuf; |
| | |
| | | } |
| | | |
| | | void client(int port) { |
| | | void *socket = dgram_mod_open_socket(SURVEY); |
| | | void *socket = dgram_mod_open_socket(); |
| | | int size; |
| | | void *recvbuf; |
| | | char sendbuf[512]; |
| | |
| | | #ifndef __SHM_QUEUE_H__ |
| | | #define __SHM_QUEUE_H__ |
| | | |
| | | #include "usg_common.h" |
| | | #include "hashtable.h" |
| | | #include "lock_free_queue.h" |
| | | #include "logger_factory.h" |
| | | #include "shm_allocator.h" |
| | | #include "sem_util.h" |
| | | #include "shm_allocator.h" |
| | | #include "usg_common.h" |
| | | // default Queue size |
| | | // #define LOCK_FREE_Q_DEFAULT_SIZE 16 |
| | | |
| | | template < typename ELEM_T> |
| | | class SHMQueue |
| | | { |
| | | |
| | | template <typename ELEM_T> class SHMQueue { |
| | | |
| | | private: |
| | | const int KEY; |
| | | |
| | | const int KEY; |
| | | |
| | | public: |
| | | /// @brief constructor of the class |
| | | SHMQueue(int key=0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | |
| | | ~SHMQueue(); |
| | | /// @brief constructor of the class |
| | | SHMQueue(int key = 0, size_t qsize = LOCK_FREE_Q_DEFAULT_SIZE); |
| | | |
| | | |
| | | inline uint32_t size(); |
| | | |
| | | inline bool full(); |
| | | inline bool empty(); |
| | | |
| | | inline bool push(const ELEM_T &a_data); |
| | | inline bool push_nowait(const ELEM_T &a_data); |
| | | inline bool push_timeout(const ELEM_T &a_data, const struct timespec * timeout); |
| | | inline bool pop(ELEM_T &a_data); |
| | | inline bool pop_nowait(ELEM_T &a_data); |
| | | inline bool pop_timeout(ELEM_T &a_data, struct timespec * timeout); |
| | | ~SHMQueue(); |
| | | |
| | | inline ELEM_T& operator[](unsigned i); |
| | | inline uint32_t size(); |
| | | |
| | | static void remove_queues_exclude(int *keys, size_t length); |
| | | inline bool full(); |
| | | inline bool empty(); |
| | | |
| | | inline bool push(const ELEM_T &a_data); |
| | | inline bool push_nowait(const ELEM_T &a_data); |
| | | inline bool push_timeout(const ELEM_T &a_data, |
| | | const struct timespec *timeout); |
| | | inline bool pop(ELEM_T &a_data); |
| | | inline bool pop_nowait(ELEM_T &a_data); |
| | | inline bool pop_timeout(ELEM_T &a_data, struct timespec *timeout); |
| | | |
| | | inline ELEM_T &operator[](unsigned i); |
| | | |
| | | static void remove_queues_exclude(int *keys, size_t length); |
| | | |
| | | private: |
| | | |
| | | |
| | | protected: |
| | | /// @brief the actual queue-> methods are forwarded into the real |
| | | /// implementation |
| | | LockFreeQueue<ELEM_T, SHM_Allocator>* queue; |
| | | /// @brief the actual queue-> methods are forwarded into the real |
| | | /// implementation |
| | | LockFreeQueue<ELEM_T, SHM_Allocator> *queue; |
| | | |
| | | private: |
| | | /// @brief disable copy constructor declaring it private |
| | | SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src); |
| | | /// @brief disable copy constructor declaring it private |
| | | SHMQueue<ELEM_T>(const SHMQueue<ELEM_T> &a_src); |
| | | }; |
| | | |
| | | template <typename ELEM_T> |
| | | void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length) { |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | std::set<int> *keyset = hashtable_keyset(hashtable); |
| | | std::set<int>::iterator keyItr; |
| | | LockFreeQueue<ELEM_T, SHM_Allocator> *mqueue; |
| | | bool found; |
| | | for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | found = false; |
| | | for (size_t i = 0; i < length; i++) { |
| | | if (*keyItr == keys[i]) { |
| | | found = true; |
| | | break; |
| | | } |
| | | } |
| | | if (!found) { |
| | | mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, |
| | | *keyItr); |
| | | delete mqueue; |
| | | } |
| | | } |
| | | delete keyset; |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | void SHMQueue<ELEM_T>::remove_queues_exclude(int *keys, size_t length) |
| | | { |
| | | template <typename ELEM_T> |
| | | SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize) : KEY(key) { |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); |
| | | // LockFreeQueue<int, 10000> q; |
| | | if (queue == NULL || (void *)queue == (void *)1) { |
| | | queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize); |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | } |
| | | queue->reference++; |
| | | LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", |
| | | queue->reference.load()); |
| | | } |
| | | |
| | | template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() { |
| | | SemUtil::dec(queue->mutex); |
| | | queue->reference--; |
| | | // LoggerFactory::getLogger().debug("SHMQueue destructor reference===%d", |
| | | // queue->reference.load()); |
| | | if (queue->reference.load() == 0) { |
| | | delete queue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | std::set<int>* keyset = hashtable_keyset(hashtable); |
| | | std::set<int>::iterator keyItr; |
| | | LockFreeQueue<ELEM_T, SHM_Allocator>* mqueue; |
| | | bool found; |
| | | for (keyItr = keyset->begin(); keyItr != keyset->end(); keyItr++) { |
| | | found = false; |
| | | for(size_t i = 0; i < length; i++) { |
| | | if(*keyItr == keys[i]) { |
| | | found = true; |
| | | break; |
| | | } |
| | | } |
| | | if(!found) { |
| | | mqueue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, *keyItr); |
| | | delete mqueue; |
| | | } |
| | | } |
| | | delete keyset; |
| | | |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | SHMQueue<ELEM_T>::SHMQueue(int key, size_t qsize): KEY(key) |
| | | { |
| | | |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | queue = (LockFreeQueue<ELEM_T, SHM_Allocator> *)hashtable_get(hashtable, key); |
| | | //LockFreeQueue<int, 10000> q; |
| | | if (queue == NULL || (void *)queue == (void *)1) { |
| | | queue = new LockFreeQueue<ELEM_T, SHM_Allocator>(qsize); |
| | | hashtable_put(hashtable, key, (void *)queue); |
| | | } |
| | | queue->reference++; |
| | | LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load()); |
| | | hashtable_remove(hashtable, KEY); |
| | | // LoggerFactory::getLogger().debug("SHMQueue destructor delete queue\n"); |
| | | } else { |
| | | SemUtil::inc(queue->mutex); |
| | | } |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | SHMQueue<ELEM_T>::~SHMQueue() |
| | | { |
| | | SemUtil::dec( queue->mutex); |
| | | queue->reference--; |
| | | //LoggerFactory::getLogger().debug("SHMQueue destructor reference===%d", queue->reference.load()); |
| | | if(queue->reference.load() == 0) { |
| | | delete queue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | hashtable_remove(hashtable, KEY); |
| | | // LoggerFactory::getLogger().debug("SHMQueue destructor delete queue\n"); |
| | | } else { |
| | | SemUtil::inc(queue->mutex); |
| | | } |
| | | |
| | | template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::force_destroy() { |
| | | SemUtil::dec(queue->mutex); |
| | | delete queue; |
| | | hashtable_t *hashtable = mm_get_hashtable(); |
| | | hashtable_remove(hashtable, KEY); |
| | | SemUtil::inc(queue->mutex); |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline uint32_t SHMQueue<ELEM_T>::size() |
| | | { |
| | | return queue->size(); |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::full() |
| | | { |
| | | return queue->full(); |
| | | template <typename ELEM_T> inline uint32_t SHMQueue<ELEM_T>::size() { |
| | | return queue->size(); |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::empty() |
| | | { |
| | | return queue->empty(); |
| | | } |
| | | |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data) |
| | | { |
| | | return queue->push(a_data); |
| | | |
| | | template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::full() { |
| | | return queue->full(); |
| | | } |
| | | |
| | | template < |
| | | typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) |
| | | { |
| | | return queue->push_nowait(a_data); |
| | | |
| | | template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::empty() { |
| | | return queue->empty(); |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, const struct timespec * timeout) |
| | | { |
| | | |
| | | return queue->push_timeout(a_data, timeout); |
| | | |
| | | template <typename ELEM_T> |
| | | inline bool SHMQueue<ELEM_T>::push(const ELEM_T &a_data) { |
| | | return queue->push(a_data); |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data) |
| | | { |
| | | // printf("SHMQueue pop before\n"); |
| | | int rv = queue->pop(a_data); |
| | | // printf("SHMQueue after before\n"); |
| | | return rv; |
| | | |
| | | template <typename ELEM_T> |
| | | inline bool SHMQueue<ELEM_T>::push_nowait(const ELEM_T &a_data) { |
| | | return queue->push_nowait(a_data); |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) |
| | | { |
| | | return queue->pop_nowait(a_data); |
| | | |
| | | template <typename ELEM_T> |
| | | inline bool SHMQueue<ELEM_T>::push_timeout(const ELEM_T &a_data, |
| | | const struct timespec *timeout) { |
| | | |
| | | return queue->push_timeout(a_data, timeout); |
| | | } |
| | | |
| | | |
| | | template < typename ELEM_T > |
| | | inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, struct timespec * timeout) |
| | | { |
| | | return queue->pop_timeout(a_data, timeout); |
| | | |
| | | template <typename ELEM_T> inline bool SHMQueue<ELEM_T>::pop(ELEM_T &a_data) { |
| | | // printf("SHMQueue pop before\n"); |
| | | int rv = queue->pop(a_data); |
| | | // printf("SHMQueue after before\n"); |
| | | return rv; |
| | | } |
| | | |
| | | template < typename ELEM_T > |
| | | inline ELEM_T& SHMQueue<ELEM_T>::operator[](unsigned i) { |
| | | return queue->operator[](i); |
| | | template <typename ELEM_T> |
| | | inline bool SHMQueue<ELEM_T>::pop_nowait(ELEM_T &a_data) { |
| | | return queue->pop_nowait(a_data); |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | inline bool SHMQueue<ELEM_T>::pop_timeout(ELEM_T &a_data, |
| | | struct timespec *timeout) { |
| | | return queue->pop_timeout(a_data, timeout); |
| | | } |
| | | |
| | | template <typename ELEM_T> |
| | | inline ELEM_T &SHMQueue<ELEM_T>::operator[](unsigned i) { |
| | | return queue->operator[](i); |
| | | } |
| | | |
| | | #endif |
| | |
| | | free(buf); |
| | | } |
| | | |
| | | int start_bus(void * _socket) { |
| | | int dgram_mod_start_bus(void * _socket) { |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | socket->topic_sub_map = new std::map<std::string, std::set<int> *>; |
| | | run_pubsub_proxy(socket); |
| | |
| | | /** |
| | | * @port 总线端口 |
| | | */ |
| | | int sub(void * _socket, void *topic, int size, int port) { |
| | | int dgram_mod_sub(void * _socket, void *topic, int size, int port) { |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | char buf[8192]; |
| | | snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER); |
| | |
| | | /** |
| | | * @port 总线端口 |
| | | */ |
| | | int pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) { |
| | | int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) { |
| | | |
| | | dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket; |
| | | int head_len; |
| | |
| | | */ |
| | | int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; |
| | | |
| | | |
| | | /** |
| | | * 启动bus |
| | | * |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int dgram_mod_start_bus(void * _socket); |
| | | |
| | | /** |
| | | * 订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @port 总线端口 |
| | | */ |
| | | int dgram_mod_sub(void * _socket, void *topic, int size, int port); |
| | | |
| | | /** |
| | | * 发布主题 |
| | | * @topic 主题 |
| | | * @content 主题内容 |
| | | * @port 总线端口 |
| | | */ |
| | | int dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port); |
| | | |
| | | |
| | | /** |
| | | * 获取soket端口号 |
| | | */ |
| | |
| | | * 释放存储接收信息的buf |
| | | */ |
| | | void dgram_mod_free(void *buf) ; |
| | | |
| | | /** |
| | | * 启动bus |
| | | * |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int start_bus(void * _socket); |
| | | |
| | | /** |
| | | * 订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @port 总线端口 |
| | | */ |
| | | int sub(void * _socket, void *topic, int size, int port); |
| | | |
| | | /** |
| | | * 发布主题 |
| | | * @topic 主题 |
| | | * @content 主题内容 |
| | | * @port 总线端口 |
| | | */ |
| | | int pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port); |
| | | |
| | | |
| | | #ifdef __cplusplus |
| | | } |
| | | #endif |
| | |
| | | void *socket = dgram_mod_open_socket(); |
| | | dgram_mod_bind(socket, port); |
| | | |
| | | start_bus(socket); |
| | | dgram_mod_start_bus(socket); |
| | | |
| | | } |
| | | |
| | |
| | | if(strcmp(action, "sub") == 0) { |
| | | printf("Please input topic!\n"); |
| | | scanf("%s", topic); |
| | | sub(socket, topic, strlen(topic), port); |
| | | dgram_mod_sub(socket, topic, strlen(topic), port); |
| | | printf("Sub success!\n"); |
| | | } |
| | | else if(strcmp(action, "pub") == 0) { |
| | | // printf("%s %s %s\n", action, topic, content); |
| | | printf("Please input topic and content\n"); |
| | | scanf("%s %s", topic, content); |
| | | pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port); |
| | | dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1, port); |
| | | printf("Pub success!\n"); |
| | | } else if(strcmp(action, "quit") == 0) { |
| | | break; |
| | |
| | | |
| | | }Targ; |
| | | |
| | | LockFreeQueue<task_t, DM_Allocator> task_queue(100); |
| | | LockFreeQueue<task_t, DM_Allocator> task_queue(128); |
| | | |
| | | |
| | | void *worker(void *socket) { |