wangzhengquan
2020-12-21 8b0a8c644f19e97606dfb06a865f56dbad15f95e
update
2个文件已删除
3个文件已添加
13个文件已修改
1311 ■■■■■ 已修改文件
Makefile 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/key_def.h 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.c 353 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.h 85 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket_wrapper.c 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket_wrapper.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dgram_mod_socket.c 214 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dgram_mod_socket.h 127 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.c 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.c 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.h 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.c 312 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.c 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/socket_def.h 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.c 40 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
Makefile
@@ -1,5 +1,5 @@
# debug "make --just-print"
DIRS = src test_net_socket test_socket
DIRS = src test_net_socket
TAR_NAME = shm_queue.tar.gz
all:
src/key_def.h
@@ -1,2 +1,7 @@
#ifndef _KEY_DEF_H_
#define _KEY_DEF_H_
#define BUS_MAP_KEY 1  
#define BUS_KEY 8
#define BUS_KEY 8
#endif
src/socket/bus_server_socket.c
New file
@@ -0,0 +1,353 @@
#include "bus_server_socket.h"
static Logger *logger = LoggerFactory::getLogger();
void BusServerSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
    SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
    SHMKeySet *subscripter_set;
    SHMKeySet::iterator set_iter;
    SHMTopicSubMap::iterator map_iter;
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
            if(subscripter_set != NULL) {
                for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
                    cb(subscripter_set, *set_iter);
                }
            }
        }
    }
}
// bool BusServerSocket::include_in_keys(int key, int keys[], size_t length) {
//     if(length == 0) {
//         return false;
//     }
//     for(int i = 0; i < length; i++) {
//         if(keys[i] == key)
//             return true;
//     }
//     return false;
// }
size_t BusServerSocket::remove_subscripters(int keys[], size_t length) {
    size_t count = 0;
    int key;
    for(int i = 0; i < length; i++) {
        key = keys[i];
        SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
        SHMKeySet *subscripter_set;
        SHMKeySet::iterator set_iter;
        SHMTopicSubMap::iterator map_iter;
        if(topic_sub_map != NULL) {
            for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
                subscripter_set = map_iter->second;
                if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
                    subscripter_set->erase(set_iter);
// printf("remove_subscripter %s, %d\n", map_iter->first, key);
                    count++;
                }
            }
        }
    }
    return count;
}
BusServerSocket::BusServerSocket() {
    shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    topic_sub_map = NULL;
}
BusServerSocket::~BusServerSocket() {
// printf("BusServerSocket  destory 1\n");
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    stop();
    sleep(2);
// printf("BusServerSocket  destory 2\n");
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
// printf("BusServerSocket  destory 2-1\n");
            if(subscripter_set != NULL) {
// printf("BusServerSocket  destory 2-2\n");
                subscripter_set->clear();
// printf("BusServerSocket  destory 2-3\n");
                mm_free((void *)subscripter_set);
// printf("BusServerSocket  destory 2-4\n");
            }
        }
        topic_sub_map->clear();
        mem_pool_free_by_key(BUS_MAP_KEY);
    }
// printf("BusServerSocket  destory 3\n");
    // printf("=============close socket\n");
    shm_close_socket(shm_socket);
