wangzhengquan
2020-07-25 8ec1d776751f62c43335d36c3427dc1ab2d84a61
src/socket/shm_socket.c
@@ -3,8 +3,6 @@
#include "logger_factory.h"
#include <map>
static Logger logger = LoggerFactory::getLogger();
void print_msg(char *head, shm_msg_t& msg) {
@@ -16,7 +14,6 @@
void * _client_run_msg_rev(void* _socket);
int _shm_close_dgram_socket(shm_socket_t *socket);
int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
@@ -42,7 +39,6 @@
         return -1;
   }
   return -1;
}
int shm_socket_bind(shm_socket_t * socket, int port) {
@@ -53,7 +49,8 @@
int shm_listen(shm_socket_t* socket) {
   if(socket->socket_type != SHM_SOCKET_STREAM) {
      err_exit(0, "can not invoke shm_listen method with a socket which is not a SHM_SOCKET_STREAM socket");
    err_exit(0, "can not invoke shm_listen method with a socket which is not a "
                "SHM_SOCKET_STREAM socket");
   }
   int  port;
@@ -72,12 +69,11 @@
   socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
   socket->clientSocketMap = new std::map<int, shm_socket_t* >;
   socket->status = SHM_CONN_LISTEN;
   pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket);
  pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev,
                 (void *)socket);
   
   return 0;
}
/**
 * 接受客户端建立新连接的请求
@@ -85,7 +81,8 @@
*/
shm_socket_t* shm_accept(shm_socket_t* socket) {
   if(socket->socket_type != SHM_SOCKET_STREAM) {
      err_exit(0, "can not invoke shm_accept method with a socket which is not a SHM_SOCKET_STREAM socket");
    err_exit(0, "can not invoke shm_accept method with a socket which is not a "
                "SHM_SOCKET_STREAM socket");
   }
   hashtable_t *hashtable = mm_get_hashtable();
   int client_port;
@@ -101,14 +98,16 @@
      client_socket->port = socket->port;
      // client_socket->queue= socket->queue;
      //初始化消息queue
      client_socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
    client_socket->messageQueue =
        new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
      //连接到对方queue
      client_socket->remoteQueue = _attach_remote_queue(client_port);
      socket->clientSocketMap->insert({client_port, client_socket});
      /*
         * shm_accept 用户执行的方法 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题
* shm_accept 用户执行的方法
* 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题
      */
      //发送open_reply,回应客户端的connect请求
      struct timespec timeout = {1, 0};
@@ -117,8 +116,7 @@
      msg.size = 0;
      msg.type = SHM_SOCKET_OPEN_REPLY;
      if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
      {
    if (client_socket->remoteQueue->push_timeout(msg, &timeout)) {
         client_socket->status = SHM_CONN_ESTABLISHED;
         return client_socket;
      } else {
@@ -126,18 +124,16 @@
         return NULL;
      }
   } else {
      err_exit(errno, "shm_accept");
   }
   return NULL;
}
int shm_connect(shm_socket_t* socket, int port) {
   if(socket->socket_type != SHM_SOCKET_STREAM) {
      err_exit(0, "can not invoke shm_connect method with a socket which is not a SHM_SOCKET_STREAM socket");
    err_exit(0, "can not invoke shm_connect method with a socket which is not "
                "a SHM_SOCKET_STREAM socket");
   }
   hashtable_t *hashtable = mm_get_hashtable();
   if(hashtable_get(hashtable, port)== NULL) {
@@ -154,9 +150,11 @@
   }
   socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
   socket->remoteQueue = _attach_remote_queue(port);
   socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
   
  if ((socket->remoteQueue = _attach_remote_queue(port)) == NULL) {
    err_exit(0, "connect to %d failted", port);
  }
  socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
   //发送open请求
   struct timespec timeout = {1, 0};
@@ -171,7 +169,8 @@
      // 在这里server端已经准备好接受客户端发送请求了,完成与服务端的连接
      if(msg.type == SHM_SOCKET_OPEN_REPLY) {
         socket->status = SHM_CONN_ESTABLISHED;
         pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
      pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev,
                     (void *)socket);
      } else {
         err_exit(0, "shm_connect: 不匹配的应答信息!");
      }
