wangzhengquan
2020-08-03 b63ce299ddacea2ad487dc635926ed52ff422c20
add timeout nowait
1个文件已添加
8个文件已修改
278 ■■■■ 已修改文件
src/queue/include/shm_queue_wrapper.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dgram_mod_socket.c 106 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/dgram_mod_socket.h 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/shm_socket.h 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.c 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/util/sem_util.c 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_bus.c 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/test_timeout.c 63 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/include/shm_queue_wrapper.h
@@ -55,7 +55,7 @@
int shmqueue_push(void * _shmqueue, void *src, int size);
/**
 * 入队, 队列满时立即返回.
 * 入队, 立刻返回
 * @return 1 入队成功, 0 入队失败
 */
int shmqueue_push_nowait(void * _shmqueue, void *src, int size) ;
@@ -75,7 +75,7 @@
int shmqueue_pop(void * _shmqueue, void **dest, int *size);
/**
 * 出队, 队列空时立即返回
 * 出队, 立刻返回
 * @return 1 出队成功, 0出队失败
 */
int shmqueue_pop_nowait(void * _shmqueue, void **dest, int *size) ;
src/socket/dgram_mod_socket.c
@@ -83,43 +83,78 @@
    return shm_socket_force_bind(socket->shm_socket, port);
}
int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return shm_sendto(socket->shm_socket, buf, size, port);
    return shm_sendto(socket->shm_socket, buf, size, port, NULL, 0);
}
int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) {
int dgram_mod_sendto_timeout(void *_socket, const void *buf, const int size, const int port, int sec, int nsec) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    struct timespec timeout = {sec, nsec};
    return shm_sendto(socket->shm_socket, buf, size, port, &timeout, 0);
}
int dgram_mod_sendto_nowait(void *_socket, const void *buf, const int size, const int port) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return shm_sendto(socket->shm_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT);
}
static inline int _dgram_mod_recvfrom_(void *_socket, void **buf, int *size, int *port,  struct timespec *timeout, int flags) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    if(socket->mod == BUS) {
        err_exit(0, "Can not use method recvfrom in a Bus");
    }
// printf("dgram_mod_recvfrom  before\n");
    int rv = shm_recvfrom(socket->shm_socket, buf, size, port);
    int rv = shm_recvfrom(socket->shm_socket, buf, size, port, timeout, flags);
// printf("dgram_mod_recvfrom  after\n");
    return rv;
}
int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port) {
    return _dgram_mod_recvfrom_(_socket, buf, size, port, NULL, 0);
}
int dgram_mod_recvfrom_timeout(void *_socket, void **buf, int *size, int *port,  int sec,  int nsec) {
    struct timespec timeout = {sec, nsec};
    return _dgram_mod_recvfrom_(_socket, buf, size, port, &timeout, 0);
}
int dgram_mod_recvfrom_nowait(void *_socket, void **buf, int *size, int *port) {
    return _dgram_mod_recvfrom_(_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT);
}
 
int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) {
int dgram_mod_sendandrecv(void * _socket,  const void *send_buf,  const int send_size,  const int send_port,
    void **recv_buf, int *recv_size) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size);
    return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0);
}
int dgram_mod_get_port(void * _socket) {
int dgram_mod_sendandrecv_timeout(void * _socket,  const void *send_buf,  const int send_size,  const int send_port,
    void **recv_buf, int *recv_size,   int sec,  int nsec) {
    struct timespec timeout = {sec, nsec};
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->shm_socket->port;
    return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, &timeout, 0);
}
int dgram_mod_sendandrecv_nowait(void * _socket, const  void *send_buf, const  int send_size, const int send_port,
    void **recv_buf, int *recv_size) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return shm_sendandrecv(socket->shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
}
void dgram_mod_free(void *buf) {
    free(buf);
}
// =================bus========================
int  dgram_mod_start_bus(void * _socket) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
@@ -136,17 +171,32 @@
/**
 * @port 总线端口
 */