// printf("BusServerSocket  destory 4\n");
}
int BusServerSocket::bind(int key) {
    return  shm_socket_bind(shm_socket, key);
}
/**
 * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
 * @return 0 成功, 其他值 失败的错误码
*/
int BusServerSocket::force_bind(int key) {
    return shm_socket_force_bind(shm_socket, key);
}
/**
 * 启动bus
 *
 * @return 0 成功, 其他值 失败的错误码
*/
int  BusServerSocket::start(){
    topic_sub_map =    mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
    run_pubsub_proxy();
    // pthread_t tid;
    // pthread_create(&tid, NULL, run_accept_sub_request, _socket);
    return 0;
}
int  BusServerSocket::stop(){
    char buf[128];
    if( shm_socket->key <= 0) {
        return -1;
    }
    snprintf(buf, 128, "%sstop%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
    return shm_sendto(shm_socket, buf, strlen(buf), shm_socket->key, NULL, 0);
}
/*
 * 处理订阅
*/
void BusServerSocket::_proxy_sub( char *topic, int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    SHMKeySet::iterator set_iter;
//printf("_proxy_sub topic = %s\n", topic);
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
    } else {
        void *set_ptr = mm_malloc(sizeof(SHMKeySet));
        subscripter_set = new(set_ptr) SHMKeySet;
        topic_sub_map->insert({topic, subscripter_set});
    }
    subscripter_set->insert(key);
}
/*
 * 处理取消订阅
*/
void BusServerSocket::_proxy_desub( char *topic, int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    // SHMKeySet::iterator set_iter;
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
        subscripter_set->erase(key);
    }
}
/*
 * 处理取消所有订阅
*/
void BusServerSocket::_proxy_desub_all(int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    // SHMKeySet::iterator set_iter;
    for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
            subscripter_set->erase(key);
    }
}
/*
 * 处理发布,代理转发
*/
void BusServerSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    SHMKeySet::iterator set_iter;
    std::vector<int> subscripter_to_del;
    std::vector<int>::iterator vector_iter;
    int send_key;
    struct timespec timeout = {1,0};
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
        for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
            send_key = *set_iter;
 // printf("_proxy_pub send before %d \n", send_key);
            if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
                //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
                subscripter_to_del.push_back(send_key);
            } else {
// printf("_proxy_pub send after: %d \n", send_key);
            }
        }
        // 删除已关闭的端
        for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
            if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
                subscripter_set->erase(set_iter);
                logger->debug("remove closed subscripter %d \n", send_key);
            }
        }
        subscripter_to_del.clear();
    }
}
void * BusServerSocket::run_pubsub_proxy() {
    // pthread_detach(pthread_self());
    int size;
    int key;
    char * action, *topic, *topics, *buf;
    size_t head_len;
    const char *topic_delim = ",";
// printf("run_pubsub_proxy server receive before\n");
    while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
//printf("run_pubsub_proxy server recv after: %s \n", buf);
        if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
 // printf("run_pubsub_proxy  %s %s \n", action, topics);
            if(strcmp(action, "sub") == 0) {
                // 订阅支持多主题订阅
                topic = strtok(topics, topic_delim);
//printf("run_pubsub_proxy topic = %s\n", topic);
              while(topic) {
           _proxy_sub(trim(topic, 0), key);
            topic =  strtok(NULL, topic_delim);
              }
            } else if(strcmp(action, "desub") == 0) {
// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
                if(strcmp(trim(topics, 0), "") == 0) {
                    // 取消所有订阅
                    _proxy_desub_all(key);
                } else {
                    topic = strtok(topics, topic_delim);
                  while(topic) {
               _proxy_desub(trim(topic, 0), key);
                topic =  strtok(NULL, topic_delim);
                  }
                }
            } else if(strcmp(action, "pub") == 0) {
                _proxy_pub(topics, head_len, buf, size, key);
            }  else if(strcmp(action, "stop") == 0) {
                 logger->info( "Stopping Bus...");
                 // snprintf(buf, 128, "%sstop_finished%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, "", TOPIC_RIDENTIFIER);
                 // shm_sendto(shm_socket, const void *buf, const int size, key);
                 free(action);
                 free(topics);
                 free(buf);
                 break;
            } else {
                logger->error( "BusServerSocket::run_pubsub_proxy : unrecognized action");
            }
            free(action);
            free(topics);
        } else {
            logger->error( "BusServerSocket::run_pubsub_proxy : incorrect format msg");
        }
        free(buf);
    }
    return NULL;
}
/**
 * @str "<**sub**>{经济}"
 */
int BusServerSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) {
 char *ptr = str;
 char *str_end_ptr = str + size;
 char *action_start_ptr;
 char *action_end_ptr;
 size_t action_len = 0;
 char *topic_start_ptr;
 char *topic_end_ptr;
 size_t topic_len = 0;
 // if (strlen(identifier) > strlen(str)) {
 //  return 0;
 // }
 if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) {
  ptr += strlen(ACTION_LIDENTIFIER);
  action_start_ptr = ptr;
  while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) {
    if(ptr >= str_end_ptr) {
      return 0;
    }
  }
// printf("%s\n", ptr);
  action_end_ptr = ptr;
  action_len = action_end_ptr - action_start_ptr;
  ptr += strlen(ACTION_RIDENTIFIER);
// printf("%s\n", ptr);
// printf("%s\n", str_end_ptr-1);
  if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) {
    topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER);
    while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) {
      if(ptr >= str_end_ptr) {
        return 0;
      }
    }
    topic_end_ptr = ptr;
    topic_len = topic_end_ptr - topic_start_ptr;
    ptr += strlen(TOPIC_RIDENTIFIER);
  } else {
    return 0;
  }
 } else {
  return 0;
 }
 char *topic = (char *)malloc(topic_len+1);
 strncpy(topic, topic_start_ptr, topic_len);
 *(topic+topic_len) = '\0';
 *_topic = topic;
 char *action = (char *)malloc(action_len+1);
 strncpy(action, action_start_ptr, action_len);
 *(action+action_len) = '\0';
 *_action = action;
 *head_len = ptr-str;
 return 1;
}
src/socket/bus_server_socket.h
New file
@@ -0,0 +1,85 @@
#ifndef _BUS_SERVER_SOCKET_H_
#define _BUS_SERVER_SOCKET_H_
#include "usg_common.h"
#include "shm_socket.h"
#include "shm_allocator.h"
#include "mem_pool.h"
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "key_def.h"
#include "socket_def.h"
#include <set>
//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
class BusServerSocket {
private:
    shm_socket_t *shm_socket;
  // pthread_t recv_thread;
  // <主题, 订阅者>
    SHMTopicSubMap *topic_sub_map;
private:
    void _proxy_sub( char *topic, int key);
    void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key);
    void *run_pubsub_proxy();
    int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
    void _proxy_desub( char *topic, int key);
    void _proxy_desub_all(int key);
    static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
    // static bool include_in_keys(int key, int keys[], size_t length);
public:
    static size_t remove_subscripters(int keys[], size_t length) ;
public:
    BusServerSocket();
    ~BusServerSocket();
    /**
     * 绑定端口到socket, 如果不绑定则系统自动分配一个
     * @return 0 成功, 其他值 失败的错误码
    */
    int bind(int key);
    /**
     * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
     * @return 0 成功, 其他值 失败的错误码
    */
    int force_bind(int key);
    /**
     * 启动bus
     *
     * @return 0 成功, 其他值 失败的错误码
    */
    int  start();
    /**
     * 停止bus
     *
     * @return 0 成功, 其他值 失败的错误码
    */
    int  stop();
    /**
     * 获取soket key
     */
    int get_key() ;
};
#endif
src/socket/bus_server_socket_wrapper.c
@@ -8,7 +8,7 @@
 */
void * bus_server_socket_wrapper_open() {
    
    NetModSocket *sockt = new NetModSocket;
    BusServerSocket *sockt = new BusServerSocket;
    return (void *)sockt;
}
@@ -16,7 +16,7 @@
 * 关闭
 */
