wangzhengquan
2020-11-27 a856e56b3943041d64a22285c550f6dbb9d2e193
update
5个文件已修改
147 ■■■■ 已修改文件
Makefile 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 74 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.c 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_stream_mod_socket.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Makefile
@@ -11,6 +11,7 @@
    @for i in $(DIRS); do \
        (cd $$i && echo ">>>>>> cleaning $$i ..." && $(MAKE) clean) || exit 1; \
    done
tar:
    rm -f $(TAR_NAME)
src/socket/shm_mod_socket.h
@@ -41,17 +41,17 @@
    std::set<int> *bus_set;
private:
    inline int _recvfrom_(void **buf, int *size, int *port,  struct timespec *timeout, int flags);
    void _proxy_sub( char *topic, int port);
    void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port);
    inline int _recvfrom_(void **buf, int *size, int *key,  struct timespec *timeout, int flags);
    void _proxy_sub( char *topic, int key);
    void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key);
    void *run_pubsub_proxy();
    int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
    int _sub_( char *topic, int size, int port, struct timespec *timeout, int flags);
    int _pub_( char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags);
    int _sub_( char *topic, int size, int key, struct timespec *timeout, int flags);
    int _pub_( char *topic, int topic_size, void *content, int content_size, int key, struct timespec *timeout, int flags);
    void _proxy_desub( char *topic, int port);
    void _proxy_desub_all(int port);
    int  _desub_( char *topic, int size, int port, struct timespec *timeout, int flags);
    void _proxy_desub( char *topic, int key);
    void _proxy_desub_all(int key);
    int  _desub_( char *topic, int size, int key, struct timespec *timeout, int flags);
    static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
    static bool include_in_keys(int key, int keys[], size_t length);
