wangzhengquan
2020-12-04 aa2f3b2a9968bb4928463bdae05fb026d16b60bb
固定bus key
3个文件已添加
6个文件已修改
183 ■■■■■ 已修改文件
src/key_def.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket_wrapper.c 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket_wrapper.h 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.c 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket_wrapper.c 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket_wrapper.h 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_socket.sh 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/test_net_mod_socket.c 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/key_def.h
New file
@@ -0,0 +1,2 @@
#define BUS_MAP_KEY 1
#define BUS_KEY 8
src/socket/bus_server_socket_wrapper.c
New file
@@ -0,0 +1,40 @@
#include "bus_server_socket_wrapper.h"
#include "key_def.h"
static Logger *logger = LoggerFactory::getLogger();
/**
 * 创建
 */
void * bus_server_socket_wrapper_open() {
    NetModSocket *sockt = new NetModSocket;
    return (void *)sockt;
}
/**
 * 关闭
 */
void bus_server_socket_wrapper_close(void *_socket) {
    NetModSocket *sockt = (NetModSocket *)_socket;
    delete sockt;
}
/**
 * 启动bus
 *
 * @return 0 成功, 其他值 失败的错误码
*/
int  bus_server_socket_wrapper_start_bus(void * _socket) {
    int ret;
    NetModSocket *sockt = (NetModSocket *)_socket;
    if( (ret = sockt->bind(BUS_KEY)) == 0) {
        sockt->start_bus();
        return 0;
    } else {
        logger->error("start bus failed");
        return -1;
    }
}
src/socket/bus_server_socket_wrapper.h
New file
@@ -0,0 +1,33 @@
#ifndef _BUS_SERVER_SOCKET_WRAPPER_H_
#define _BUS_SERVER_SOCKET_WRAPPER_H_
#include "net_mod_socket.h"
#ifdef __cplusplus
extern "C" {
#endif
/**
 * 创建
 */
void * bus_server_socket_wrapper_open();
/**
 * 关闭
 */
void bus_server_socket_wrapper_close(void *_sockt);
/**
 * 启动bus
 *
 * @return 0 成功, 其他值 失败的错误码
*/
int  bus_server_socket_wrapper_start_bus(void * _socket);
#ifdef __cplusplus
}
#endif
#endif
src/socket/net_mod_server_socket.c
@@ -3,6 +3,7 @@
#include "socket_io.h"
#include "net_mod_socket_io.h"
#include "net_mod_socket.h"
#include "key_def.h"
 