int  dgram_mod_sub(void * _socket, void *topic, int size, int port) {
static int  _dgram_mod_sub_(void * _socket, void *topic, int size, int port,
    struct timespec *timeout, int flags) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    char buf[8192];
    snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    return shm_sendto(socket->shm_socket, buf, strlen(buf) + 1, port);
    return shm_sendto(socket->shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
}
int  dgram_mod_sub(void * _socket, void *topic, int size, int port ) {
    return _dgram_mod_sub_(_socket, topic, size, port, NULL, 0);
}
int  dgram_mod_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec) {
    struct timespec timeout = {sec, nsec};
    return _dgram_mod_sub_(_socket, topic, size, port, &timeout, 0);
}
int  dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port) {
    return _dgram_mod_sub_(_socket, topic, size, port, NULL,  (int)SHM_MSG_NOWAIT);
}
/**
 * @port 总线端口
 */
int  dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) {
static int  _dgram_mod_pub_(void * _socket, void *topic, int topic_size, void *content, int content_size, int port,
    struct timespec *timeout, int flags) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    int head_len;
@@ -154,10 +204,34 @@
    snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    head_len = strlen(buf);
    memcpy(buf+head_len, content, content_size);
    return shm_sendto(socket->shm_socket, buf, head_len+content_size, port);
    return shm_sendto(socket->shm_socket, buf, head_len+content_size, port, timeout, flags);
}
int  dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) {
    return _dgram_mod_pub_(_socket, topic, topic_size, content, content_size, port, NULL, 0);
}
int  dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, int sec, int nsec) {
    struct timespec timeout = {sec, nsec};
    return _dgram_mod_pub_(_socket, topic, topic_size, content, content_size, port, &timeout, 0);
}
int  dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int port) {
    return _dgram_mod_pub_(_socket, topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT);
}
int dgram_mod_get_port(void * _socket) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->shm_socket->port;
}
void dgram_mod_free(void *buf) {
    free(buf);
}
//==========================================================================================================================
src/socket/include/dgram_mod_socket.h
@@ -36,7 +36,10 @@
 * @return 0 成功, 其他值 失败的错误码
 */
int dgram_mod_sendto(void *_socket, const void *buf, const int size, const int port);
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int dgram_mod_sendto_timeout(void *_socket, const void *buf, const int size, const int port, int sec, int nsec);
// 发送信息立刻返回。
int dgram_mod_sendto_nowait(void *_socket, const void *buf, const int size, const int port);
/**
 * 接收信息
@@ -44,6 +47,9 @@
 * @return 0 成功, 其他值 失败的错误码
*/
int dgram_mod_recvfrom(void *_socket, void **buf, int *size, int *port);
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int dgram_mod_recvfrom_timeout(void *_socket, void **buf, int *size, int *port, int sec, int nsec);
int dgram_mod_recvfrom_nowait(void *_socket, void **buf, int *size, int *port);
/**
 * 发送请求信息并等待接收应答
@@ -51,6 +57,9 @@
 * @return 0 成功, 其他值 失败的错误码
*/
int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
// 超时返回。 @sec 秒 , @nsec 纳秒
int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, int sec, int nsec) ;
int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
/**
@@ -67,6 +76,11 @@
 * @port 总线端口
 */
int  dgram_mod_sub(void * _socket, void *topic, int size, int port);
// 超时返回。 @sec 秒 , @nsec 纳秒
int  dgram_mod_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec);
int  dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port);
/**
 * 发布主题
@@ -75,6 +89,9 @@
 * @port 总线端口
 */