@@ -66,49 +66,49 @@
     * 绑定端口到socket, 如果不绑定则系统自动分配一个
     * @return 0 成功, 其他值 失败的错误码
    */
    int bind(int port);
    int bind(int key);
    /**
     * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
     * @return 0 成功, 其他值 失败的错误码
    */
    int force_bind(int port);
    int force_bind(int key);
    /**
     * 发送信息
     * @port 发送给谁
     * @key 发送给谁
     * @return 0 成功, 其他值 失败的错误码
     */
    int sendto(const void *buf, const int size, const int port);
    int sendto(const void *buf, const int size, const int key);
    // 发送信息超时返回。 @sec 秒 , @nsec 纳秒
    int sendto_timeout(const void *buf, const int size, const int port, const struct timespec *timeout);
    int sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout);
    // 发送信息立刻返回。
    int sendto_nowait(const void *buf, const int size, const int port);
    int sendto_nowait(const void *buf, const int size, const int key);
    /**
     * 接收信息
     * @port 从谁哪里收到的信息
     * @key 从谁哪里收到的信息
     * @return 0 成功, 其他值 失败的错误码
    */
    int recvfrom(void **buf, int *size, int *port);
    int recvfrom(void **buf, int *size, int *key);
    // 接受信息超时返回。 @sec 秒 , @nsec 纳秒
    int recvfrom_timeout(void **buf, int *size, int *port,  struct timespec *timeout);
    int recvfrom_nowait(void **buf, int *size, int *port);
    int recvfrom_timeout(void **buf, int *size, int *key,  struct timespec *timeout);
    int recvfrom_nowait(void **buf, int *size, int *key);
    /**
     * 发送请求信息并等待接收应答
     * @port 发送给谁
     * @key 发送给谁
     * @return 0 成功, 其他值 失败的错误码
    */
    int sendandrecv(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
    int sendandrecv(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int sendandrecv_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
    int sendandrecv_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    int sendandrecv_unsafe(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
    int sendandrecv_unsafe(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
    int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    /**
@@ -122,35 +122,35 @@
     * 订阅指定主题
     * @topic 主题
     * @size 主题长度
     * @port 总线端口
     * @key 总线端口
     */
    int  sub(char *topic, int size, int port);
    int  sub(char *topic, int size, int key);
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int  sub_timeout(char *topic, int size, int port,  struct timespec *timeout);
    int  sub_nowait(char *topic, int size, int port);
    int  sub_timeout(char *topic, int size, int key,  struct timespec *timeout);
    int  sub_nowait(char *topic, int size, int key);
     /**
     * 取消订阅指定主题
      * @topic 主题,主题为空时取消全部订阅
     * @size 主题长度
     * @port 总线端口
     * @key 总线端口
     */
    int desub( char *topic, int size, int port);
    int desub( char *topic, int size, int key);
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int desub_timeout(char *topic, int size, int port, struct timespec *timeout);
    int desub_nowait(char *topic, int size, int port) ;
    int desub_timeout(char *topic, int size, int key, struct timespec *timeout);
    int desub_nowait(char *topic, int size, int key) ;
    /**
     * 发布主题
     * @topic 主题
     * @content 主题内容
     * @port 总线端口
     * @key 总线端口
     */
    int  pub(char *topic, int topic_size, void *content, int content_size, int port);
    int  pub(char *topic, int topic_size, void *content, int content_size, int key);
    //  超时返回。 @sec 秒 , @nsec 纳秒
    int  pub_timeout(char *topic, int topic_size, void *content, int content_size, int port,  struct timespec *timeout);
    int  pub_nowait(char *topic, int topic_size, void *content, int content_size, int port);
    int  pub_timeout(char *topic, int topic_size, void *content, int content_size, int key,  struct timespec *timeout);
    int  pub_nowait(char *topic, int topic_size, void *content, int content_size, int key);
    /**
src/socket/shm_socket.c
@@ -79,8 +79,9 @@
int shm_listen(shm_socket_t *socket) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
    err_exit(0, "can not invoke shm_listen method with a socket which is not a "
    logger->error("can not invoke shm_listen method with a socket which is not a "
                "SHM_SOCKET_STREAM socket");
    exit(1);
  }
  int key;
@@ -109,8 +110,9 @@
*/
shm_socket_t *shm_accept(shm_socket_t *socket) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
    err_exit(0, "can not invoke shm_accept method with a socket which is not a "
    logger->error("can not invoke shm_accept method with a socket which is not a "
                "SHM_SOCKET_STREAM socket");
    exit(1);
  }
  hashtable_t *hashtable = mm_get_hashtable();
  int client_key;
@@ -148,7 +150,7 @@
      client_socket->status = SHM_CONN_ESTABLISHED;
      return client_socket;
    } else {
      err_msg(0, "shm_accept: 发送open_reply失败");
      logger->error( "shm_accept: 发送open_reply失败");
      return NULL;
    }
@@ -161,12 +163,14 @@
int shm_connect(shm_socket_t *socket, int key) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
    err_exit(0, "can not invoke shm_connect method with a socket which is not "
    logger->error( "can not invoke shm_connect method with a socket which is not "
                "a SHM_SOCKET_STREAM socket");
    exit(1);
  }
  hashtable_t *hashtable = mm_get_hashtable();
  if (hashtable_get(hashtable, key) == NULL) {
    err_exit(0, "shm_connect:connect at key %d  failed!", key);
    logger->error("shm_connect:connect at key %d  failed!", key);
    exit(1);
  }
  if (socket->key == -1) {
@@ -178,7 +182,8 @@
  socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
  if ((socket->remoteQueue = _attach_remote_queue(key)) == NULL) {
    err_exit(0, "connect to %d failted", key);
    logger->error("connect to %d failted", key);
    exit(1);
  }
  socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
@@ -198,11 +203,13 @@
      pthread_create(&(socket->dispatch_thread), NULL, _client_run_msg_rev,
                     (void *)socket);
    } else {
      err_exit(0, "shm_connect: 不匹配的应答信息!");
      logger->error( "shm_connect: 不匹配的应答信息!");
      exit(1);
    }
  } else {
    err_exit(0, "connect failted!");
    logger->error( "connect failted!");
    exit(1);
  }
  return 0;
@@ -210,8 +217,9 @@
int shm_send(shm_socket_t *socket, const void *buf, const int size) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
    err_exit(0, "can not invoke shm_send method with a socket which is not a "
    logger->error("shm_socket.shm_send: can not invoke shm_send method with a socket which is not a "
                "SHM_SOCKET_STREAM socket");
    exit(1);
  }
  // hashtable_t *hashtable = mm_get_hashtable();
  // if(socket->remoteQueue == NULL) {
@@ -228,16 +236,17 @@
  if (socket->remoteQueue->push(dest)) {
    return 0;
  } else {
    err_msg(errno, "connection has been closed!");
    logger->error(errno, "connection has been closed!");
    return -1;
  }
}
int shm_recv(shm_socket_t *socket, void **buf, int *size) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
    err_exit(0, "can not invoke shm_recv method in a %d type socket  which is "
    logger->error( "shm_socket.shm_recv: can not invoke shm_recv method in a %d type socket  which is "
                "not a SHM_SOCKET_STREAM socket ",
             socket->socket_type);
    exit(1);
  }
  shm_msg_t src;
@@ -258,9 +267,10 @@
int shm_sendto(shm_socket_t *socket, const void *buf, const int size,
               const int key, const struct timespec *timeout, const int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendto method in a %d type socket  which is "
    logger->error( "shm_socket.shm_sendto: Can't invoke shm_sendto method in a %d type socket  which is "
                "not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
    exit(0);
  }
  hashtable_t *hashtable = mm_get_hashtable();
@@ -278,13 +288,13 @@
  SemUtil::inc(socket->mutex);
  
  if (key == socket->key) {
    err_msg(0, "can not send to your self!");
    logger->error( "can not send to your self!");
    return -1;
  }
  SHMQueue<shm_msg_t> *remoteQueue;
  if ((remoteQueue = _attach_remote_queue(key)) == NULL) {
      err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!");
      logger->error( "shm_sendto failed, the other end has been closed, or has not been opened!");
    return SHM_SOCKET_ECONNFAILED;
  }
@@ -312,7 +322,7 @@
  } else {
    delete remoteQueue;
    mm_free(dest.buf);
    err_msg(errno, "sendto key %d failed!", key);
    logger->error(errno, "sendto key %d failed!", key);
    return -1;
  }
}
@@ -320,9 +330,10 @@
// 短连接方式接受
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket  which "
    logger->error("shm_socket.shm_recvfrom: Can't invoke shm_recvfrom method in a %d type socket  which "
                "is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
    exit(1);
  }
  hashtable_t *hashtable = mm_get_hashtable();
  SemUtil::dec(socket->mutex);
