wangzhengquan
2020-07-20 f85c9b875b060681b51f57b15074ba1c7c9f5636
queue/shm_socket.c
@@ -16,29 +16,19 @@
SHMQueue<shm_msg_t> * _attach_remote_queue(int port) ;
void shm_init(int size) {
   mem_pool_init(size);
}
void shm_destroy() {
   mem_pool_destroy();
}
void shm_free(void *buf) {
   free(buf);
}
shm_socket_t *shm_open_socket() {
   shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
   
   socket->port = -1;
   socket->dispatch_thread = 0;
   socket->status=SHM_CONN_CLOSED;
   
   return socket;
}
int _shm_close_socket(shm_socket_t *socket, bool notifyRemote) {
   socket->status = SHM_CONN_CLOSED;
   //给对方发送一个关闭连接的消息
   struct timespec timeout = {1, 0};
   shm_msg_t close_msg;
@@ -78,6 +68,7 @@
         client_socket->remoteQueue->push_timeout(close_msg, &timeout);
         delete client_socket->remoteQueue;
         client_socket->remoteQueue=NULL;
         delete client_socket->messageQueue;
         client_socket->messageQueue=NULL;
         socket->clientSocketMap->erase(iter);
@@ -99,9 +90,7 @@
   return _shm_close_socket(socket, true);
}
int shm_bind(shm_socket_t * socket, int port) {
int shm_socket_bind(shm_socket_t * socket, int port) {
   shm_socket_t * _socket = (shm_socket_t *) socket;
   _socket -> port = port;
   return 0;
@@ -123,9 +112,10 @@
   socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
   socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
   socket->clientSocketMap = new std::map<int, shm_socket_t* >;
   socket->status = SHM_CONN_LISTEN;
   pthread_create(&(socket->dispatch_thread), NULL, _server_run_msg_rev , (void *)socket);
   return 0;
}
@@ -133,16 +123,6 @@
   shm_socket_t *client_socket;
   auto iter = socket->clientSocketMap->find(port);
   if( iter !=  socket->clientSocketMap->end() ) {
      // client_socket= iter->second;
      // if(client_socket->remoteQueue != NULL) {
      //    delete client_socket->remoteQueue;
      //    client_socket->remoteQueue = NULL;
      // }
      // if(client_socket->messageQueue != NULL) {
      //    delete client_socket->messageQueue;
      //    client_socket->messageQueue = NULL;
      // }
      socket->clientSocketMap->erase(iter);
   }
   //free((void *)client_socket);
@@ -172,12 +152,11 @@
         case SHM_COMMON_MSG :
            iter = socket->clientSocketMap->find(src.port);
   print_msg("_server_run_msg_rev find before", src);
            if( iter !=  socket->clientSocketMap->end()) {
               client_socket= iter->second;
   print_msg("_server_run_msg_rev push before", src);
   // print_msg("_server_run_msg_rev push before", src);
               client_socket->messageQueue->push_timeout(src, &timeout);
   print_msg("_server_run_msg_rev push after", src);
   // print_msg("_server_run_msg_rev push after", src);
            }
            
            break;
@@ -221,7 +200,7 @@
      /*
         * shm_accept 用户执行的方法 与_server_run_msg_rev在两个不同的限制工作,accept要保证在客户的发送消息之前完成资源的准备工作,以避免出现竞态问题
      */
      //发送open_reply
      //发送open_reply,回应客户端的connect请求
      struct timespec timeout = {1, 0};
      shm_msg_t msg;
      msg.port = socket->port;
@@ -230,6 +209,7 @@
      if (client_socket->remoteQueue->push_timeout(msg, &timeout) )
      {
         client_socket->status = SHM_CONN_ESTABLISHED;
         return client_socket;
      } else {
         err_msg(0, "shm_accept: 发送open_reply失败");
@@ -274,9 +254,9 @@
   //接受open reply
   if(socket->queue->pop(msg)) {
      // 在这里server端已经准备好接受客户端发送请求了
      // 在这里server端已经准备好接受客户端发送请求了,完成与服务端的连接
      if(msg.type == SHM_SOCKET_OPEN_REPLY) {
         socket->status = SHM_CONN_ESTABLISHED;
         pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev , (void *)socket);
      } else {
         err_exit(0, "shm_connect: 不匹配的应答信息!");
@@ -325,8 +305,12 @@
 
int shm_send(shm_socket_t *socket, void *buf, int size) {
int shm_send(shm_socket_t *socket, const void *buf, const int size) {
   // hashtable_t *hashtable = mm_get_hashtable();
   // if(socket->remoteQueue == NULL) {
   //    err_msg(errno, "当前客户端无连接!");
   //    return -1;
   // }
   shm_msg_t dest;
   dest.type=SHM_COMMON_MSG;
   dest.port = socket->port;