lichao
2021-04-15 c64c54d8e75b9354dc49a7b6b2d326e7dd59eb37
src/bh_api.cpp
@@ -39,11 +39,25 @@
   }
   size_t size() const { return size_; }
   operator bool() const { return ptr_; }
   bool ReleaseTo(void **pdata, int *psize)
   {
      if (!ptr_) {
         return false;
      }
      if (pdata && psize) {
         *psize = size();
         *pdata = release();
      }
      return true;
   }
};
template <class Msg>
bool PackOutput(const Msg &msg, void **out, int *out_len)
{
   if (!out || !out_len) {
      return true; // not wanted.
   }
   auto size = msg.ByteSizeLong();
   TmpPtr p(size);
   if (!p) {
@@ -51,30 +65,37 @@
      return false;
   }
   msg.SerializePartialToArray(p.get(), size);
   *out = p.release();
   *out_len = size;
   p.ReleaseTo(out, out_len);
   return true;
}
} // namespace
bool BHRegister(const void *proc_info,
                const int proc_info_len,
template <class MsgIn, class MsgOut = MsgCommonReply>
bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
                  const void *request,
                  const int request_len,
                void **reply,
                int *reply_len,
                const int timeout_ms)
{
   ProcInfo pi;
   if (!pi.ParseFromArray(proc_info, proc_info_len)) {
   MsgIn input;
   if (!input.ParseFromArray(request, request_len)) {
      SetLastError(eInvalidInput, "invalid input.");
      return false;
   }
   MsgCommonReply msg_reply;
   if (ProcNode().Register(pi, msg_reply, timeout_ms)) {
   MsgOut msg_reply;
   if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) {
      return PackOutput(msg_reply, reply, reply_len);
   } else {
      return false;
   }
}
} // namespace
bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
bool BHHeartBeatEasy(const int timeout_ms)
@@ -82,23 +103,19 @@
   return ProcNode().Heartbeat(timeout_ms);
}
bool BHHeartBeat(const void *proc_info,
                 const int proc_info_len,
                 void **reply,
                 int *reply_len,
                 const int timeout_ms)
bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
   ProcInfo pi;
   if (!pi.ParseFromArray(proc_info, proc_info_len)) {
      SetLastError(eInvalidInput, "invalid input.");
      return false;
   return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
   }
   MsgCommonReply msg_reply;
   if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) {
      return PackOutput(msg_reply, reply, reply_len);
   } else {
      return false;
bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
   }
bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
bool BHPublish(const void *msgpub,
@@ -125,8 +142,35 @@
   if (ProcNode().RecvSub(proc, pub, timeout_ms)) {
      TmpPtr pproc(proc);
      if (pproc && PackOutput(pub, msgpub, msgpub_len)) {
         *proc_id = pproc.release();
         *proc_id_len = pproc.size();
         pproc.ReleaseTo(proc_id, proc_id_len);
         return true;
      } else {
         SetLastError(ENOMEM, "out of mem");
      }
   }
   return false;
}
bool BHAsyncRequest(const void *request,
                    const int request_len,
                    void **msg_id,
                    int *msg_id_len)
{
   MsgRequestTopic req;
   if (!req.ParseFromArray(request, request_len)) {
      SetLastError(eInvalidInput, "invalid input.");
      return false;
   }
   std::string str_msg_id;
   MsgRequestTopicReply out_msg;
   if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
      if (!msg_id || !msg_id_len) {
         return true;
      }
      TmpPtr ptr(str_msg_id);
      if (ptr) {
         ptr.ReleaseTo(msg_id, msg_id_len);
         return true;
      } else {
         SetLastError(ENOMEM, "out of mem");
      }
@@ -152,8 +196,8 @@
   if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
      TmpPtr pproc(proc);
      if (pproc && PackOutput(out_msg, reply, reply_len)) {
         *proc_id = pproc.release();
         *proc_id_len = pproc.size();
         pproc.ReleaseTo(proc_id, proc_id_len);
         return true;
      } else {
         SetLastError(ENOMEM, "out of mem");
      }
@@ -174,9 +218,9 @@
   if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) {
      TmpPtr pproc(proc);
      if (pproc && PackOutput(out_msg, request, request_len)) {
         *proc_id = pproc.release();
         *proc_id_len = pproc.size();
         pproc.ReleaseTo(proc_id, proc_id_len);
         *src = src_info;
         return true;
      } else {
         SetLastError(ENOMEM, "out of mem");
      }
@@ -206,10 +250,11 @@
typedef std::function<bool(const void *, const int)> ServerSender;
} // namespace
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb)
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
{
   TopicNode::ServerCB on_req;
   TopicNode::SubDataCB on_sub;
   TopicNode::RequestResultCB on_reply;
   if (server_cb) {
      on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
         std::string sreq(request.SerializeAsString());
@@ -228,8 +273,16 @@
         sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size());
      };
   }
   if (client_cb) {
      on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) {
         std::string s(rep.SerializeAsString());
         client_cb(head.proc_id().data(), head.proc_id().size(),
                   head.msg_id().data(), head.msg_id().size(),
                   s.data(), s.size());
      };
   }
   ProcNode().Start(on_req, on_sub);
   ProcNode().Start(on_req, on_sub, on_reply);
}
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                           const void *data,
@@ -251,10 +304,7 @@
      std::string err_msg;
      GetLastError(ec, err_msg);
      TmpPtr p(err_msg);
      if (p) {
         *msg = p.release();
         *msg_len = p.size();
      }
      p.ReleaseTo(msg, msg_len);
   }
   return ec;
}