void bus_server_socket_wrapper_close(void *_socket) {
    NetModSocket *sockt = (NetModSocket *)_socket;
    BusServerSocket *sockt = (BusServerSocket *)_socket;
    delete sockt;
}
@@ -27,10 +27,10 @@
*/
int  bus_server_socket_wrapper_start_bus(void * _socket) {
    int ret;
    NetModSocket *sockt = (NetModSocket *)_socket;
    BusServerSocket *sockt = (BusServerSocket *)_socket;
    if( (ret = sockt->bind(BUS_KEY)) == 0) {
        return sockt->start_bus();
        return sockt->start();
    } else {
        logger->error("start bus failed");
        return -1;
src/socket/bus_server_socket_wrapper.h
@@ -1,7 +1,7 @@
#ifndef _BUS_SERVER_SOCKET_WRAPPER_H_
#define _BUS_SERVER_SOCKET_WRAPPER_H_
#include "net_mod_socket.h"
#include "bus_server_socket.h"
#ifdef __cplusplus
extern "C" {
src/socket/dgram_mod_socket.c
File was deleted
src/socket/dgram_mod_socket.h
File was deleted
src/socket/net_mod_server_socket.c
@@ -215,6 +215,7 @@
    
   
  } else if(request_head.mod == BUS) {
    if(request_head.topic_length > max_topic_buf) {
      if( (topic_buf = realloc(topic_buf, request_head.topic_length)) == NULL ) {
         LoggerFactory::getLogger()->error(errno, "NetModServerSocket::process_client realloc topic_buf");
@@ -243,7 +244,7 @@
    else if(request_head.timeout == -1) {
      ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY);
    }
printf("bus server pub ret=%d\n", ret);
    response_head.code = ret;
    response_head.content_length = 0;
    if( rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH) != NET_MODE_RESPONSE_HEAD_LENGTH )
src/socket/net_mod_socket.c
@@ -560,15 +560,6 @@
/**
 * 启动bus
 *
 * @return 0 成功, 其他值 失败的错误码
*/
int  NetModSocket::start_bus() {
  return shmModSocket.start_bus();
}
/**
 * 订阅指定主题
 * @topic 主题
 * @size 主题长度
src/socket/net_mod_socket.h
@@ -4,13 +4,8 @@
#include "shm_mod_socket.h"
#include "socket_io.h"
#include <poll.h>
#include "socket_def.h"
#define GET(p)       (*(uint32_t *)(p))
#define PUT(p, val)  (*(uint32_t *)(p) = (val))
#define GET_INT32(p)       (*(int32_t *)(p))
#define PUT_INT32(p, val)  (*(int32_t *)(p) = (val))
class NetModServerSocket;
@@ -178,15 +173,7 @@
  int sendandrecv_timeout( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, int sec, int nsec) ;
  int sendandrecv_nowait( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size) ;
  /**
   * 启动bus
   *
   * @return 0 成功, 其他值 失败的错误码
  */
  int  start_bus();
   /**
   * 向node_arr 中的所有网络节点发布消息
   * @node_arr 网络节点组, @node_arr_len该数组长度
src/socket/shm_mod_socket.c
@@ -1,65 +1,10 @@
#include "shm_mod_socket.h"
#include "bus_server_socket.h"
static Logger *logger = LoggerFactory::getLogger();
void ShmModSocket::foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb) {
    SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
    SHMKeySet *subscripter_set;
    SHMKeySet::iterator set_iter;
    SHMTopicSubMap::iterator map_iter;
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
            if(subscripter_set != NULL) {
                for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
                    cb(subscripter_set, *set_iter);
                }
            }
        }
    }
}
bool ShmModSocket::include_in_keys(int key, int keys[], size_t length) {
    if(length == 0) {
        return false;
    }
    for(int i = 0; i < length; i++) {
        if(keys[i] == key)
            return true;
    }
    return false;
}
size_t ShmModSocket::remove_subscripters(int keys[], size_t length) {
    size_t count = 0;
    int key;
    for(int i = 0; i < length; i++) {
        key = keys[i];
        SHMTopicSubMap *topic_sub_map = mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
        SHMKeySet *subscripter_set;
        SHMKeySet::iterator set_iter;
        SHMTopicSubMap::iterator map_iter;
        if(topic_sub_map != NULL) {
            for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
                subscripter_set = map_iter->second;
                if(subscripter_set != NULL && (set_iter = subscripter_set->find(key) ) != subscripter_set->end()) {
                    subscripter_set->erase(set_iter);
// printf("remove_subscripter %s, %d\n", map_iter->first, key);
                    count++;
                }
            }
        }
    }
    return count;
}
size_t ShmModSocket::remove_keys(int keys[], size_t length) {
    remove_subscripters(keys, length);
    BusServerSocket::remove_subscripters(keys, length);
    return shm_socket_remove_keys(keys, length);
}
@@ -67,42 +12,18 @@
    mod = (socket_mod_t)0;
    shm_socket = shm_open_socket(SHM_SOCKET_DGRAM);
    bus_set = new std::set<int>;
    topic_sub_map = NULL;
}
ShmModSocket::~ShmModSocket() {
// printf("ShmModSocket  destory 1\n");
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
  logger->debug("Destory ShmModSocket...\n");
    struct timespec timeout = {1, 0};
    if(bus_set != NULL) {
        for(auto bus_iter = bus_set->begin(); bus_iter != bus_set->end(); bus_iter++) {
// printf("ShmModSocket  desub_timeout before");
            desub_timeout(NULL, 0, *bus_iter, &timeout);
// printf("ShmModSocket  desub_timeout after %d\n", *bus_iter);
        }
        delete bus_set;
    }
// printf("ShmModSocket  destory 2\n");
    if(topic_sub_map != NULL) {
        for (map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
// printf("ShmModSocket  destory 2-1\n");
            if(subscripter_set != NULL) {
// printf("ShmModSocket  destory 2-2\n");
                subscripter_set->clear();
// printf("ShmModSocket  destory 2-3\n");
                mm_free((void *)subscripter_set);
// printf("ShmModSocket  destory 2-4\n");
            }
        }
        topic_sub_map->clear();
        mem_pool_free_by_key(BUS_MAP_KEY);
    }
// printf("ShmModSocket  destory 3\n");
    // printf("=============close socket\n");
    shm_close_socket(shm_socket);
// printf("ShmModSocket  destory 4\n");    
}
@@ -110,8 +31,6 @@
int ShmModSocket::bind(int key) {
    return  shm_socket_bind(shm_socket, key);
/**
 * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
@@ -198,20 +117,6 @@
/**
 * 启动bus
 *
 * @return 0 成功, 其他值 失败的错误码
*/
int  ShmModSocket::start_bus(){
    mod = BUS;
    topic_sub_map =    mem_pool_attach<SHMTopicSubMap>(BUS_MAP_KEY);
    run_pubsub_proxy();
    // pthread_t tid;
    // pthread_create(&tid, NULL, run_accept_sub_request, _socket);
    return 0;
}
/**
 * 订阅指定主题
@@ -310,7 +215,9 @@
/**
 * @key 总线端口
 * @str "<**pub**>{经济}"
 */
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,  
    struct timespec *timeout, int flags) {
    int head_len;
@@ -321,216 +228,7 @@
    return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags);
}
/*
 * 处理订阅
*/
void ShmModSocket::_proxy_sub( char *topic, int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    SHMKeySet::iterator set_iter;
//printf("_proxy_sub topic = %s\n", topic);
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
    } else {
        void *set_ptr = mm_malloc(sizeof(SHMKeySet));
        subscripter_set = new(set_ptr) SHMKeySet;
        topic_sub_map->insert({topic, subscripter_set});
    }
    subscripter_set->insert(key);
}
/*
 * 处理取消订阅
*/
void ShmModSocket::_proxy_desub( char *topic, int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    // SHMKeySet::iterator set_iter;
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
        subscripter_set->erase(key);
    }
}
/*
 * 处理取消所有订阅
*/
void ShmModSocket::_proxy_desub_all(int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    // SHMKeySet::iterator set_iter;
    for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
            subscripter_set->erase(key);
    }
}
/*
 * 处理发布,代理转发
*/
void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    SHMKeySet::iterator set_iter;
    std::vector<int> subscripter_to_del;
    std::vector<int>::iterator vector_iter;
    int send_key;
    struct timespec timeout = {1,0};
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
        for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
            send_key = *set_iter;
 // printf("_proxy_pub send before %d \n", send_key);
            if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
                //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
                subscripter_to_del.push_back(send_key);
            } else {
// printf("_proxy_pub send after: %d \n", send_key);
            }
        }
        // 删除已关闭的端
        for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
            if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
                subscripter_set->erase(set_iter);
                logger->debug("remove closed subscripter %d \n", send_key);
            }
        }
        subscripter_to_del.clear();
    }
}
void * ShmModSocket::run_pubsub_proxy() {
    // pthread_detach(pthread_self());
    int size;
    int key;
    char * action, *topic, *topics, *buf;
    size_t head_len;
    const char *topic_delim = ",";
// printf("run_pubsub_proxy server receive before\n");
    while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
//printf("run_pubsub_proxy server recv after: %s \n", buf);
        if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
// printf("run_pubsub_proxy  %s %s \n", action, topics);
            if(strcmp(action, "sub") == 0) {
                // 订阅支持多主题订阅
                topic = strtok(topics, topic_delim);
//printf("run_pubsub_proxy topic = %s\n", topic);
              while(topic) {
           _proxy_sub(trim(topic, 0), key);
            topic =  strtok(NULL, topic_delim);
              }
            } else if(strcmp(action, "desub") == 0) {
// printf("desub topic=%s,%s,%d\n", topics, trim(topics, 0), strcmp(trim(topics, 0), ""));
                if(strcmp(trim(topics, 0), "") == 0) {
                    // 取消所有订阅
                    _proxy_desub_all(key);
                } else {
                    topic = strtok(topics, topic_delim);
                  while(topic) {
               _proxy_desub(trim(topic, 0), key);
                topic =  strtok(NULL, topic_delim);
                  }
                }
            } else if(strcmp(action, "pub") == 0) {
                _proxy_pub(topics, head_len, buf, size, key);
            }
            free(action);
            free(topics);
        } else {
            logger->error( "ShmModSocket::run_pubsub_proxy : incorrect format msg");
        }
        free(buf);
    }
    return NULL;
}
/**
 * @str "<**sub**>{经济}"
 */
