lichao
2021-04-21 3931f83205f153f2bc7fc36d1a894cdc3f14b4db
src/bh_api.cpp
@@ -83,44 +83,50 @@
      return false;
   }
   MsgOut msg_reply;
   if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) {
      return PackOutput(msg_reply, reply, reply_len);
   } else {
      return false;
   }
   return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
          PackOutput(msg_reply, reply, reply_len);
}
} // namespace
bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
                      const void *request,
                      const int request_len,
                      void **reply,
                      int *reply_len,
                      const int timeout_ms)
{
   return (*func)(request, request_len, reply, reply_len, timeout_ms);
}
int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
bool BHHeartBeatEasy(const int timeout_ms)
int BHHeartbeatEasy(const int timeout_ms)
{
   return ProcNode().Heartbeat(timeout_ms);
}
bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
}
bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
   return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
bool BHPublish(const void *msgpub,
               const int msgpub_len,
               const int timeout_ms)
int BHPublish(const void *msgpub,
              const int msgpub_len,
              const int timeout_ms)
{
   MsgPublish pub;
   if (!pub.ParseFromArray(msgpub, msgpub_len)) {
@@ -130,11 +136,11 @@
   return ProcNode().Publish(pub, timeout_ms);
}
bool BHReadSub(void **proc_id,
               int *proc_id_len,
               void **msgpub,
               int *msgpub_len,
               const int timeout_ms)
int BHReadSub(void **proc_id,
              int *proc_id_len,
              void **msgpub,
              int *msgpub_len,
              const int timeout_ms)
{
   std::string proc;
   MsgPublish pub;
@@ -151,10 +157,10 @@
   return false;
}
bool BHAsyncRequest(const void *request,
                    const int request_len,
                    void **msg_id,
                    int *msg_id_len)
int BHAsyncRequest(const void *request,
                   const int request_len,
                   void **msg_id,
                   int *msg_id_len)
{
   MsgRequestTopic req;
   if (!req.ParseFromArray(request, request_len)) {
@@ -178,13 +184,13 @@
   return false;
}
bool BHRequest(const void *request,
               const int request_len,
               void **proc_id,
               int *proc_id_len,
               void **reply,
               int *reply_len,
               const int timeout_ms)
int BHRequest(const void *request,
              const int request_len,
              void **proc_id,
              int *proc_id_len,
              void **reply,
              int *reply_len,
              const int timeout_ms)
{
   MsgRequestTopic req;
   if (!req.ParseFromArray(request, request_len)) {
@@ -205,12 +211,12 @@
   return false;
}
bool BHReadRequest(void **proc_id,
                   int *proc_id_len,
                   void **request,
                   int *request_len,
                   void **src,
                   const int timeout_ms)
int BHReadRequest(void **proc_id,
                  int *proc_id_len,
                  void **request,
                  int *request_len,
                  void **src,
                  const int timeout_ms)
{
   void *src_info = 0;
   std::string proc;
@@ -228,9 +234,9 @@
   return false;
}
bool BHSendReply(void *src,
                 const void *reply,
                 const int reply_len)
int BHSendReply(void *src,
                const void *reply,
                const int reply_len)
{
   MsgRequestTopicReply rep;
   if (!rep.ParseFromArray(reply, reply_len)) {
@@ -240,31 +246,15 @@
   return ProcNode().ServerSendReply(src, rep);
}
int BHCleanUp()
{
   return 0;
}
namespace
{
typedef std::function<bool(const void *, const int)> ServerSender;
} // namespace
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
{
   TopicNode::ServerCB on_req;
   TopicNode::ServerAsyncCB on_req;
   TopicNode::SubDataCB on_sub;
   TopicNode::RequestResultCB on_reply;
   if (server_cb) {
      on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
      on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) {
         std::string sreq(request.SerializeAsString());
         bool r = false;
         ServerSender sender = [&](const void *p, const int len) {
            r = reply.ParseFromArray(p, len);
            return r;
         };
         server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender));
         return r;
         server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src);
      };
   }
   if (sub_cb) {
@@ -283,13 +273,6 @@
   }
   ProcNode().Start(on_req, on_sub, on_reply);
}
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                           const void *data,
                           const int data_len)
{
   auto &sender = *(const ServerSender *) (tag);
   return sender(data, data_len);
}
void BHFree(void *data, int size)