wangzhengquan
2020-08-06 ff4991e1f942a3f1281330e21bf437b4b8558094
add remove_keys
1个文件已添加
5个文件已修改
183 ■■■■■ 已修改文件
src/socket/dgram_mod_socket.c 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dmod_socket.c 89 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/include/dmod_socket.h 26 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test.c 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test/test_type.c 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_bus.c 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dgram_mod_socket.c
@@ -137,17 +137,17 @@
 */
int  dgram_mod_sub(void * _socket, void *topic, int size, int port){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->m_socket->sub(topic,  size,  port);
    return socket->m_socket->sub((char *)topic,  size,  port);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  dgram_mod_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    struct timespec timeout = {sec, nsec};
    return socket->m_socket->sub_timeout(topic,  size,  port, &timeout);
    return socket->m_socket->sub_timeout((char *)topic,  size,  port, &timeout);
}
int  dgram_mod_sub_nowait(void * _socket, void *topic, int size, int port){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->m_socket->sub_nowait(topic,  size,  port);
    return socket->m_socket->sub_nowait((char *)topic,  size,  port);
}
@@ -160,17 +160,17 @@
 */
int  dgram_mod_desub(void * _socket, void *topic, int size, int port){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->m_socket->desub(topic,  size,  port);
    return socket->m_socket->desub((char *)topic,  size,  port);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  dgram_mod_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    struct timespec timeout = {sec, nsec};
    return socket->m_socket->desub_timeout(topic,  size,  port, &timeout);
    return socket->m_socket->desub_timeout((char *)topic,  size,  port, &timeout);
}
int  dgram_mod_desub_nowait(void * _socket, void *topic, int size, int port){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->m_socket->desub_nowait(topic,  size,  port);
    return socket->m_socket->desub_nowait((char *)topic,  size,  port);
}
@@ -183,17 +183,17 @@
 */
int  dgram_mod_pub(void * _socket, void *topic, int topic_size, void *content, int content_size, int port){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->m_socket->pub(topic, topic_size, content, content_size, port);
    return socket->m_socket->pub((char *)topic, topic_size, content, content_size, port);
}
//  超时返回。 @sec 秒 , @nsec 纳秒
int  dgram_mod_pub_timeout(void * _socket, void *topic, int topic_size, void *content, int content_size, int port, int sec, int nsec){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    struct timespec timeout = {sec, nsec};
    return socket->m_socket->pub_timeout(topic, topic_size, content, content_size, port, &timeout);
    return socket->m_socket->pub_timeout((char *)topic, topic_size, content, content_size, port, &timeout);
}
int  dgram_mod_pub_nowait(void * _socket, void *topic, int topic_size, void *content, int content_size, int port){
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->m_socket->pub_nowait(topic, topic_size, content, content_size, port);
    return socket->m_socket->pub_nowait((char *)topic, topic_size, content, content_size, port);
}
src/socket/dmod_socket.c
@@ -1,7 +1,7 @@
#include "dmod_socket.h"
void DModSocket::foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)>  cb) {
void DModSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
    SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
    SHMKeySet *subscripter_set;
    SHMKeySet::iterator set_iter;
@@ -12,7 +12,7 @@
            subscripter_set = map_iter->second;
            if(subscripter_set != NULL) {
                for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
                    cb(subscripter_set, set_iter);
                    cb(subscripter_set, *set_iter);
                }
            }
        }
@@ -31,14 +31,38 @@
}
size_t DModSocket::remove_subscripters(int keys[], size_t length) {
    size_t count;
    foreach_subscripters([keys, length, &count](SHMKeySet *subscripter_set, SHMKeySet::iterator set_iter){
        if (include_in_keys(*set_iter, keys, length)) {
            subscripter_set->erase(set_iter);
            count++;
    size_t count = 0;
    int key;
    for(int i = 0; i < length; i++) {
        key = keys[i];
        SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
        SHMKeySet *subscripter_set;
        SHMKeySet::iterator set_iter;
        SHMTopicSubMap::iterator map_iter;
        if(topic_sub_map != NULL) {
            for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
                subscripter_set = map_iter->second;
                if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
                    subscripter_set->erase(set_iter);
// printf("remove_subscripter %s, %d\n", map_iter->first, key);
                    count++;
                }
            }
        }
    });
    }
    return count;
