#ifndef __DMODE_SOCKET_H__ #define __DMODE_SOCKET_H__ #include "usg_common.h" #include "shm_socket.h" #include "shm_allocator.h" #include "mem_pool.h" #include "hashtable.h" #include "sem_util.h" #include "logger_factory.h" #include "key_def.h" #include #include "socket_def.h" #define BUS_HEAD_SIZE (64 + 2 * sizeof(uint32_t)) class BusServerSocket; struct bus_head_t { char action[64]; uint32_t topic_size; uint32_t content_size; }; class ShmModSocket { friend class BusServerSocket; private: shm_socket_t *shm_socket; socket_mod_t mod; std::set *bus_set; private: inline int _recvfrom_(void **buf, int *size, int *key, struct timespec *timeout, int flags); int _sub_( char *topic, int size, int key, struct timespec *timeout, int flags); int _pub_( char *topic, int topic_size, void *content, int content_size, int key, struct timespec *timeout, int flags); int _desub_( char *topic, int size, int key, struct timespec *timeout, int flags); static int get_bus_sendbuf(bus_head_t &request_head, void *topic_buf, int topic_size, void *content_buf, int content_size, void **retbuf); public: static size_t remove_keys(int keys[], size_t length); // bus header 编码为网络传输的字节 static void * encode_bus_head(bus_head_t & bushead); // 解码 bus header static bus_head_t decode_bus_head(void *headbs); public: ShmModSocket(); ~ShmModSocket(); /** * 绑定端口到socket, 如果不绑定则系统自动分配一个 * @return 0 成功, 其他值 失败的错误码 */ int bind(int key); /** * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key * @return 0 成功, 其他值 失败的错误码 */ int force_bind(int key); /** * 发送信息 * @key 发送给谁 * @return 0 成功, 其他值 失败的错误码 */ int sendto(const void *buf, const int size, const int key); // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 int sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout); // 发送信息立刻返回。 int sendto_nowait(const void *buf, const int size, const int key); /** * 接收信息 * @key 从谁哪里收到的信息 * @return 0 成功, 其他值 失败的错误码 */ int recvfrom(void **buf, int *size, int *key); // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 int recvfrom_timeout(void **buf, int *size, int *key, struct timespec *timeout); int recvfrom_nowait(void **buf, int *size, int *key); /** * 发送请求信息并等待接收应答 * @key 发送给谁 * @return 0 成功, 其他值 失败的错误码 */ int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ; // 超时返回。 @sec 秒 , @nsec 纳秒 int sendandrecv_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, struct timespec *timeout) ; int sendandrecv_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ; int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ; // 超时返回。 @sec 秒 , @nsec 纳秒 int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size, struct timespec *timeout) ; int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ; /** * 订阅指定主题 * @topic 主题 * @size 主题长度 * @key 总线端口 */ int sub(char *topic, int size, int key); // 超时返回。 @sec 秒 , @nsec 纳秒 int sub_timeout(char *topic, int size, int key, struct timespec *timeout); int sub_nowait(char *topic, int size, int key); /** * 取消订阅指定主题 * @topic 主题,主题为空时取消全部订阅 * @size 主题长度 * @key 总线端口 */ int desub( char *topic, int size, int key); // 超时返回。 @sec 秒 , @nsec 纳秒 int desub_timeout(char *topic, int size, int key, struct timespec *timeout); int desub_nowait(char *topic, int size, int key) ; /** * 发布主题 * @topic 主题 * @content 主题内容 * @key 总线端口 */ int pub(char *topic, int topic_size, void *content, int content_size, int key); // 超时返回。 @sec 秒 , @nsec 纳秒 int pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec *timeout); int pub_nowait(char *topic, int topic_size, void *content, int content_size, int key); /** * 获取soket key */ int get_key() ; }; #endif