lichao
2021-04-23 628c1c21ffb19d8c96ed9ce89531595f9870ab1a
src/bh_api.cpp
@@ -10,6 +10,7 @@
{
TopicNode &ProcNode()
{
   static bool init_bind_msg_shm = MsgI::BindShm(BHomeShm());
   static TopicNode node(BHomeShm());
   return node;
}
@@ -157,19 +158,23 @@
   return false;
}
int BHAsyncRequest(const void *request,
int BHAsyncRequest(const void *remote,
                   const int remote_len,
                   const void *request,
                   const int request_len,
                   void **msg_id,
                   int *msg_id_len)
{
   BHAddress dest;
   MsgRequestTopic req;
   if (!req.ParseFromArray(request, request_len)) {
   if (!dest.ParseFromArray(remote, remote_len) ||
       !req.ParseFromArray(request, request_len)) {
      SetLastError(eInvalidInput, "invalid input.");
      return false;
   }
   std::string str_msg_id;
   MsgRequestTopicReply out_msg;
   if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
   if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) {
      if (!msg_id || !msg_id_len) {
         return true;
      }
@@ -184,7 +189,9 @@
   return false;
}
int BHRequest(const void *request,
int BHRequest(const void *remote,
              const int remote_len,
              const void *request,
              const int request_len,
              void **proc_id,
              int *proc_id_len,
@@ -192,14 +199,16 @@
              int *reply_len,
              const int timeout_ms)
{
   BHAddress dest;
   MsgRequestTopic req;
   if (!req.ParseFromArray(request, request_len)) {
   if (!dest.ParseFromArray(remote, remote_len) ||
       !req.ParseFromArray(request, request_len)) {
      SetLastError(eInvalidInput, "invalid input.");
      return false;
   }
   std::string proc;
   MsgRequestTopicReply out_msg;
   if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
   if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) {
      TmpPtr pproc(proc);
      if (pproc && PackOutput(out_msg, reply, reply_len)) {
         pproc.ReleaseTo(proc_id, proc_id_len);
@@ -246,31 +255,15 @@
   return ProcNode().ServerSendReply(src, rep);
}
int BHCleanUp()
{
   return 0;
}
namespace
{
typedef std::function<bool(const void *, const int)> ServerSender;
} // namespace
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
{
   TopicNode::ServerCB on_req;
   TopicNode::ServerAsyncCB on_req;
   TopicNode::SubDataCB on_sub;
   TopicNode::RequestResultCB on_reply;
   if (server_cb) {
      on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
      on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) {
         std::string sreq(request.SerializeAsString());
         bool r = false;
         ServerSender sender = [&](const void *p, const int len) {
            r = reply.ParseFromArray(p, len);
            return r;
         };
         server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), &sender);
         return r;
         server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src);
      };
   }
   if (sub_cb) {
@@ -289,13 +282,6 @@
   }
   ProcNode().Start(on_req, on_sub, on_reply);
}
int BHServerCallbackReply(const void *tag,
                          const void *data,
                          const int data_len)
{
   auto &sender = *(const ServerSender *) (tag);
   return sender(data, data_len);
}
void BHFree(void *data, int size)