@@ -367,9 +378,10 @@
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket  "
    logger->error( "shm_socket.shm_sendandrecv: Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
    exit(1);
  }
  int recv_key;
  int rv;
@@ -390,9 +402,10 @@
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket  "
    logger->error( "shm_socket.shm_sendandrecv_unsafe : Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
    exit(1);
  }
  int recv_key;
  int rv;
@@ -413,9 +426,10 @@
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出
 */
SHMQueue<shm_msg_t> *_attach_remote_queue(int key) {
  hashtable_t *hashtable = mm_get_hashtable();
  if (hashtable_get(hashtable, key) == NULL) {
    err_msg(0, "_remote_queue_attach:connet at key %d  failed!", key);
    logger->error("shm_socket._remote_queue_attach:connet at key %d  failed!", key);
    return NULL;
  }
@@ -467,7 +481,7 @@
      break;
    default:
      err_msg(0, "socket.__shm_rev__: undefined message type.");
       logger->error("shm_socket._server_run_msg_rev: undefined message type.");
    }
  }
@@ -498,7 +512,7 @@
      socket->messageQueue->push_timeout(src, &timeout);
      break;
    default:
      err_msg(0, "socket.__shm_rev__: undefined message type.");
       logger->error( "shm_socket._client_run_msg_rev: undefined message type.");
    }
  }
src/socket/shm_stream_mod_socket.h
@@ -33,7 +33,7 @@
 * 绑定端口到socket, 如果不绑定则系统自动分配一个
 * @return 0 成功, 其他值 失败的错误码
*/
int shm_stream_mod_socket_bind(void * _socket, int port);
int shm_stream_mod_socket_bind(void * _socket, int key);
 
/**
@@ -45,7 +45,7 @@
/**
 * 客户端发起连接请求
 */
int shm_stream_mod_socket_connect(void * _socket, int port);
int shm_stream_mod_socket_connect(void * _socket, int key);
/**
 * 发送信息
@@ -68,7 +68,7 @@
/**
 * 获取soket端口号
 */
int shm_stream_mod_socket_get_port(void * _socket);
int shm_stream_mod_socket_get_key(void * _socket);
#ifdef __cplusplus
}
test_net_socket/net_mod_socket.sh
@@ -1,12 +1,11 @@
function server() {
    
# 开启bus
    # 开启bus
 ./test_net_mod_socket --fun="start_bus_server" --key=8  & server_pid=$! &&  echo "pid: ${server_pid}"
# 开启网络转发代理
    # 开启网络转发代理
    ./test_net_mod_socket  --fun="start_net_proxy" --port=5000 & server_pid=$! && echo "pid: ${server_pid}" 
# 打开请求应答测试的接受端
    # 打开请求应答测试的接受端
    ./test_net_mod_socket --fun="start_reply" --key=11 & server_pid=$! &&  echo "pid: ${server_pid}" 
}
@@ -31,7 +30,7 @@
}
function close() {
    ps -ef | grep -e "dgram_mod_req_rep" -e "net_mod_socket"  -e "dgram_mod_bus" | awk  '{print $2}' | xargs -i kill -9 {}
    ps -ef | grep -e "test_net_mod_socket" -e "net_mod_socket"| awk  '{print $2}' | xargs -i kill -9 {}
    ipcrm -a
}
@@ -50,7 +49,6 @@
     client
  ;;
  "mclient")
    mclient
  ;;
  "close")