#include "bh_api.h" #include "defs.h" #include "topic_node.h" #include using namespace bhome_shm; using namespace bhome_msg; namespace { TopicNode &ProcNode() { static TopicNode node(BHomeShm()); return node; } class TmpPtr : private boost::noncopyable { void *ptr_ = 0; size_t size_ = 0; public: explicit TmpPtr(const size_t size) : ptr_(malloc(size)), size_(size) {} explicit TmpPtr(const std::string &str) : TmpPtr(str.size()) { if (ptr_) { memcpy(ptr_, str.data(), str.size()); } } ~TmpPtr() { free(ptr_); } void *get() const { return ptr_; } void *release() { void *tmp = ptr_; ptr_ = 0; return tmp; } size_t size() const { return size_; } operator bool() const { return ptr_; } bool ReleaseTo(void **pdata, int *psize) { if (!ptr_) { return false; } if (pdata && psize) { *psize = size(); *pdata = release(); } return true; } }; template bool PackOutput(const Msg &msg, void **out, int *out_len) { if (!out || !out_len) { return true; // not wanted. } auto size = msg.ByteSizeLong(); TmpPtr p(size); if (!p) { SetLastError(ENOMEM, "not enough memory."); return false; } msg.SerializePartialToArray(p.get(), size); p.ReleaseTo(out, out_len); return true; } template bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int), const void *request, const int request_len, void **reply, int *reply_len, const int timeout_ms) { MsgIn input; if (!input.ParseFromArray(request, request_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } MsgOut msg_reply; return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) && PackOutput(msg_reply, reply, reply_len); } } // namespace int BHApiIn1Out1Proxy(FBHApiIn1Out1 func, const void *request, const int request_len, void **reply, int *reply_len, const int timeout_ms) { return (*func)(request, request_len, reply, reply_len, timeout_ms); } int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { return BHApiIn1Out1(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHHeartbeatEasy(const int timeout_ms) { return ProcNode().Heartbeat(timeout_ms); } int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { return BHApiIn1Out1(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { return BHApiIn1Out1(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms); } int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { return BHApiIn1Out1(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms); } int BHPublish(const void *msgpub, const int msgpub_len, const int timeout_ms) { MsgPublish pub; if (!pub.ParseFromArray(msgpub, msgpub_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } return ProcNode().Publish(pub, timeout_ms); } int BHReadSub(void **proc_id, int *proc_id_len, void **msgpub, int *msgpub_len, const int timeout_ms) { std::string proc; MsgPublish pub; if (ProcNode().RecvSub(proc, pub, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(pub, msgpub, msgpub_len)) { pproc.ReleaseTo(proc_id, proc_id_len); return true; } else { SetLastError(ENOMEM, "out of mem"); } } return false; } int BHAsyncRequest(const void *request, const int request_len, void **msg_id, int *msg_id_len) { MsgRequestTopic req; if (!req.ParseFromArray(request, request_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } std::string str_msg_id; MsgRequestTopicReply out_msg; if (ProcNode().ClientAsyncRequest(req, str_msg_id)) { if (!msg_id || !msg_id_len) { return true; } TmpPtr ptr(str_msg_id); if (ptr) { ptr.ReleaseTo(msg_id, msg_id_len); return true; } else { SetLastError(ENOMEM, "out of mem"); } } return false; } int BHRequest(const void *request, const int request_len, void **proc_id, int *proc_id_len, void **reply, int *reply_len, const int timeout_ms) { MsgRequestTopic req; if (!req.ParseFromArray(request, request_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } std::string proc; MsgRequestTopicReply out_msg; if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, reply, reply_len)) { pproc.ReleaseTo(proc_id, proc_id_len); return true; } else { SetLastError(ENOMEM, "out of mem"); } } return false; } int BHReadRequest(void **proc_id, int *proc_id_len, void **request, int *request_len, void **src, const int timeout_ms) { void *src_info = 0; std::string proc; MsgRequestTopic out_msg; if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, request, request_len)) { pproc.ReleaseTo(proc_id, proc_id_len); *src = src_info; return true; } else { SetLastError(ENOMEM, "out of mem"); } } return false; } int BHSendReply(void *src, const void *reply, const int reply_len) { MsgRequestTopicReply rep; if (!rep.ParseFromArray(reply, reply_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } return ProcNode().ServerSendReply(src, rep); } void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) { TopicNode::ServerAsyncCB on_req; TopicNode::SubDataCB on_sub; TopicNode::RequestResultCB on_reply; if (server_cb) { on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) { std::string sreq(request.SerializeAsString()); server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src); }; } if (sub_cb) { on_sub = [sub_cb](const std::string &proc_id, const MsgPublish &pub) { std::string s(pub.SerializeAsString()); sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size()); }; } if (client_cb) { on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) { std::string s(rep.SerializeAsString()); client_cb(head.proc_id().data(), head.proc_id().size(), head.msg_id().data(), head.msg_id().size(), s.data(), s.size()); }; } ProcNode().Start(on_req, on_sub, on_reply); } void BHFree(void *data, int size) { free(data); } int BHGetLastError(void **msg, int *msg_len) { int ec = 0; if (msg && msg_len) { std::string err_msg; GetLastError(ec, err_msg); TmpPtr p(err_msg); p.ReleaseTo(msg, msg_len); } return ec; } #undef BH_SOCKET_MEMF_CALL