int  dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port);
//  超时返回。 @sec 秒 , @nsec 纳秒
int  dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, int sec, int nsec);
int  dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int port);
/**
src/socket/include/shm_socket.h
@@ -5,9 +5,7 @@
#include "usg_typedef.h"
#include "shm_queue.h"
#ifdef __cplusplus
extern "C" {
#endif
enum shm_msg_type_t
{
@@ -16,6 +14,12 @@
    SHM_SOCKET_CLOSE = 3,
    SHM_COMMON_MSG = 4
    
};
enum shm_socket_flag_t
{
  SHM_MSG_TIMEOUT = 1,
  SHM_MSG_NOWAIT = 2
};
enum shm_socket_type_t
@@ -77,17 +81,16 @@
int shm_send(shm_socket_t * socket, const void *buf, const int size) ;
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size);
int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec * timeout = NULL);
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec * timeout = NULL, const int flags=0);
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port);
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port,   struct timespec * timeout = NULL,  int flags=0);
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size,
    struct timespec * timeout = NULL,  int flags=0);
#ifdef __cplusplus
}
#endif
#endif
src/socket/shm_socket.c
@@ -5,17 +5,19 @@
static Logger logger = LoggerFactory::getLogger();
void print_msg(char *head, shm_msg_t &msg) {
  // err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
}
void *_server_run_msg_rev(void *_socket);
static void *_server_run_msg_rev(void *_socket);
void *_client_run_msg_rev(void *_socket);
static void *_client_run_msg_rev(void *_socket);
int _shm_close_dgram_socket(shm_socket_t *socket);
static int _shm_close_dgram_socket(shm_socket_t *socket);
int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
static inline int  _shm_socket_check_key(shm_socket_t *socket) {
   void *tmp_ptr = mm_get_by_key(socket->port);
@@ -239,9 +241,10 @@
  }
}
// 短连接方式发送
int shm_sendto(shm_socket_t *socket, const void *buf, const int size,
               const int port, const struct timespec *timeout) {
               const int port, const struct timespec *timeout, const int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendto method in a %d type socket  which is "
                "not a SHM_SOCKET_DGRAM socket ",
@@ -273,12 +276,14 @@
  SHMQueue<shm_msg_t> *remoteQueue;
  if ((remoteQueue = _attach_remote_queue(port)) == NULL) {
      err_msg(0, "shm_sendto failed, then other end has been closed!");
      err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!");
    return -1;
  }
  // printf("shm_sendto push before\n");
  bool rv;
  if(timeout != NULL) {
  if(flags & SHM_MSG_NOWAIT != 0) {
    rv = remoteQueue->push_nowait(dest);
  } else if(timeout != NULL) {
      rv = remoteQueue->push_timeout(dest, timeout);
  } else {
      rv = remoteQueue->push(dest);
@@ -296,7 +301,7 @@
}
// 短连接方式接受
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port) {
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket  which "
                "is not a SHM_SOCKET_DGRAM socket ",
@@ -316,7 +321,16 @@
  shm_msg_t src;
  // printf("shm_recvfrom pop before\n");
  if (socket->queue->pop(src)) {
  bool rv;
   if(flags & SHM_MSG_NOWAIT != 0) {
    rv = socket->queue->pop_nowait(src);
  } else if(timeout != NULL) {
    rv = socket->queue->pop_timeout(src, timeout);
  } else {
    rv = socket->queue->pop(src);
  }
  if (rv) {
    void *_buf = malloc(src.size);
    memcpy(_buf, src.buf, src.size);
    *buf = _buf;
@@ -332,7 +346,7 @@
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_port, void **recv_buf,
                    int *recv_size) {
                    int *recv_size,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
@@ -342,8 +356,8 @@
  int rv;
  shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
  if (shm_sendto(tmp_socket, send_buf, send_size, send_port) == 0) {
    rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port);
  if (shm_sendto(tmp_socket, send_buf, send_size, send_port, timeout, flags) == 0) {
    rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port, timeout, flags);
    shm_close_socket(tmp_socket);
    return rv;
  }
src/util/sem_util.c
@@ -72,7 +72,7 @@
/* Reserve semaphore - decrement it by 1 */
int SemUtil::dec(int semId) {
logger.debug("%d: SemUtil::dec\n", semId);
// logger.debug("%d: SemUtil::dec\n", semId);
  struct sembuf sops;
  sops.sem_num = 0;
@@ -81,7 +81,7 @@
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      err_msg(errno, "SemUtil::dec");
     // err_msg(errno, "SemUtil::dec");
      return -1;
    }
