#include "bh_api.h" #include "defs.h" #include "topic_node.h" #include using namespace bhome_shm; using namespace bhome_msg; namespace { TopicNode &ProcNode() { static TopicNode node(BHomeShm()); return node; } class TmpPtr : private boost::noncopyable { void *ptr_ = 0; size_t size_ = 0; public: explicit TmpPtr(const size_t size) : ptr_(malloc(size)), size_(size) {} explicit TmpPtr(const std::string &str) : TmpPtr(str.size()) { if (ptr_) { memcpy(ptr_, str.data(), str.size()); } } ~TmpPtr() { free(ptr_); } void *get() const { return ptr_; } void *release() { void *tmp = ptr_; ptr_ = 0; return tmp; } size_t size() const { return size_; } operator bool() const { return ptr_; } }; template bool PackOutput(const Msg &msg, void **out, int *out_len) { auto size = msg.ByteSizeLong(); TmpPtr p(size); if (!p) { SetLastError(ENOMEM, "not enough memory."); return false; } msg.SerializePartialToArray(p.get(), size); *out = p.release(); *out_len = size; return true; } } // namespace bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { ProcInfo pi; if (!pi.ParseFromArray(proc_info, proc_info_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } MsgCommonReply msg_reply; if (ProcNode().Register(pi, msg_reply, timeout_ms)) { return PackOutput(msg_reply, reply, reply_len); } else { return false; } } bool BHHeartBeatEasy(const int timeout_ms) { return ProcNode().Heartbeat(timeout_ms); } bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { ProcInfo pi; if (!pi.ParseFromArray(proc_info, proc_info_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } MsgCommonReply msg_reply; if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) { return PackOutput(msg_reply, reply, reply_len); } else { return false; } } bool BHPublish(const void *msgpub, const int msgpub_len, const int timeout_ms) { MsgPublish pub; if (!pub.ParseFromArray(msgpub, msgpub_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } return ProcNode().Publish(pub, timeout_ms); } bool BHReadSub(void **proc_id, int *proc_id_len, void **msgpub, int *msgpub_len, const int timeout_ms) { std::string proc; MsgPublish pub; if (ProcNode().RecvSub(proc, pub, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(pub, msgpub, msgpub_len)) { *proc_id = pproc.release(); *proc_id_len = pproc.size(); } else { SetLastError(ENOMEM, "out of mem"); } } return false; } bool BHRequest(const void *request, const int request_len, void **proc_id, int *proc_id_len, void **reply, int *reply_len, const int timeout_ms) { MsgRequestTopic req; if (!req.ParseFromArray(request, request_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } std::string proc; MsgRequestTopicReply out_msg; if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, reply, reply_len)) { *proc_id = pproc.release(); *proc_id_len = pproc.size(); } else { SetLastError(ENOMEM, "out of mem"); } } return false; } bool BHReadRequest(void **proc_id, int *proc_id_len, void **request, int *request_len, void **src, const int timeout_ms) { void *src_info = 0; std::string proc; MsgRequestTopic out_msg; if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) { TmpPtr pproc(proc); if (pproc && PackOutput(out_msg, request, request_len)) { *proc_id = pproc.release(); *proc_id_len = pproc.size(); *src = src_info; } else { SetLastError(ENOMEM, "out of mem"); } } return false; } bool BHSendReply(void *src, const void *reply, const int reply_len, const int timeout_ms) { MsgRequestTopicReply rep; if (!rep.ParseFromArray(reply, reply_len)) { SetLastError(eInvalidInput, "invalid input."); return false; } return ProcNode().ServerSendReply(src, rep, timeout_ms); } int BHCleanUp() { return 0; } namespace { typedef std::function ServerSender; } // namespace void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb) { TopicNode::ServerCB on_req; TopicNode::SubDataCB on_sub; if (server_cb) { on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) { std::string sreq(request.SerializeAsString()); bool r = false; ServerSender sender = [&](const void *p, const int len) { r = reply.ParseFromArray(p, len); return r; }; server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender)); return r; }; } if (sub_cb) { on_sub = [sub_cb](const std::string &proc_id, const MsgPublish &pub) { std::string s(pub.SerializeAsString()); sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size()); }; } ProcNode().Start(on_req, on_sub); } bool BHServerCallbackReply(const BHServerCallbackTag *tag, const void *data, const int data_len) { auto &sender = *(const ServerSender *) (tag); return sender(data, data_len); } void BHFree(void *data, int size) { free(data); } int BHGetLastError(void **msg, int *msg_len) { int ec = 0; if (msg && msg_len) { std::string err_msg; GetLastError(ec, err_msg); TmpPtr p(err_msg); if (p) { *msg = p.release(); *msg_len = p.size(); } } return ec; } #undef BH_SOCKET_MEMF_CALL