| | |
| | | static Logger *logger = LoggerFactory::getLogger(); |
| | | |
| | | |
| | | size_t ShmModSocket::remove_keys(int keys[], size_t length) { |
| | | BusServerSocket::remove_subscripters(keys, length); |
| | | return shm_socket_remove_keys(keys, length); |
| | | } |
| | | |
| | | ShmModSocket::ShmModSocket() { |
| | | mod = (socket_mod_t)0; |
| | | shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); |
| | | shm_socket = shm_socket_open(SHM_SOCKET_DGRAM); |
| | | bus_set = new std::set<int>; |
| | | } |
| | | |
| | | ShmModSocket::~ShmModSocket() { |
| | | // logger->debug("Close ShmModSocket...\n"); |
| | | struct timespec timeout = {1, 0}; |
| | | if(bus_set != NULL) { |
| | | for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) { |
| | | desub_timeout(NULL, 0, *bus_iter, &timeout); |
| | | desub(NULL, 0, *bus_iter, &timeout, BUS_TIMEOUT_FLAG); |
| | | } |
| | | delete bus_set; |
| | | } |
| | | |
| | | shm_close_socket(shm_socket); |
| | | shm_socket_close(shm_socket); |
| | | } |
| | | |
| | | int ShmModSocket::stop() { |
| | | return shm_socket_stop(shm_socket); |
| | | } |
| | | |
| | | |
| | | int ShmModSocket::bind(int key) { |
| | | return shm_socket_bind(shm_socket, key); |
| | |
| | | int ShmModSocket::force_bind(int key) { |
| | | return shm_socket_force_bind(shm_socket, key); |
| | | } |
| | | |
| | | int ShmModSocket::reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag) |
| | | { |
| | | int ret; |
| | | struct timespec ts; |
| | | |
| | | bus_head_t head = {}; |
| | | |
| | | if (flag == PROC_REG) { |
| | | |
| | | memcpy(head.action, "reg", sizeof(head.action)); |
| | | |
| | | } else if (flag == PROC_UNREG) { |
| | | |
| | | memcpy(head.action, "unreg", sizeof(head.action)); |
| | | |
| | | } else if (flag == PROC_REG_TCS) { |
| | | |
| | | memcpy(head.action, "tcsreg", sizeof(head.action)); |
| | | |
| | | } else if (flag == PROC_QUE_TCS) { |
| | | |
| | | memcpy(head.action, "tcsque", sizeof(head.action)); |
| | | |
| | | } else if (flag == PROC_QUE_STCS) { |
| | | |
| | | memcpy(head.action, "stcsque", sizeof(head.action)); |
| | | |
| | | } else if (flag == PROC_QUE_ATCS) { |
| | | |
| | | memcpy(head.action, "atcsque", sizeof(head.action)); |
| | | |
| | | } else if (flag == PROC_REG_BUF) { |
| | | |
| | | memcpy(head.action, "bufreg", sizeof(head.action)); |
| | | |
| | | } else { |
| | | |
| | | return -1; |
| | | |
| | | } |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG)) { |
| | | |
| | | head.topic_size = 0; |
| | | |
| | | if (pData != NULL) { |
| | | |
| | | head.content_size = sizeof(ProcInfo); |
| | | |
| | | } else { |
| | | |
| | | head.content_size = 0; |
| | | |
| | | } |
| | | } else { |
| | | |
| | | head.topic_size = len; |
| | | |
| | | head.content_size = 0; |
| | | |
| | | } |
| | | |
| | | void *buf_temp; |
| | | int buf_size; |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG)) { |
| | | |
| | | buf_size = get_bus_sendbuf(head, NULL, 0, pData, head.content_size, &buf_temp); |
| | | |
| | | } else { |
| | | |
| | | buf_size = get_bus_sendbuf(head, pData, len, NULL, head.content_size, &buf_temp); |
| | | |
| | | } |
| | | |
| | | if (timeout_ms > 0) { |
| | | |
| | | ts.tv_sec = timeout_ms /1000; |
| | | |
| | | ts.tv_nsec = (timeout_ms - ts.tv_sec * 1000) * 1000 * 1000; |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) { |
| | | |
| | | ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_TIMEOUT_FLAG); |
| | | |
| | | } else { |
| | | |
| | | ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_TIMEOUT_FLAG); |
| | | |
| | | } |
| | | |
| | | } else if (timeout_ms == 0) { |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) { |
| | | |
| | | ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, BUS_NOWAIT_FLAG); |
| | | |
| | | } else { |
| | | |
| | | ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, BUS_NOWAIT_FLAG); |
| | | |
| | | } |
| | | |
| | | } else { |
| | | |
| | | if ((flag == PROC_REG) || (flag == PROC_UNREG) || (flag == PROC_REG_TCS) || (flag == PROC_REG_BUF)) { |
| | | |
| | | ret = shm_sendto(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, &ts, -1); |
| | | |
| | | } else { |
| | | |
| | | ret = shm_sendandrecv(shm_socket, buf_temp, buf_size, SHM_BUS_KEY, buf, size, &ts, -1); |
| | | |
| | | } |
| | | |
| | | } |
| | | |
| | | free(buf_temp); |
| | | |
| | | return ret; |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 发送信息 |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::sendto(const void *buf, const int size, const int key) { |
| | | return shm_sendto(shm_socket, buf, size, key, NULL, 0); |
| | | } |
| | | // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) { |
| | | return shm_sendto(shm_socket, buf, size, key, timeout, 0); |
| | | } |
| | | // 发送信息立刻返回。 |
| | | int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){ |
| | | return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | int ShmModSocket::sendto(const void *buf, const int size, const int key, const struct timespec *timeout, int flag, int reset, int data_set) { |
| | | int rv = shm_sendto(shm_socket, buf, size, key, timeout, flag, reset, data_set); |
| | | if(rv == 0) { |
| | | return 0; |
| | | } |
| | | |
| | | logger->debug("ShmModSocket::sendto : %d sendto %d failed %s", get_key(), key, bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *key, struct timespec *timeout, int flags) { |
| | | |
| | | if(mod == BUS) { |
| | | logger->error("Can not use method recvfrom in a Bus"); |
| | | exit(1); |
| | | } |
| | | // printf("dgram_mod_recvfrom before\n"); |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags); |
| | | // printf("dgram_mod_recvfrom after\n"); |
| | | return rv; |
| | | } |
| | | /** |
| | | * 接收信息 |
| | | * @key 从谁哪里收到的信息 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::recvfrom(void **buf, int *size, int *key) { |
| | | |
| | | return _recvfrom_( buf, size, key, NULL, 0); |
| | | } |
| | | int ShmModSocket::recvfrom( void **buf, int *size, int *key, const struct timespec *timeout, int flag, int reset, int data_set) { |
| | | |
| | | int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flag, reset, data_set); |
| | | |
| | | // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) { |
| | | return _recvfrom_(buf, size, key, timeout, 0); |
| | | } |
| | | if(rv == 0) { |
| | | return 0; |
| | | } |
| | | |
| | | int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){ |
| | | return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | logger->debug("ShmModSocket::recvfrom: socket %d recvfrom failed %s", get_key(), bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 发送请求信息并等待接收应答 |
| | | * @key 发送给谁 |
| | | * @return 0 成功, 其他值 失败的错误码 |
| | | */ |
| | | int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); |
| | | } |
| | | int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | int ShmModSocket::sendandrecv(const void *send_buf, const int send_size, const int send_key, |
| | | void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){ |
| | | int rv = shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag); |
| | | |
| | | int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); |
| | | } |
| | | int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ |
| | | return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); |
| | | if(rv == 0) { |
| | | return 0; |
| | | } |
| | | |
| | | logger->debug("ShmModSocket::sendandrecv : sendandrecv to %d failed %s", send_key, bus_strerror(rv)); |
| | | return rv; |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | int ShmModSocket::recvandsend( recvandsend_callback_fn callback, |
| | | const struct timespec *timeout , int flag, void * user_data ) { |
| | | return shm_recvandsend(shm_socket, callback, timeout, flag, user_data); |
| | | } |
| | | |
| | | // // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | // int ShmModSocket::sendandrecv_unsafe(const void *send_buf, const int send_size, const int send_key, |
| | | // void **recv_buf, int *recv_size, const struct timespec *timeout, int flag){ |
| | | // return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, flag); |
| | | // } |
| | | |
| | | /** |
| | | * 订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::sub(char *topic, int size, int key){ |
| | | return _sub_( topic, size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){ |
| | | return _sub_(topic, size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::sub_nowait(char *topic, int size, int key) { |
| | | return _sub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 取消订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::desub(char *topic, int size, int key){ |
| | | return _desub_( topic, size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){ |
| | | return _desub_(topic, size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::desub_nowait(char *topic, int size, int key) { |
| | | return _desub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 发布主题 |
| | | * @topic 主题 |
| | | * @content 主题内容 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){ |
| | | return _pub_(topic, topic_size, content, content_size, key, NULL, 0); |
| | | } |
| | | // 超时返回。 @sec 秒 , @nsec 纳秒 |
| | | int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){ |
| | | return _pub_( topic, topic_size, content, content_size, key, timeout, 0); |
| | | } |
| | | int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){ |
| | | return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * 获取soket key |
| | | */ |
| | | int ShmModSocket::get_key(){ |
| | | return shm_socket->key; |
| | | } |
| | | |
| | | |
| | | |
| | | // ============================================================================= |
| | | /** |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_sub_(char *topic, int topic_size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | // char buf[8192]; |
| | | // int rv; |
| | | // snprintf(buf, 8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | // rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); |
| | | // if(rv == 0) { |
| | | // bus_set->insert(key); |
| | | // } |
| | | // return rv; |
| | | |
| | | int ShmModSocket::sub(const char *topic, int topic_size, int key, |
| | | const struct timespec *timeout, int flags) { |
| | | int ret; |
| | | bus_head_t head = {}; |
| | | memcpy(head.action, "sub", sizeof(head.action)); |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * 取消订阅指定主题 |
| | | * @topic 主题 |
| | | * @size 主题长度 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::_desub_(char *topic, int topic_size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | int ShmModSocket::desub(const char *topic, int topic_size, int key, const struct timespec *timeout, int flags) { |
| | | // char buf[8192]; |
| | | int ret; |
| | | if(topic == NULL) { |
| | |
| | | if(size > 0) { |
| | | ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); |
| | | free(buf); |
| | | return ret; |
| | | if(ret == 0) { |
| | | return 0; |
| | | } else { |
| | | logger->error("ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(ret)); |
| | | return ret; |
| | | } |
| | | } else { |
| | | return -1; |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * @key 总线端口 |
| | | * @str "<**pub**>{经济}" |
| | | */ |
| | | |
| | | int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, |
| | | struct timespec *timeout, int flags) { |
| | | // int head_len; |
| | | // char buf[8192+content_size]; |
| | | // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); |
| | | // head_len = strlen(buf); |
| | | // memcpy(buf+head_len, content, content_size); |
| | | |
| | | |
| | | /** |
| | | * 发布主题 |
| | | * @topic 主题 |
| | | * @content 主题内容 |
| | | * @key 总线端口 |
| | | */ |
| | | int ShmModSocket::pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout, int flags) { |
| | | int ret; |
| | | bus_head_t head = {}; |
| | | memcpy(head.action, "pub", sizeof(head.action)); |
| | | head.topic_size = topic_size = strlen(topic) + 1; |
| | | head.content_size = content_size; |
| | | |
| | | void *buf; |
| | | |
| | | void *buf; |
| | | int size = get_bus_sendbuf(head, topic, topic_size, content, content_size, &buf); |
| | | if(size > 0) { |
| | | ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); |
| | |
| | | return -1; |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 获取soket key |
| | | */ |
| | | int ShmModSocket::get_key(){ |
| | | return shm_socket->key; |
| | | } |
| | | |
| | | |
| | | |
| | | // ============================================================================= |
| | | |
| | | int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, |
| | | void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) { |
| | | const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf) { |
| | | |
| | | int buf_size; |
| | | char *buf; |
| | | int max_buf_size; |
| | | if((buf = (char *)malloc(MAXBUF)) == NULL) { |
| | | void *buf_ptr; |
| | | int count = 0; |
| | | if((buf = (char *) malloc(MAXBUF)) == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc"); |
| | | exit(1); |
| | | } else { |
| | | max_buf_size = MAXBUF; |
| | | } |
| | | |
| | | buf_size = BUS_HEAD_SIZE + content_size + topic_size ; |
| | | buf_size = BUS_HEAD_SIZE + content_size + topic_size; |
| | | if(max_buf_size < buf_size) { |
| | | |
| | | if((buf = (char *)realloc(buf, buf_size)) == NULL) { |
| | | if((buf = (char *) realloc(buf, buf_size)) == NULL) { |
| | | LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf realloc buf"); |
| | | exit(1); |
| | | } else { |
| | |
| | | } |
| | | } |
| | | |
| | | memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE); |
| | | buf_ptr = ShmModSocket::encode_bus_head(request_head); |
| | | memcpy(buf, buf_ptr, BUS_HEAD_SIZE); |
| | | if(topic_size != 0 ) |
| | | memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size); |
| | | if(content_size != 0) |
| | | memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); |
| | | if ((content_size != 0) && (strncmp(request_head.action, "reg", strlen("reg")) != 0) && \ |
| | | (strncmp(request_head.action, "unreg", strlen("unreg")) != 0)) { |
| | | memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); |
| | | } else { |
| | | if (((strncmp(request_head.action, "reg", strlen("reg")) == 0) || (strncmp(request_head.action, "unreg", \ |
| | | strlen("unreg")) == 0)) && (content_buf != NULL)) { |
| | | proc_copy(buf + BUS_HEAD_SIZE + topic_size, const_cast<void *> (content_buf), &count); |
| | | |
| | | request_head.content_size = count; |
| | | buf_size -= (content_size - count); |
| | | |
| | | } |
| | | } |
| | | |
| | | *retbuf = buf; |
| | | free(buf_ptr); |
| | | return buf_size; |
| | | } |
| | | |
| | |
| | | tmp_ptr += sizeof(head.action); |
| | | PUT(tmp_ptr, htonl(head.topic_size)); |
| | | |
| | | tmp_ptr += 4; |
| | | tmp_ptr += sizeof(head.topic_size); |
| | | PUT(tmp_ptr, htonl(head.content_size)); |
| | | |
| | | return headbs; |
| | |
| | | tmp_ptr += sizeof(head.action); |
| | | head.topic_size = ntohl(GET(tmp_ptr)); |
| | | |
| | | tmp_ptr += 4; |
| | | tmp_ptr += sizeof(head.topic_size); |
| | | head.content_size = ntohl(GET(tmp_ptr)); |
| | | |
| | | return head; |