#ifndef __DMODE_SOCKET_H__ #define __DMODE_SOCKET_H__ #include "usg_common.h" #include "shm_socket.h" #include "shm_allocator.h" #include "shm_mm.h" #include "hashtable.h" #include "sem_util.h" #include "logger_factory.h" #include "key_def.h" #include #include "socket_def.h" #define BUS_HEAD_SIZE sizeof(bus_head_t) class BusServerSocket; struct bus_head_t { char action[64]; uint32_t topic_size; uint32_t content_size; }; class ShmModSocket { friend class BusServerSocket; private: shm_socket_t *shm_socket; std::set *bus_set; private: static int get_bus_sendbuf(bus_head_t &request_head, const void *topic_buf, int topic_size, const void *content_buf, int content_size, void **retbuf); public: // static size_t remove_keys(int keys[], size_t length); // static size_t remove_keys_exclude(int keys[], size_t length); // bus header 编码为网络传输的字节 static void * encode_bus_head(bus_head_t & bushead); // 解码 bus header static bus_head_t decode_bus_head(void *headbs); public: ShmModSocket(); ~ShmModSocket(); int stop(); /** * 绑定端口到socket, 如果不绑定则系统自动分配一个 * @return 0 成功, 其他值 失败的错误码 */ int bind(int key); /** * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key * @return 0 成功, 其他值 失败的错误码 */ int force_bind(int key); /** * 发送信息 * @key 发送给谁 * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG * @return 0 成功, 其他值 失败的错误码 */ int sendto(const void *buf, const int size, const int key, const struct timespec *timeout = NULL, int flag = 0); /** * 接收信息 * @key 从谁哪里收到的信息 * @return 0 成功, 其他值 失败的错误码 */ int recvfrom(void **buf, int *size, int *key, const struct timespec *timeout = NULL, int flag = 0); /** * 发送请求信息并等待接收应答 * @key 发送给谁 * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG * @return 0 成功, 其他值 失败的错误码 */ int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, const struct timespec *timeout = NULL, int flag = 0); /** * */ int recvandsend( recvandsend_callback_fn callback, const struct timespec *timeout = NULL , int flag = 0, void * user_data = NULL); /** * 订阅指定主题 * @topic 主题 * @size 主题长度 * @key 总线端口 * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG */ int sub(const char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); /** * 取消订阅指定主题 * @topic 主题,主题为空时取消全部订阅 * @size 主题长度 * @key 总线端口 * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG */ int desub(const char *topic, int size, int key, const struct timespec *timeout = NULL, int flag = 0); /** * 发布主题 * @topic 主题 * @content 主题内容 * @key 总线端口 * @flag BUS_TIMEOUT_FLAG BUS_NOWAIT_FLAG */ int pub(const char *topic, int topic_size, const void *content, int content_size, int key, const struct timespec *timeout = NULL, int flag = 0); /** * 获取soket key */ int get_key() ; }; #endif