wangzhengquan
2020-11-27 a856e56b3943041d64a22285c550f6dbb9d2e193
src/socket/shm_socket.c
@@ -79,8 +79,9 @@
int shm_listen(shm_socket_t *socket) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
    err_exit(0, "can not invoke shm_listen method with a socket which is not a "
    logger->error("can not invoke shm_listen method with a socket which is not a "
                "SHM_SOCKET_STREAM socket");
    exit(1);
  }
  int key;
@@ -109,8 +110,9 @@
*/
shm_socket_t *shm_accept(shm_socket_t *socket) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
    err_exit(0, "can not invoke shm_accept method with a socket which is not a "
    logger->error("can not invoke shm_accept method with a socket which is not a "
                "SHM_SOCKET_STREAM socket");
    exit(1);
  }
  hashtable_t *hashtable = mm_get_hashtable();
  int client_key;
@@ -148,7 +150,7 @@
      client_socket->status = SHM_CONN_ESTABLISHED;
      return client_socket;
    } else {
      err_msg(0, "shm_accept: 发送open_reply失败");
      logger->error( "shm_accept: 发送open_reply失败");
      return NULL;
    }
@@ -161,12 +163,14 @@
int shm_connect(shm_socket_t *socket, int key) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
    err_exit(0, "can not invoke shm_connect method with a socket which is not "
    logger->error( "can not invoke shm_connect method with a socket which is not "
                "a SHM_SOCKET_STREAM socket");
    exit(1);
  }
  hashtable_t *hashtable = mm_get_hashtable();
  if (hashtable_get(hashtable, key) == NULL) {
    err_exit(0, "shm_connect:connect at key %d  failed!", key);
    logger->error("shm_connect:connect at key %d  failed!", key);
    exit(1);
  }
  if (socket->key == -1) {
@@ -178,7 +182,8 @@
  socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
  if ((socket->remoteQueue = _attach_remote_queue(key)) == NULL) {
    err_exit(0, "connect to %d failted", key);
    logger->error("connect to %d failted", key);
    exit(1);
  }
  socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
@@ -198,11 +203,13 @@
      pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev,
                     (void *)socket);
    } else {
      err_exit(0, "shm_connect: 不匹配的应答信息!");
      logger->error( "shm_connect: 不匹配的应答信息!");
      exit(1);
    }
  } else {
    err_exit(0, "connect failted!");
    logger->error( "connect failted!");
    exit(1);
  }
  return 0;
@@ -210,8 +217,9 @@
int shm_send(shm_socket_t *socket, const void *buf, const int size) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
    err_exit(0, "can not invoke shm_send method with a socket which is not a "
    logger->error("shm_socket.shm_send: can not invoke shm_send method with a socket which is not a "
                "SHM_SOCKET_STREAM socket");
    exit(1);
  }
  // hashtable_t *hashtable = mm_get_hashtable();
  // if(socket->remoteQueue == NULL) {
@@ -228,16 +236,17 @@
  if (socket->remoteQueue->push(dest)) {
    return 0;
  } else {
    err_msg(errno, "connection has been closed!");
    logger->error(errno, "connection has been closed!");
    return -1;
  }
}
int shm_recv(shm_socket_t *socket, void **buf, int *size) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
    err_exit(0, "can not invoke shm_recv method in a %d type socket  which is "
    logger->error( "shm_socket.shm_recv: can not invoke shm_recv method in a %d type socket  which is "
                "not a SHM_SOCKET_STREAM socket ",
             socket->socket_type);
    exit(1);
  }
  shm_msg_t src;
@@ -258,9 +267,10 @@
int shm_sendto(shm_socket_t *socket, const void *buf, const int size,
               const int key, const struct timespec *timeout, const int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendto method in a %d type socket  which is "
    logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket  which is "
                "not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
    exit(0);
  }
  hashtable_t *hashtable = mm_get_hashtable();
@@ -278,13 +288,13 @@
  SemUtil::inc(socket->mutex);
  
  if (key == socket->key) {
    err_msg(0, "can not send to your self!");
    logger->error( "can not send to your self!");
    return -1;
  }
  SHMQueue<shm_msg_t> *remoteQueue;
  if ((remoteQueue = _attach_remote_queue(key)) == NULL) {
     err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!");
     logger->error( "shm_sendto failed, the other end has been closed, or has not been opened!");
    return SHM_SOCKET_ECONNFAILED;
  }
@@ -312,7 +322,7 @@
  } else {
    delete remoteQueue;
    mm_free(dest.buf);
    err_msg(errno, "sendto key %d failed!", key);
    logger->error(errno, "sendto key %d failed!", key);
    return -1;
  }
}
@@ -320,9 +330,10 @@
// 短连接方式接受
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket  which "
    logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket  which "
                "is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
    exit(1);
  }
  hashtable_t *hashtable = mm_get_hashtable();
  SemUtil::dec(socket->mutex);
@@ -367,9 +378,10 @@
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket  "
    logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
    exit(1);
  }
  int recv_key;
  int rv;
@@ -390,9 +402,10 @@
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket  "
    logger->error( "shm_socket.shm_sendandrecv_unsafe : Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
    exit(1);
  }
  int recv_key;
  int rv;
@@ -413,9 +426,10 @@
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出
 */
SHMQueue<shm_msg_t> *_attach_remote_queue(int key) {
  hashtable_t *hashtable = mm_get_hashtable();
  if (hashtable_get(hashtable, key) == NULL) {
    err_msg(0, "_remote_queue_attach:connet at key %d  failed!", key);
    logger->error("shm_socket._remote_queue_attach:connet at key %d  failed!", key);
    return NULL;
  }
@@ -467,7 +481,7 @@
      break;
    default:
      err_msg(0, "socket.__shm_rev__: undefined message type.");
       logger->error("shm_socket._server_run_msg_rev: undefined message type.");
    }
  }
@@ -498,7 +512,7 @@
      socket->messageQueue->push_timeout(src, &timeout);
      break;
    default:
      err_msg(0, "socket.__shm_rev__: undefined message type.");
       logger->error( "shm_socket._client_run_msg_rev: undefined message type.");
    }
  }