int ShmModSocket::parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len ) {
 char *ptr = str;
 char *str_end_ptr = str + size;
 char *action_start_ptr;
 char *action_end_ptr;
 size_t action_len = 0;
 char *topic_start_ptr;
 char *topic_end_ptr;
 size_t topic_len = 0;
 // if (strlen(identifier) > strlen(str)) {
 //  return 0;
 // }
 if (strncmp(ptr, ACTION_LIDENTIFIER, strlen(ACTION_LIDENTIFIER)) == 0) {
  ptr += strlen(ACTION_LIDENTIFIER);
  action_start_ptr = ptr;
  while(strncmp(++ptr, ACTION_RIDENTIFIER, strlen(ACTION_RIDENTIFIER)) != 0) {
    if(ptr >= str_end_ptr) {
      return 0;
    }
  }
// printf("%s\n", ptr);
  action_end_ptr = ptr;
  action_len = action_end_ptr - action_start_ptr;
  ptr += strlen(ACTION_RIDENTIFIER);
// printf("%s\n", ptr);
// printf("%s\n", str_end_ptr-1);
  if(strncmp(ptr, TOPIC_LIDENTIFIER, strlen(TOPIC_LIDENTIFIER)) == 0 ) {
    topic_start_ptr = ptr+strlen(TOPIC_LIDENTIFIER);
    while(strncmp(++ptr, TOPIC_RIDENTIFIER, strlen(TOPIC_RIDENTIFIER)) != 0) {
      if(ptr >= str_end_ptr) {
        return 0;
      }
    }
    topic_end_ptr = ptr;
    topic_len = topic_end_ptr - topic_start_ptr;
    ptr += strlen(TOPIC_RIDENTIFIER);
  } else {
    return 0;
  }
 } else {
  return 0;
 }
 char *topic = (char *)malloc(topic_len+1);
 strncpy(topic, topic_start_ptr, topic_len);
 *(topic+topic_len) = '\0';
 *_topic = topic;
 char *action = (char *)malloc(action_len+1);
 strncpy(action, action_start_ptr, action_len);
 *(action+action_len) = '\0';
 *_action = action;
 *head_len = ptr-str;
 return 1;
}
 
