#ifndef __NET_MODE_SOCKET_H__ #define __NET_MODE_SOCKET_H__ #include "usg_common.h" #include "shm_mod_socket.h" #include "socket_io.h" #include "proc_def.h" #include #include "socket_def.h" #include "net_conn_pool.h" class NetModServerSocket; struct net_node_t { const char *host; int port; int key; }; #define NET_MODE_REQUEST_HEAD_LENGTH sizeof(net_mod_request_head_t) // 请求头 struct net_mod_request_head_t { uint32_t mod; char host[NI_MAXHOST]; uint32_t port; uint32_t key; uint32_t content_length; // 请求内容 uint32_t topic_length; // 请求主题 int32_t timeout; // -1 block, 0 nowait , > 0 timeout }; #define NET_MODE_RESPONSE_HEAD_LENGTH (NI_MAXHOST + 4 * sizeof(uint32_t)) // 应答头 struct net_mod_response_head_t { // socket_mod_t mod; char host[NI_MAXHOST]; uint32_t port; uint32_t key; uint32_t code; //返回值, 0 为成功 uint32_t content_length; }; struct net_mod_recv_msg_t { char host[NI_MAXHOST]; int port; int key; void *content; int content_length; }; struct net_mod_err_t { char host[NI_MAXHOST]; int port; int key; int code; }; class NetModSocket { friend class NetModServerSocket; private: ShmModSocket shmModSocket; int int_val; int svr_val; // pthread_mutex_t sendMutex; // request header 编码为网络传输的字节 static void * encode_request_head(net_mod_request_head_t & request); // 解码request header static net_mod_request_head_t decode_request_head(void *headbs); static void * encode_response_head(net_mod_response_head_t & response); static net_mod_response_head_t decode_response_head(void *_headbs); // 销毁threadlocal pool static void _destroyConnPool_(void *_pool); // 创建thread local key static void _createConnPoolKey_(void); NetConnPool* _get_threadlocal_pool(); NetConnPool* _get_pool(); //读取返回信息 int read_response(int clientfd, net_mod_recv_msg_t *recv_msg, net_mod_err_t *err_arr); // 发送请求信息 int write_request(int clientfd, net_mod_request_head_t &request_head, const void *send_buf, int send_size, const void *topic_buf, int topic_size); int _pub_(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int timeout) ; public: NetModSocket(); ~NetModSocket(); int stop(); /** * 绑定端口到socket, 如果不绑定则系统自动分配一个 * @return 0 成功, 其他值 失败的错误码 */ int bind( int key); /** * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key * @return 0 成功, 其他值 失败的错误码 */ int force_bind( int key); int bind_proc_id(char *buf, int len); int reg(void *pData, int len, void **buf, int *size, const int timeout_ms, int flag); /** * 如果建立连接的节点没有接受到消息等待timeout的时间后返回 * 向node_arr 中的所有网络节点发送请求消息,节点的返回信息汇总并存储在recv_arr中 * @node_arr 网络节点组, @node_arr_len该数组长度.如果IP为空则为本地发送。 * @send_buf 发送的消息,@send_size 该消息体的长度 * @recv_arr 返回的应答消息组,@recv_arr_size 该数组长度 * @timeout 等待时间,单位是千分之一秒 * @return 成功发送的节点的个数 * 优点:1某个节点的故障不会阻塞其他节点。2 性能好。 3 采用thread local技术即保证了线程安全,又可以使用连接池缓存连接 */ int sendandrecv(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, net_mod_recv_msg_t ** recv_arr, int *recv_arr_size, net_mod_err_t ** _err_arr, int *_err_arr_size, int timeout); void int_set(int data); void svr_set(int data); int int_get(void); int svr_get(void); /** * 功能同sendandrecv * 优点:线程安全 * 缺点:阻塞的,性能不如sendandrecv * */ // int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, // net_mod_recv_msg_t ** recv_arr, int *recv_arr_size); /** * 发送信息 * @key 发送给谁 * @return 0 成功, 其他值 失败的错误码 */ int sendto( const void *buf, const int size, const int key, int reset = 0, int data_set = 0); // 发送信息超时返回。 @sec 秒 , @nsec 纳秒 int sendto_timeout( const void *buf, const int size, const int key, int sec, int nsec, int reset = 0, int data_set = 0); // 发送信息立刻返回。 int sendto_nowait( const void *buf, const int size, const int key, int reset = 0, int data_set = 0); /** * 接收信息 * @key 从谁哪里收到的信息 * @return 0 成功, 其他值 失败的错误码 */ int recvfrom( void **buf, int *size, int *key, int reset = 0, int data_set = 0); // 接受信息超时返回。 @sec 秒 , @nsec 纳秒 int recvfrom_timeout( void **buf, int *size, int *key, int sec, int nsec, int reset = 0, int data_set = 0); int recvfrom_nowait( void **buf, int *size, int *key, int reset = 0, int data_set = 0); /** * 本地发送请求信息并等待接收应答 * @key 发送给谁 * @return 0 成功, 其他值 失败的错误码 */ int sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size) ; // 超时返回。 @sec 秒 , @nsec 纳秒 int sendandrecv_timeout( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, int sec, int nsec) ; int sendandrecv_nowait( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size) ; /** * recvandsend */ int recvandsend( recvandsend_callback_fn callback, const struct timespec *timeout = NULL , int flag = 0, void * user_data = NULL ); /** * 向node_arr 中的所有网络节点发布消息 * @node_arr 网络节点组, @node_arr_len该数组长度 * @topic 主题,@topic_size 该主题的长度 * @content 内容,@content_size 内容长度 * @return 成功发布的节点的个数 */ int pub(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size) ; int pub_nowait(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size); /** * @msec 毫秒 (千分之一秒) */ int pub_timeout(net_node_t *node_arr, int arrlen, const char *topic, int topic_size, const void *content, int content_size, int msec); /** * 订阅指定主题 * @topic 主题 * @size 主题长度 * @key 总线端口 */ int sub( void *topic, int size, int key); // 超时返回。 @sec 秒 , @nsec 纳秒 int sub_timeout( void *topic, int size, int key, int sec, int nsec); int sub_nowait( void *topic, int size, int key); /** * 取消订阅指定主题 * @topic 主题,主题为空时取消全部订阅 * @size 主题长度 * @key 总线端口 */ int desub( void *topic, int size, int key); // 超时返回。 @sec 秒 , @nsec 纳秒 int desub_timeout( void *topic, int size, int key, int sec, int nsec); int desub_nowait( void *topic, int size, int key); /** * 发布主题 * @topic 主题 * @content 主题内容 * @key 总线端口 */ int pub( char *topic, int topic_size, void *content, int content_size, int key); // 超时返回。 @sec 秒 , @nsec 纳秒 int pub_timeout( char *topic, int topic_size, void *content, int content_size, int key, int sec, int nsec); int pub_nowait( char *topic, int topic_size, void *content, int content_size, int key); /** * 获取soket key */ int get_key() ; /** * 销毁sendandrecv方法返回的消息组 * @arr 消息组 * @size 消息组的长度 */ static void free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size); }; #endif