//     foreach_subscripters([keys, length, &count](SHMKeySet *subscripter_set, int key){
// printf("foreach===========\n");
//         if (include_in_keys(key, keys, length)) {
//             //subscripter_set->erase(key);
// printf("remove_subscripter %d\n", key);
//             count++;
//         }
//     });
// printf("remove_subscripters count = %d\n", count);
}
@@ -177,14 +201,14 @@
 * @size 主题长度
 * @port 总线端口
 */
int  DModSocket::sub( void *topic, int size, int port){
int  DModSocket::sub(char *topic, int size, int port){
    return _sub_( topic, size, port, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  DModSocket::sub_timeout(void *topic, int size, int port, struct timespec *timeout){
int  DModSocket::sub_timeout(char *topic, int size, int port, struct timespec *timeout){
    return _sub_(topic, size, port, timeout, 0);
}
int  DModSocket::sub_nowait(void *topic, int size, int port) {
int  DModSocket::sub_nowait(char *topic, int size, int port) {
    return _sub_(topic, size, port, NULL,  (int)SHM_MSG_NOWAIT);
}
@@ -196,14 +220,14 @@
 * @size 主题长度
 * @port 总线端口
 */
int  DModSocket::desub( void *topic, int size, int port){
int  DModSocket::desub(char *topic, int size, int port){
    return _desub_( topic, size, port, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  DModSocket::desub_timeout(void *topic, int size, int port, struct timespec *timeout){
int  DModSocket::desub_timeout(char *topic, int size, int port, struct timespec *timeout){
    return _desub_(topic, size, port, timeout, 0);
}
int  DModSocket::desub_nowait(void *topic, int size, int port) {
int  DModSocket::desub_nowait(char *topic, int size, int port) {
    return _desub_(topic, size, port, NULL,  (int)SHM_MSG_NOWAIT);
}
@@ -215,14 +239,14 @@
 * @content 主题内容
 * @port 总线端口
 */
int  DModSocket::pub(void *topic, int topic_size, void *content, int content_size, int port){
int  DModSocket::pub(char *topic, int topic_size, void *content, int content_size, int port){
        return _pub_(topic, topic_size, content, content_size, port, NULL, 0);
}
//  超时返回。 @sec 秒 , @nsec 纳秒
int  DModSocket::pub_timeout(void *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){
int  DModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){
    return _pub_( topic, topic_size, content, content_size, port, timeout, 0);
}
int  DModSocket::pub_nowait(void *topic, int topic_size, void *content, int content_size, int port){
int  DModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int port){
    return _pub_(topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT);
}
@@ -240,11 +264,11 @@
/**
 * @port 总线端口
 */
int  DModSocket::_sub_( void *topic, int size, int port,
int  DModSocket::_sub_(char *topic, int size, int port,
    struct timespec *timeout, int flags) {
    char buf[8192];
    int rv;
    snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
    rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
    if(rv == 0) {
        bus_set->insert(port);
@@ -256,21 +280,24 @@
/**
 * @port 总线端口
 */
int  DModSocket::_desub_( void *topic, int size, int port,
int  DModSocket::_desub_(char *topic, int size, int port,
    struct timespec *timeout, int flags) {
    char buf[8192];
    snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    if(topic == NULL) {
        topic = "";
    }
    snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
    return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
}
/**
 * @port 总线端口
 */
int  DModSocket::_pub_( void *topic, int topic_size, void *content, int content_size, int port,
int  DModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int port,
    struct timespec *timeout, int flags) {
    int head_len;
    char buf[8192+content_size];
    snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, (char *)topic, TOPIC_RIDENTIFIER);
    snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
    head_len = strlen(buf);
    memcpy(buf+head_len, content, content_size);
    return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
@@ -284,7 +311,7 @@
    SHMTopicSubMap::iterator map_iter;
    SHMKeySet::iterator set_iter;
printf("_proxy_sub topic = %s\n", topic);
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
    } else {
@@ -306,7 +333,9 @@
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
        subscripter_set->erase(port);
printf("============ desub %d\n", port);
    }
}
@@ -321,6 +350,7 @@
    for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
            subscripter_set->erase(port);
printf("============ desub %d\n", port);
    }
}
@@ -376,20 +406,23 @@
    const char *topic_delim = ",";
// printf("run_pubsub_proxy server receive before\n");
    while(shm_recvfrom(shm_socket, (void **)&buf, &size, &port) == 0) {
// printf("run_pubsub_proxy server recv after: %s \n", buf);
//printf("run_pubsub_proxy server recv after: %s \n", buf);
        if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
// printf("run_pubsub_proxy  %s %s \n", action, topics);
            if(strcmp(action, "sub") == 0) {
                // 订阅支持多主题订阅
                topic = strtok(topics, topic_delim);
//printf("run_pubsub_proxy topic = %s\n", topic);
              while(topic) {
           _proxy_sub(trim(topic, 0), port);
            topic =  strtok(NULL, topic_delim);
              }
            } else if(strcmp(action, "desub") == 0) {
printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
                if(strcmp(trim(topics, 0), "") == 0) {
                    // 取消所有订阅
        printf("取消所有订阅");
        printf("====取消所有订阅\n");
                    _proxy_desub_all(port);
                } else {
                 
@@ -473,12 +506,12 @@
 char *topic = (char *)malloc(topic_len+1);
 strncpy(topic, topic_start_ptr, topic_len);
 *topic = '\0';
 *(topic+topic_len) = '\0';
 *_topic = topic;
 char *action = (char *)malloc(action_len+1);
 strncpy(action, action_start_ptr, action_len);
 *action = '\0';
 *(action+action_len) = '\0';
 *_action = action;
 *head_len = ptr-str;
src/socket/include/dmod_socket.h
@@ -46,14 +46,14 @@
    void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port);
    void *run_pubsub_proxy();
    int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
    int _sub_( void *topic, int size, int port, struct timespec *timeout, int flags);
    int _pub_( void *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags);
    int _sub_( char *topic, int size, int port, struct timespec *timeout, int flags);
    int _pub_( char *topic, int topic_size, void *content, int content_size, int port, struct timespec *timeout, int flags);
    void _proxy_desub( char *topic, int port);
    void _proxy_desub_all(int port);
    int  _desub_( void *topic, int size, int port, struct timespec *timeout, int flags);
    int  _desub_( char *topic, int size, int port, struct timespec *timeout, int flags);
    static void foreach_subscripters(std::function<void(SHMKeySet *, SHMKeySet::iterator)>  cb);
    static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
    static bool include_in_keys(int key, int keys[], size_t length);
    static size_t remove_subscripters(int keys[], size_t length) ;
public:
@@ -119,10 +119,10 @@
     * @size 主题长度
     * @port 总线端口
     */
    int  sub(void *topic, int size, int port);
    int  sub(char *topic, int size, int port);
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int  sub_timeout(void *topic, int size, int port,  struct timespec *timeout);
    int  sub_nowait(void *topic, int size, int port);
    int  sub_timeout(char *topic, int size, int port,  struct timespec *timeout);
    int  sub_nowait(char *topic, int size, int port);
     /**
@@ -131,10 +131,10 @@
     * @size 主题长度
     * @port 总线端口
     */
    int desub( void *topic, int size, int port);
    int desub( char *topic, int size, int port);
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int desub_timeout(void *topic, int size, int port, struct timespec *timeout);
    int desub_nowait(void *topic, int size, int port) ;
    int desub_timeout(char *topic, int size, int port, struct timespec *timeout);
    int desub_nowait(char *topic, int size, int port) ;
    /**
     * 发布主题
@@ -142,10 +142,10 @@
     * @content 主题内容
     * @port 总线端口
     */
    int  pub(void *topic, int topic_size, void *content, int content_size, int port);
    int  pub(char *topic, int topic_size, void *content, int content_size, int port);
    //  超时返回。 @sec 秒 , @nsec 纳秒
    int  pub_timeout(void *topic, int topic_size, void *content, int content_size, int port,  struct timespec *timeout);
    int  pub_nowait(void *topic, int topic_size, void *content, int content_size, int port);
    int  pub_timeout(char *topic, int topic_size, void *content, int content_size, int port,  struct timespec *timeout);
    int  pub_nowait(char *topic, int topic_size, void *content, int content_size, int port);
    /**
test/test.c
@@ -2,4 +2,10 @@
#include "usg_typedef.h"
 
int main() {
    char buf[1024];
    sprintf(buf, "%s\n", (char*)"" );
    printf(buf);
    int d = strcmp(trim("", 0), "");
    printf("%d\n", d);
}
test/test_type.c
New file
@@ -0,0 +1,15 @@
#include "usg_common.h"
#include "usg_typedef.h"
#include "dgram_mod_socket.h"
#include "shm_mm.h"
#include "mm.h"
#include <typeinfo>
#include "lock_free_queue.h"
int main() {
    shm_init(512);
    LockFreeQueue<int> * queue = new LockFreeQueue<int>(16);
    void * tmp = (void *)queue;
    std::cout << typeid(queue).name() << std::endl;
    std::cout << typeid(tmp).name() << std::endl;
}
test_socket/dgram_mod_bus.c
@@ -10,7 +10,6 @@
}
void server(int port, bool restart) {
  signal(SIGINT,  sigint_handler);
  server_socket = dgram_mod_open_socket();
@@ -39,6 +38,8 @@
void client(int port) {
  void *socket = dgram_mod_open_socket();
  pthread_t tid;
  pthread_create(&tid, NULL, run_recv, socket);
  int size;
@@ -56,7 +57,7 @@
      printf("Please input topic!\n");
      scanf("%s", topic);
      if (dgram_mod_sub(socket, topic, strlen(topic),  port) == 0) {
         printf("Sub success!\n");
         printf("%d Sub success!\n", dgram_mod_get_port(socket));
      } else {
        printf("Sub failture!\n");
        exit(0);
@@ -66,24 +67,25 @@
      printf("Please input topic!\n");
      scanf("%s", topic);
      if (dgram_mod_desub(socket, topic, strlen(topic),  port) == 0) {
         printf("Desub success!\n");
         printf("%d Desub success!\n", dgram_mod_get_port(socket));
      } else {
        printf("Desub failture!\n");
        exit(0);
      }
     
    }
    else if(strcmp(action, "pub") == 0) {
    } else if(strcmp(action, "pub") == 0) {
      // printf("%s %s %s\n", action, topic, content);
      printf("Please input topic and content\n");
      scanf("%s %s", topic, content);
      if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1,  port) == 0){
        printf("Pub success!\n");
        printf("%d Pub success!\n", dgram_mod_get_port(socket));
      } else {
        printf("Pub failture!\n");
      }
      
    } else if(strcmp(action, "quit") == 0) {
      printf("(%d) quit\n", dgram_mod_get_port(socket));
      dgram_mod_close_socket(socket);
      break;
    } else {
      printf("error input argument\n");
@@ -91,8 +93,7 @@
    }
   
  }
  printf("(%d) quit\n", dgram_mod_get_port(socket));
  dgram_mod_close_socket(socket);
}
 
@@ -101,7 +102,7 @@
  shm_init(512);
  int port;
  if (argc < 3) {
    fprintf(stderr, "Usage: %s %s|%s <PORT> ...\n", argv[0], "server", "client");
    fprintf(stderr, "Usage: %s %s|%s|rmkey <PORT> ...\n", argv[0], "server", "client");
    return 1;
  }
@@ -115,10 +116,16 @@
      server(port, false);
    }
    
  } else if (strcmp("client", argv[1]) == 0) {
    client(port);
  } else if(strcmp("rmkey", argv[1]) == 0) {
    for(int i = 2; i < argc; i++) {
      port = atoi(argv[i]);
      dgram_mod_remove_key(port);
      // printf("%d\n", port);
    }
  }
  if (strcmp("client", argv[1]) == 0)
    client(port);
  
  return 0;