src/socket/shm_mod_socket.h
@@ -9,53 +9,23 @@
#include "logger_factory.h"
#include "key_def.h"
#include <set>
#define ACTION_LIDENTIFIER "<**"
#define ACTION_RIDENTIFIER "**>"
#define TOPIC_LIDENTIFIER "{"
#define TOPIC_RIDENTIFIER "}"
//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
enum socket_mod_t
{
    PULL_PUSH = 1,
    REQ_REP = 2,
    PAIR = 3,
    PUB_SUB = 4,
    SURVEY = 5,
    BUS = 6
};
#include "socket_def.h"
class ShmModSocket {
private:
    shm_socket_t *shm_socket;
  socket_mod_t mod;
  // pthread_t recv_thread;
  // <主题, 订阅者>
    SHMTopicSubMap *topic_sub_map;
    std::set<int> *bus_set;
private:
    inline int _recvfrom_(void **buf, int *size, int *key,  struct timespec *timeout, int flags);
    void _proxy_sub( char *topic, int key);
    void _proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key);
    void *run_pubsub_proxy();
    int parse_pubsub_topic(char *str, size_t size, char **_action, char **_topic, size_t *head_len );
    int _sub_( char *topic, int size, int key, struct timespec *timeout, int flags);
    int _pub_( char *topic, int topic_size, void *content, int content_size, int key, struct timespec *timeout, int flags);
    void _proxy_desub( char *topic, int key);
    void _proxy_desub_all(int key);
    int  _desub_( char *topic, int size, int key, struct timespec *timeout, int flags);
    static void foreach_subscripters(std::function<void(SHMKeySet *, int)>  cb);
    static bool include_in_keys(int key, int keys[], size_t length);
    static size_t remove_subscripters(int keys[], size_t length) ;
