wangzhengquan
2020-11-26 72b7aebb0022f8e391c999348763acd5f7a16133
update
13个文件已修改
632 ■■■■■ 已修改文件
src/socket/dgram_mod_socket.c 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.c 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket_wrapper.c 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket_wrapper.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.c 178 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.c 120 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_stream_mod_socket.c 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.c 227 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_socket/dgram_mod_bus.c 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/dgram_mod_socket.c
@@ -202,7 +202,7 @@
 */
int dgram_mod_get_port(void * _socket) {
    dgram_mod_socket_t * socket = (dgram_mod_socket_t *) _socket;
    return socket->m_socket->get_port();
    return socket->m_socket->get_key();
}
src/socket/net_mod_socket.c
@@ -498,8 +498,8 @@
/**
 * 获取soket端口号
 */
int NetModSocket::get_port() {
  return shmModSocket.get_port();
int NetModSocket::get_key() {
  return shmModSocket.get_key();
}
src/socket/net_mod_socket.h
@@ -222,7 +222,7 @@
  int  pub_nowait( char *topic, int topic_size, void *content, int content_size, int port);
   /**
   * 向node_arr 中的所有网络节点发布消息
   * @node_arr 网络节点组, @node_arr_len该数组长度
@@ -233,9 +233,9 @@
  int pub(net_node_t *node_arr, int node_arr_len, char *topic, int topic_size, void *content, int content_size);
  /**
   * 获取soket端口号
   * 获取soket key
   */
  int get_port() ;
  int get_key() ;
   /**
   * 销毁sendandrecv方法返回的消息组 
src/socket/net_mod_socket_wrapper.c
@@ -184,9 +184,9 @@
/**
 * 获取soket端口号
 */
int net_mod_socket_get_port(void * _socket) {
int net_mod_socket_get_key(void * _socket) {
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->get_port();
    return sockt->sockt->get_key();
}
src/socket/net_mod_socket_wrapper.h
@@ -121,7 +121,7 @@
 * @size 主题长度
 * @port 总线端口
 */
int  net_mod_socket_desub(void * _socket, void *topic, int size, int port);
int  net_mod_socket_desub(void * _socket, void *topic, int size, int key);
// 超时返回。 @sec 秒 , @nsec 纳秒
int  net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec);
int  net_mod_socket_desub_nowait(void * _socket, void *topic, int size, int port);
@@ -131,7 +131,7 @@
/**
 * 获取soket端口号
 */
int net_mod_socket_get_port(void * _socket) ;
int net_mod_socket_get_key(void * _socket) ;
src/socket/shm_mod_socket.c
@@ -105,8 +105,8 @@
// printf("ShmModSocket  destory 4\n");    
}
int ShmModSocket::bind(int port) {
    return  shm_socket_bind(shm_socket, port);
int ShmModSocket::bind(int key) {
    return  shm_socket_bind(shm_socket, key);
@@ -115,82 +115,82 @@
 * 强制绑定端口到socket, 适用于程序非正常关闭的情况下,重启程序绑定原来还没释放的key
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::force_bind(int port) {
    return shm_socket_force_bind(shm_socket, port);
int ShmModSocket::force_bind(int key) {
    return shm_socket_force_bind(shm_socket, key);
}
/**
 * 发送信息
 * @port 发送给谁
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
 */
int ShmModSocket::sendto(const void *buf, const int size, const int port) {
        return shm_sendto(shm_socket, buf, size, port, NULL, 0);
int ShmModSocket::sendto(const void *buf, const int size, const int key) {
        return shm_sendto(shm_socket, buf, size, key, NULL, 0);
}
// 发送信息超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendto_timeout(const void *buf, const int size, const int port, const struct timespec *timeout) {
    return shm_sendto(shm_socket, buf, size, port, timeout, 0);
int ShmModSocket::sendto_timeout(const void *buf, const int size, const int key, const struct timespec *timeout) {
    return shm_sendto(shm_socket, buf, size, key, timeout, 0);
}
// 发送信息立刻返回。
int ShmModSocket::sendto_nowait( const void *buf, const int size, const int port){
    return shm_sendto(shm_socket, buf, size, port, NULL, (int)SHM_MSG_NOWAIT);
int ShmModSocket::sendto_nowait( const void *buf, const int size, const int key){
    return shm_sendto(shm_socket, buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
}
inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *port,  struct timespec *timeout, int flags) {
inline int ShmModSocket::_recvfrom_(void **buf, int *size, int *key,  struct timespec *timeout, int flags) {
    if(mod == BUS) {
        err_exit(0, "Can not use method recvfrom in a Bus");
    }
// printf("dgram_mod_recvfrom  before\n");
    int rv = shm_recvfrom(shm_socket, buf, size, port, timeout, flags);
    int rv = shm_recvfrom(shm_socket, buf, size, key, timeout, flags);
// printf("dgram_mod_recvfrom  after\n");
    return rv;
}
/**
 * 接收信息
 * @port 从谁哪里收到的信息
 * @key 从谁哪里收到的信息
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::recvfrom(void **buf, int *size, int *port) {
int ShmModSocket::recvfrom(void **buf, int *size, int *key) {
        
        return  _recvfrom_( buf, size, port, NULL, 0);
        return  _recvfrom_( buf, size, key, NULL, 0);
}
// 接受信息超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *port, struct timespec *timeout) {
    return _recvfrom_(buf, size, port, timeout, 0);
int ShmModSocket::recvfrom_timeout( void **buf, int *size, int *key, struct timespec *timeout) {
    return _recvfrom_(buf, size, key, timeout, 0);
}
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *port){
    return _recvfrom_(buf, size, port, NULL, (int)SHM_MSG_NOWAIT);
int ShmModSocket::recvfrom_nowait( void **buf, int *size, int *key){
    return _recvfrom_(buf, size, key, NULL, (int)SHM_MSG_NOWAIT);
}
/**
 * 发送请求信息并等待接收应答
 * @port 发送给谁
 * @key 发送给谁
 * @return 0 成功, 其他值 失败的错误码
*/
int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0);
int ShmModSocket::sendandrecv( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0);
int ShmModSocket::sendandrecv_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
int ShmModSocket::sendandrecv_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
}
int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, NULL, 0);
int ShmModSocket::sendandrecv_unsafe( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size, struct timespec *timeout){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, timeout, 0);
int ShmModSocket::sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, struct timespec *timeout){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, timeout, 0);
}
int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_port, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
int ShmModSocket::sendandrecv_unsafe_nowait(const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size){
    return shm_sendandrecv_unsafe(shm_socket, send_buf, send_size, send_key, recv_buf, recv_size, 0, (int)SHM_MSG_NOWAIT);
}
@@ -214,17 +214,17 @@
 * 订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @port 总线端口
 * @key 总线端口
 */
int  ShmModSocket::sub(char *topic, int size, int port){
    return _sub_( topic, size, port, NULL, 0);
int  ShmModSocket::sub(char *topic, int size, int key){
    return _sub_( topic, size, key, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::sub_timeout(char *topic, int size, int port, struct timespec *timeout){
    return _sub_(topic, size, port, timeout, 0);
int  ShmModSocket::sub_timeout(char *topic, int size, int key, struct timespec *timeout){
    return _sub_(topic, size, key, timeout, 0);
}
int  ShmModSocket::sub_nowait(char *topic, int size, int port) {
    return _sub_(topic, size, port, NULL,  (int)SHM_MSG_NOWAIT);
int  ShmModSocket::sub_nowait(char *topic, int size, int key) {
    return _sub_(topic, size, key, NULL,  (int)SHM_MSG_NOWAIT);
}
@@ -233,17 +233,17 @@
 * 取消订阅指定主题
 * @topic 主题
 * @size 主题长度
 * @port 总线端口
 * @key 总线端口
 */
int  ShmModSocket::desub(char *topic, int size, int port){
    return _desub_( topic, size, port, NULL, 0);
int  ShmModSocket::desub(char *topic, int size, int key){
    return _desub_( topic, size, key, NULL, 0);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::desub_timeout(char *topic, int size, int port, struct timespec *timeout){
    return _desub_(topic, size, port, timeout, 0);
int  ShmModSocket::desub_timeout(char *topic, int size, int key, struct timespec *timeout){
    return _desub_(topic, size, key, timeout, 0);
}
int  ShmModSocket::desub_nowait(char *topic, int size, int port) {
    return _desub_(topic, size, port, NULL,  (int)SHM_MSG_NOWAIT);
int  ShmModSocket::desub_nowait(char *topic, int size, int key) {
    return _desub_(topic, size, key, NULL,  (int)SHM_MSG_NOWAIT);
}
@@ -252,76 +252,76 @@
 * 发布主题
 * @topic 主题
 * @content 主题内容
 * @port 总线端口
 * @key 总线端口
 */
int  ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int port){
        return _pub_(topic, topic_size, content, content_size, port, NULL, 0);
int  ShmModSocket::pub(char *topic, int topic_size, void *content, int content_size, int key){
        return _pub_(topic, topic_size, content, content_size, key, NULL, 0);
}
//  超时返回。 @sec 秒 , @nsec 纳秒
int  ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int port, struct timespec * timeout){
    return _pub_( topic, topic_size, content, content_size, port, timeout, 0);
int  ShmModSocket::pub_timeout(char *topic, int topic_size, void *content, int content_size, int key, struct timespec * timeout){
    return _pub_( topic, topic_size, content, content_size, key, timeout, 0);
}
int  ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int port){
    return _pub_(topic, topic_size, content, content_size, port, NULL, (int)SHM_MSG_NOWAIT);
int  ShmModSocket::pub_nowait(char *topic, int topic_size, void *content, int content_size, int key){
    return _pub_(topic, topic_size, content, content_size, key, NULL, (int)SHM_MSG_NOWAIT);
}
/**
 * 获取soket端口号
 * 获取soket key
 */
int ShmModSocket::get_port(){
    return shm_socket->port;
int ShmModSocket::get_key(){
    return shm_socket->key;
}
// =============================================================================
/**
 * @port 总线端口
 * @key 总线端口
 */
int  ShmModSocket::_sub_(char *topic, int size, int port,
int  ShmModSocket::_sub_(char *topic, int size, int key,
    struct timespec *timeout, int flags) {
    char buf[8192];
    int rv;
    snprintf(buf,  8192, "%ssub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
    rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
    rv = shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
    if(rv == 0) {
        bus_set->insert(port);
        bus_set->insert(key);
    }
    return rv;
}
/**
 * @port 总线端口
 * @key 总线端口
 */
int  ShmModSocket::_desub_(char *topic, int size, int port,
int  ShmModSocket::_desub_(char *topic, int size, int key,
    struct timespec *timeout, int flags) {
    char buf[8192];
    if(topic == NULL) {
        topic = "";
    }
    snprintf(buf,  8192, "%sdesub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER,  topic, TOPIC_RIDENTIFIER);
    return shm_sendto(shm_socket, buf, strlen(buf) + 1, port, timeout, flags);
    return shm_sendto(shm_socket, buf, strlen(buf) + 1, key, timeout, flags);
}
/**
 * @port 总线端口
 * @key 总线端口
 */
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int port,
int  ShmModSocket::_pub_(char *topic, int topic_size, void *content, int content_size, int key,
    struct timespec *timeout, int flags) {
    int head_len;
    char buf[8192+content_size];
    snprintf(buf, 8192, "%spub%s%s%s%s", ACTION_LIDENTIFIER, ACTION_RIDENTIFIER, TOPIC_LIDENTIFIER, topic, TOPIC_RIDENTIFIER);
    head_len = strlen(buf);
    memcpy(buf+head_len, content, content_size);
    return shm_sendto(shm_socket, buf, head_len+content_size, port, timeout, flags);
    return shm_sendto(shm_socket, buf, head_len+content_size, key, timeout, flags);
}
/*
 * 处理订阅
*/
void ShmModSocket::_proxy_sub( char *topic, int port) {
void ShmModSocket::_proxy_sub( char *topic, int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
@@ -334,13 +334,13 @@
        subscripter_set = new(set_ptr) SHMKeySet;
        topic_sub_map->insert({topic, subscripter_set});
    }
    subscripter_set->insert(port);
    subscripter_set->insert(key);
}
/*
 * 处理取消订阅
*/
void ShmModSocket::_proxy_desub( char *topic, int port) {
void ShmModSocket::_proxy_desub( char *topic, int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
@@ -349,30 +349,30 @@
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
        subscripter_set->erase(port);
printf("============ desub %d\n", port);
        subscripter_set->erase(key);
printf("============ desub %d\n", key);
    }
}
/*
 * 处理取消所有订阅
*/
void ShmModSocket::_proxy_desub_all(int port) {
void ShmModSocket::_proxy_desub_all(int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
    // SHMKeySet::iterator set_iter;
    for (auto map_iter = topic_sub_map->begin(); map_iter != topic_sub_map->end(); map_iter++) {
            subscripter_set = map_iter->second;
            subscripter_set->erase(port);
printf("============ desub %d\n", port);
            subscripter_set->erase(key);
printf("============ desub %d\n", key);
    }
}
/*
 * 处理发布,代理转发
*/
void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int port) {
void ShmModSocket::_proxy_pub( char *topic, size_t head_len, void *buf, size_t size, int key) {
    SHMKeySet *subscripter_set;
    SHMTopicSubMap::iterator map_iter;
@@ -381,19 +381,19 @@
    std::vector<int> subscripter_to_del;
    std::vector<int>::iterator vector_iter;
    int send_port;
    int send_key;
    struct timespec timeout = {1,0};
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
        for(set_iter = subscripter_set->begin(); set_iter != subscripter_set->end(); set_iter++) {
            send_port = *set_iter;
 // printf("_proxy_pub send before %d \n", send_port);
            if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_port, &timeout) == SHM_SOCKET_ECONNFAILED ) {
            send_key = *set_iter;
 // printf("_proxy_pub send before %d \n", send_key);
            if (shm_sendto(shm_socket, buf+head_len, size-head_len, send_key, &timeout) == SHM_SOCKET_ECONNFAILED ) {
                //对方已关闭的连接放到待删除队列里。如果直接删除会让iter指针出现错乱
                subscripter_to_del.push_back(send_port);
                subscripter_to_del.push_back(send_key);
            } else {
// printf("_proxy_pub send after: %d \n", send_port);
// printf("_proxy_pub send after: %d \n", send_key);
            }
            
@@ -403,7 +403,7 @@
        for(vector_iter = subscripter_to_del.begin(); vector_iter != subscripter_to_del.end(); vector_iter++) {
            if((set_iter = subscripter_set->find(*vector_iter)) != subscripter_set->end()) {
                subscripter_set->erase(set_iter);
                printf("remove closed subscripter %d \n", send_port);
                printf("remove closed subscripter %d \n", send_key);
            }
        }
        subscripter_to_del.clear();
@@ -414,13 +414,13 @@
void * ShmModSocket::run_pubsub_proxy() {
    // pthread_detach(pthread_self());
    int size;
    int port;
    int key;
    char * action, *topic, *topics, *buf;
    size_t head_len;
    const char *topic_delim = ",";
// printf("run_pubsub_proxy server receive before\n");
    while(shm_recvfrom(shm_socket, (void **)&buf, &size, &port) == 0) {
    while(shm_recvfrom(shm_socket, (void **)&buf, &size, &key) == 0) {
//printf("run_pubsub_proxy server recv after: %s \n", buf);
        if(parse_pubsub_topic(buf, size, &action, &topics, &head_len)) {
// printf("run_pubsub_proxy  %s %s \n", action, topics);
@@ -429,7 +429,7 @@
                topic = strtok(topics, topic_delim);
//printf("run_pubsub_proxy topic = %s\n", topic);
              while(topic) {
           _proxy_sub(trim(topic, 0), port);
           _proxy_sub(trim(topic, 0), key);
            topic =  strtok(NULL, topic_delim);
              }
@@ -438,12 +438,12 @@
                if(strcmp(trim(topics, 0), "") == 0) {
                    // 取消所有订阅
        printf("====取消所有订阅\n");
                    _proxy_desub_all(port);
                    _proxy_desub_all(key);
                } else {
                 
                    topic = strtok(topics, topic_delim);
                  while(topic) {
               _proxy_desub(trim(topic, 0), port);
               _proxy_desub(trim(topic, 0), key);
                topic =  strtok(NULL, topic_delim);
                  }
                }
@@ -451,7 +451,7 @@
                
            } else if(strcmp(action, "pub") == 0) {
                _proxy_pub(topics, head_len, buf, size, port);
                _proxy_pub(topics, head_len, buf, size, key);
            }  
            
            free(action);
src/socket/shm_mod_socket.h
@@ -104,6 +104,7 @@
    int sendandrecv_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
    int sendandrecv_nowait(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
    int sendandrecv_unsafe(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size) ;
    // 超时返回。 @sec 秒 , @nsec 纳秒
    int sendandrecv_unsafe_timeout(const void *send_buf, const int send_size, const int port, void **recv_buf, int *recv_size,  struct timespec *timeout) ;
@@ -153,9 +154,9 @@
    /**
     * 获取soket端口号
     * 获取soket key
     */
    int get_port() ;
    int get_key() ;
};
src/socket/shm_socket.c
@@ -8,7 +8,7 @@
void print_msg(char *head, shm_msg_t &msg) {
  // err_msg(0, "%s: port=%d, type=%d\n", head, msg.port, msg.type);
  // err_msg(0, "%s: key=%d, type=%d\n", head, msg.key, msg.type);
}
static void *_server_run_msg_rev(void *_socket);
@@ -20,15 +20,15 @@
static int _shm_close_stream_socket(shm_socket_t *socket, bool notifyRemote);
static inline int  _shm_socket_check_key(shm_socket_t *socket) {
   void *tmp_ptr = mm_get_by_key(socket->port);
   void *tmp_ptr = mm_get_by_key(socket->key);
    if (tmp_ptr!= NULL && tmp_ptr != (void *)1 && !socket->force_bind ) {
      err_exit(0, "key %d has already been in used!", socket->port);
      err_exit(0, "key %d has already been in used!", socket->key);
      return 0;
    }
    return 1;
}
SHMQueue<shm_msg_t> *_attach_remote_queue(int port);
SHMQueue<shm_msg_t> *_attach_remote_queue(int key);
@@ -39,7 +39,7 @@
shm_socket_t *shm_open_socket(shm_socket_type_t socket_type) {
  shm_socket_t *socket = (shm_socket_t *)calloc(1, sizeof(shm_socket_t));
  socket->socket_type = socket_type;
  socket->port = -1;
  socket->key = -1;
  socket->force_bind = false;
  socket->dispatch_thread = 0;
  socket->status = SHM_CONN_CLOSED;
@@ -65,14 +65,14 @@
  return ret;
}
int shm_socket_bind(shm_socket_t *socket, int port) {
  socket->port = port;
int shm_socket_bind(shm_socket_t *socket, int key) {
  socket->key = key;
  return 0;
}
int shm_socket_force_bind(shm_socket_t *socket, int port) {
int shm_socket_force_bind(shm_socket_t *socket, int key) {
  socket->force_bind = true;
  socket->port = port;
  socket->key = key;
  return 0;
}
@@ -83,17 +83,17 @@
                "SHM_SOCKET_STREAM socket");
  }
  int port;
  int key;
  hashtable_t *hashtable = mm_get_hashtable();
  if (socket->port == -1) {
    port = hashtable_alloc_key(hashtable);
    socket->port = port;
  if (socket->key == -1) {
    key = hashtable_alloc_key(hashtable);
    socket->key = key;
  } else {
   _shm_socket_check_key(socket);
  }
  socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
  socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
  socket->acceptQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
  socket->clientSocketMap = new std::map<int, shm_socket_t *>;
  socket->status = SHM_CONN_LISTEN;
@@ -113,25 +113,25 @@
                "SHM_SOCKET_STREAM socket");
  }
  hashtable_t *hashtable = mm_get_hashtable();
  int client_port;
  int client_key;
  shm_socket_t *client_socket;
  shm_msg_t src;
  if (socket->acceptQueue->pop(src)) {
    // print_msg("===accept:", src);
    client_port = src.port;
    client_key = src.key;
    // client_socket = (shm_socket_t *)malloc(sizeof(shm_socket_t));
    client_socket = shm_open_socket(socket->socket_type);
    client_socket->port = socket->port;
    client_socket->key = socket->key;
    // client_socket->queue= socket->queue;
    //初始化消息queue
    client_socket->messageQueue =
        new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
    //连接到对方queue
    client_socket->remoteQueue = _attach_remote_queue(client_port);
    client_socket->remoteQueue = _attach_remote_queue(client_key);
    socket->clientSocketMap->insert({client_port, client_socket});
    socket->clientSocketMap->insert({client_key, client_socket});
    /*
* shm_accept 用户执行的方法
@@ -140,7 +140,7 @@
    //发送open_reply,回应客户端的connect请求
    struct timespec timeout = {1, 0};
    shm_msg_t msg;
    msg.port = socket->port;
    msg.key = socket->key;
    msg.size = 0;
    msg.type = SHM_SOCKET_OPEN_REPLY;
@@ -159,33 +159,33 @@
}
int shm_connect(shm_socket_t *socket, int port) {
int shm_connect(shm_socket_t *socket, int key) {
  if (socket->socket_type != SHM_SOCKET_STREAM) {
    err_exit(0, "can not invoke shm_connect method with a socket which is not "
                "a SHM_SOCKET_STREAM socket");
  }
  hashtable_t *hashtable = mm_get_hashtable();
  if (hashtable_get(hashtable, port) == NULL) {
    err_exit(0, "shm_connect:connect at port %d  failed!", port);
  if (hashtable_get(hashtable, key) == NULL) {
    err_exit(0, "shm_connect:connect at key %d  failed!", key);
  }
  if (socket->port == -1) {
    socket->port = hashtable_alloc_key(hashtable);
  if (socket->key == -1) {
    socket->key = hashtable_alloc_key(hashtable);
  } else {
    _shm_socket_check_key(socket);
  }
  socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
  socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
  if ((socket->remoteQueue = _attach_remote_queue(port)) == NULL) {
    err_exit(0, "connect to %d failted", port);
  if ((socket->remoteQueue = _attach_remote_queue(key)) == NULL) {
    err_exit(0, "connect to %d failted", key);
  }
  socket->messageQueue = new LockFreeQueue<shm_msg_t, DM_Allocator>(16);
  //发送open请求
  struct timespec timeout = {1, 0};
  shm_msg_t msg;
  msg.port = socket->port;
  msg.key = socket->key;
  msg.size = 0;
  msg.type = SHM_SOCKET_OPEN;
  socket->remoteQueue->push_timeout(msg, &timeout);
@@ -220,7 +220,7 @@
  // }
  shm_msg_t dest;
  dest.type = SHM_COMMON_MSG;
  dest.port = socket->port;
  dest.key = socket->key;
  dest.size = size;
  dest.buf = mm_malloc(size);
  memcpy(dest.buf, buf, size);
@@ -256,7 +256,7 @@
// 短连接方式发送
int shm_sendto(shm_socket_t *socket, const void *buf, const int size,
               const int port, const struct timespec *timeout, const int flags) {
               const int key, const struct timespec *timeout, const int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendto method in a %d type socket  which is "
                "not a SHM_SOCKET_DGRAM socket ",
@@ -266,31 +266,31 @@
  SemUtil::dec(socket->mutex);
  if (socket->queue == NULL) {
    if (socket->port == -1) {
      socket->port = hashtable_alloc_key(hashtable);
    if (socket->key == -1) {
      socket->key = hashtable_alloc_key(hashtable);
    } else {
     _shm_socket_check_key(socket);
    }
    socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
    socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
  }
  SemUtil::inc(socket->mutex);
  
  if (port == socket->port) {
  if (key == socket->key) {
    err_msg(0, "can not send to your self!");
    return -1;
  }
  SHMQueue<shm_msg_t> *remoteQueue;
  if ((remoteQueue = _attach_remote_queue(port)) == NULL) {
  if ((remoteQueue = _attach_remote_queue(key)) == NULL) {
      err_msg(0, "shm_sendto failed, the other end has been closed, or has not been opened!");
    return SHM_SOCKET_ECONNFAILED;
  }
  shm_msg_t dest;
  dest.type = SHM_COMMON_MSG;
  dest.port = socket->port;
  dest.key = socket->key;
  dest.size = size;
  dest.buf = mm_malloc(size);
  memcpy(dest.buf, buf, size);
@@ -312,13 +312,13 @@
  } else {
    delete remoteQueue;
    mm_free(dest.buf);
    err_msg(errno, "sendto port %d failed!", port);
    err_msg(errno, "sendto key %d failed!", key);
    return -1;
  }
}
// 短连接方式接受
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port,  struct timespec *timeout,  int flags) {
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_recvfrom method in a %d type socket  which "
                "is not a SHM_SOCKET_DGRAM socket ",
@@ -327,14 +327,14 @@
  hashtable_t *hashtable = mm_get_hashtable();
  SemUtil::dec(socket->mutex);
  if (socket->queue == NULL) {
    if (socket->port == -1) {
      socket->port = hashtable_alloc_key(hashtable);
    if (socket->key == -1) {
      socket->key = hashtable_alloc_key(hashtable);
    } else {
      _shm_socket_check_key(socket);
    }
    socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
    socket->queue = new SHMQueue<shm_msg_t>(socket->key, 16);
  }
  SemUtil::inc(socket->mutex);
@@ -354,7 +354,7 @@
    memcpy(_buf, src.buf, src.size);
    *buf = _buf;
    *size = src.size;
    *port = src.port;
    *key = src.key;
    mm_free(src.buf);
    // printf("shm_recvfrom pop after\n");
    return 0;
@@ -364,19 +364,19 @@
}
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_port, void **recv_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
  }
  int recv_port;
  int recv_key;
  int rv;
  shm_socket_t *tmp_socket = shm_open_socket(SHM_SOCKET_DGRAM);
  if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_port, timeout, flags)) == 0) {
    rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_port, timeout, flags);
  if ((rv = shm_sendto(tmp_socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
    rv = shm_recvfrom(tmp_socket, recv_buf, recv_size, &recv_key, timeout, flags);
    shm_close_socket(tmp_socket);
    return rv;
  } else {
@@ -387,19 +387,19 @@
}
int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf,
                    const int send_size, const int send_port, void **recv_buf,
                    const int send_size, const int send_key, void **recv_buf,
                    int *recv_size,  struct timespec *timeout,  int flags) {
  if (socket->socket_type != SHM_SOCKET_DGRAM) {
    err_exit(0, "Can't invoke shm_sendandrecv method in a %d type socket  "
                "which is not a SHM_SOCKET_DGRAM socket ",
             socket->socket_type);
  }
  int recv_port;
  int recv_key;
  int rv;
 
  if ((rv = shm_sendto(socket, send_buf, send_size, send_port, timeout, flags)) == 0) {
    rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_port, timeout, flags);
  if ((rv = shm_sendto(socket, send_buf, send_size, send_key, timeout, flags)) == 0) {
    rv = shm_recvfrom(socket, recv_buf, recv_size, &recv_key, timeout, flags);
    return rv;
  } else {
    return rv;
@@ -412,21 +412,21 @@
/**
 * 绑定key到队列,但是并不会创建队列。如果没有对应指定key的队列提示错误并退出
 */
SHMQueue<shm_msg_t> *_attach_remote_queue(int port) {
SHMQueue<shm_msg_t> *_attach_remote_queue(int key) {
  hashtable_t *hashtable = mm_get_hashtable();
  if (hashtable_get(hashtable, port) == NULL) {
    err_msg(0, "_remote_queue_attach:connet at port %d  failed!", port);
  if (hashtable_get(hashtable, key) == NULL) {
    err_msg(0, "_remote_queue_attach:connet at key %d  failed!", key);
    return NULL;
  }
  SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(port, 0);
  SHMQueue<shm_msg_t> *queue = new SHMQueue<shm_msg_t>(key, 0);
  return queue;
}
void _server_close_conn_to_client(shm_socket_t *socket, int port) {
void _server_close_conn_to_client(shm_socket_t *socket, int key) {
  shm_socket_t *client_socket;
  std::map<int, shm_socket_t *>::iterator iter =
      socket->clientSocketMap->find(port);
      socket->clientSocketMap->find(key);
  if (iter != socket->clientSocketMap->end()) {
    client_socket = iter->second;
    free((void *)client_socket);
@@ -452,11 +452,11 @@
      socket->acceptQueue->push_timeout(src, &timeout);
      break;
    case SHM_SOCKET_CLOSE:
      _server_close_conn_to_client(socket, src.port);
      _server_close_conn_to_client(socket, src.key);
      break;
    case SHM_COMMON_MSG:
      iter = socket->clientSocketMap->find(src.port);
      iter = socket->clientSocketMap->find(src.key);
      if (iter != socket->clientSocketMap->end()) {
        client_socket = iter->second;
        // print_msg("_server_run_msg_rev push before", src);
@@ -511,7 +511,7 @@
  struct timespec timeout = {1, 0};
  shm_msg_t close_msg;
  close_msg.port = socket->port;
  close_msg.key = socket->key;
  close_msg.size = 0;
  close_msg.type = SHM_SOCKET_CLOSE;
  if (notifyRemote && socket->remoteQueue != NULL) {
src/socket/shm_socket.h
@@ -41,7 +41,7 @@
};
typedef struct shm_msg_t {
    int port;
    int key;
    shm_msg_type_t type;
    size_t size;
    void * buf;
@@ -53,8 +53,8 @@
typedef struct shm_socket_t {
    shm_socket_type_t socket_type;
    // 本地port
    int port;
    // 本地key
    int key;
    bool force_bind;
    int mutex;
    shm_connection_status_t status;
@@ -77,33 +77,33 @@
int shm_close_socket(shm_socket_t * socket) ;
int shm_socket_bind(shm_socket_t * socket, int port) ;
int shm_socket_bind(shm_socket_t * socket, int key) ;
int shm_socket_force_bind(shm_socket_t * socket, int port) ;
int shm_socket_force_bind(shm_socket_t * socket, int key) ;
int shm_listen(shm_socket_t * socket) ;
shm_socket_t* shm_accept(shm_socket_t* socket);
int shm_connect(shm_socket_t * socket, int port);
int shm_connect(shm_socket_t * socket, int key);
int shm_send(shm_socket_t * socket, const void *buf, const int size) ;
int shm_recv(shm_socket_t * socket, void **buf, int *size) ;
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int port, const struct timespec * timeout = NULL, const int flags=0);
int shm_sendto(shm_socket_t *socket, const void *buf, const int size, const int key, const struct timespec * timeout = NULL, const int flags=0);
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *port,   struct timespec * timeout = NULL,  int flags=0);
int shm_recvfrom(shm_socket_t *socket, void **buf, int *size, int *key,   struct timespec * timeout = NULL,  int flags=0);
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size,
int shm_sendandrecv(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,
    struct timespec * timeout = NULL,  int flags=0);
/**
 * 功能同shm_sendandrecv, 但是不是线程安全的
 */
int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_port, void **recv_buf, int *recv_size,
int shm_sendandrecv_unsafe(shm_socket_t *socket, const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size,
    struct timespec * timeout = NULL,  int flags=0);
src/socket/shm_stream_mod_socket.c
@@ -63,9 +63,9 @@
int shm_stream_mod_socket_bind(void * _socket, int port){
int shm_stream_mod_socket_bind(void * _socket, int key){
    shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket;
    return  shm_socket_bind(socket->shm_socket, port);
    return  shm_socket_bind(socket->shm_socket, key);
}
void * run_server_recv_client_msg(void *_socket) {
@@ -116,9 +116,9 @@
}
int shm_stream_mod_socket_connect(void * _socket, int port) {
int shm_stream_mod_socket_connect(void * _socket, int key) {
    shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket;
    return shm_connect(socket->shm_socket, port);
    return shm_connect(socket->shm_socket, key);
}
@@ -189,9 +189,9 @@
    return -1;
}
int shm_stream_mod_socket_get_port(void * _socket) {
int shm_stream_mod_socket_get_key(void * _socket) {
    shm_stream_mod_socket_t * socket = (shm_stream_mod_socket_t *) _socket;
    return socket->shm_socket->port;
    return socket->shm_socket->key;
}
test_net_socket/net_mod_socket.sh
@@ -1,15 +1,16 @@
function server() {
     
# 打开请求应答的server
    ./dgram_mod_req_rep server 11 & server_pid=$! &&  echo ${server_pid}
# 打开请求应答的接受端
    ./test_net_mod_socket --fun="start_reply" --key=11 & server_pid=$! &&  echo "pid: ${server_pid}"
    
    
# 开启bus
    ./dgram_mod_bus server 8 & server_pid=$! &&  echo ${server_pid}
# 开启bus
 ./test_net_mod_socket --fun="start_bus_server" --key=8  & server_pid=$! &&  echo "pid: ${server_pid}"
     
# 开启网络server
    ./test_net_mod_socket server 5000 & server_pid=$! &&  echo ${server_pid}
# 开启网络转发代理
    ./test_net_mod_socket  --fun="start_net_proxy" --port=5000 & server_pid=$! && echo "pid: ${server_pid}"
     
}
test_net_socket/test_net_mod_socket.c
@@ -1,8 +1,8 @@
#include "net_mod_server_socket_wrapper.h"
#include "net_mod_socket_wrapper.h"
#include "shm_mm.h"
#include "dgram_mod_socket.h"
#include "usg_common.h"
#include <getopt.h>
typedef struct Targ {
    int port;
@@ -10,18 +10,20 @@
}Targ;
void server(int port) {
void start_net_proxy(int port) {
  printf("Start net proxy\n");
    void *serverSocket  = net_mod_server_socket_open(port);
    if(net_mod_server_socket_start(serverSocket) != 0) {
        err_exit(errno, "net_mod_server_socket_start");
    }
}
void client(int port ){
void start_net_client(int port ){
    void * client = net_mod_socket_open();
    char content[MAXLINE];
    char action[512];
  char topic[512];
  int buskey;
    
    int recv_arr_size, i, n;
@@ -75,6 +77,29 @@
            net_mod_socket_free_recv_msg_arr(recv_arr, recv_arr_size);
          }
    }
    else if(strcmp(action, "desub") == 0) {
      printf("Please input buskey topic!\n");
      scanf("%d %s", buskey, topic);
      if (net_mod_socket_desub(client, topic, strlen(topic),  buskey) == 0) {
         printf("%d Desub success!\n", net_mod_socket_get_key(client));
      } else {
        printf("Desub failture!\n");
        exit(0);
      }
    }
    else if(strcmp(action, "sub") == 0) {
      printf("Please input topic!\n");
      scanf("%s", topic);
      if (net_mod_socket_sub(client, topic, strlen(topic),  buskey) == 0) {
         printf("%d Sub success!\n", net_mod_socket_get_key(client));
      } else {
        printf("Sub failture!\n");
        exit(0);
      }
    }
    else if(strcmp(action, "quit") == 0) {
      break;
    } else {
@@ -138,7 +163,7 @@
  return (void *)i;
}
void mclient(int port) {
void start_net_mclient(int port) {
  int status, i = 0, processors = 1;
  void *res[processors];
@@ -175,27 +200,195 @@
  // fflush(stdout);
}
void start_bus_server(int key) {
  printf("Start bus server\n");
  void * server_socket = net_mod_socket_open();
  net_mod_socket_bind(server_socket, key);
  net_mod_socket_start_bus(server_socket);
}
void start_reply(int key) {
  void *socket = net_mod_socket_open();
  net_mod_socket_bind(socket, key);
  int size;
  void *recvbuf;
  char sendbuf[512];
  int rv;
  int remote_port;
  while ( (rv = net_mod_socket_recvfrom(socket, &recvbuf, &size, &remote_port) ) == 0) {
    printf( "server: RECEIVED REQUEST FROM PORT %d NAME %s\n", remote_port, recvbuf);
    sprintf(sendbuf, "RECEIVED  PORT %d NAME %s", remote_port, recvbuf);
    net_mod_socket_sendto(socket, sendbuf, strlen(sendbuf) + 1, remote_port);
    free(recvbuf);
  }
}
void usage(char *name)
{
  fprintf(stderr, "Usage: %s  [OPTIONS]  [ARG...]\n\n", name);
  fprintf(stderr, "Test net mod socket\n\n");
  fprintf(stderr, "Options:\n\n");
  #define fpe(str) fprintf(stderr, "  %s", str);
  fpe("-f, --funciton                       Function name\n");
  fpe("-p, --port                           TCP/IP Port\n");
  fpe("-k, --key                            SHM Key\n");
  fpe("\n");
}
struct argument_t {
  char *fun;
  int port;
  int key;
  char **cmd_arr;
  int cmd_arr_len;
};
argument_t parse_args (int argc, char *argv[])
{
  int c;
  if(argc < 2) {
    usage(argv[0]);
    exit(1);
  }
  if(argc == 2 && strcmp(argv[1], "--help") == 0) {
    usage(argv[0]);
    exit(0);
  }
  argument_t mopt = {};
  // mopt.volume_list_size = 0;
  opterr = 0;
  static struct option long_options[] =
  {
    /* These options set a flag. */
    {"fun",  required_argument, 0, 'f'},
    {"key",  required_argument, 0, 'k'},
    {"port",  required_argument, 0, 'p'},
    {0, 0, 0, 0}
  };
  /* getopt_long stores the option index here. */
  int option_index = 0;
  while (1)
  {
    c = getopt_long (argc, argv, "+f:k:p:", long_options, &option_index);
    /* Detect the end of the options. */
    if (c == -1)
      break;
    switch (c)
    {
    case 0:
      // printf("ffffffff\n");
      /* If this option set a flag, do nothing else now. */
      if (long_options[option_index].flag != 0)
        break;
      printf ("option %s", long_options[option_index].name);
      if (optarg)
        printf (" with arg %s", optarg);
      printf ("\n");
      break;
    case 'f':
      mopt.fun = optarg;
      break;
    case 'k':
      mopt.key = atoi(optarg);
      break;
    case 'p':
       // printf ("==name with value `%s'\n", optarg);
      mopt.port = atoi(optarg);
      break;
    case '?':
     // printf ("==? optopt=%c, %s, `%s', %d\n", optopt, optarg, argv[optind], optind);
      /* getopt_long already printed an error message. */
      usage(argv[0]);
      exit(1);
      break;
    default:
      //printf ("==default optopt=%c, %s, `%s'\n",optopt, optarg,  argv[optind]);
      break;
    }
  }
  // printf ("optind = %d, argc=%d \n", optind, argc);
  /* Print any remaining command line arguments (not options). */
  if (optind < argc)
  {
    mopt.cmd_arr = &argv[optind];
    mopt.cmd_arr_len = argc - optind;
    // printf ("non-option ARGV-elements: ");
    // while (optind < argc)
    //   printf ("%d, %d, %s \n", optind, argc, argv[optind++]);
    // putchar ('\n');
  }
  return mopt;
}
int main(int argc, char *argv[]) {
    shm_init(512);
  argument_t opt = parse_args(argc, argv);
    int port;
  if (argc < 3) {
    fprintf(stderr, "Usage: %s %s|%s <PORT> \n", argv[0],  "server", "client");
    return 1;
  }
  port = atoi(argv[2]);
 // port = atoi(argv[2]);
     
    if (strcmp("server", argv[1]) == 0 ) {
     server(port);
    if (strcmp("start_net_proxy", opt.fun) == 0 ) {
    if(opt.port == 0) {
      usage(argv[0]);
      exit(1);
    }
    start_net_proxy(opt.port);
  }
  else if (strcmp("start_bus_server", opt.fun) == 0) {
    if(opt.key == 0) {
      usage(argv[0]);
      exit(1);
    }
    start_bus_server(opt.key);
  }
  else if (strcmp("start_reply", opt.fun) == 0) {
    if(opt.key == 0) {
      usage(argv[0]);
      exit(1);
    }
    start_reply(opt.key);
  }
  if (strcmp("client", argv[1]) == 0)
     client(port);
  else if (strcmp("start_net_client", opt.fun) == 0) {
    if(opt.port == 0) {
      usage(argv[0]);
      exit(1);
    }
    start_net_client(opt.port);
  }
  else {
    usage(argv[0]);
    exit(1);
  if (strcmp("mclient", argv[1]) == 0)
    mclient(port);
  }
}
test_socket/dgram_mod_bus.c
@@ -9,14 +9,14 @@
   exit(0);
}
void server(int port, bool restart) {
void server(int key, bool restart) {
  server_socket = dgram_mod_open_socket();
  if(restart) {
    dgram_mod_force_bind(server_socket, port);
    dgram_mod_force_bind(server_socket, key);
  } else {
     dgram_mod_bind(server_socket, port);
     dgram_mod_bind(server_socket, key);
  }
 
   
@@ -28,18 +28,17 @@
  pthread_detach(pthread_self());
  void *recvbuf;
  int size;
  int port;
  while (dgram_mod_recvfrom( socket, &recvbuf, &size, &port) == 0) {
  int key;
  while (dgram_mod_recvfrom( socket, &recvbuf, &size, &key) == 0) {
    printf("收到订阅消息:%s\n", recvbuf);
    free(recvbuf);
  }
  
}
void client(int port) {
void client(int key) {
  void *socket = dgram_mod_open_socket();
  
  pthread_t tid;
  pthread_create(&tid, NULL, run_recv, socket);
  int size;
@@ -56,7 +55,7 @@
    if(strcmp(action, "sub") == 0) {
      printf("Please input topic!\n");
      scanf("%s", topic);
      if (dgram_mod_sub(socket, topic, strlen(topic),  port) == 0) {
      if (dgram_mod_sub(socket, topic, strlen(topic),  key) == 0) {
         printf("%d Sub success!\n", dgram_mod_get_port(socket));
      } else {
        printf("Sub failture!\n");
@@ -66,7 +65,7 @@
    } else if(strcmp(action, "desub") == 0) {
      printf("Please input topic!\n");
      scanf("%s", topic);
      if (dgram_mod_desub(socket, topic, strlen(topic),  port) == 0) {
      if (dgram_mod_desub(socket, topic, strlen(topic),  key) == 0) {
         printf("%d Desub success!\n", dgram_mod_get_port(socket));
      } else {
        printf("Desub failture!\n");
@@ -77,7 +76,7 @@
      // printf("%s %s %s\n", action, topic, content);
      printf("Please input topic and content\n");
      scanf("%s %s", topic, content);
      if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1,  port) == 0){
      if(dgram_mod_pub(socket, topic, strlen(topic)+1, content, strlen(content)+1,  key) == 0){
        printf("%d Pub success!\n", dgram_mod_get_port(socket));
      } else {
        printf("Pub failture!\n");
@@ -100,29 +99,29 @@
int main(int argc, char *argv[]) {
  shm_init(512);
  int port;
  int key;
  if (argc < 3) {
    fprintf(stderr, "Usage: %s %s|%s|rmkey <PORT> ...\n", argv[0], "server", "client");
    fprintf(stderr, "Usage: %s %s|%s|rmkey <key> ...\n", argv[0], "server", "client");
    return 1;
  }
  port = atoi(argv[2]);
  key = atoi(argv[2]);
  if (strcmp("server", argv[1]) == 0) {
    if(argc >= 4 && strcmp("restart", argv[3]) == 0) {
      server(port, true);
      server(key, true);
    }
    else{
      server(port, false);
      server(key, false);
    }
    
  } else if (strcmp("client", argv[1]) == 0) {
    client(port);
    client(key);
  } else if(strcmp("rmkey", argv[1]) == 0) {
    for(int i = 2; i < argc; i++) {
      port = atoi(argv[i]);
      dgram_mod_remove_key(port);
      // printf("%d\n", port);
      key = atoi(argv[i]);
      dgram_mod_remove_key(key);
      // printf("%d\n", key);
    }
  }