fujuntang
2021-10-23 bae3a4fd9406635608edf0c0d16c52cf7ca06a66
Optimize the code source.
2个文件已添加
6个文件已修改
260 ■■■■■ 已修改文件
src/bh_api.cpp 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg_trigger/msg_mgr.cpp 149 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg_trigger/msg_mgr.h 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.cpp 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket.h 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.cpp 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/net/net_mod_socket_wrapper.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket/bus_server_socket.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp
@@ -18,7 +18,6 @@
static int gRun_stat = 0;
static void *gNetmod_socket = NULL;
static std::map<std::string, int> gRecvbuf;
static pthread_mutex_t mutex;
@@ -1202,7 +1201,6 @@
  std::string str;
  std::string MsgID;
  int timeout_ms = 3000;
  std::map<std::string, int>::iterator recvIter;
  char data_buf[MAX_STR_LEN] = { 0x00 };
  char buf_temp[MAX_STR_LEN] = { 0x00 };
  char *topics_buf = NULL;
@@ -1265,11 +1263,10 @@
#endif 
 
  str = buf_temp;
  recvIter = gRecvbuf.find(str);
  if(recvIter != gRecvbuf.end()) {
  val = net_mod_socket_buf_data_get(gNetmod_socket, str);
  if(val > 0) {
    
    rv = 0;
    val = recvIter->second;
  } else {
    rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
@@ -1280,7 +1277,7 @@
      val = atoi((char *)data_buf);
      if (val > 0) {
        str = buf_temp;
        gRecvbuf.insert({str, val});
        net_mod_socket_buf_data_set(gNetmod_socket, str, val);
      }
      free(buf);
@@ -1381,7 +1378,6 @@
  net_mod_err_t *errarr;
  int errarr_size = 0;
  char *errString = NULL;
  std::map<std::string, int>::iterator recvIter;
  char buf_temp[MAX_STR_LEN] = { 0x00 };
  char *topics_buf = NULL;
  
@@ -1450,11 +1446,10 @@
#endif 
 
  str = buf_temp;
  recvIter = gRecvbuf.find(str);
  if(recvIter != gRecvbuf.end()) {
  val = net_mod_socket_buf_data_get(gNetmod_socket, str);
  if(val > 0) {
    
    rv = 0;
    val = recvIter->second;
  } else {
    rv = net_mod_socket_reg(gNetmod_socket, buf_temp, strlen(buf_temp), &buf, &size, timeout_ms, PROC_QUE_STCS);
@@ -1465,7 +1460,7 @@
      val = atoi((char *)data_buf);
      if (val > 0) {
        str = buf_temp;
        gRecvbuf.insert({str, val});
        net_mod_socket_buf_data_set(gNetmod_socket, str, val);
      }
      free(buf);
src/msg_trigger/msg_mgr.cpp
New file
@@ -0,0 +1,149 @@
#include "sem_util.h"
#include "logger_factory.h"
#include <sys/sem.h>
#include <sys/shm.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <errno.h>
#include "msg_mgr.h"
static Logger *logger = LoggerFactory::getLogger();
static int sem_msg_id;
static Msg_info gMsg_buf[MAX_LOCK];
static int rsv_msg_id;
int msg_init(void)
{
  key_t key = ftok(MSG_PATH, 0x01);
  memset(gMsg_buf, 0x00, sizeof(gMsg_buf));
  sem_msg_id = msgget((key_t)key, 0666 | IPC_CREAT);
  if(sem_msg_id < 0) {
    logger->error("msgget failed with error: %d\n", sem_msg_id);
    return -1;
  }
  key_t key_rsv = ftok(MSG_RSV_PATH, 0x02);
  rsv_msg_id = msgget((key_t)key_rsv, 0666 | IPC_CREAT);
  if(rsv_msg_id == -1) {
    logger->error("msgget failed with error: %d\n", rsv_msg_id);
    return -1;
  }
  return 0;
}
void msg_distrib(int msg_id, Msg_info *message)
{
  int ret;
  Msg_info* msg_obj = message;
  switch(msg_id)
  {
    case SEM_TYPE_ID:
      msg_obj->mtype = MSG_TYPE_SEM;
      ret = msgsnd(sem_msg_id, msg_obj, sizeof(Msg_info) - sizeof(long), IPC_NOWAIT);
      if(ret < 0) {
        logger->error("msg send fail: %d\n", ret);
        return;
      }
      break;
    case RSV_TYPE_ID:
      msg_obj->mtype = MSG_TYPE_RSV;
      ret = msgsnd(rsv_msg_id, msg_obj, sizeof(Msg_info), 0);
      if(ret < 0) {
        logger->error("msg send fail: %d\n", ret);
        return;
      }
      break;
  default:
    logger->error("unknown msg type!\n");
  }
}
int get_msg_info(int Msgid, Msg_info *message)
{
  int ret = 0;
  Msg_info msg_obj;
  int msg_id;
  if (Msgid == SEM_TYPE_ID) {
    msg_id = sem_msg_id;
  } else if (Msgid == RSV_TYPE_ID) {
    msg_id = rsv_msg_id;
  }
  ret = msgrcv(msg_id, (void*)&msg_obj, sizeof(Msg_info) - sizeof(long), MSG_TYPE_SEM, 0);
  if(ret < 0) {
    logger->error("msg recv fail: %d\n", ret);
    return -1;
  }
  switch(Msgid) {
    case SEM_TYPE_ID:
      msg_info_set(SEM_TYPE_ID, msg_obj);
      break;
    case RSV_TYPE_ID:
      msg_info_set(SEM_TYPE_ID, msg_obj);
      break;
    default:
      logger->error("unknown msg type!\n");
  }
  return 0;
}
void msg_info_set(int index, Msg_info msg_obj)
{
  gMsg_buf[index].key = msg_obj.key;
  gMsg_buf[index].id = msg_obj.id;
  if (msg_obj.act == SEM_POST) {
    gMsg_buf[index].count++;
  } else if (msg_obj.act == SEM_GET) {
    gMsg_buf[index].count--;
  } else if (msg_obj.act == SEM_RESET) {
    gMsg_buf[index].count = 0;
  }
}
void *sem_msg_handler(void *skptr)
{
  int ret;
  Msg_info msg_obj;
  msg_init();
  while(true) {
    ret = get_msg_info(SEM_TYPE_ID, &msg_obj);
    if (ret == 0) {
      //process_msg(msg_obj);
    }
  }
}
src/msg_trigger/msg_mgr.h
New file
@@ -0,0 +1,54 @@
#ifndef __MSG_MGR_DEF_
#define __MSG_MGR_DEF_
#ifdef __cplusplus
extern "C" {
#endif
#define SEM_TYPE_ID   0
#define RSV_TYPE_ID   1
#define MSG_TYPE_SEM  1
#define MSG_TYPE_RSV  2
#define SEM_CREATE    0
#define SEM_GET       1
#define SEM_POST      2
#define SEM_RM        3
#define SEM_RESET     4
#define MSG_PATH      "/tmp/msgqueue"
#define MSG_RSV_PATH  "/tmp/msgqueue_rsv"
#define QUERY_MUTEX_KEY 0x8700
#define MAX_LOCK      100
#define MAX_LOCK_HOLD 10
#define SEM_WT_TIMEOUT  60
typedef struct _msg_info
{
  long mtype;
  int key;
  int id;
  int act;
  int count;
} Msg_info;
#ifdef __cplusplus
}
#endif
int msg_init(void);
void msg_distrib(int msg_id, Msg_info *message);
int get_msg_info(int msg_id, Msg_info *message);
void *sem_msg_handler(void *skptr);
void msg_info_set(int index, Msg_info msg_obj);
#endif  //end of file
src/net/net_mod_socket.cpp
@@ -341,6 +341,25 @@
     
}
void NetModSocket::buf_data_set(std::string str, int val) {
  recvbuf.insert({str, val});
}
int NetModSocket::buf_data_get(std::string str) {
  int i;
  int val = 0;
  std::map<std::string, int>::iterator recvIter;
  recvIter = recvbuf.find(str);
  if(recvIter != recvbuf.end()) {
    val = recvIter->second;
  }
  return val;
}
void NetModSocket::free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size) {
src/net/net_mod_socket.h
@@ -73,6 +73,7 @@
  ShmModSocket shmModSocket;
  int int_val;
  int svr_val;
  std::map<std::string, int> recvbuf;
  // pthread_mutex_t sendMutex;
  // request header 编码为网络传输的字节
@@ -180,9 +181,6 @@
  int sendandrecv_timeout( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size, int sec, int nsec) ;
  int sendandrecv_nowait( const void *send_buf, const int send_size, const int send_key, void **recv_buf, int *recv_size) ;
  /**
   * recvandsend
   */
@@ -250,7 +248,8 @@
   */
  static void free_recv_msg_arr(net_mod_recv_msg_t * arr, size_t size);
 
  void buf_data_set(std::string str, int val);
  int buf_data_get(std::string str);
};
 
#endif
src/net/net_mod_socket_wrapper.cpp
@@ -123,6 +123,16 @@
  return sockt->svr_get();
}
void net_mod_socket_buf_data_set(void * _socket, std::string str, int val) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  sockt->buf_data_set(str, val);
}
int net_mod_socket_buf_data_get(void * _socket, std::string str) {
  NetModSocket *sockt = (NetModSocket *)_socket;
  return sockt->buf_data_get(str);
}
/**
 * 如果建立连接的节点没有接受到消息等待timeout的时间后返回
 * @timeout 等待时间,单位是千分之一秒
src/net/net_mod_socket_wrapper.h
@@ -132,6 +132,8 @@
void net_mod_socket_svr_set(void * _socket, int data);
int net_mod_socket_int_get(void * _socket);
int net_mod_socket_svr_get(void * _socket);
void net_mod_socket_buf_data_set(void * _socket, std::string str, int val);
int net_mod_socket_buf_data_get(void * _socket, std::string str);
/**
 * @brief 跨机器发送消息并接受返回的应答消息,直到发送完成才返回
src/socket/bus_server_socket.h
@@ -23,7 +23,7 @@
  int data;
  int data_fix;
  int count;
  _LinkNode *next; 
} LinkNode;