public:
    static size_t remove_keys(int keys[], size_t length);
public:
@@ -109,14 +79,6 @@
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int key, void **recv_buf, int *recv_size) ;
    /**
     * 启动bus
     *
     * @return 0 成功, 其他值 失败的错误码
    */
    int  start_bus();
    /**
     * 订阅指定主题
src/socket/shm_socket.c
@@ -298,10 +298,10 @@
  }
  SemUtil::inc(socket->mutex);
  
  if (key == socket->key) {
    logger->error( "can not send to your self!");
    return -1;
  }
  // if (key == socket->key) {
  //   logger->error( "can not send to your self!");
  //   return -1;
  // }
  SHMQueue<shm_msg_t> *remoteQueue;
  if ((remoteQueue = _attach_remote_queue(key)) == NULL) {
src/socket/shm_socket.h
@@ -5,29 +5,13 @@
#include "usg_typedef.h"
#include "shm_queue.h"
enum shm_msg_type_t
{
    SHM_SOCKET_OPEN = 1,
    SHM_SOCKET_OPEN_REPLY = 2,
    SHM_SOCKET_CLOSE = 3,
    SHM_COMMON_MSG = 4
};
enum shm_socket_flag_t
{
  SHM_MSG_TIMEOUT = 1,
  SHM_MSG_NOWAIT = 2
};
enum shm_socket_type_t
{
    SHM_SOCKET_STREAM = 1,
    SHM_SOCKET_DGRAM = 2
};
enum shm_socket_error_type_t {
    SHM_SOCKET_ECONNFAILED = 1,
@@ -38,6 +22,22 @@
    SHM_CONN_CLOSED=1,
    SHM_CONN_LISTEN=2,
    SHM_CONN_ESTABLISHED=3
};
enum shm_socket_type_t
{
    SHM_SOCKET_STREAM = 1,
    SHM_SOCKET_DGRAM = 2
};
enum shm_msg_type_t
{
    SHM_SOCKET_OPEN = 1,
    SHM_SOCKET_OPEN_REPLY = 2,
    SHM_SOCKET_CLOSE = 3,
    SHM_COMMON_MSG = 4
};
typedef struct shm_msg_t {
@@ -93,6 +93,9 @@
int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
/**
 * @flags : SHM_MSG_NOWAIT
 */
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0);
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,   struct timespec * timeout = NULL,  int flags=0);
src/socket/socket_def.h
New file
@@ -0,0 +1,37 @@
#ifndef _SOCKET_DEF_H_
#define _SOCKET_DEF_H_
#define GET(p)       (*(uint32_t *)(p))
#define PUT(p, val)  (*(uint32_t *)(p) = (val))
#define GET_INT32(p)       (*(int32_t *)(p))
#define PUT_INT32(p, val)  (*(int32_t *)(p) = (val))
enum socket_mod_t
{
    PULL_PUSH = 1,
    REQ_REP = 2,
    PAIR = 3,
    PUB_SUB = 4,
    SURVEY = 5,
    BUS = 6
};
// typedef struct shm_bus_msg_t {
//     void *topic;
//     int topic_length;
// } shm_bus_msg_t;
#define ACTION_LIDENTIFIER "<**"
#define ACTION_RIDENTIFIER "**>"
#define TOPIC_LIDENTIFIER "{"
#define TOPIC_RIDENTIFIER "}"
#endif
test_net_socket/net_mod_socket.sh
@@ -1,7 +1,7 @@
function server() {
    
    # 开启bus 
 ./test_net_mod_socket --fun="start_bus_server" --key=8  & server_pid=$! &&  echo "pid: ${server_pid}"
 ./test_net_mod_socket --fun="start_bus_server"  & server_pid=$! &&  echo "pid: ${server_pid}"
    # 开启网络转发代理
    ./test_net_mod_socket  --fun="start_net_proxy" --port=5000 & server_pid=$! && echo "pid: ${server_pid}" 
@@ -49,6 +49,7 @@
case ${1} in
  "server")
    close
    sleep 2
  server
  ;;
  "client")
