wangzhengquan
2020-10-13 2a4e4619f34a742e36693e589e0431347a72979b
update
1个文件已添加
17个文件已修改
262 ■■■■ 已修改文件
src/libshm_queue.a 补丁 | 查看 | 原始文档 | blame | 历史
src/logger_factory.c 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/logger_factory.h 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/linked_lock_free_queue.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/lock_free_queue.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/mm.c 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/queue/shm_queue.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/mod_socket.c 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.c 59 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_server_socket.h 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.c 86 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/net_mod_socket.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.c 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_mod_socket.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.c 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/shm_socket.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_req_rep.c 54 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
test_net_socket/net_mod_req_rep.sh 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/libshm_queue.a
Binary files differ
src/logger_factory.c
New file
@@ -0,0 +1,3 @@
#include "logger_factory.h"
Logger * LoggerFactory::logger = NULL;
src/logger_factory.h
@@ -3,11 +3,21 @@
#include "logger.h"
class LoggerFactory {
private:
    static Logger *logger;
public:
    static Logger getLogger() {
    static Logger* getLogger() {
//ERROR ALL DEBUG INFO WARN
        static Logger logger(Logger::WARN);
        if(logger != NULL)
            return logger;
        LoggerConfig config;
        config.level = Logger::DEBUG;
        config.logFile =  "softbus.log";
        config.console = 1;
        logger = new Logger(config);
        return logger;
    }
};
src/queue/linked_lock_free_queue.h
@@ -98,7 +98,7 @@
template <typename T>
LinkedLockFreeQueue<T>::~LinkedLockFreeQueue()
{
    LoggerFactory::getLogger().debug("LinkedLockFreeQueue destory");
    LoggerFactory::getLogger()->debug("LinkedLockFreeQueue destory");
    Node<T> * nodeptr;
    Pointer<T> tmp = Head.load(std::memory_order_relaxed);
    while((nodeptr = tmp.ptr) != NULL) {
src/queue/lock_free_queue.h
@@ -160,7 +160,7 @@
    template <typename T, typename AT> class Q_TYPE>
LockFreeQueue<ELEM_T, Allocator, Q_TYPE>::~LockFreeQueue()
{
    LoggerFactory::getLogger().debug("LockFreeQueue desctroy");
    LoggerFactory::getLogger()->debug("LockFreeQueue desctroy");
    SemUtil::remove(slots);
    SemUtil::remove(items);
    SemUtil::remove(mutex);
src/queue/mm.c
@@ -402,13 +402,13 @@
  
  if(shmctl(shmid, IPC_STAT, &shmid_ds) == 0) {
    //LoggerFactory::getLogger().debug("shm_nattch=%d\n", shmid_ds.shm_nattch);
    //LoggerFactory::getLogger()->debug("shm_nattch=%d\n", shmid_ds.shm_nattch);
    if(shmid_ds.shm_nattch == 0) {
      //remove shared memery
       if (shmctl(shmid, IPC_RMID, 0) == -1)
        err_exit(errno, "mm_destroy shmctl IPC_RMID");
       else 
         LoggerFactory::getLogger().debug("shared memory destroy\n");
         LoggerFactory::getLogger()->debug("shared memory destroy\n");
       SemUtil::inc(mutex);
       SemUtil::remove(mutex);
src/queue/shm_queue.h
@@ -115,7 +115,7 @@
    hashtable_put(hashtable, key, (void *)queue);
  }
  queue->reference++;
  LoggerFactory::getLogger().debug("SHMQueue constructor reference===%d", queue->reference.load());
  // LoggerFactory::getLogger()->debug("SHMQueue constructor reference===%d", queue->reference.load());
}
template <typename ELEM_T> SHMQueue<ELEM_T>::~SHMQueue() {
@@ -126,7 +126,7 @@
  SemUtil::dec(queue->mutex);
  queue->reference--;
  // LoggerFactory::getLogger().debug("SHMQueue destructor  reference===%d",
  // LoggerFactory::getLogger()->debug("SHMQueue destructor  reference===%d",
  if (queue->reference.load() == 0) {
      delete queue;
      queue = NULL;
src/socket/mod_socket.c
@@ -7,7 +7,7 @@
#include "sem_util.h"
#include "logger_factory.h"
static Logger logger = LoggerFactory::getLogger();
static Logger *logger = LoggerFactory::getLogger();
typedef struct mod_entry_t
{
src/socket/net_mod_server_socket.c
@@ -4,7 +4,7 @@
#include "net_mod_socket_io.h"
#include "net_mod_socket.h"
 
NetModServerSocket::NetModServerSocket(int port):max_buf(1024)
NetModServerSocket::NetModServerSocket(int port):max_buf(1024), max_topic_buf(256)
{
  char portstr[32];
@@ -17,12 +17,18 @@
  if(buf == NULL) {
    err_exit(errno, "process_client malloc");
  }
  topic_buf = malloc(max_topic_buf);
  if(topic_buf == NULL) {
    err_exit(errno, "process_client malloc");
  }
}
NetModServerSocket::~NetModServerSocket() {
   Close(listenfd);
   fee(buf);
   free(buf);
   free(topic_buf);
}
void NetModServerSocket::start() {
@@ -72,7 +78,7 @@
    {
      /* Add connected descriptor to the pool */
      pool.clientfd[i] = connfd;                 //line:conc:echoservers:beginaddclient
      Rio_readinitb(&pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
     // Rio_readinitb(&pool.clientrio[i], connfd); //line:conc:echoservers:endaddclient
      /* Add the descriptor to descriptor set */
      FD_SET(connfd, &pool.read_set); //line:conc:echoservers:addconnfd
@@ -90,8 +96,7 @@
/* $end add_client */
int NetModServerSocket::process_client(rio_t *rio, int connfd) {
  int n;
int NetModServerSocket::process_client(int connfd) {
  net_mod_request_head_t request_head;
  net_mod_response_head_t response_head;
  char request_head_bs[NET_MODE_REQUEST_HEAD_LENGTH];
@@ -99,15 +104,15 @@
  
  int recv_size;
  if(buf == NULL) {
    buf = malloc(max_buf);
    if(buf == NULL) {
      err_exit(errno, "process_client malloc");
    }
  }
  // if(buf == NULL) {
  //   buf = malloc(max_buf);
  //   if(buf == NULL) {
  //     err_exit(errno, "process_client malloc");
  //   }
  // }
  
 
  if (rio_readnb(rio, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) !=  NET_MODE_REQUEST_HEAD_LENGTH)
  if (rio_readn(connfd, request_head_bs, NET_MODE_REQUEST_HEAD_LENGTH) !=  NET_MODE_REQUEST_HEAD_LENGTH)
  {
    return -1;
  }
@@ -118,11 +123,12 @@
    buf = realloc(buf, request_head.content_length);
    max_buf = request_head.content_length;
    if(buf == NULL) {
      err_exit(errno, "process_client realloc");
      LoggerFactory::getLogger()->error(errno, "process_client realloc");
      exit(1);
    }
  }  
  if ((n = rio_readnb(rio, buf, request_head.content_length)) != request_head.content_length ) {
  if (rio_readn(connfd, buf, request_head.content_length) != request_head.content_length ) {
    return -1;
  }
@@ -131,6 +137,21 @@
    response_head.content_length = recv_size;
    Rio_writen(connfd, NetModSocket::encode_response_head(response_head), NET_MODE_RESPONSE_HEAD_LENGTH);
    Rio_writen(connfd, recv_buf, recv_size);
  } else if(request_head.mod == BUS) {
    if(request_head.topic_length > max_topic_buf) {
      topic_buf = realloc(topic_buf, request_head.topic_length);
      max_topic_buf = request_head.topic_length;
      if(topic_buf == NULL) {
        LoggerFactory::getLogger()->error(errno, "process_client realloc");
        exit(1);
      }
    }
    if (rio_readn(connfd, topic_buf, request_head.topic_length) != request_head.topic_length ) {
      return -1;
    }
 LoggerFactory::getLogger()->debug("====server pub %s===\n", buf);
    shmModSocket.pub((char*)topic_buf, request_head.topic_length, buf, request_head.content_length, request_head.key);
  }
  return 0;
@@ -141,22 +162,22 @@
void  NetModServerSocket::check_clients()
{
  int i, connfd;
  rio_t *rio;
  //rio_t *rio;
  
  for (i = 0; (i <= pool.maxi) && (pool.nready > 0); i++)
  {
    connfd = pool.clientfd[i];
    rio = &pool.clientrio[i];
    //rio = &pool.clientrio[i];
    /* If the descriptor is ready, echo a text line from it */
    if ((connfd > 0) && (FD_ISSET(connfd, &pool.ready_set)))
    {
      pool.nready--;
      if(process_client(rio, connfd) != 0) {
      if(process_client(connfd) != 0) {
        Close(connfd); //line:conc:echoservers:closeconnfd
        FD_CLR(connfd, &pool.read_set); //line:conc:echoservers:beginremove
        pool.clientfd[i] = -1;          //line:conc:echoservers:endremove
        FD_CLR(connfd, &pool.read_set);
        pool.clientfd[i] = -1;
      }
    }
src/socket/net_mod_server_socket.h
@@ -23,7 +23,7 @@
      int nready;       /* Number of ready descriptors from select */
      int maxi;         /* Highwater index into client array */
      int clientfd[FD_SETSIZE];    /* Set of active descriptors */
      rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
     // rio_t clientrio[FD_SETSIZE]; /* Set of active read buffers */
    } ; 
private:
@@ -31,13 +31,15 @@
    ShmModSocket shmModSocket;
    pool pool;
    void *buf = NULL;
    void *buf;
    void *topic_buf;
  size_t max_buf;
  size_t max_topic_buf;
    void init_pool(int listenfd);
    void add_client(int connfd);
    void check_clients();
    int process_client(rio_t *rio, int connfd);
    int process_client(int connfd);
public:
src/socket/net_mod_socket.c
@@ -12,7 +12,13 @@
NetModSocket::~NetModSocket() {
  rio_t * rio;
  for (auto map_iter = connectionMap.begin(); map_iter != connectionMap.end(); map_iter++) {
    rio = map_iter->second;
    if(rio != NULL) {
      free(rio);
    }
  }
}
int NetModSocket::sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
@@ -25,7 +31,7 @@
  void *recv_buf;
  int recv_size;
  char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
  net_mod_request_head_t request_head;
  net_mod_request_head_t request_head = {};
  net_mod_response_head_t response_head;
  std::map<std::string, rio_t*>::iterator mapIter;
  rio_t *rio;
@@ -54,28 +60,34 @@
    request_head.mod = REQ_REP;
    request_head.key = node->key;
    request_head.content_length = send_size;
    request_head.topic_length = 0;
    if(rio_writen(rio->rio_fd, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) {
      err_exit(errno, "NetModSocket::send head rio_writen");
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send head rio_writen");
      exit(1);
    }
    if(rio_writen(rio->rio_fd, send_buf, send_size) != send_size ) {
       err_exit(errno, "NetModSocket::send conent rio_writen");
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send conent rio_writen");
      exit(1);
    }
    if ( rio_readnb(rio, response_head_bs, NET_MODE_RESPONSE_HEAD_LENGTH) !=  NET_MODE_RESPONSE_HEAD_LENGTH) {
      err_exit(errno, "NetModSocket::send  rio_readnb");
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
      exit(1);
    }
    response_head =  NetModSocket::decode_response_head(response_head_bs);
    recv_buf = malloc(response_head.content_length);
    if(recv_buf == NULL) {
      err_exit(errno, "NetModSocket::send malloc");
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send malloc");
      exit(1);
    }
    if ( (recv_size = rio_readnb(rio, recv_buf, response_head.content_length) ) !=  response_head.content_length) {
      err_exit(errno, "NetModSocket::send  rio_readnb");
      LoggerFactory::getLogger()->error(errno, "NetModSocket::send  rio_readnb");
      exit(1);
    }
LABEL_ARR_PUSH:
@@ -86,12 +98,70 @@
    ret_arr[i].content_length = recv_size;
  }
  *recv_arr = ret_arr;
  if(recv_arr_size != NULL) {
  *recv_arr_size = i;
  }
  return i;
     
}
// int  pub(char *topic, int topic_size, void *content, int content_size, int port);
int NetModSocket::pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size) {
  int i, n, clientfd;
  char portstr[32];
  net_node_t *node;
  char mapKey[256];
  void *recv_buf;
  int recv_size;
  char response_head_bs[NET_MODE_RESPONSE_HEAD_LENGTH];
  net_mod_request_head_t request_head;
  net_mod_response_head_t response_head;
  std::map<std::string, rio_t*>::iterator mapIter;
  rio_t *rio;
  for (i = 0; i< arrlen; i++) {
    node = &node_arr[i];
    if(node->host == NULL) {
      // 本地发送
      shmModSocket.pub(topic, topic_size, content, content_size, node->key);
    } else {
      sprintf(mapKey, "%s:%d", node->host, node->port);
      if( ( mapIter = connectionMap.find(mapKey)) != connectionMap.end()) {
        rio = mapIter->second;
      } else {
        rio = (rio_t *)malloc(sizeof(rio_t));
        sprintf(portstr, "%d", node->port);
        clientfd = Open_clientfd(node-> host, portstr);
        Rio_readinitb(rio, clientfd);
        connectionMap.insert({mapKey, rio});
      }
      request_head.mod = BUS;
      request_head.key = node->key;
      request_head.content_length = content_size;
      request_head.topic_length = strlen(topic) + 1;
      if(rio_writen(rio->rio_fd, NetModSocket::encode_request_head(request_head), NET_MODE_REQUEST_HEAD_LENGTH) != NET_MODE_REQUEST_HEAD_LENGTH) {
        LoggerFactory::getLogger()->error(errno, "NetModSocket::pub head rio_writen");
        exit(1);
      }
      if(rio_writen(rio->rio_fd, content, content_size) != content_size ) {
        LoggerFactory::getLogger()->error(errno, "NetModSocket::pub rio_writen conent ");
        exit(1);
      }
      if(rio_writen(rio->rio_fd, topic, request_head.topic_length) != request_head.topic_length ) {
        LoggerFactory::getLogger()->error(errno, "NetModSocket::pub rio_writen conent ");
        exit(1);
      }
    }
  }
  return i;
}
void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
@@ -112,6 +182,7 @@
  PUT(head, htonl(request.mod));
  PUT(head + 4, htonl(request.key));
  PUT(head + 8, htonl(request.content_length));
  PUT(head + 12, htonl(request.topic_length));
  return head;
}
@@ -121,6 +192,7 @@
  head.mod = ntohl(GET(headbs));
  head.key = ntohl(GET(headbs + 4));
  head.content_length = ntohl(GET(headbs + 8));
  head.topic_length = ntohl(GET(headbs + 12));
  return head;
}
src/socket/net_mod_socket.h
@@ -7,7 +7,7 @@
#define GET(p)       (*(uint32_t *)(p))
#define PUT(p, val)  (*(uint32_t *)(p) = (val))
#define NET_MODE_REQUEST_HEAD_LENGTH 12
#define NET_MODE_REQUEST_HEAD_LENGTH 16
#define NET_MODE_RESPONSE_HEAD_LENGTH 4
struct net_node_t
@@ -22,6 +22,7 @@
    uint32_t mod;
    uint32_t key;
    uint32_t content_length;
    uint32_t topic_length;
};
struct net_mod_response_head_t {
@@ -49,9 +50,12 @@
public:
    
  NetModSocket();
  int sendandrecv(net_node_t *node_arr, int arrlen, void *send_buf, int send_size, 
      net_mod_recv_msg_t ** resp_arr, int *resp_arr_size);
 
  int pub(net_node_t *node_arr, int arrlen, char *topic, int topic_size, void *content, int content_size);
  ~NetModSocket();
  static void  free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size);
src/socket/shm_mod_socket.c
@@ -314,7 +314,7 @@
    SHMTopicSubMap::iterator map_iter;
    SHMKeySet::iterator set_iter;
printf("_proxy_sub topic = %s\n", topic);
//printf("_proxy_sub topic = %s\n", topic);
    if( (map_iter = topic_sub_map->find(topic) ) != topic_sub_map->end()) {
        subscripter_set = map_iter->second;
    } else {
src/socket/shm_mod_socket.h
@@ -14,7 +14,7 @@
#define TOPIC_LIDENTIFIER "{"
#define TOPIC_RIDENTIFIER "}"
static Logger logger = LoggerFactory::getLogger();
static Logger *logger = LoggerFactory::getLogger();
#define BUS_MAP_KEY 1
//typedef std::basic_string<char, std::char_traits<char>, SHM_STL_Allocator<char> > SHMString;
typedef std::set<int,  std::less<int>, SHM_STL_Allocator<int> > SHMKeySet;
src/socket/shm_socket.c
@@ -3,7 +3,7 @@
#include "logger_factory.h"
#include <map>
static Logger logger = LoggerFactory::getLogger();
static Logger *logger = LoggerFactory::getLogger();
@@ -43,7 +43,7 @@
  socket->force_bind = false;
  socket->dispatch_thread = 0;
  socket->status = SHM_CONN_CLOSED;
  socket->mutex = SemUtil::get(IPC_PRIVATE, 1);
  return socket;
}
@@ -258,6 +258,7 @@
  }
  hashtable_t *hashtable = mm_get_hashtable();
  SemUtil::dec(socket->mutex);
  if (socket->queue == NULL) {
    if (socket->port == -1) {
      socket->port = hashtable_alloc_key(hashtable);
@@ -268,6 +269,8 @@
    socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
  }
  SemUtil::inc(socket->mutex);
  if (port == socket->port) {
    err_msg(0, "can not send to your self!");
    return -1;
@@ -316,6 +319,7 @@
             socket->socket_type);
  }
  hashtable_t *hashtable = mm_get_hashtable();
  SemUtil::dec(socket->mutex);
  if (socket->queue == NULL) {
    if (socket->port == -1) {
      socket->port = hashtable_alloc_key(hashtable);
@@ -326,6 +330,7 @@
    socket->queue = new SHMQueue<shm_msg_t>(socket->port, 16);
  }
  SemUtil::inc(socket->mutex);
  shm_msg_t src;
  // printf("shm_recvfrom pop before\n");
src/socket/shm_socket.h
@@ -56,6 +56,7 @@
    // 本地port
    int port;
    bool force_bind;
    int mutex;
    shm_connection_status_t status;
    SHMQueue<shm_msg_t> *queue;
    SHMQueue<shm_msg_t> *remoteQueue;
test_net_socket/net_mod_req_rep.c
@@ -11,19 +11,45 @@
void client(int port ){
    NetModSocket client;
    char send_buf[MAXLINE];
    char content[MAXLINE];
    char action[512];
  char topic[512];
    net_mod_recv_msg_t *recv_arr;
    int recv_arr_size, i;
    int recv_arr_size, i, n;
    int node_arr_size = 3;
    //192.168.20.104
    net_node_t node_arr[] = {
        {"localhost", port, 11},
        {"localhost", port, 12},
        {"localhost", port, 13},
        {"localhost", port, 14}
        {"192.168.20.104", port, 11},
        {"192.168.20.104", port, 12},
        {"192.168.20.104", port, 13}
    };
  while (fgets(send_buf, MAXLINE, stdin) != NULL) {
    int pub_node_arr_size = 3;
    net_node_t pub_node_arr[] = {
        {"192.168.20.104", port, 8},
        {"192.168.20.104", port, 8},
        {"192.168.20.104", port, 8}
    };
  while (true) {
    //printf("Usage: pub <topic> [content] or sub <topic>\n");
    printf("Can I help you? pub, send or quit\n");
    scanf("%s",action);
    if(strcmp(action, "pub") == 0) {
        printf("Please input topic and content\n");
      scanf("%s %s", topic, content);
        n = client.pub(pub_node_arr, pub_node_arr_size, topic, strlen(topic)+1, content, strlen(content)+1);
        printf("pub %d\n", n);
    }
    else if(strcmp(action, "send") == 0) {
        getc(stdin);
        printf("Please input  content\n");
          if (fgets(content, MAXLINE, stdin) != NULL) {
      // 收到消息的节点即使没有对应的信息, 也要回复一个表示无的消息,否则会一直等待
    client.sendandrecv( node_arr, 4, send_buf, strlen(send_buf), &recv_arr, &recv_arr_size);
            n = client.sendandrecv( node_arr, node_arr_size, content, strlen(content), &recv_arr, &recv_arr_size);
    for(i=0; i<recv_arr_size; i++) {
        printf("host:%s, port: %d, key:%d, content: %s\n", 
            recv_arr[i].host,
@@ -36,6 +62,18 @@
    NetModSocket::free_recv_msg_arr(recv_arr, recv_arr_size);
  }
}
    else if(strcmp(action, "quit") == 0) {
      break;
    } else {
      printf("error input argument\n");
      continue;
    }
  }
}
int main(int argc, char *argv[]) {
    shm_init(512);
test_net_socket/net_mod_req_rep.sh
@@ -5,6 +5,8 @@
    ./dgram_mod_req_rep server 13 &
    ./dgram_mod_req_rep server 14 &
    ./dgram_mod_bus server 8 &
    ./net_mod_req_rep server 5000 &
}
@@ -14,7 +16,7 @@
}
function close() {
    ps -ef | grep -e "dgram_mod_req_rep" -e "net_mod_req_rep" | awk  '{print $2}' | xargs -i kill -9 {}
    ps -ef | grep -e "dgram_mod_req_rep" -e "net_mod_req_rep"  -e "dgram_mod_bus" | awk  '{print $2}' | xargs -i kill -9 {}
    ipcrm -a
}