From 0da7d09982d61ea6864cec53e2341b51b30cf208 Mon Sep 17 00:00:00 2001
From: wangzhengquan <wangzhengquan85@126.com>
Date: 星期二, 13 十月 2020 18:19:22 +0800
Subject: [PATCH] update

---
 src/socket/net_mod_server_socket.c |  111 +++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 86 insertions(+), 25 deletions(-)

diff --git a/src/socket/net_mod_server_socket.c b/src/socket/net_mod_server_socket.c
index 97e4fea..73a2a5a 100644
--- a/src/socket/net_mod_server_socket.c
+++ b/src/socket/net_mod_server_socket.c
@@ -2,18 +2,33 @@
 
 #include "socket_io.h"
 #include "net_mod_socket_io.h"
+#include "net_mod_socket.h"
  
-NetModServerSocket::NetModServerSocket(int port, ShmModSocket * modsocket): shm_mod_socket(modsocket) 
+NetModServerSocket::NetModServerSocket(int port):max_buf(1024), max_topic_buf(256)
 {
   char portstr[32];
+
+  //shmModSocket = new ShmModSocket;
   sprintf(portstr, "%d", port);
   listenfd = Open_listenfd(portstr);
   init_pool(listenfd);
+
+  buf = malloc(max_buf);
+  if(buf == NULL) {
+    err_exit(errno, "process_client malloc");
+  }
+
+  topic_buf = malloc(max_topic_buf);
+  if(topic_buf == NULL) {
+    err_exit(errno, "process_client malloc");
+  }
 }
 
 
 NetModServerSocket::~NetModServerSocket() {
    Close(listenfd);
+   free(buf);
+   free(topic_buf);
 }
 
 void NetModServerSocket::start() {
@@ -63,7 +78,7 @@
     {
       /* Add connected descriptor to the pool */
       pool.clientfd[i] = connfd;                 //line:conc:echoservers:beginaddclient
-      Rio_readinitb(&pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
+     // Rio_readinitb(&pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
 
       /* Add the descriptor to descriptor set */
       FD_SET(connfd, &pool.read_set); //line:conc:echoservers:addconnfd
@@ -81,44 +96,90 @@
 /* $end add_client */
 
 
+int NetModServerSocket::process_client(int connfd) {
+  net_mod_request_head_t request_head;
+  net_mod_response_head_t response_head;
+  char request_head_bs[NET_MODE_REQUEST_HEAD_LENGTH];
+  void  *recv_buf;
+  
+  int recv_size;
+
+  // if(buf == NULL) {
+  //   buf = malloc(max_buf);
+  //   if(buf == NULL) {
+  //     err_exit(errno, "process_client malloc");
+  //   }
+  // }
+  
+ 
+  if (rio_readn(connfd, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) !=  NET_MODE_REQUEST_HEAD_LENGTH)
+  {
+    return -1;
+  }
+
+  request_head = NetModSocket::decode_request_head(request_head_bs);
+
+  if(request_head.content_length > max_buf) {
+    buf = realloc(buf, request_head.content_length);
+    max_buf = request_head.content_length;
+    if(buf == NULL) {
+      LoggerFactory::getLogger()->error(errno, "process_client realloc");
+      exit(1);
+    }
+  }  
+
+  if (rio_readn(connfd, buf, request_head.content_length) != request_head.content_length ) {
+    return -1;
+  }
+
+  if(request_head.mod == REQ_REP) {
+    shmModSocket.sendandrecv(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size);
+    response_head.content_length = recv_size;
+    Rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH);
+    Rio_writen(connfd, recv_buf, recv_size);
+  } else if(request_head.mod == BUS) {
+    if(request_head.topic_length > max_topic_buf) {
+      topic_buf = realloc(topic_buf, request_head.topic_length);
+      max_topic_buf = request_head.topic_length;
+      if(topic_buf == NULL) {
+        LoggerFactory::getLogger()->error(errno, "process_client realloc");
+        exit(1);
+      }
+    }
+
+    if (rio_readn(connfd, topic_buf, request_head.topic_length) != request_head.topic_length ) {
+      return -1;
+    }
+ LoggerFactory::getLogger()->debug("====server pub %s===\n", buf);
+    shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key);
+  }
+
+  return 0;
+  
+}
 
 /* $begin check_clients */
 void  NetModServerSocket::check_clients()
 {
-  int i, connfd, n;
-  char buf[MAXLINE];
-  rio_t rio;
+  int i, connfd;
+  //rio_t *rio;
+  
 
   for (i = 0; (i <= pool.maxi) && (pool.nready > 0); i++)
   {
     connfd = pool.clientfd[i];
-    rio = pool.clientrio[i];
+    //rio = &pool.clientrio[i];
 
     /* If the descriptor is ready, echo a text line from it */
     if ((connfd > 0) && (FD_ISSET(connfd, &pool.ready_set)))
     {
       pool.nready--;
-      if ((n = rio_readpkgb(&rio, buf, MAXLINE)) > 0)
-      {
-         
-        Rio_writen(connfd, buf, n);
-        Rio_writen(connfd, PKG_SEP, strlen(PKG_SEP));
-       // shm_mod_socket->sendto(buf, n, msg->key);
-      //   net_mod_msg_t *msg = (net_mod_msg_t*)buf;
-		    // if(msg.mod == PUB_SUB) {
-		    // 	shm_mod_socket->pub(msg->topic, msg->topic_size, msg->content, msg->content_size, msg->key);
-		    // } else {
-		    // 	shm_mod_socket->sendto(msg->buf, msg->size, msg->key);
-		    // }
+      if(process_client(connfd) != 0) {
+        Close(connfd); //line:conc:echoservers:closeconnfd
+        FD_CLR(connfd, &pool.read_set); 
+        pool.clientfd[i] = -1;
       }
 
-      /* EOF detected, remove descriptor from pool */
-      else
-      {
-        Close(connfd); //line:conc:echoservers:closeconnfd
-        FD_CLR(connfd, &pool.read_set); //line:conc:echoservers:beginremove
-        pool.clientfd[i] = -1;          //line:conc:echoservers:endremove
-      }
     }
   }
 }

--
Gitblit v1.8.0