wangzhengquan
2020-08-06 203df24a403a8c0cd8e93d0f33eaf10de2788969
add desub
5个文件已修改
165 ■■■■ 已修改文件
src/socket/dgram_mod_socket.c 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dmod_socket.c 100 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/dgram_mod_socket.h 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/dmod_socket.h 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_bus.c 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dgram_mod_socket.c
@@ -144,6 +144,29 @@
/**
 * 取消订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @port 总线端口
 */
int  dgram_mod_desub(void * _socket, void *topic, int size, int port){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->m_socket->desub(topic,  size,  port);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    struct timespec timeout = {sec, nsec};
    return socket->m_socket->desub_timeout(topic,  size,  port, &timeout);
}
int  dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->m_socket->desub_nowait(topic,  size,  port);
}
/**
 * 发布主题
 * @topic 主题
 * @content 主题内容
src/socket/dmod_socket.c
@@ -135,15 +135,7 @@
    // pthread_create(&tid, NULL, run_accept_sub_request, _socket);
    return 0;
}
/**
 * @port 总线端口
 */
int  DModSocket::_sub_( void *topic, int size, int port,
    struct timespec *timeout, int flags) {
    char buf[8192];
    snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
}
/**
 * 订阅指定主题
 * @topic 主题
@@ -162,19 +154,25 @@
}
/**
 * 取消订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @port 总线端口
 */
int  DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port,
    struct timespec *timeout, int flags) {
    int head_len;
    char buf[8192+content_size];
    snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    head_len = strlen(buf);
    memcpy(buf+head_len, content, content_size);
    return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
int  DModSocket::desub( void *topic, int size, int port){
    return _desub_( topic, size, port, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  DModSocket::desub_timeout(void *topic, int size, int port, struct timespec *timeout){
    return _desub_(topic, size, port, timeout, 0);
}
int  DModSocket::desub_nowait(void *topic, int size, int port) {
    return _desub_(topic, size, port, NULL,  (int)SHM_MSG_NOWAIT);
}
/**
 * 发布主题
@@ -203,9 +201,41 @@
// ========================================================
// =============================================================================
/**
 * @port 总线端口
 */
int  DModSocket::_sub_( void *topic, int size, int port,
    struct timespec *timeout, int flags) {
    char buf[8192];
    snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
}
/**
 * @port 总线端口
 */
int  DModSocket::_desub_( void *topic, int size, int port,
    struct timespec *timeout, int flags) {
    char buf[8192];
    snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
}
/**
 * @port 总线端口
 */
int  DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port,
    struct timespec *timeout, int flags) {
    int head_len;
    char buf[8192+content_size];
    snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    head_len = strlen(buf);
    memcpy(buf+head_len, content, content_size);
    return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
}
/*
 * 处理订阅
*/
@@ -223,6 +253,22 @@
        topic_sub_map->insert({topic, subscripter_set});
    }
    subscripter_set->insert(port);
}
/*
 * 处理取消订阅
*/
void DModSocket::_proxy_desub( char *topic, int port) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    // SHMKeySet::iterator set_iter;
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
        subscripter_set->erase(port);
    }
}
/*
@@ -281,15 +327,23 @@
        if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
            if(strcmp(action, "sub") == 0) {
                // 订阅支持多主题订阅
                topic = trim(strtok(topics, topic_delim), NULL);
                topic = strtok(topics, topic_delim);
              while(topic) {
           _proxy_sub( topic, port);
            topic = trim(strtok(NULL, topic_delim), NULL);
           _proxy_sub(trim(topic, 0), port);
            topic =  strtok(NULL, topic_delim);
              }
            } else if(strcmp(action, "desub") == 0) {
                // 订阅支持多主题订阅
                topic = strtok(topics, topic_delim);
              while(topic) {
           _proxy_desub(trim(topic, 0), port);
            topic =  strtok(NULL, topic_delim);
              }
            } else if(strcmp(action, "pub") == 0) {
                _proxy_pub(topics, head_len, buf, size, port);
            }
            }
            
            free(action);
            free(topics);
src/socket/include/dgram_mod_socket.h
@@ -56,10 +56,10 @@
 * @port 发送给谁
 * @return 0 成功, 其他值 失败的错误码
*/
int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
int dgram_mod_sendandrecv(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) ;
// 超时返回。 @sec 秒 , @nsec 纳秒
int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size, int sec, int nsec) ;
int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
int dgram_mod_sendandrecv_timeout(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, int sec, int nsec) ;
int dgram_mod_sendandrecv_nowait(void * _socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size) ;
/**
@@ -81,6 +81,16 @@
int  dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port);
/**
 * 取消订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @port 总线端口
 */
int  dgram_mod_desub(void * _socket, void *topic, int size, int port);
// 超时返回。 @sec 秒 , @nsec 纳秒
int  dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec);
int  dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port);
/**
 * 发布主题
src/socket/include/dmod_socket.h
@@ -48,6 +48,8 @@
    int _sub_( void *topic, int size, int port, struct timespec *timeout, int flags);
    int _pub_( void *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags);
    void _proxy_desub( char *topic, int port);
    int  _desub_( void *topic, int size, int port, struct timespec *timeout, int flags);
    static void  foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)>  cb) ;
public:
    DModSocket();
@@ -116,6 +118,16 @@
    int  sub_nowait(void *topic, int size, int port);
     /**
     * 取消订阅指定主题
     * @topic 主题
     * @size 主题长度
     * @port 总线端口
     */
    int desub( void *topic, int size, int port);
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int desub_timeout(void *topic, int size, int port, struct timespec *timeout);
    int desub_nowait(void *topic, int size, int port) ;
    /**
     * 发布主题
test_socket/dgram_mod_bus.c
@@ -49,7 +49,7 @@
  long i = 0;
  while (true) {
    //printf("Usage: pub <topic> [content] or sub <topic>\n");
    printf("Can I help you? sub, pub or quit\n");
    printf("Can I help you? sub, pub, desub or quit\n");
    scanf("%s",action);
    
    if(strcmp(action, "sub") == 0) {
@@ -59,6 +59,16 @@
         printf("Sub success!\n");
      } else {
        printf("Sub failture!\n");
        exit(0);
      }
    } else if(strcmp(action, "desub") == 0) {
      printf("Please input topic!\n");
      scanf("%s", topic);
      if (dgram_mod_desub(socket, topic, strlen(topic),  port) == 0) {
         printf("Desub success!\n");
      } else {
        printf("Desub failture!\n");
        exit(0);
      }
     
@@ -76,7 +86,7 @@
    } else if(strcmp(action, "quit") == 0) {
      break;
    } else {
      printf("error input\n");
      printf("error input argument\n");
      continue;
    }