lichao
2021-05-18 cf05ea3d9f43e4e84d621e1f9d54cbef552b6e2b
src/bh_api.cpp
@@ -1,6 +1,7 @@
#include "bh_api.h"
#include "defs.h"
#include "topic_node.h"
#include <cstdio>
#include <memory>
using namespace bhome_shm;
@@ -8,10 +9,47 @@
namespace
{
std::string GetProcExe()
{
   auto f = fopen("/proc/self/stat", "rb");
   if (f) {
      DEFER1(fclose(f));
      char buf[100] = {0};
      int n = fread(buf, 1, sizeof(buf), f);
      if (n > 0) {
         std::string s(buf, n);
         auto start = s.find('(');
         if (start != std::string::npos) {
            ++start;
            auto end = s.find(')', start);
            return s.substr(start, end - start);
         }
      }
   }
   return std::to_string(getpid());
}
std::unique_ptr<TopicNode> &ProcNodePtr()
{
   static std::mutex mtx;
   std::lock_guard<std::mutex> lk(mtx);
   static std::unique_ptr<TopicNode> ptr;
   if (!ptr && GlobalInit(BHomeShm())) {
      auto InitLog = []() {
         auto id = GetProcExe();
         char path[200] = {0};
         sprintf(path, "/opt/vasystem/valog/bhshmq_node_%s.log", id.c_str());
         ns_log::AddLog(path);
         return true;
      };
      static bool init_log = InitLog();
      ptr.reset(new TopicNode(BHomeShm()));
   }
   return ptr;
}
TopicNode &ProcNode()
{
   static TopicNode node(BHomeShm());
   return node;
   return *ProcNodePtr();
}
class TmpPtr : private boost::noncopyable
@@ -70,12 +108,10 @@
}
template <class MsgIn, class MsgOut = MsgCommonReply>
bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
                  const void *request,
                  const int request_len,
                  void **reply,
                  int *reply_len,
                  const int timeout_ms)
bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
                    const void *request, const int request_len,
                    void **reply, int *reply_len,
                    const int timeout_ms)
{
   MsgIn input;
   if (!input.ParseFromArray(request, request_len)) {
@@ -83,44 +119,89 @@
      return false;
   }
   MsgOut msg_reply;
   if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) {
      return PackOutput(msg_reply, reply, reply_len);
   auto &ptr = ProcNodePtr();
   if (!ptr) {
      SetLastError(eNotFound, "center not started.");
      return 0;
   }
   } else {
   return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
          PackOutput(msg_reply, reply, reply_len);
}
template <class MsgIn0, class MsgIn1, class MsgOut = MsgCommonReply>
bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int),
                    const void *in0, const int in0_len,
                    const void *in1, const int in1_len,
                    void **reply, int *reply_len,
                    const int timeout_ms)
{
   MsgIn0 input0;
   MsgIn1 input1;
   if (!input0.ParseFromArray(in0, in0_len) ||
       !input1.ParseFromArray(in1, in1_len)) {
      SetLastError(eInvalidInput, "invalid input.");
      return false;
   }
   MsgOut msg_reply;
   return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) &&
          PackOutput(msg_reply, reply, reply_len);
}
} // namespace
bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
                      const void *request,
                      const int request_len,
                      void **reply,
                      int *reply_len,
                      const int timeout_ms)
{
   return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
   return (*func)(request, request_len, reply, reply_len, timeout_ms);
}
bool BHHeartBeatEasy(const int timeout_ms)
int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApi_In1_Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApi_In1_Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHHeartbeatEasy(const int timeout_ms)
{
   return ProcNode().Heartbeat(timeout_ms);
}
bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
   return BHApi_In1_Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
   return BHApi_In1_Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
}
bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
int BHQueryTopicAddress(const void *remote, const int remote_len,
                        const void *topic, const int topic_len,
                        void **reply, int *reply_len,
                        const int timeout_ms)
{
   return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
   return BHApi_In2_Out1<BHAddress, MsgQueryTopic, MsgQueryTopicReply>(
       &TopicNode::QueryTopicAddress,
       remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms);
}
bool BHPublish(const void *msgpub,
               const int msgpub_len,
               const int timeout_ms)
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
int BHPublish(const void *msgpub,
              const int msgpub_len,
              const int timeout_ms)
{
   MsgPublish pub;
   if (!pub.ParseFromArray(msgpub, msgpub_len)) {
@@ -130,11 +211,11 @@
   return ProcNode().Publish(pub, timeout_ms);
}
bool BHReadSub(void **proc_id,
               int *proc_id_len,
               void **msgpub,
               int *msgpub_len,
               const int timeout_ms)
int BHReadSub(void **proc_id,
              int *proc_id_len,
              void **msgpub,
              int *msgpub_len,
              const int timeout_ms)
{
   std::string proc;
   MsgPublish pub;
@@ -151,19 +232,23 @@
   return false;
}
bool BHAsyncRequest(const void *request,
                    const int request_len,
                    void **msg_id,
                    int *msg_id_len)
int BHAsyncRequest(const void *remote,
                   const int remote_len,
                   const void *request,
                   const int request_len,
                   void **msg_id,
                   int *msg_id_len)
{
   BHAddress dest;
   MsgRequestTopic req;
   if (!req.ParseFromArray(request, request_len)) {
   if (!dest.ParseFromArray(remote, remote_len) ||
       !req.ParseFromArray(request, request_len)) {
      SetLastError(eInvalidInput, "invalid input.");
      return false;
   }
   std::string str_msg_id;
   MsgRequestTopicReply out_msg;
   if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
   if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) {
      if (!msg_id || !msg_id_len) {
         return true;
      }
@@ -178,22 +263,26 @@
   return false;
}
bool BHRequest(const void *request,
               const int request_len,
               void **proc_id,
               int *proc_id_len,
               void **reply,
               int *reply_len,
               const int timeout_ms)
int BHRequest(const void *remote,
              const int remote_len,
              const void *request,
              const int request_len,
              void **proc_id,
              int *proc_id_len,
              void **reply,
              int *reply_len,
              const int timeout_ms)
{
   BHAddress dest;
   MsgRequestTopic req;
   if (!req.ParseFromArray(request, request_len)) {
   if (!dest.ParseFromArray(remote, remote_len) ||
       !req.ParseFromArray(request, request_len)) {
      SetLastError(eInvalidInput, "invalid input.");
      return false;
   }
   std::string proc;
   MsgRequestTopicReply out_msg;
   if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
   if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) {
      TmpPtr pproc(proc);
      if (pproc && PackOutput(out_msg, reply, reply_len)) {
         pproc.ReleaseTo(proc_id, proc_id_len);
@@ -205,12 +294,12 @@
   return false;
}
bool BHReadRequest(void **proc_id,
                   int *proc_id_len,
                   void **request,
                   int *request_len,
                   void **src,
                   const int timeout_ms)
int BHReadRequest(void **proc_id,
                  int *proc_id_len,
                  void **request,
                  int *request_len,
                  void **src,
                  const int timeout_ms)
{
   void *src_info = 0;
   std::string proc;
@@ -228,9 +317,9 @@
   return false;
}
bool BHSendReply(void *src,
                 const void *reply,
                 const int reply_len)
int BHSendReply(void *src,
                const void *reply,
                const int reply_len)
{
   MsgRequestTopicReply rep;
   if (!rep.ParseFromArray(reply, reply_len)) {
@@ -240,31 +329,15 @@
   return ProcNode().ServerSendReply(src, rep);
}
int BHCleanUp()
{
   return 0;
}
namespace
{
typedef std::function<bool(const void *, const int)> ServerSender;
} // namespace
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
{
   TopicNode::ServerCB on_req;
   TopicNode::ServerAsyncCB on_req;
   TopicNode::SubDataCB on_sub;
   TopicNode::RequestResultCB on_reply;
   if (server_cb) {
      on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
      on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) {
         std::string sreq(request.SerializeAsString());
         bool r = false;
         ServerSender sender = [&](const void *p, const int len) {
            r = reply.ParseFromArray(p, len);
            return r;
         };
         server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender));
         return r;
         server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src);
      };
   }
   if (sub_cb) {
@@ -284,19 +357,18 @@
   ProcNode().Start(on_req, on_sub, on_reply);
}
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                           const void *data,
                           const int data_len)
{
   auto &sender = *(const ServerSender *) (tag);
   return sender(data, data_len);
}
void BHFree(void *data, int size)
{
   free(data);
}
int BHCleanup()
{
   ProcNodePtr().reset();
   return 0;
}
int BHGetLastError(void **msg, int *msg_len)
{
   int ec = 0;