@@ -65,12 +66,13 @@
  ;;
  "")
    close
    sleep 2
    server 
    client
  ;;
  *)
  echo "error input"
  echo "argument input error"
  exit 1
  ;;
esac
test_net_socket/test_net_mod_socket.c
@@ -55,9 +55,35 @@
}
void *bus_handler(void *sockt) {
  pthread_detach(pthread_self());
  char action[512];
  while ( true) {
    printf("Input action: Close?\n");
    if(scanf("%s",action) < 1) {
      printf("Invalide action\n");
      continue;
    }
    if(strcmp(action, "close") == 0) {
      bus_server_socket_wrapper_close(sockt);
      break;
    } else {
      printf("Invalide action\n");
    }
  }
}
void start_bus_server() {
  printf("Start bus server\n");
  void * server_socket = bus_server_socket_wrapper_open();
  pthread_t tid;
  // 创建一个线程,可以关闭bus
  pthread_create(&tid, NULL, bus_handler, server_socket);
  if(bus_server_socket_wrapper_start_bus(server_socket) != 0) {
    printf("start bus failed\n");
    exit(1);
@@ -177,11 +203,6 @@
  
}
void *_run_sendandrecv_(void *arg) {
  Targ *targ = (Targ *)arg;
@@ -349,6 +370,10 @@
 // port = atoi(argv[2]);
     
  if(opt.fun == NULL) {
    usage(argv[0]);
    exit(1);
  }
    if (strcmp("start_net_proxy", opt.fun) == 0 ) {
    if(opt.port == 0) {
@@ -359,10 +384,7 @@
    
  }
  else if (strcmp("start_bus_server", opt.fun) == 0) {
    if(opt.key == 0) {
      usage(argv[0]);
      exit(1);
    }
    start_bus_server();
  }
  else if (strcmp("start_reply", opt.fun) == 0) {