@@ -183,10 +182,10 @@
   return 0;
}
int shm_send(shm_socket_t *socket, const void *buf, const int size) {
   if(socket->socket_type != SHM_SOCKET_STREAM) {
      err_exit(0, "can not invoke shm_send method with a socket which is not a SHM_SOCKET_STREAM socket");
    err_exit(0, "can not invoke shm_send method with a socket which is not a "
                "SHM_SOCKET_STREAM socket");
   }
   // hashtable_t *hashtable = mm_get_hashtable();
   // if(socket->remoteQueue == NULL) {
@@ -200,7 +199,6 @@
   dest.buf = mm_malloc(size);
   memcpy(dest.buf, buf, size);
   if(socket->remoteQueue->push(dest)) {
      return 0;
   } else {
@@ -209,10 +207,11 @@
   }
}
int shm_recv(shm_socket_t* socket, void **buf, int *size) {
   if(socket->socket_type != SHM_SOCKET_STREAM) {
      err_exit(0, "can not invoke shm_recv method in a %d type socket  which is not a SHM_SOCKET_STREAM socket ", socket->socket_type);
    err_exit(0, "can not invoke shm_recv method in a %d type socket  which is "
                "not a SHM_SOCKET_STREAM socket ",
             socket->socket_type);
   }
   shm_msg_t src;
@@ -226,15 +225,15 @@
   } else {
      return -1;
   }
}
// 短连接方式发送
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port) {
int shm_sendto(shm_socket_t *socket, const void *buf, const int size,
               const int port, const struct timespec *timeout) {
   if(socket->socket_type != SHM_SOCKET_DGRAM) {
      err_exit(0, "Can't invoke shm_sendto method in a %d type socket  which is not a SHM_SOCKET_DGRAM socket ", socket->socket_type);
    err_exit(0, "Can't invoke shm_sendto method in a %d type socket  which is "
                "not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
   }
   hashtable_t *hashtable = mm_get_hashtable();
@@ -262,8 +261,21 @@
   dest.buf = mm_malloc(size);
   memcpy(dest.buf, buf, size);
   SHMQueue<shm_msg_t> *remoteQueue =  _attach_remote_queue(port);
   if(remoteQueue->push(dest)) {
  SHMQueue<shm_msg_t> *remoteQueue;
  if ((remoteQueue = _attach_remote_queue(port)) == NULL) {
     err_msg(0, "shm_sendto failed, then other end has been closed!");
    return -1;
  }
  // printf("shm_sendto push before\n");
  bool rv;
  if(timeout != NULL) {
     rv = remoteQueue->push_timeout(dest, timeout);
  } else {
     rv = remoteQueue->push(dest);
  }
  if (rv) {
    // printf("shm_sendto push after\n");
      delete remoteQueue;
      return 0;
   } else {
@@ -273,11 +285,12 @@
   }
}
// 短连接方式接受
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port){
   if(socket->socket_type != SHM_SOCKET_DGRAM) {
      err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket  which is not a SHM_SOCKET_DGRAM socket ", socket->socket_type);
    err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket  which "
                "is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
   }
   hashtable_t *hashtable = mm_get_hashtable();
   if(socket->queue == NULL) {
@@ -294,7 +307,7 @@
   }
   shm_msg_t src;
// printf("shm_recvfrom pop before");
  // printf("shm_recvfrom pop before\n");
   if (socket->queue->pop(src)) {
      void * _buf = malloc(src.size);
      memcpy(_buf, src.buf, src.size);
@@ -302,16 +315,20 @@
      *size = src.size;
      *port = src.port;
      mm_free(src.buf);
// printf("shm_recvfrom pop after");
    // printf("shm_recvfrom pop after\n");
      return 0;
   } else {
      return -1;
   }
}
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) {
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_port, void **recv_buf,
                    int *recv_size) {
   if(socket->socket_type != SHM_SOCKET_DGRAM) {
      err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket  which is not a SHM_SOCKET_DGRAM socket ", socket->socket_type);
    err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
   }
   int recv_port;
   int rv;
@@ -325,14 +342,13 @@
   return -1;
}
/**
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出
 */
SHMQueue<shm_msg_t> * _attach_remote_queue(int port) {
   hashtable_t *hashtable = mm_get_hashtable();
   if(hashtable_get(hashtable, port)== NULL) {
      err_exit(0, "_remote_queue_attach:connet at port %d  failed!", port);
    err_msg(0, "_remote_queue_attach:connet at port %d  failed!", port);
      return NULL;
   }
    
@@ -340,20 +356,15 @@
   return queue;
}
void _server_close_conn_to_client(shm_socket_t* socket, int port) {
   shm_socket_t *client_socket;
   std::map<int, shm_socket_t* >::iterator iter = socket->clientSocketMap->find(port);
  std::map<int, shm_socket_t *>::iterator iter =
      socket->clientSocketMap->find(port);
   if( iter !=  socket->clientSocketMap->end() ) {
      client_socket = iter->second;
      free((void *)client_socket);
      socket->clientSocketMap->erase(iter);
   }
}
/**
@@ -396,13 +407,10 @@
   return NULL;
}
void _client_close_conn_to_server(shm_socket_t* socket) {
    
   _shm_close_stream_socket(socket, false);
}
/**
 * client端的各种类型消息()在这里进程分拣
@@ -429,7 +437,6 @@
 
   return NULL;
}
int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote) {
   socket->status = SHM_CONN_CLOSED;
@@ -466,7 +473,8 @@
   if(socket->clientSocketMap != NULL) {
      shm_socket_t *client_socket;
      for(auto iter = socket->clientSocketMap->begin(); iter != socket->clientSocketMap->end(); iter++) {
    for (auto iter = socket->clientSocketMap->begin();
         iter != socket->clientSocketMap->end(); iter++) {
         client_socket= iter->second;
         client_socket->remoteQueue->push_timeout(close_msg, &timeout);
@@ -480,7 +488,6 @@
      }
      delete socket->clientSocketMap;
   }
   if(socket->dispatch_thread != 0)
      pthread_cancel(socket->dispatch_thread);