wangzhengquan
2020-10-12 575f2e424a17737111786227103a428fb5c20396
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#include "net_mod_server_socket.h"
 
#include "socket_io.h"
#include "net_mod_socket_io.h"
#include "net_mod_socket.h"
 
NetModServerSocket::NetModServerSocket(int port)
{
  char portstr[32];
 
  //shmModSocket = new ShmModSocket;
  sprintf(portstr, "%d", port);
  listenfd = Open_listenfd(portstr);
  init_pool(listenfd);
}
 
 
NetModServerSocket::~NetModServerSocket() {
   Close(listenfd);
}
 
void NetModServerSocket::start() {
    int connfd;
    socklen_t clientlen;
  struct sockaddr_storage clientaddr;
    while (1)
  {
    /* Wait for listening/connected descriptor(s) to become ready */
    pool.ready_set = pool.read_set;
    pool.nready = select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL);
 
    /* If listening descriptor ready, add new client to pool */
    if (FD_ISSET(listenfd, &pool.ready_set))
    {
      clientlen = sizeof(struct sockaddr_storage);
      connfd = accept(listenfd, (SA *)&clientaddr, &clientlen); 
      add_client(connfd); 
    }
 
    /* Echo a text line from each ready connected descriptor */
    check_clients();
  }
}
 
void  NetModServerSocket::init_pool(int listenfd)
{
  /* Initially, there are no connected descriptors */
  int i;
  pool.maxi = -1;                   //line:conc:echoservers:beginempty
  for (i = 0; i < FD_SETSIZE; i++)
    pool.clientfd[i] = -1;        //line:conc:echoservers:endempty
 
  /* Initially, listenfd is only member of select read set */
  pool.maxfd = listenfd;            //line:conc:echoservers:begininit
  FD_ZERO(&pool.read_set);
  FD_SET(listenfd, &pool.read_set); //line:conc:echoservers:endinit
}
 
/* $begin add_client */
void  NetModServerSocket::add_client(int connfd)
{
  int i;
  pool.nready--;
  for (i = 0; i < FD_SETSIZE; i++)  /* Find an available slot */
    if (pool.clientfd[i] < 0)
    {
      /* Add connected descriptor to the pool */
      pool.clientfd[i] = connfd;                 //line:conc:echoservers:beginaddclient
      Rio_readinitb(&pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
 
      /* Add the descriptor to descriptor set */
      FD_SET(connfd, &pool.read_set); //line:conc:echoservers:addconnfd
 
      /* Update max descriptor and pool highwater mark */
      if (connfd > pool.maxfd) //line:conc:echoservers:beginmaxfd
        pool.maxfd = connfd; //line:conc:echoservers:endmaxfd
      if (i > pool.maxi)       //line:conc:echoservers:beginmaxi
        pool.maxi = i;       //line:conc:echoservers:endmaxi
      break;
    }
  if (i == FD_SETSIZE) /* Couldn't find an empty slot */
    err_msg(errno, "add_client error: Too many clients");
}
/* $end add_client */
 
 
int NetModServerSocket::process_client(rio_t *rio, int connfd) {
  int n;
  net_mod_request_head_t request_head;
  net_mod_response_head_t response_head;
  void  *recv_buf;
  static void *buf;
  int recv_size;
 
  static size_t max_buf = 1024;
  if(buf == NULL) {
    buf = malloc(max_buf);
    if(buf == NULL) {
      err_exit(errno, "process_client malloc");
    }
  }
  
 
  if ((n = rio_readnb(rio, &request_head, sizeof(net_mod_request_head_t))) !=  sizeof(net_mod_request_head_t))
  {
    return -1;
  }
 
  if(request_head.content_length > max_buf) {
    buf = realloc(buf, request_head.content_length);
    max_buf = request_head.content_length;
    if(buf == NULL) {
      err_exit(errno, "process_client realloc");
    }
  }  
 
  if ((n = rio_readnb(rio, buf, request_head.content_length)) != request_head.content_length ) {
    return -1;
  }
 
  if(request_head.mod == REQ_REP) {
    shmModSocket.sendandrecv(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size);
    response_head.content_length = recv_size;
    Rio_writen(connfd, &response_head, sizeof(response_head));
    Rio_writen(connfd, recv_buf, recv_size);
  }
 
  return 0;
  
}
 
/* $begin check_clients */
void  NetModServerSocket::check_clients()
{
  int i, connfd;
  rio_t *rio;
  
 
  for (i = 0; (i <= pool.maxi) && (pool.nready > 0); i++)
  {
    connfd = pool.clientfd[i];
    rio = &pool.clientrio[i];
 
    /* If the descriptor is ready, echo a text line from it */
    if ((connfd > 0) && (FD_ISSET(connfd, &pool.ready_set)))
    {
      pool.nready--;
      if(process_client(rio, connfd) != 0) {
        Close(connfd); //line:conc:echoservers:closeconnfd
        FD_CLR(connfd, &pool.read_set); //line:conc:echoservers:beginremove
        pool.clientfd[i] = -1;          //line:conc:echoservers:endremove
      }
 
    }
  }
}