@@ -97,7 +97,7 @@
  while (semop(semId, &sops, 1) == -1)
    if (errno != EINTR) {
      err_msg(errno, "SemUtil::dec_nowait");
     // err_msg(errno, "SemUtil::dec_nowait");
      return -1;
    }
@@ -113,7 +113,7 @@
  while (semtimedop(semId, &sops, 1, timeout) == -1)
    if (errno != EINTR) {
      err_msg(errno, "SemUtil::dec_timeout");
      //err_msg(errno, "SemUtil::dec_timeout");
      return -1;
    }
test_socket/Makefile
@@ -14,7 +14,7 @@
include $(ROOT)/Make.defines.$(PLATFORM)
PROGS =    dgram_mod_bus dgram_mod_survey dgram_mod_req_rep
PROGS =    dgram_mod_bus dgram_mod_survey dgram_mod_req_rep test_timeout
build: $(PROGS)
test_socket/dgram_mod_bus.c
@@ -55,15 +55,24 @@
    if(strcmp(action, "sub") == 0) {
      printf("Please input topic!\n");
      scanf("%s", topic);
      dgram_mod_sub(socket, topic, strlen(topic),  port);
      printf("Sub success!\n");
      if (dgram_mod_sub(socket, topic, strlen(topic),  port) == 0) {
         printf("Sub success!\n");
      } else {
        printf("Sub failture!\n");
        exit(0);
      }
    }
    else if(strcmp(action, "pub") == 0) {
      // printf("%s %s %s\n", action, topic, content);
      printf("Please input topic and content\n");
      scanf("%s %s", topic, content);
      dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1,  port);
      printf("Pub success!\n");
      if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1,  port) == 0){
        printf("Pub success!\n");
      } else {
        printf("Pub failture!\n");
      }
    } else if(strcmp(action, "quit") == 0) {
      break;
    } else {
test_socket/test_timeout.c
New file
@@ -0,0 +1,63 @@
#include "dgram_mod_socket.h"
#include "shm_mm.h"
#include "usg_common.h"
void server(int port) {
  void *socket = dgram_mod_open_socket();
  dgram_mod_bind(socket, port);
  int size;
  void *recvbuf;
  char sendbuf[512];
  int rv;
  int remote_port;
  if ( (rv = dgram_mod_recvfrom_timeout(socket, &recvbuf, &size, &remote_port, 5, 0) ) == 0) {
    printf( "RECEIVED HREARTBEAT FROM %d: %s\n", remote_port, recvbuf);
    free(recvbuf);
  } else {
    printf("RECEIVED failture, timeout\n");
  }
  sleep(100);
  dgram_mod_close_socket(socket);
}
void client(int port) {
  void *socket = dgram_mod_open_socket();
  int size;
  char sendbuf[512];
  long i = 0;
  while (true) {
    sprintf(sendbuf, "%d", i);
    if(dgram_mod_sendto_timeout(socket, sendbuf, strlen(sendbuf) + 1, port, 2, 0) == 0) {
       printf("SEND HEART:%s\n", sendbuf);
       printf("send success\n");
    } else {
       printf("send failture, timeout\n");
    }
    i++;
  }
  dgram_mod_close_socket(socket);
}
int main(int argc, char *argv[]) {
  shm_init(512);
  int port;
  if (argc < 3) {
    fprintf(stderr, "Usage: reqrep %s|%s <PORT> ...\n", "server", "client");
    return 1;
  }
  port = atoi(argv[2]);
  if (strcmp("server", argv[1]) == 0) {
    server(port);
  }
  if (strcmp("client", argv[1]) == 0)
    client(port);
  return 0;
}