#include "bh_api.h" #include "defs.h" #include "topic_node.h" #include #include using namespace bhome_shm; using namespace bhome_msg; namespace { std::string GetProcExe() { auto f = fopen("/proc/self/stat", "rb"); if (f) { DEFER1(fclose(f)); char buf[100] = {0}; int n = fread(buf, 1, sizeof(buf), f); if (n > 0) { std::string s(buf, n); auto start = s.find('('); if (start != std::string::npos) { ++start; auto end = s.find(')', start); return s.substr(start, end - start); } } } return std::to_string(getpid()); } std::unique_ptr &ProcNodePtr() { static std::mutex mtx; std::lock_guard lk(mtx); static std::unique_ptr ptr; if (!ptr && GlobalInit(BHomeShm())) { auto InitLog = []() { auto id = GetProcExe(); char path[200] = {0}; sprintf(path, "/opt/vasystem/valog/bhshmq_node_%s.log", id.c_str()); ns_log::AddLog(path); return true; }; static bool init_log = InitLog(); ptr.reset(new TopicNode(BHomeShm())); } return ptr; } TopicNode &ProcNode() { return *ProcNodePtr(); } class TmpPtr : private boost::noncopyable { void *ptr_ = 0; size_t size_ = 0; public: explicit TmpPtr(const size_t size) : ptr_(malloc(size)), size_(size) {} explicit TmpPtr(const std::string &str) : TmpPtr(str.size()) { if (ptr_) { memcpy(ptr_, str.data(), str.size()); } } ~TmpPtr() { free(ptr_); } void *get() const { return ptr_; } void *release() { void *tmp = ptr_; ptr_ = 0; return tmp; } size_t size() const { return size_; } operator bool() const { return ptr_; } bool ReleaseTo(void **pdata, int *psize) { if (!ptr_) { return false; } if (pdata && psize) { *psize = size(); *pdata = release(); } return true; } }; template bool PackOutput(const Msg &msg, void **out, int *out_len) { if (!out || !out_len) { return true; // not wanted. } auto size = msg.ByteSizeLong(); TmpPtr p(size); if (!p) { SetLastError(ENOMEM, "not enough memory."); return false; } msg.SerializePartialToArray(p.get(), size); p.ReleaseTo(out, out_len); return true; } template bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), const void *request, const int request_len, void **reply, int *reply_len, const int timeout_ms) { MsgIn input; if (!input.ParseFromArray(request, request_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } MsgOut msg_reply; auto &ptr = ProcNodePtr(); if (!ptr) { SetLastError(eNotFound, "center not started."); return 0; } return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) && PackOutput(msg_reply, reply, reply_len); } template bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int), const void *in0, const int in0_len, const void *in1, const int in1_len, void **reply, int *reply_len, const int timeout_ms) { MsgIn0 input0; MsgIn1 input1; if (!input0.ParseFromArray(in0, in0_len) || !input1.ParseFromArray(in1, in1_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } MsgOut msg_reply; return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) && PackOutput(msg_reply, reply, reply_len); } } // namespace int BHApiIn1Out1Proxy(FBHApiIn1Out1 func, const void *request, const int request_len, void **reply, int *reply_len, const int timeout_ms) { return (*func)(request, request_len, reply, reply_len, timeout_ms); } int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { return BHApi_In1_Out1(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { return BHApi_In1_Out1(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHHeartbeatEasy(const int timeout_ms) { return ProcNode().Heartbeat(timeout_ms); } int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { return BHApi_In1_Out1(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { return BHApi_In1_Out1(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); } int BHQueryTopicAddress(const void *remote, const int remote_len, const void *topic, const int topic_len, void **reply, int *reply_len, const int timeout_ms) { return BHApi_In2_Out1( &TopicNode::QueryTopicAddress, remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms); } int BHQueryProcs(const void *remote, const int remote_len, const void *query, const int query_len, void **reply, int *reply_len, const int timeout_ms) { return BHApi_In2_Out1( &TopicNode::QueryProcs, remote, remote_len, query, query_len, reply, reply_len, timeout_ms); } int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { return BHApi_In1_Out1(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); } int BHPublish(const void *msgpub, const int msgpub_len, const int timeout_ms) { MsgPublish pub; if (!pub.ParseFromArray(msgpub, msgpub_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } return ProcNode().Publish(pub, timeout_ms); } int BHReadSub(void **proc_id, int *proc_id_len, void **msgpub, int *msgpub_len, const int timeout_ms) { std::string proc; MsgPublish pub; if (ProcNode().RecvSub(proc, pub, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(pub, msgpub, msgpub_len)) { pproc.ReleaseTo(proc_id, proc_id_len); return true; } else { SetLastError(ENOMEM, "out of mem"); } } return false; } int BHAsyncRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **msg_id, int *msg_id_len) { BHAddress dest; MsgRequestTopic req; if (!dest.ParseFromArray(remote, remote_len) || !req.ParseFromArray(request, request_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } std::string str_msg_id; MsgRequestTopicReply out_msg; if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) { if (!msg_id || !msg_id_len) { return true; } TmpPtr ptr(str_msg_id); if (ptr) { ptr.ReleaseTo(msg_id, msg_id_len); return true; } else { SetLastError(ENOMEM, "out of mem"); } } return false; } int BHRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **proc_id, int *proc_id_len, void **reply, int *reply_len, const int timeout_ms) { BHAddress dest; MsgRequestTopic req; if (!dest.ParseFromArray(remote, remote_len) || !req.ParseFromArray(request, request_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } std::string proc; MsgRequestTopicReply out_msg; if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, reply, reply_len)) { pproc.ReleaseTo(proc_id, proc_id_len); return true; } else { SetLastError(ENOMEM, "out of mem"); } } return false; } int BHReadRequest(void **proc_id, int *proc_id_len, void **request, int *request_len, void **src, const int timeout_ms) { void *src_info = 0; std::string proc; MsgRequestTopic out_msg; if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, request, request_len)) { pproc.ReleaseTo(proc_id, proc_id_len); *src = src_info; return true; } else { SetLastError(ENOMEM, "out of mem"); } } return false; } int BHSendReply(void *src, const void *reply, const int reply_len) { MsgRequestTopicReply rep; if (!rep.ParseFromArray(reply, reply_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } return ProcNode().ServerSendReply(src, rep); } void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) { TopicNode::ServerAsyncCB on_req; TopicNode::SubDataCB on_sub; TopicNode::RequestResultCB on_reply; if (server_cb) { on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) { std::string sreq(request.SerializeAsString()); server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src); }; } if (sub_cb) { on_sub = [sub_cb](const std::string &proc_id, const MsgPublish &pub) { std::string s(pub.SerializeAsString()); sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size()); }; } if (client_cb) { on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) { std::string s(rep.SerializeAsString()); client_cb(head.proc_id().data(), head.proc_id().size(), head.msg_id().data(), head.msg_id().size(), s.data(), s.size()); }; } ProcNode().Start(on_req, on_sub, on_reply); } void BHFree(void *data, int size) { free(data); } int BHCleanup() { ProcNodePtr().reset(); return 0; } int BHGetLastError(void **msg, int *msg_len) { int ec = 0; if (msg && msg_len) { std::string err_msg; GetLastError(ec, err_msg); TmpPtr p(err_msg); p.ReleaseTo(msg, msg_len); } return ec; } #undef BH_SOCKET_MEMF_CALL