| | |
| | | #include "bh_api.h" |
| | | #include "defs.h" |
| | | #include "topic_node.h" |
| | | #include <cstdio> |
| | | #include <memory> |
| | | |
| | | using namespace bhome_shm; |
| | |
| | | |
| | | namespace |
| | | { |
| | | std::string GetProcExe() |
| | | { |
| | | auto f = fopen("/proc/self/stat", "rb"); |
| | | if (f) { |
| | | DEFER1(fclose(f)); |
| | | char buf[100] = {0}; |
| | | int n = fread(buf, 1, sizeof(buf), f); |
| | | if (n > 0) { |
| | | std::string s(buf, n); |
| | | auto start = s.find('('); |
| | | if (start != std::string::npos) { |
| | | ++start; |
| | | auto end = s.find(')', start); |
| | | return s.substr(start, end - start); |
| | | } |
| | | } |
| | | } |
| | | return std::to_string(getpid()); |
| | | } |
| | | std::unique_ptr<TopicNode> &ProcNodePtr() |
| | | { |
| | | static bool init = GlobalInit(BHomeShm()); |
| | | auto InitLog = []() { |
| | | auto id = GetProcExe(); |
| | | char path[200] = {0}; |
| | | sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str()); |
| | | ns_log::AddLog(path); |
| | | return true; |
| | | }; |
| | | static bool init_log = InitLog(); |
| | | static std::unique_ptr<TopicNode> ptr(new TopicNode(BHomeShm())); |
| | | return ptr; |
| | | } |
| | | TopicNode &ProcNode() |
| | | { |
| | | static TopicNode node(BHomeShm()); |
| | | return node; |
| | | return *ProcNodePtr(); |
| | | } |
| | | |
| | | class TmpPtr : private boost::noncopyable |
| | |
| | | } |
| | | |
| | | template <class MsgIn, class MsgOut = MsgCommonReply> |
| | | bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), |
| | | const void *request, |
| | | const int request_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), |
| | | const void *request, const int request_len, |
| | | void **reply, int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | MsgIn input; |
| | | if (!input.ParseFromArray(request, request_len)) { |
| | |
| | | return false; |
| | | } |
| | | MsgOut msg_reply; |
| | | if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) { |
| | | return PackOutput(msg_reply, reply, reply_len); |
| | | return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) && |
| | | PackOutput(msg_reply, reply, reply_len); |
| | | } |
| | | |
| | | } else { |
| | | template <class MsgIn0, class MsgIn1, class MsgOut = MsgCommonReply> |
| | | bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int), |
| | | const void *in0, const int in0_len, |
| | | const void *in1, const int in1_len, |
| | | void **reply, int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | MsgIn0 input0; |
| | | MsgIn1 input1; |
| | | if (!input0.ParseFromArray(in0, in0_len) || |
| | | !input1.ParseFromArray(in1, in1_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | MsgOut msg_reply; |
| | | return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) && |
| | | PackOutput(msg_reply, reply, reply_len); |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | int BHApiIn1Out1Proxy(FBHApiIn1Out1 func, |
| | | const void *request, |
| | | const int request_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | return (*func)(request, request_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHHeartBeatEasy(const int timeout_ms) |
| | | int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApi_In1_Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | } |
| | | int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApi_In1_Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | int BHHeartbeatEasy(const int timeout_ms) |
| | | { |
| | | return ProcNode().Heartbeat(timeout_ms); |
| | | } |
| | | |
| | | bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | return BHApi_In1_Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); |
| | | return BHApi_In1_Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | int BHQueryTopicAddress(const void *remote, const int remote_len, |
| | | const void *topic, const int topic_len, |
| | | void **reply, int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); |
| | | return BHApi_In2_Out1<BHAddress, MsgQueryTopic, MsgQueryTopicReply>( |
| | | &TopicNode::QueryTopicAddress, |
| | | remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | bool BHPublish(const void *msgpub, |
| | | const int msgpub_len, |
| | | const int timeout_ms) |
| | | int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | | return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | int BHPublish(const void *msgpub, |
| | | const int msgpub_len, |
| | | const int timeout_ms) |
| | | { |
| | | MsgPublish pub; |
| | | if (!pub.ParseFromArray(msgpub, msgpub_len)) { |
| | |
| | | return ProcNode().Publish(pub, timeout_ms); |
| | | } |
| | | |
| | | bool BHReadSub(void **proc_id, |
| | | int *proc_id_len, |
| | | void **msgpub, |
| | | int *msgpub_len, |
| | | const int timeout_ms) |
| | | int BHReadSub(void **proc_id, |
| | | int *proc_id_len, |
| | | void **msgpub, |
| | | int *msgpub_len, |
| | | const int timeout_ms) |
| | | { |
| | | std::string proc; |
| | | MsgPublish pub; |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool BHAsyncRequest(const void *request, |
| | | const int request_len, |
| | | void **msg_id, |
| | | int *msg_id_len) |
| | | int BHAsyncRequest(const void *remote, |
| | | const int remote_len, |
| | | const void *request, |
| | | const int request_len, |
| | | void **msg_id, |
| | | int *msg_id_len) |
| | | { |
| | | BHAddress dest; |
| | | MsgRequestTopic req; |
| | | if (!req.ParseFromArray(request, request_len)) { |
| | | if (!dest.ParseFromArray(remote, remote_len) || |
| | | !req.ParseFromArray(request, request_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | std::string str_msg_id; |
| | | MsgRequestTopicReply out_msg; |
| | | if (ProcNode().ClientAsyncRequest(req, str_msg_id)) { |
| | | if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) { |
| | | if (!msg_id || !msg_id_len) { |
| | | return true; |
| | | } |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool BHRequest(const void *request, |
| | | const int request_len, |
| | | void **proc_id, |
| | | int *proc_id_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | int BHRequest(const void *remote, |
| | | const int remote_len, |
| | | const void *request, |
| | | const int request_len, |
| | | void **proc_id, |
| | | int *proc_id_len, |
| | | void **reply, |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | BHAddress dest; |
| | | MsgRequestTopic req; |
| | | if (!req.ParseFromArray(request, request_len)) { |
| | | if (!dest.ParseFromArray(remote, remote_len) || |
| | | !req.ParseFromArray(request, request_len)) { |
| | | SetLastError(eInvalidInput, "invalid input."); |
| | | return false; |
| | | } |
| | | std::string proc; |
| | | MsgRequestTopicReply out_msg; |
| | | if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) { |
| | | if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) { |
| | | TmpPtr pproc(proc); |
| | | if (pproc && PackOutput(out_msg, reply, reply_len)) { |
| | | pproc.ReleaseTo(proc_id, proc_id_len); |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool BHReadRequest(void **proc_id, |
| | | int *proc_id_len, |
| | | void **request, |
| | | int *request_len, |
| | | void **src, |
| | | const int timeout_ms) |
| | | int BHReadRequest(void **proc_id, |
| | | int *proc_id_len, |
| | | void **request, |
| | | int *request_len, |
| | | void **src, |
| | | const int timeout_ms) |
| | | { |
| | | void *src_info = 0; |
| | | std::string proc; |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool BHSendReply(void *src, |
| | | const void *reply, |
| | | const int reply_len) |
| | | int BHSendReply(void *src, |
| | | const void *reply, |
| | | const int reply_len) |
| | | { |
| | | MsgRequestTopicReply rep; |
| | | if (!rep.ParseFromArray(reply, reply_len)) { |
| | |
| | | return ProcNode().ServerSendReply(src, rep); |
| | | } |
| | | |
| | | int BHCleanUp() |
| | | { |
| | | return 0; |
| | | } |
| | | |
| | | namespace |
| | | { |
| | | typedef std::function<bool(const void *, const int)> ServerSender; |
| | | } // namespace |
| | | |
| | | void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) |
| | | { |
| | | TopicNode::ServerCB on_req; |
| | | TopicNode::ServerAsyncCB on_req; |
| | | TopicNode::SubDataCB on_sub; |
| | | TopicNode::RequestResultCB on_reply; |
| | | if (server_cb) { |
| | | on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { |
| | | on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) { |
| | | std::string sreq(request.SerializeAsString()); |
| | | bool r = false; |
| | | ServerSender sender = [&](const void *p, const int len) { |
| | | r = reply.ParseFromArray(p, len); |
| | | return r; |
| | | }; |
| | | server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender)); |
| | | return r; |
| | | server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src); |
| | | }; |
| | | } |
| | | if (sub_cb) { |
| | |
| | | |
| | | ProcNode().Start(on_req, on_sub, on_reply); |
| | | } |
| | | bool BHServerCallbackReply(const BHServerCallbackTag *tag, |
| | | const void *data, |
| | | const int data_len) |
| | | { |
| | | auto &sender = *(const ServerSender *) (tag); |
| | | return sender(data, data_len); |
| | | } |
| | | |
| | | void BHFree(void *data, int size) |
| | | { |
| | | free(data); |
| | | } |
| | | |
| | | int BHCleanup() |
| | | { |
| | | ProcNodePtr().reset(); |
| | | return 0; |
| | | } |
| | | |
| | | int BHGetLastError(void **msg, int *msg_len) |
| | | { |
| | | int ec = 0; |