#ifndef __DMODE_SOCKET_H__ #define __DMODE_SOCKET_H__ #include "usg_common.h" #include "shm_socket.h" #include "shm_allocator.h" #include "mem_pool.h" #include "hashtable.h" #include "sem_util.h" #include "logger_factory.h" #include #define ACTION_LIDENTIFIER "<**" #define ACTION_RIDENTIFIER "**>" #define TOPIC_LIDENTIFIER "{" #define TOPIC_RIDENTIFIER "}" static Logger logger = LoggerFactory::getLogger(); #define BUS_MAP_KEY 1 //typedef std::basic_string, SHM_STL_Allocator > SHMString; typedef std::set, SHM_STL_Allocator > SHMKeySet; typedef std::map, SHM_STL_Allocator > > SHMTopicSubMap; enum socket_mod_t { PULL_PUSH = 1, REQ_REP = 2, PAIR = 3, PUB_SUB = 4, SURVEY = 5, BUS = 6 }; class DModSocket { private: shm_socket_t *shm_socket; socket_mod_t mod; // pthread_t recv_thread; // <主题, 订阅者> SHMTopicSubMap *topic_sub_map; std::set *bus_set; private: inline int _recvfrom_(void **buf, int *size, int *port, struct timespec *timeout, int flags); void _proxy_sub( char *topic, int port); void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port); void *run_pubsub_proxy(); int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ); int _sub_( char *topic, int size, int port, struct timespec *timeout, int flags); int _pub_( char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags); void _proxy_desub( char *topic, int port); void _proxy_desub_all(int port); int _desub_( char *topic, int size, int port, struct timespec *timeout, int flags); static void foreach_subscripters(std::function cb); static bool include_in_keys(int key, int keys[], size_t length); static size_t remove_subscripters(int keys[], size_t length) ; public: static size_t remove_keys(int keys[], size_t length); public: DModSocket(); ~DModSocket(); /** * 绑定端口到socket, 如果不绑定则系统自动分配一个 * @return 0 成功, 其他值 失败的错误码 */ int bind(int port); /** * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key * @return 0 成功, 其他值 失败的错误码 */ int force_bind(int port); /** * 发送信息 * @port 发送给谁 * @return 0 成功, 其他值 失败的错误码 */ int sendto(const void *buf, const int size, const int port); // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 int sendto_timeout(const void *buf, const int size, const int port, const struct timespec *timeout); // 发送信息立刻返回。 int sendto_nowait(const void *buf, const int size, const int port); /** * 接收信息 * @port 从谁哪里收到的信息 * @return 0 成功, 其他值 失败的错误码 */ int recvfrom(void **buf, int *size, int *port); // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 int recvfrom_timeout(void **buf, int *size, int *port, struct timespec *timeout); int recvfrom_nowait(void **buf, int *size, int *port); /** * 发送请求信息并等待接收应答 * @port 发送给谁 * @return 0 成功, 其他值 失败的错误码 */ int sendandrecv(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; // 超时返回。 @sec 秒 , @nsec 纳秒 int sendandrecv_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, struct timespec *timeout) ; int sendandrecv_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ; /** * 启动bus * * @return 0 成功, 其他值 失败的错误码 */ int start_bus(); /** * 订阅指定主题 * @topic 主题 * @size 主题长度 * @port 总线端口 */ int sub(char *topic, int size, int port); // 超时返回。 @sec 秒 , @nsec 纳秒 int sub_timeout(char *topic, int size, int port, struct timespec *timeout); int sub_nowait(char *topic, int size, int port); /** * 取消订阅指定主题 * @topic 主题,主题为空时取消全部订阅 * @size 主题长度 * @port 总线端口 */ int desub( char *topic, int size, int port); // 超时返回。 @sec 秒 , @nsec 纳秒 int desub_timeout(char *topic, int size, int port, struct timespec *timeout); int desub_nowait(char *topic, int size, int port) ; /** * 发布主题 * @topic 主题 * @content 主题内容 * @port 总线端口 */ int pub(char *topic, int topic_size, void *content, int content_size, int port); // 超时返回。 @sec 秒 , @nsec 纳秒 int pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout); int pub_nowait(char *topic, int topic_size, void *content, int content_size, int port); /** * 获取soket端口号 */ int get_port() ; }; #endif