#include "shm_mod_socket.h" #include "bus_server_socket.h" static Logger *logger = LoggerFactory::getLogger(); size_t ShmModSocket::remove_keys(int keys[], size_t length) { BusServerSocket::remove_subscripters(keys, length); return shm_socket_remove_keys(keys, length); } ShmModSocket::ShmModSocket() { shm_socket = shm_open_socket(SHM_SOCKET_DGRAM); bus_set = new std::set; } ShmModSocket::~ShmModSocket() { // logger->debug("Close ShmModSocket...\n"); struct timespec timeout = {1, 0}; if(bus_set != NULL) { for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) { desub_timeout(NULL, 0, *bus_iter, &timeout); } delete bus_set; } shm_close_socket(shm_socket); } int ShmModSocket::bind(int key) { return shm_socket_bind(shm_socket, key); } /** * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key * @return 0 成功, 其他值 失败的错误码 */ int ShmModSocket::force_bind(int key) { return shm_socket_force_bind(shm_socket, key); } /** * 发送信息 * @key 发送给谁 * @return 0 成功, 其他值 失败的错误码 */ int ShmModSocket::sendto(const void *buf, const int size, const int key) { return shm_sendto(shm_socket, buf, size, key, NULL, 0); } // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) { return shm_sendto(shm_socket, buf, size, key, timeout, 0); } // 发送信息立刻返回。 int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){ return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT); } /** * 接收信息 * @key 从谁哪里收到的信息 * @return 0 成功, 其他值 失败的错误码 */ int ShmModSocket::recvfrom(void **buf, int *size, int *key) { int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, 0); // logger->error(rv, "ShmModSocket::recvfrom failed!"); return rv; } // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) { int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, 0); return rv; } int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){ int rv = shm_recvfrom(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT); // logger->error(rv, "ShmModSocket::recvfrom_nowait failed!"); return rv; } /** * 发送请求信息并等待接收应答 * @key 发送给谁 * @return 0 成功, 其他值 失败的错误码 */ int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); } // 超时返回。 @sec 秒 , @nsec 纳秒 int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); } int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); } int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0); } // 超时返回。 @sec 秒 , @nsec 纳秒 int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){ return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0); } int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){ return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT); } /** * 订阅指定主题 * @topic 主题 * @size 主题长度 * @key 总线端口 */ int ShmModSocket::sub(char *topic, int size, int key){ return _sub_( topic, size, key, NULL, 0); } // 超时返回。 @sec 秒 , @nsec 纳秒 int ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){ return _sub_(topic, size, key, timeout, 0); } int ShmModSocket::sub_nowait(char *topic, int size, int key) { return _sub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); } /** * 取消订阅指定主题 * @topic 主题 * @size 主题长度 * @key 总线端口 */ int ShmModSocket::desub(char *topic, int size, int key){ return _desub_( topic, size, key, NULL, 0); } // 超时返回。 @sec 秒 , @nsec 纳秒 int ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){ return _desub_(topic, size, key, timeout, 0); } int ShmModSocket::desub_nowait(char *topic, int size, int key) { return _desub_(topic, size, key, NULL, (int)SHM_MSG_NOWAIT); } /** * 发布主题 * @topic 主题 * @content 主题内容 * @key 总线端口 */ int ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){ return _pub_(topic, topic_size, content, content_size, key, NULL, 0); } // 超时返回。 @sec 秒 , @nsec 纳秒 int ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){ return _pub_( topic, topic_size, content, content_size, key, timeout, 0); } int ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){ return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT); } /** * 获取soket key */ int ShmModSocket::get_key(){ return shm_socket->key; } // ============================================================================= /** * @key 总线端口 */ int ShmModSocket::_sub_(char *topic, int topic_size, int key, struct timespec *timeout, int flags) { int ret; bus_head_t head = {}; memcpy(head.action, "sub", sizeof(head.action)); head.topic_size = topic_size = strlen(topic) + 1; head.content_size = 0; void *buf; int size = get_bus_sendbuf(head, topic, topic_size, NULL, 0, &buf); if(size > 0) { ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); free(buf); if(ret == 0) { bus_set->insert(key); } return ret; } else { return -1; } } /** * @key 总线端口 */ int ShmModSocket::_desub_(char *topic, int topic_size, int key, struct timespec *timeout, int flags) { // char buf[8192]; int ret; if(topic == NULL) { topic = ""; } // snprintf(buf, 8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); // return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags); bus_head_t head = {}; memcpy(head.action, "desub", sizeof(head.action)); head.topic_size = topic_size = strlen(topic) + 1; head.content_size = 0; void *buf; int size = get_bus_sendbuf(head, topic, topic_size, NULL, 0, &buf); if(size > 0) { ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); free(buf); if(ret == EBUS_TIMEOUT) { logger->error(ret, "ShmModSocket::_desub_ key %d failed, %s", key, bus_strerror(EBUS_TIMEOUT)); return EBUS_TIMEOUT; } else { logger->error(ret, "ShmModSocket::_desub_ key %d failed!", key); return ret; } } else { return -1; } } /** * @key 总线端口 * @str "<**pub**>{经济}" */ int ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key, struct timespec *timeout, int flags) { // int head_len; // char buf[8192+content_size]; // snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER); // head_len = strlen(buf); // memcpy(buf+head_len, content, content_size); int ret; bus_head_t head = {}; memcpy(head.action, "pub", sizeof(head.action)); head.topic_size = topic_size = strlen(topic) + 1; head.content_size = content_size; void *buf; int size = get_bus_sendbuf(head, topic, topic_size, content, content_size, &buf); if(size > 0) { ret = shm_sendto(shm_socket, buf, size, key, timeout, flags); free(buf); return ret; } else { return -1; } } int ShmModSocket::get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf) { int buf_size; char *buf; int max_buf_size; if((buf = (char *)malloc(MAXBUF)) == NULL) { LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf malloc"); exit(1); } else { max_buf_size = MAXBUF; } buf_size = BUS_HEAD_SIZE + content_size + topic_size ; if(max_buf_size < buf_size) { if((buf = (char *)realloc(buf, buf_size)) == NULL) { LoggerFactory::getLogger()->error(errno, "ShmModSocket::get_bus_sendbuf realloc buf"); exit(1); } else { max_buf_size = buf_size; } } memcpy(buf, ShmModSocket::encode_bus_head(request_head), BUS_HEAD_SIZE); if(topic_size != 0 ) memcpy(buf + BUS_HEAD_SIZE, topic_buf, topic_size); if(content_size != 0) memcpy(buf + BUS_HEAD_SIZE + topic_size, content_buf, content_size); *retbuf = buf; return buf_size; } /** char action[]; uint32_t topic_size; uint32_t content_size; */ void * ShmModSocket::encode_bus_head(bus_head_t & head) { void * headbs = malloc(BUS_HEAD_SIZE); char *tmp_ptr = (char *)headbs; memcpy(tmp_ptr, head.action, sizeof(head.action)); tmp_ptr += sizeof(head.action); PUT(tmp_ptr, htonl(head.topic_size)); tmp_ptr += 4; PUT(tmp_ptr, htonl(head.content_size)); return headbs; } bus_head_t ShmModSocket::decode_bus_head(void *headbs) { char *tmp_ptr = (char *)headbs; bus_head_t head; memcpy(head.action, tmp_ptr, sizeof(head.action)); tmp_ptr += sizeof(head.action); head.topic_size = ntohl(GET(tmp_ptr)); tmp_ptr += 4; head.content_size = ntohl(GET(tmp_ptr)); return head; }