static  Logger * logger = LoggerFactory::getLogger();
@@ -229,18 +230,18 @@
// LoggerFactory::getLogger()->debug("====server pub %s===\n", buf);
    memcpy(response_head.host, request_head.host, NI_MAXHOST);
    response_head.port = request_head.port;
    response_head.key = request_head.key;
    // response_head.key = request_head.key;
    if(request_head.timeout > 0) {
      timeout.tv_sec = request_head.timeout / 1000;
      timeout.tv_nsec = (request_head.timeout - timeout.tv_sec * 1000) * 10e6;
      ret = shmModSocket.pub_timeout((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key, &timeout);
      ret = shmModSocket.pub_timeout((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY, &timeout);
    }
    else if(request_head.timeout == 0) {
      ret = shmModSocket.pub_nowait((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key);
      ret = shmModSocket.pub_nowait((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY);
    }
    else if(request_head.timeout == -1) {
      ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key);
      ret = shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, BUS_KEY);
    }
   
    response_head.code = ret;
src/socket/net_mod_socket_wrapper.c
@@ -137,15 +137,6 @@
/**
 * 启动bus
 *
 * @return 0 成功, 其他值 失败的错误码
*/
int  net_mod_socket_start_bus(void * _socket) {
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->start_bus();
}
/**
 * 订阅指定主题
@@ -153,18 +144,18 @@
 * @size 主题长度
 * @port 总线端口
 */
int  net_mod_socket_sub(void * _socket, void *topic, int size, int port) {
int  net_mod_socket_sub(void * _socket, void *topic, int size) {
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->sub((char *)topic,  size,  port);
    return sockt->sockt->sub((char *)topic,  size,  BUS_KEY);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  net_mod_socket_sub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec){
int  net_mod_socket_sub_timeout(void * _socket, void *topic, int size, int sec, int nsec){
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->sub_timeout((char *)topic,  size,  port, sec, nsec);
    return sockt->sockt->sub_timeout((char *)topic,  size,  BUS_KEY, sec, nsec);
}
int  net_mod_socket_sub_nowait(void * _socket, void *topic, int size, int port){
int  net_mod_socket_sub_nowait(void * _socket, void *topic, int size){
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->sub_nowait((char *)topic,  size,  port);
    return sockt->sockt->sub_nowait((char *)topic,  size,  BUS_KEY);
}
@@ -174,18 +165,18 @@
 * @size 主题长度
 * @port 总线端口
 */
int  net_mod_socket_desub(void * _socket, void *topic, int size, int port) {
int  net_mod_socket_desub(void * _socket, void *topic, int size) {
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->desub((char *)topic,  size,  port);
    return sockt->sockt->desub((char *)topic,  size,  BUS_KEY);
}
// 超时返回。 @sec 秒 , @nsec 纳秒
int  net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int port, int sec, int nsec) {
int  net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int sec, int nsec) {
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->desub_timeout((char *)topic,  size,  port, sec, nsec);
    return sockt->sockt->desub_timeout((char *)topic,  size,  BUS_KEY, sec, nsec);
}
int  net_mod_socket_desub_nowait(void * _socket, void *topic, int size, int port){
int  net_mod_socket_desub_nowait(void * _socket, void *topic, int size){
    net_mod_socket_t *sockt = (net_mod_socket_t *)_socket;
    return sockt->sockt->desub_nowait((char *)topic,  size,  port);
    return sockt->sockt->desub_nowait((char *)topic,  size,  BUS_KEY);
}
src/socket/net_mod_socket_wrapper.h
@@ -84,13 +84,6 @@
  net_mod_recv_msg_t ** recv_arr, int *recv_arr_size) ;
/**
 * 启动bus
 *
 * @return 0 成功, 其他值 失败的错误码
*/
int  net_mod_socket_start_bus(void * _socket);
 /**
 * 向node_arr 中的所有网络节点发布消息
@@ -112,10 +105,10 @@
 * @size 主题长度
 * @key 总线端口
 */
int  net_mod_socket_sub(void * _socket, void *topic, int size, int key);
int  net_mod_socket_sub(void * _socket, void *topic, int size);
// 超时返回。 @sec 秒 , @nsec 纳秒
int  net_mod_socket_sub_timeout(void * _socket, void *topic, int size, int key, int sec, int nsec);
int  net_mod_socket_sub_nowait(void * _socket, void *topic, int size, int key);
int  net_mod_socket_sub_timeout(void * _socket, void *topic, int size,  int sec, int nsec);
int  net_mod_socket_sub_nowait(void * _socket, void *topic, int size);
/**
@@ -124,10 +117,10 @@
 * @size 主题长度
 * @key 总线端口
 */
int  net_mod_socket_desub(void * _socket, void *topic, int size, int key);
int  net_mod_socket_desub(void * _socket, void *topic, int size);
// 超时返回。 @sec 秒 , @nsec 纳秒
int  net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int key, int sec, int nsec);
int  net_mod_socket_desub_nowait(void * _socket, void *topic, int size, int key);
int  net_mod_socket_desub_timeout(void * _socket, void *topic, int size, int sec, int nsec);
int  net_mod_socket_desub_nowait(void * _socket, void *topic, int size);
/**
src/socket/shm_mod_socket.h
@@ -7,6 +7,7 @@
#include "hashtable.h"
#include "sem_util.h"
#include "logger_factory.h"
#include "key_def.h"
#include <set>
#define ACTION_LIDENTIFIER "<**"
@@ -14,7 +15,7 @@
#define TOPIC_LIDENTIFIER "{"
#define TOPIC_RIDENTIFIER "}"
#define BUS_MAP_KEY 1
//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
typedef std::map<SHMString, SHMKeySet *, std::less<SHMString>, SHM_STL_Allocator<std::pair<const SHMString, SHMKeySet *> > > SHMTopicSubMap;
test_net_socket/net_mod_socket.sh
@@ -18,7 +18,7 @@
    ./test_net_mod_socket --fun="start_net_client" \
     --sendlist="localhost:5000:100" \
     --publist="localhost:5000:8"
     --publist="localhost:5000"
     
}
@@ -31,7 +31,7 @@
function mpub() {
    ./test_net_mod_socket --fun="test_net_pub_threads" \
     --publist="localhost:5000:8, localhost:5000:8"
     --publist="localhost:5000, localhost:5000"
     
}
test_net_socket/test_net_mod_socket.c
@@ -1,5 +1,7 @@
#include "net_mod_server_socket_wrapper.h"
#include "net_mod_socket_wrapper.h"
#include "bus_server_socket_wrapper.h"
#include "shm_mm_wraper.h"
#include "usg_common.h"
#include <getopt.h>
@@ -53,13 +55,13 @@
}
void start_bus_server(int key) {
void start_bus_server() {
  printf("Start bus server\n");
  void * server_socket = net_mod_socket_open();
  net_mod_socket_bind(server_socket, key);
  net_mod_socket_start_bus(server_socket);
  void * server_socket = bus_server_socket_wrapper_open();
  if(bus_server_socket_wrapper_start_bus(server_socket) != 0) {
    printf("start bus failed\n");
    exit(1);
  }
}
 
@@ -139,10 +141,10 @@
          }
    }
    else if(strcmp(action, "desub") == 0) {
      printf("Please input buskey and topic!\n");
      printf("Please input topic!\n");
       
      scanf("%d %s", &buskey, topic);
      if (net_mod_socket_desub(client, topic, strlen(topic),  buskey) == 0) {
      scanf("%s", topic);
      if (net_mod_socket_desub(client, topic, strlen(topic)) == 0) {
         printf("%d Desub success!\n", net_mod_socket_get_key(client));
      } else {
        printf("Desub failture!\n");
@@ -151,11 +153,10 @@
     
    } 
    else if(strcmp(action, "sub") == 0) {
      printf("Please input buskey and topic!\n");
      scanf("%d %s",&buskey, topic);
      printf("Please input topic!\n");
      scanf("%s",topic);
      printf("===%d %s\n",buskey, topic);
      if (net_mod_socket_sub(client, topic, strlen(topic),  buskey) == 0) {
      if (net_mod_socket_sub(client, topic, strlen(topic)) == 0) {
         printf("%d Sub success!\n", net_mod_socket_get_key(client));
      } else {
        printf("Sub failture!\n");
@@ -361,7 +362,7 @@
      usage(argv[0]);
      exit(1);
    }
    start_bus_server(opt.key);
    start_bus_server();
  }
  else if (strcmp("start_reply", opt.fun) == 0) {
    if(opt.key == 0) {
@@ -557,10 +558,15 @@
  for(i = 0; i < entry_arr_len; i++) {
    property_arr_len = str_split(entry_arr[i], ":", &property_arr);
  // printf("%s, %s, %s\n", property_arr[0], property_arr[1], property_arr[2]);
    node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), atoi(property_arr[2])};
    free(entry_arr[i]);
    node_arr[i] = {trim(property_arr[0], 0), atoi(property_arr[1]), 0};
    free(property_arr[1]);
    free(property_arr[2]);
    if(property_arr_len == 3) {
      node_arr[i].key = atoi(property_arr[2]);
      free(property_arr[2]);
    }
    free(entry_arr[i]);
  }
  *node_arr_addr = node_arr;