wangzhengquan
2020-10-20 95349b79a5a646736c706fe19645181146ee9486
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#ifndef __NET_MODE_SOCKET_H__
#define __NET_MODE_SOCKET_H__
#include "usg_common.h"
#include "shm_mod_socket.h"
#include "socket_io.h"
#include <poll.h>
 
#define OPEN_MAX 1024
#define GET(p)       (*(uint32_t *)(p))
#define PUT(p, val)  (*(uint32_t *)(p) = (val))
 
 
 
 
 
class NetModServerSocket;
 
struct net_node_t
{
    const char *host;
    int port;
    int key;
};
 
#define NET_MODE_REQUEST_HEAD_LENGTH (NI_MAXHOST + 5 * sizeof(uint32_t))
 
struct net_mod_request_head_t {
    uint32_t mod;
  char host[NI_MAXHOST];
  uint32_t port;
    uint32_t key;
    uint32_t content_length;
    uint32_t topic_length;
};
 
#define NET_MODE_RESPONSE_HEAD_LENGTH (NI_MAXHOST + 4 * sizeof(uint32_t))
 
struct net_mod_response_head_t {
    // socket_mod_t mod;
  char host[NI_MAXHOST];
  uint32_t port;
    uint32_t key;
  uint32_t code;
    uint32_t content_length;
};
 
 
struct net_mod_recv_msg_t
{
  char host[NI_MAXHOST];
  int port;
  int key;
  void *content;
  int content_length;
  
};
 
class NetModSocket {
  struct pool{ /* Represents a pool of connected descriptors */ //line:conc:echoservers:beginpool
 
    int nready;       /* Number of ready descriptors from select */
    int maxi;         /* Highwater index into client array */
    struct pollfd conns[OPEN_MAX];
    // net_node_t *nodes[FD_SETSIZE];
   // rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
    // std::map<int, net_node_t*> connfdNodeMap;
    std::map<std::string, int> connectionMap;
  } ; 
 
  friend class NetModServerSocket;
private:
   
  ShmModSocket shmModSocket;
  pool req_resp_pool;
 
 
 
  static void * encode_request_head(net_mod_request_head_t & request);
  static net_mod_request_head_t  decode_request_head(void *headbs);
 
  static void * encode_response_head(net_mod_response_head_t & response);
  static net_mod_response_head_t  decode_response_head(void *_headbs);
 
  void init_req_rep_req_resp_pool();
  int connect( net_node_t*);
  void close_connect(int connfd);
  int read_response(int clientfd, net_mod_recv_msg_t *recv_msg);
  int write_request(int clientfd, int key, void *send_buf, int send_size);
 
 
public:
    
  NetModSocket();
  ~NetModSocket();
 
  /**
   * 向node_arr 中的所有网络节点发送请求消息,节点的返回信息汇总并存储在recv_arr中
   * @node_arr 网络节点组, @node_arr_len该数组长度
   * @send_buf 发送的消息,@send_size 该消息体的长度
   * @recv_arr 返回的应答消息组,@recv_arr_size 该数组长度
   * @return 成功发送的节点的个数
   * 优点:无阻塞,性能好
   * 缺点:不是线程安全的
   */
  int sendandrecv(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
      net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
 
  /**
   * 功能同sendandrecv
   * 优点:线程安全
   * 缺点:阻塞的,性能不如sendandrecv
   * 
   */
  int sendandrecv_safe(net_node_t *node_arr, int node_arr_len, void *send_buf, int send_size, 
    net_mod_recv_msg_t ** recv_arr, int *recv_arr_size);
 
  /**
   * 销毁sendandrecv方法返回的消息组 
   * @arr 消息组
   * @size 消息组的长度
   */
  static void  free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size);
 
   /**
   * 向node_arr 中的所有网络节点发布消息
   * @node_arr 网络节点组, @node_arr_len该数组长度
   * @topic 主题,@topic_size 该主题的长度
   * @content 内容,@content_size 内容长度
   * @return 成功发布的节点的个数
   */
  int pub(net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size);
 
 
};
 
#endif