wangzhengquan
2020-10-12 d26e81c4213dfb04391c7b6692f243aede2e6895
version
1个文件已删除
1个文件已添加
6个文件已修改
316 ■■■■ 已修改文件
src/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.c 72 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.c 99 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.h 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_req_rep.c 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.c 43 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/libshm_queue.a
Binary files differ
src/socket/net_mod_server_socket.c
@@ -2,10 +2,13 @@
#include "socket_io.h"
#include "net_mod_socket_io.h"
#include "net_mod_socket.h"
 
NetModServerSocket::NetModServerSocket(int port, ShmModSocket * modsocket): shm_mod_socket(modsocket)
NetModServerSocket::NetModServerSocket(int port)
{
  char portstr[32];
  //shmModSocket = new ShmModSocket;
  sprintf(portstr, "%d", port);
  listenfd = Open_listenfd(portstr);
  init_pool(listenfd);
@@ -81,44 +84,69 @@
/* $end add_client */
int NetModServerSocket::process_client(rio_t *rio, int connfd) {
  int n;
  net_mod_request_head_t request_head;
  net_mod_response_head_t response_head;
  void *buf, *recv_buf;
  int recv_size;
  size_t max_buf = 8096;
  buf = malloc(max_buf);
  if(buf == NULL) {
    err_exit(errno, "process_client malloc");
  }
  if ((n = rio_readnb(rio, &request_head, sizeof(net_mod_request_head_t))) !=  sizeof(net_mod_request_head_t))
  {
    free(buf);
    return -1;
  }
  if(request_head.content_length > max_buf) {
    buf = realloc(buf, request_head.content_length);
    max_buf = request_head.content_length;
    if(buf == NULL) {
      err_exit(errno, "process_client realloc");
    }
  }
  if ((n = rio_readnb(rio, buf, request_head.content_length)) != request_head.content_length ) {
    free(buf);
    return -1;
  }
  shmModSocket.sendandrecv(buf, request_head.content_length, request_head.key, &recv_buf, &recv_size);
  response_head.content_length = recv_size;
  Rio_writen(connfd, &response_head, sizeof(response_head));
  Rio_writen(connfd, recv_buf, recv_size);
  free(buf);
  return 0;
}
/* $begin check_clients */
void  NetModServerSocket::check_clients()
{
  int i, connfd, n;
  char buf[MAXLINE];
  rio_t rio;
  int i, connfd;
  rio_t *rio;
  for (i = 0; (i <= pool.maxi) && (pool.nready > 0); i++)
  {
    connfd = pool.clientfd[i];
    rio = pool.clientrio[i];
    rio = &pool.clientrio[i];
    /* If the descriptor is ready, echo a text line from it */
    if ((connfd > 0) && (FD_ISSET(connfd, &pool.ready_set)))
    {
      pool.nready--;
      if ((n = rio_readpkgb(&rio, buf, MAXLINE)) > 0)
      {
        Rio_writen(connfd, buf, n);
        Rio_writen(connfd, PKG_SEP, strlen(PKG_SEP));
       // shm_mod_socket->sendto(buf, n, msg->key);
      //   net_mod_msg_t *msg = (net_mod_msg_t*)buf;
            // if(msg.mod == PUB_SUB) {
            //     shm_mod_socket->pub(msg->topic, msg->topic_size, msg->content, msg->content_size, msg->key);
            // } else {
            //     shm_mod_socket->sendto(msg->buf, msg->size, msg->key);
            // }
      }
      /* EOF detected, remove descriptor from pool */
      else
      {
      if(process_client(rio, connfd) != 0) {
        Close(connfd); //line:conc:echoservers:closeconnfd
        FD_CLR(connfd, &pool.read_set); //line:conc:echoservers:beginremove
        pool.clientfd[i] = -1;          //line:conc:echoservers:endremove
      }
    }
  }
}
src/socket/net_mod_server_socket.h
@@ -28,16 +28,17 @@
private:
    int listenfd;
    ShmModSocket * shm_mod_socket;
    ShmModSocket shmModSocket;
    pool pool;
    void init_pool(int listenfd);
    void add_client(int connfd);
    void check_clients();
    int process_client(rio_t *rio, int connfd);
public:
    NetModServerSocket(int port, ShmModSocket *_shm_mod_socket);
    NetModServerSocket(int port);
    void start();
    ~NetModServerSocket();
src/socket/net_mod_socket.c
@@ -2,24 +2,95 @@
#include "socket_io.h"
#include "net_mod_socket_io.h"
NetModSocket::NetModSocket(const char *host, int port)
std::map<std::string, rio_t *> NetModSocket::connectionMap;
NetModSocket::NetModSocket()
{
    char portstr[32];
    sprintf(portstr, "%d", port);
  clientfd = Open_clientfd(host, portstr);
  Rio_readinitb(&rio, clientfd);
}
ssize_t NetModSocket::send(void *buf, size_t size) {
  int n = rio_writen(clientfd, buf, size);
  rio_writen(clientfd, PKG_SEP, strlen(PKG_SEP));
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
  net_mod_recv_msg_t ** resp_arr, int *resp_arr_size) {
  char resp[MAXLINE];
  int ss;
  ss = rio_readpkgb(&rio, resp, MAXLINE);
  puts(resp);
  return n;
  int i, n, clientfd;
  char portstr[32];
  net_node_t *node;
  char mapKey[256];
  void *recv_buf;
  int recv_size;
  net_mod_request_head_t request_head;
  net_mod_response_head_t response_head;
  std::map<std::string, rio_t*>::iterator mapIter;
  rio_t *rio;
  net_mod_recv_msg_t *ret_arr = (net_mod_recv_msg_t *)calloc(arrlen, sizeof(net_mod_recv_msg_t));
  for (i = 0; i< arrlen; i++) {
    node = &node_arr[i];
    if(node->host == NULL) {
      // 本地发送
      shmModSocket.sendandrecv(send_buf, send_size, node->key, &recv_buf, &recv_size);
      goto LABEL_ARR_PUSH;
    }
    sprintf(mapKey, "%s:%d", node->host, node->port);
    if( ( mapIter = connectionMap.find(mapKey)) != connectionMap.end()) {
      rio = mapIter->second;
    } else {
      rio = (rio_t *)malloc(sizeof(rio_t));
      sprintf(portstr, "%d", node->port);
      clientfd = Open_clientfd(node-> host, portstr);
      Rio_readinitb(rio, clientfd);
      connectionMap.insert({mapKey, rio});
    }
    request_head.mod = REQ_REP;
    request_head.key = node->key;
    request_head.content_length = send_size;
    if( (n = rio_writen(rio->rio_fd, &request_head, sizeof(request_head))) != sizeof(request_head)) {
      err_exit(errno, "NetModSocket::send head rio_writen");
    }
    if( (n = rio_writen(rio->rio_fd, send_buf, send_size)) != send_size ) {
       err_exit(errno, "NetModSocket::send conent rio_writen");
    }
    if ((n = rio_readnb(rio, &response_head, sizeof(response_head))) !=  sizeof(response_head)) {
      err_exit(errno, "NetModSocket::send  rio_readnb");
    }
    recv_buf = malloc(response_head.content_length);
    if(recv_buf == NULL) {
      err_exit(errno, "NetModSocket::send malloc");
    }
    if ( (recv_size = rio_readnb(rio, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
      err_exit(errno, "NetModSocket::send  rio_readnb");
    }
LABEL_ARR_PUSH:
    strcpy( ret_arr[i].host, node->host);
    ret_arr[i].port = node->port;
    ret_arr[i].key = node->key;
    ret_arr[i].content = recv_buf;
    ret_arr[i].content_length = recv_size;
  }
  *resp_arr = ret_arr;
  *resp_arr_size = i;
  return i;
}
void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
  for(int i =0; i< size; i++) {
    free(arr[i].content);
  }
}
// ssize_t recv(void *buf, size_t len) {
@@ -29,6 +100,6 @@
// }
NetModSocket::~NetModSocket() {
   Close(clientfd);
}
src/socket/net_mod_socket.h
@@ -3,15 +3,52 @@
#include "usg_common.h"
#include "shm_mod_socket.h"
#include "socket_io.h"
struct net_node_t
{
    const char *host;
    int port;
    int key;
};
struct net_mod_request_head_t {
    socket_mod_t mod;
    int key;
    uint32_t content_length;
};
struct net_mod_response_head_t {
    // socket_mod_t mod;
    // int key;
    uint32_t content_length;
};
struct net_mod_recv_msg_t
{
  char host[128];
  int port;
  int key;
  void *content;
  uint32_t content_length;
};
class NetModSocket {
private:
  int clientfd;
  rio_t rio;
   static std::map<std::string, rio_t *> connectionMap;
   ShmModSocket shmModSocket;
public:
  NetModSocket(const char *host, int port);
  ssize_t send(void *buf, size_t size);
  NetModSocket();
  int sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size,
      net_mod_recv_msg_t ** resp_arr, int *resp_arr_size);
  ~NetModSocket();
   static void  free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size);
};
test_net_socket/Makefile
@@ -13,7 +13,7 @@
INCLUDES += -I${DEST}/include/shmqueue -I$(ROOT)/include/usgcommon
PROGS = ${DEST}/test_net_mod_socket
PROGS = ${DEST}/net_mod_req_rep
DEPENDENCES = $(patsubst %, %.d, $(PROGS)) 
test_net_socket/net_mod_req_rep.c
New file
@@ -0,0 +1,50 @@
#include "net_mod_server_socket.h"
#include "net_mod_socket.h"
#include "shm_mm.h"
#include "dgram_mod_socket.h"
#include "usg_common.h"
void server() {
    NetModServerSocket *serverSocket  = new NetModServerSocket(5000);
    serverSocket->start();
}
void client(){
    NetModSocket client;
    char send_buf[MAXLINE];
    net_mod_recv_msg_t *recv_arr;
    int recv_arr_size, i;
    net_node_t node_arr[1] = {
        {"localhost", 5000, 8}
    };
  while (fgets(send_buf, MAXLINE, stdin) != NULL) {
    client.sendandrecv( node_arr, 1, send_buf, strlen(send_buf), &recv_arr, &recv_arr_size);
    for(i=0; i<recv_arr_size; i++) {
        printf("host:%s, port: %d, key:%d, content: %s\n",
            recv_arr[i].host,
            recv_arr[i].port,
            recv_arr[i].key,
            recv_arr[i].content
        );
    }
  }
}
int main(int argc, char *argv[]) {
    shm_init(512);
    if (argc < 2) {
     fprintf(stderr, "Usage: %s %s|%s\n", argv[0], "server", "client");
     return 1;
  }
    if (strcmp("server", argv[1]) == 0 ) {
     server();
  }
  if (strcmp("client", argv[1]) == 0)
     client();
}
test_net_socket/test_net_mod_socket.c
File was deleted