From c338820e4db43ad32c20ff429a038b06bcb980f8 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 08 四月 2021 18:13:25 +0800 Subject: [PATCH] BIG change, join center,bus; now msg is head+body. --- .vscode/tasks.json | 4 .gitignore | 4 src/socket.cpp | 73 + utest/utest.cpp | 75 + src/topic_node.cpp | 322 ++++++++++ src/center.cpp | 402 ++++++++++++ src/msg.cpp | 145 --- utest/speed_test.cpp | 68 + proto/source/bhome_msg_api.proto | 71 ++ src/center.h | 3 src/proto.cpp | 41 + src/socket.h | 97 ++ utest/simple_tests.cpp | 10 src/proto.h | 78 ++ .vscode/settings.json | 7 src/msg.h | 69 + proto/source/bhome_msg.proto | 84 -- proto/source/error_msg.proto | 5 src/shm_queue.cpp | 19 /dev/null | 108 --- .vscode/launch.json | 2 src/pubsub.h | 12 src/shm_queue.h | 10 src/pubsub.cpp | 56 + src/topic_node.h | 121 +++ 25 files changed, 1,406 insertions(+), 480 deletions(-) diff --git a/.gitignore b/.gitignore index 5c7daa3..8e81403 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,8 @@ *.un~ build/ +debug/ +release/ Makefile utest/utest +*.bak +gmon.out diff --git a/.vscode/launch.json b/.vscode/launch.json index 9eeb23e..ef42f7b 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -8,7 +8,7 @@ "name": "g++ - Build and debug active file", "type": "cppdbg", "request": "launch", - "program": "${workspaceFolder}/utest/utest", + "program": "${workspaceFolder}/debug/bin/utest", "args": [ "-t", "ReqRepTest" diff --git a/.vscode/settings.json b/.vscode/settings.json index d5005e9..7928bc8 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -55,7 +55,12 @@ "cinttypes": "cpp", "typeindex": "cpp", "typeinfo": "cpp", - "variant": "cpp" + "variant": "cpp", + "iomanip": "cpp", + "*.inc": "cpp", + "strstream": "cpp", + "unordered_set": "cpp", + "cfenv": "cpp" }, "files.exclude": { "**/*.un~": true diff --git a/.vscode/tasks.json b/.vscode/tasks.json index 84142bc..db457e5 100644 --- a/.vscode/tasks.json +++ b/.vscode/tasks.json @@ -6,10 +6,10 @@ "command": "ninja", "args": [ "-C", - "../build" + "debug" ], "options": { - "cwd": "${workspaceFolder}/utest" + "cwd": "${workspaceFolder}" }, "problemMatcher": [ "$gcc" diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto index 9827f17..b06b692 100644 --- a/proto/source/bhome_msg.proto +++ b/proto/source/bhome_msg.proto @@ -1,39 +1,21 @@ syntax = "proto3"; - option optimize_for = LITE_RUNTIME; -import "google/protobuf/descriptor.proto"; -import "error_msg.proto"; +// import "google/protobuf/descriptor.proto"; +import "bhome_msg_api.proto"; package bhome.msg; -// message format : header(BHMsgHead) + body(variable types) -message BHAddress { - bytes mq_id = 1; // mqid, uuid - bytes ip = 2; // - int32 port = 3; -} - -message ProcInfo -{ - bytes id = 1; // serial number, maybe managed - bytes name = 2; - bytes public_info = 3; - bytes private_info = 4; -} +// message format : head_len(4) + head(BHMsgHead) + body_len(4) + body(variable types) message BHMsgHead { bytes msg_id = 1; repeated BHAddress route = 2; // for reply and proxy. int64 timestamp = 3; int32 type = 4; - ProcInfo proc = 5; + bytes proc_id = 5; bytes topic = 6; // for request route -} - -message BHMsgBody { - bytes data = 1; } message BHMsg { // deprecated @@ -46,6 +28,7 @@ enum MsgType { kMsgTypeInvalid = 0; + kMsgTypeRawData = 1; kMsgTypeCommonReply = 2; @@ -57,57 +40,16 @@ kMsgTypeQueryTopicReply = 15; kMsgTypeRequestTopic = 16; kMsgTypeRequestTopicReply = 17; + kMsgTypeRegisterRPC = 18; + // reply - kMsgTypePublish = 100; - // kMsgTypePublishReply = 101; - kMsgTypeSubscribe = 102; - // kMsgTypeSubscribeReply = 103; - kMsgTypeUnsubscribe = 104; - // kMsgTypeUnsubscribeReply = 105; + kMsgTypePublish = 20; + // kMsgTypePublishReply = 21; + kMsgTypeSubscribe = 22; + // kMsgTypeSubscribeReply = 23; + kMsgTypeUnsubscribe = 24; + // kMsgTypeUnsubscribeReply = 25; -} - -message MsgPub { - bytes topic = 1; - bytes data = 2; -} - -message MsgSub { - repeated bytes topics = 1; -} - -message MsgCommonReply { - ErrorMsg errmsg = 1; -} - -message MsgRequestTopic { - bytes topic = 1; - bytes data = 2; -} - -message MsgRequestTopicReply { - ErrorMsg errmsg = 1; - bytes data = 2; -} - -message MsgRegister -{ - ProcInfo proc = 1; - repeated bytes topics = 2; -} - -message MsgHeartbeat -{ - ProcInfo proc = 1; -} - -message MsgQueryTopic { - bytes topic = 1; -} - -message MsgQueryTopicReply { - ErrorMsg errmsg = 1; - BHAddress address = 2; } service TopicRPC { diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto new file mode 100644 index 0000000..82b8115 --- /dev/null +++ b/proto/source/bhome_msg_api.proto @@ -0,0 +1,71 @@ +syntax = "proto3"; +option optimize_for = LITE_RUNTIME; + +// public messages +import "error_msg.proto"; + +package bhome.msg; + +message BHAddress { + bytes mq_id = 1; // mqid, uuid + // bytes ip = 2; // + // int32 port = 3; +} + +message ProcInfo +{ + bytes proc_id = 1; // serial number, maybe managed + bytes name = 2; + bytes public_info = 3; // maybe json. + bytes private_info = 4; +} + +message MsgPublish { + bytes topic = 1; + bytes data = 2; +} + +message MsgSubscribe { + repeated bytes topics = 1; +} +message MsgUnsubscribe { + repeated bytes topics = 1; +} + +message MsgCommonReply { + ErrorMsg errmsg = 1; +} + +message MsgRequestTopic { + bytes topic = 1; + bytes data = 2; +} + +message MsgRequestTopicReply { + ErrorMsg errmsg = 1; + bytes data = 2; +} + +message MsgRegister +{ + ProcInfo proc = 1; +} + +message MsgRegisterRPC +{ + repeated bytes topics = 1; +} + +message MsgHeartbeat +{ + ProcInfo proc = 1; +} + +message MsgQueryTopic { + bytes topic = 1; +} + +message MsgQueryTopicReply { + ErrorMsg errmsg = 1; + BHAddress address = 2; +} diff --git a/proto/source/error_msg.proto b/proto/source/error_msg.proto index f283108..b85ddb3 100644 --- a/proto/source/error_msg.proto +++ b/proto/source/error_msg.proto @@ -8,6 +8,11 @@ eSuccess = 0; eError = 1; eInvalidInput = 2; + eNotRegistered = 3; + eNotFound = 4; + eOffline = 5; + eNoRespond = 6; + eAddressNotMatch = 7; } message ErrorMsg { diff --git a/src/center.cpp b/src/center.cpp index a3897fb..fe549b7 100644 --- a/src/center.cpp +++ b/src/center.cpp @@ -16,20 +16,387 @@ * ===================================================================================== */ #include "center.h" +#include "bh_util.h" #include "defs.h" -#include "pubsub_center.h" -#include "reqrep_center.h" #include "shm.h" +#include <set> using namespace bhome_shm; +using namespace bhome_msg; +using namespace bhome::msg; typedef BHCenter::MsgHandler Handler; -Handler Join(Handler h1, Handler h2) +namespace { - return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg) { - return h1(socket, imsg, msg) || h2(socket, imsg, msg); +auto Now = []() { time_t t; return time(&t); }; + +//TODO check proc_id +class NodeCenter +{ +public: + typedef std::string ProcId; + typedef std::string Address; + typedef bhome::msg::ProcInfo ProcInfo; + +private: + enum { + kStateInvalid = 0, + kStateNormal = 1, + kStateNoRespond = 2, + kStateOffline = 3, + }; + + struct ProcState { + time_t timestamp_ = 0; + uint32_t flag_ = 0; // reserved + }; + typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; + + struct NodeInfo { + ProcState state_; // state + Address addr_; // registered_mqid. + ProcInfo proc_; // + AddressTopics services_; // address: topics + AddressTopics subscriptions_; // address: topics + }; + typedef std::shared_ptr<NodeInfo> Node; + typedef std::weak_ptr<NodeInfo> WeakNode; + + struct TopicDest { + Address mq_; + WeakNode weak_node_; + bool operator<(const TopicDest &a) const { return mq_ < a.mq_; } + }; + const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } + +public: + typedef std::set<TopicDest> Clients; + + NodeCenter(const std::string &id = "#Center") : + id_(id) {} + const std::string &id() const { return id_; } // no need to lock. + + //TODO maybe just return serialized string. + MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) + { + if (msg.proc().proc_id() != head.proc_id()) { + return MakeReply(eInvalidInput, "invalid proc id."); + } + + try { + Node node(new NodeInfo); + node->addr_ = SrcAddr(head); + node->proc_.Swap(msg.mutable_proc()); + node->state_.timestamp_ = Now(); + node->state_.flag_ = kStateNormal; + nodes_[node->proc_.proc_id()] = node; + return MakeReply(eSuccess); + } catch (...) { + return MakeReply(eError, "register node error."); + } + } + template <class OnSuccess, class OnError> + auto HandleMsg(const BHMsgHead &head, OnSuccess onOk, OnError onErr) + { + auto pos = nodes_.find(head.proc_id()); + if (pos == nodes_.end()) { + return onErr(eNotRegistered, "Node is not registered."); + } else { + auto node = pos->second; + if (head.type() == kMsgTypeHeartbeat && node->addr_ != SrcAddr(head)) { + return onErr(eAddressNotMatch, "Node address error."); + } else if (!Valid(*node)) { + return onErr(eNoRespond, "Node is not alive."); + } else { + return onOk(node); + } + } + } + + template <class Reply, class Func> + Reply HandleMsg(const BHMsgHead &head, Func const &op) + { + try { + auto onErr = [](const ErrorCode ec, const std::string &str) { return MakeReply<Reply>(ec, str); }; + return HandleMsg(head, op, onErr); + + auto pos = nodes_.find(head.proc_id()); + if (pos == nodes_.end()) { + return MakeReply<Reply>(eNotRegistered, "Node is not registered."); + } else { + auto node = pos->second; + if (node->addr_ != SrcAddr(head)) { + return MakeReply<Reply>(eAddressNotMatch, "Node address error."); + } else if (!Valid(*node)) { + return MakeReply<Reply>(eNoRespond, "Node is not alive."); + } else { + return op(node); + } + } + } catch (...) { + //TODO error log + return MakeReply<Reply>(eError, "internal error."); + } + } + template <class Func> + inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op) + { + return HandleMsg<MsgCommonReply, Func>(head, op); + } + + MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) + { + return HandleMsg( + head, [&](Node node) -> MsgCommonReply { + auto &src = SrcAddr(head); + node->services_[src].insert(msg.topics().begin(), msg.topics().end()); + TopicDest dest = {src, node}; + for (auto &topic : msg.topics()) { + service_map_[topic].insert(dest); + } + return MakeReply(eSuccess); + }); + } + + MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg) + { + return HandleMsg(head, [&](Node node) { + NodeInfo &ni = *node; + ni.state_.timestamp_ = Now(); + auto &info = msg.proc(); + if (!info.public_info().empty()) { + ni.proc_.set_public_info(info.public_info()); + } + if (!info.private_info().empty()) { + ni.proc_.set_private_info(info.private_info()); + } + return MakeReply(eSuccess); + }); + } + + MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) + { + typedef MsgQueryTopicReply Reply; + + auto query = [&](Node self) -> MsgQueryTopicReply { + auto pos = service_map_.find(req.topic()); + if (pos != service_map_.end() && !pos->second.empty()) { + // now just find first one. + const TopicDest &dest = *(pos->second.begin()); + Node dest_node(dest.weak_node_.lock()); + if (!dest_node) { + service_map_.erase(pos); + return MakeReply<Reply>(eOffline, "topic server offline."); + } else if (!Valid(*dest_node)) { + return MakeReply<Reply>(eNoRespond, "topic server not responding."); + } else { + MsgQueryTopicReply reply = MakeReply<Reply>(eSuccess); + reply.mutable_address()->set_mq_id(dest.mq_); + return reply; + } + + } else { + return MakeReply<Reply>(eNotFound, "topic server not found."); + } + }; + + return HandleMsg<Reply>(head, query); + } + + MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) + { + return HandleMsg(head, [&](Node node) { + auto &src = SrcAddr(head); + node->subscriptions_[src].insert(msg.topics().begin(), msg.topics().end()); + TopicDest dest = {src, node}; + for (auto &topic : msg.topics()) { + subscribe_map_[topic].insert(dest); + } + return MakeReply(eSuccess); + }); + } + MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) + { + return HandleMsg(head, [&](Node node) { + auto &src = SrcAddr(head); + auto pos = node->subscriptions_.find(src); + + auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { + auto pos = subscribe_map_.find(topic); + if (pos != subscribe_map_.end() && + pos->second.erase(dest) != 0 && + pos->second.empty()) { + subscribe_map_.erase(pos); + } + }; + + if (pos != node->subscriptions_.end()) { + const TopicDest &dest = {src, node}; + // clear node sub records; + for (auto &topic : msg.topics()) { + pos->second.erase(topic); + RemoveSubTopicDestRecord(topic, dest); + } + if (pos->second.empty()) { + node->subscriptions_.erase(pos); + } + } + return MakeReply(eSuccess); + }); + } + + Clients DoFindClients(const std::string &topic) + { + Clients dests; + auto Find1 = [&](const std::string &t) { + auto pos = subscribe_map_.find(topic); + if (pos != subscribe_map_.end()) { + auto &clients = pos->second; + for (auto &cli : clients) { + if (Valid(cli.weak_node_)) { + dests.insert(cli); + } + } + } + }; + Find1(topic); + + size_t pos = 0; + while (true) { + pos = topic.find(kTopicSep, pos); + if (pos == topic.npos || ++pos == topic.size()) { + // Find1(std::string()); // sub all. + break; + } else { + Find1(topic.substr(0, pos)); + } + } + return dests; + } + bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) + { + bool ret = false; + HandleMsg(head, [&](Node node) { + DoFindClients(msg.topic()).swap(out); + ret = true; + return MakeReply(eSuccess); + }).Swap(&reply); + return ret; + } + +private: + bool Valid(const NodeInfo &node) + { + return node.state_.flag_ == kStateNormal; + } + bool Valid(const WeakNode &weak) + { + auto node = weak.lock(); + return node && Valid(*node); + } + void CheckAllNodes(); //TODO, call it in timer. + std::string id_; // center proc id; + + std::unordered_map<Topic, Clients> service_map_; + std::unordered_map<Topic, Clients> subscribe_map_; + std::unordered_map<ProcId, Node> nodes_; +}; + +template <class Body, class OnMsg, class Replyer> +inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) +{ + if (head.route_size() != 1) { return; } + Body body; + if (msg.ParseBody(body)) { + replyer(onmsg(body)); + } +} + +Handler Combine(const Handler &h1, const Handler &h2) +{ + return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) { + return h1(socket, msg, head) || h2(socket, msg, head); }; } +template <class... H> +Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) +{ + return Combine(Combine(h0, h1), h2, rest...); +} + +#define CASE_ON_MSG_TYPE(MsgTag) \ + case kMsgType##MsgTag: \ + Dispatch<Msg##MsgTag>( \ + msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ + return true; + +bool InstallCenter() +{ + auto center_ptr = std::make_shared<Synced<NodeCenter>>(); + auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { + return [&](auto &&rep_body) { + auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); + bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10); + if (!r) { + printf("send reply failed.\n"); + } + //TODO resend failed. + }; + }; + + auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto ¢er = *center_ptr; + auto replyer = MakeReplyer(socket, head, center->id()); + switch (head.type()) { + CASE_ON_MSG_TYPE(Register); + CASE_ON_MSG_TYPE(Heartbeat); + + CASE_ON_MSG_TYPE(RegisterRPC); + CASE_ON_MSG_TYPE(QueryTopic); + default: return false; + } + }; + + auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { + auto ¢er = *center_ptr; + auto replyer = MakeReplyer(socket, head, center->id()); + auto OnPublish = [&]() { + MsgPublish pub; + NodeCenter::Clients clients; + MsgCommonReply reply; + MsgI pubmsg; + if (head.route_size() != 1 || !msg.ParseBody(pub)) { + return; + } else if (!center->FindClients(head, pub, clients, reply)) { + // send error reply. + MakeReplyer(socket, head, center->id())(reply); + } else if (pubmsg.MakeRC(socket.shm(), msg)) { + DEFER1(pubmsg.Release(socket.shm())); + for (auto &cli : clients) { + auto node = cli.weak_node_.lock(); + if (node) { + socket.Send(cli.mq_.data(), pubmsg, 10); + } + } + } + }; + switch (head.type()) { + CASE_ON_MSG_TYPE(Subscribe); + CASE_ON_MSG_TYPE(Unsubscribe); + case kMsgTypePublish: OnPublish(); return true; + default: return false; + } + }; + + BHCenter::Install("#center.reg", OnCenter, BHTopicCenterAddress(), 1000); + BHCenter::Install("#center.bus", OnPubSub, BHTopicBusAddress(), 1000); + + return true; +} + +#undef CASE_ON_MSG_TYPE + +} // namespace SharedMemory &BHomeShm() { @@ -42,17 +409,24 @@ static CenterRecords rec; return rec; } + bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len) { - CenterRecords()[name] = CenterInfo{name, handler, mqid, mq_len}; + Centers()[name] = CenterInfo{name, handler, mqid, mq_len}; + return true; +} +bool BHCenter::Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len) +{ + return Install(name, handler, std::string((const char *) &mqid, sizeof(mqid)), mq_len); } BHCenter::BHCenter(Socket::Shm &shm) { - sockets_["center"] = std::make_shared<ShmSocket>(shm, &BHTopicCenterAddress(), 1000); - sockets_["bus"] = std::make_shared<ShmSocket>(shm, &BHTopicBusAddress(), 1000); + InstallCenter(); + for (auto &kv : Centers()) { - sockets_[kv.first] = std::make_shared<ShmSocket>(shm, kv.second.mqid_.data(), kv.second.mq_len_); + auto &info = kv.second; + sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_); } } @@ -61,16 +435,12 @@ bool BHCenter::Start() { - auto onCenter = MakeReqRepCenter(); - auto onBus = MakeBusCenter(); - sockets_["center"]->Start(onCenter); - sockets_["bus"]->Start(onBus); - for (auto &kv : Centers()) { - sockets_[kv.first]->Start(kv.second.handler_); + auto &info = kv.second; + sockets_[info.name_]->Start(info.handler_); } + return true; - // socket_.Start(Join(onCenter, onBus)); } bool BHCenter::Stop() diff --git a/src/center.h b/src/center.h index 02ec8f4..920addd 100644 --- a/src/center.h +++ b/src/center.h @@ -28,8 +28,9 @@ typedef ShmSocket Socket; public: - typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler; + typedef Socket::PartialRecvCB MsgHandler; static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len); + static bool Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len); BHCenter(Socket::Shm &shm); BHCenter(); diff --git a/src/msg.cpp b/src/msg.cpp index 8752066..c353d84 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -25,140 +25,38 @@ center accept request and route.; //*/ const uint32_t kMsgTag = 0xf1e2d3c4; -const uint32_t kMsgPrefixLen = 4; -inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); } - -std::string RandId() +void *MsgI::Pack(SharedMemory &shm, + const uint32_t head_len, const ToArray &headToArray, + const uint32_t body_len, const ToArray &bodyToArray) { - boost::uuids::uuid id = boost::uuids::random_generator()(); - return std::string((char *) &id, sizeof(id)); -} -BHMsg InitMsg(MsgType type, const std::string &msgid = RandId()) -{ - BHMsg msg; - msg.set_msg_id(msgid); - msg.set_type(type); - time_t tm = 0; - msg.set_timestamp(time(&tm)); - return msg; -} - -BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size) -{ - BHMsg msg(InitMsg(kMsgTypeRequestTopic)); - AddRoute(msg, src_id); - MsgRequestTopic req; - req.set_topic(topic); - req.set_data(data, size); - msg.set_body(req.SerializeAsString()); - return msg; -} - -BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics) -{ - BHMsg msg(InitMsg(kMsgTypeRegister)); - AddRoute(msg, src_id); - MsgRegister reg; - reg.mutable_proc()->Swap(&info); - for (auto &t : topics) { - reg.add_topics(t); + void *addr = shm.Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len); + if (addr) { + auto p = static_cast<char *>(addr); + auto Pack1 = [&p](auto len, auto &writer) { + Put32(p, len); + p += sizeof(len); + writer(p, len); + p += len; + }; + Pack1(head_len, headToArray); + Pack1(body_len, bodyToArray); } - msg.set_body(reg.SerializeAsString()); - return msg; + return addr; } -BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info) +bool MsgI::ParseHead(BHMsgHead &head) const { - BHMsg msg(InitMsg(kMsgTypeHeartbeat)); - AddRoute(msg, src_id); - MsgHeartbeat reg; - reg.mutable_proc()->Swap(&info); - msg.set_body(reg.SerializeAsString()); - return msg; -} - -BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size) -{ - assert(data && size); - BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid)); - MsgRequestTopicReply reply; - reply.set_data(data, size); - msg.set_body(reply.SerializeAsString()); - return msg; -} - -BHMsg MakeSubUnsub(const MQId &client, const std::vector<std::string> &topics, const MsgType sub_unsub) -{ - assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe); - BHMsg msg(InitMsg(sub_unsub)); - AddRoute(msg, client); - MsgSub subs; - for (auto &t : topics) { - subs.add_topics(t); - } - msg.set_body(subs.SerializeAsString()); - return msg; -} - -BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); } -BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeUnsubscribe); } - -BHMsg MakePub(const std::string &topic, const void *data, const size_t size) -{ - assert(data && size); - BHMsg msg(InitMsg(kMsgTypePublish)); - MsgPub pub; - pub.set_topic(topic); - pub.set_data(data, size); - msg.set_body(pub.SerializeAsString()); - return msg; -} - -BHMsg MakeQueryTopic(const MQId &client, const std::string &topic) -{ - BHMsg msg(InitMsg(kMsgTypeQueryTopic)); - AddRoute(msg, client); - MsgQueryTopic query; - query.set_topic(topic); - msg.set_body(query.SerializeAsString()); - return msg; -} -BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid) -{ - BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid)); - MsgQueryTopicReply reply; - reply.mutable_address()->set_mq_id(mqid); - msg.set_body(reply.SerializeAsString()); - return msg; -} - -void *Pack(SharedMemory &shm, const BHMsg &msg) -{ - uint32_t msg_size = msg.ByteSizeLong(); - void *p = shm.Alloc(4 + msg_size); - if (p) { - Put32(p, msg_size); - if (!msg.SerializeToArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size)) { - shm.Dealloc(p); - p = 0; - } - } - return p; -} - -bool MsgI::Unpack(BHMsg &msg) const -{ - void *p = ptr_.get(); + auto p = static_cast<char *>(ptr_.get()); assert(p); uint32_t msg_size = Get32(p); - return msg.ParseFromArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size); + p += 4; + return head.ParseFromArray(p, msg_size); } // with ref count; -bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg) +bool MsgI::MakeRC(SharedMemory &shm, void *p) { - void *p = Pack(shm, msg); if (!p) { return false; } @@ -171,9 +69,8 @@ return true; } -bool MsgI::Make(SharedMemory &shm, const BHMsg &msg) +bool MsgI::Make(SharedMemory &shm, void *p) { - void *p = Pack(shm, msg); if (!p) { return false; } diff --git a/src/msg.h b/src/msg.h index 30b3208..661d989 100644 --- a/src/msg.h +++ b/src/msg.h @@ -18,10 +18,12 @@ #ifndef MSG_5BILLZET #define MSG_5BILLZET -#include "bhome_msg.pb.h" +#include "bh_util.h" +#include "proto.h" #include "shm.h" #include <boost/interprocess/offset_ptr.hpp> #include <boost/uuid/uuid_generators.hpp> +#include <functional> #include <stdint.h> namespace bhome_msg @@ -59,16 +61,6 @@ int num_ = 1; }; -BHMsg MakeQueryTopic(const MQId &client, const std::string &topic); -BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid); -BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size); -BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size); -BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics); -BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info); -BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics); -BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics); -BHMsg MakePub(const std::string &topic, const void *data, const size_t size); - // message content layout: header_size + header + data_size + data class MsgI { @@ -76,7 +68,22 @@ offset_ptr<void> ptr_; offset_ptr<RefCount> count_; - bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub); + typedef std::function<void(void *p, int len)> ToArray; + void *Pack(SharedMemory &shm, + const uint32_t head_len, const ToArray &headToArray, + const uint32_t body_len, const ToArray &bodyToArray); + + template <class Body> + void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body) + { + return Pack( + shm, + uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); }, + uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); + } + + bool MakeRC(SharedMemory &shm, void *addr); + bool Make(SharedMemory &shm, void *addr); public: MsgI(void *p = 0, RefCount *c = 0) : @@ -97,9 +104,41 @@ int Count() const { return IsCounted() ? count_->Get() : 1; } bool IsCounted() const { return static_cast<bool>(count_); } - bool Make(SharedMemory &shm, const BHMsg &msg); - bool MakeRC(SharedMemory &shm, const BHMsg &msg); - bool Unpack(BHMsg &msg) const; + template <class Body> + bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) + { + return Make(shm, Pack(shm, head, body)); + } + template <class Body> + bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body) + { + return MakeRC(shm, Pack(shm, head, body)); + } + bool MakeRC(SharedMemory &shm, MsgI &a) + { + if (a.IsCounted()) { + *this = a; + AddRef(); + return true; + } else { + void *p = a.ptr_.get(); + a.ptr_ = 0; + return MakeRC(shm, p); + } + } + bool ParseHead(BHMsgHead &head) const; + template <class Body> + bool ParseBody(Body &body) const + { + auto p = static_cast<char *>(ptr_.get()); + assert(p); + uint32_t size = Get32(p); + p += 4; + p += size; + size = Get32(p); + p += 4; + return body.ParseFromArray(p, size); + } }; inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); } diff --git a/src/proto.cpp b/src/proto.cpp new file mode 100644 index 0000000..0ec894f --- /dev/null +++ b/src/proto.cpp @@ -0,0 +1,41 @@ +/* + * ===================================================================================== + * + * Filename: proto.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�07鏃� 17鏃�04鍒�36绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#include "proto.h" +#include <boost/uuid/uuid_generators.hpp> + +std::string RandId() +{ + boost::uuids::uuid id = boost::uuids::random_generator()(); + return std::string((char *) &id, sizeof(id)); +} + +BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id) +{ + return InitMsgHead(type, proc_id, RandId()); +} + +BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid) +{ + BHMsgHead msg; + msg.set_msg_id(msgid); + msg.set_type(type); + msg.set_proc_id(proc_id); + time_t tm = 0; + msg.set_timestamp(time(&tm)); + return msg; +} diff --git a/src/proto.h b/src/proto.h new file mode 100644 index 0000000..2057711 --- /dev/null +++ b/src/proto.h @@ -0,0 +1,78 @@ +/* + * ===================================================================================== + * + * Filename: proto.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�07鏃� 13鏃�48鍒�51绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef PROTO_UA9UWKL1 +#define PROTO_UA9UWKL1 + +#include "bhome_msg.pb.h" + +using namespace bhome::msg; + +template <class Msg> +struct MsgToType { +}; + +#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE) \ + template <> \ + struct MsgToType<mSG> { \ + static const bhome::msg::MsgType value = tYPE; \ + }; + +#define BHOME_SIMPLE_MAP_MSG(name) BHOME_MAP_MSG_AND_TYPE(Msg##name, kMsgType##name) + +BHOME_SIMPLE_MAP_MSG(CommonReply); +BHOME_SIMPLE_MAP_MSG(Register); +BHOME_SIMPLE_MAP_MSG(RegisterRPC); +BHOME_SIMPLE_MAP_MSG(Heartbeat); +BHOME_SIMPLE_MAP_MSG(QueryTopic); +BHOME_SIMPLE_MAP_MSG(QueryTopicReply); +BHOME_SIMPLE_MAP_MSG(RequestTopic); +BHOME_SIMPLE_MAP_MSG(RequestTopicReply); +BHOME_SIMPLE_MAP_MSG(Publish); +BHOME_SIMPLE_MAP_MSG(Subscribe); +BHOME_SIMPLE_MAP_MSG(Unsubscribe); + +#undef BHOME_SIMPLE_MAP_MSG +#undef BHOME_MAP_MSG_AND_TYPE + +template <class Msg> +constexpr inline bhome::msg::MsgType GetType(const Msg &) +{ + return MsgToType<Msg>::value; +} + +inline void SetError(ErrorMsg &em, const ErrorCode err_code, const std::string &err_str = "") +{ + em.set_errcode(err_code); + if (!err_str.empty()) { + em.set_errstring(err_str); + } +} + +template <class Reply = MsgCommonReply> +inline Reply MakeReply(const ErrorCode err_code, const std::string &err_str = "") +{ + Reply msg; + SetError(*msg.mutable_errmsg(), err_code, err_str); + return msg; +} + +BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid); +BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id); +// inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } + +#endif // end of include guard: PROTO_UA9UWKL1 diff --git a/src/pubsub.cpp b/src/pubsub.cpp index 0266c86..471c63c 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -22,24 +22,38 @@ using namespace std::chrono_literals; using namespace bhome_msg; -bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms) +bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms) { try { + MsgPublish pub; + pub.set_topic(topic); + pub.set_data(data, size); + BHMsgHead head(InitMsgHead(GetType(pub), proc_id)); MsgI imsg; - if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) { - return false; + if (imsg.MakeRC(shm(), head, pub)) { + DEFER1(imsg.Release(shm())); + return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms); } - DEFER1(imsg.Release(shm())); - return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms); } catch (...) { - return false; } + return false; } +namespace +{ +inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } -bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms) +} // namespace +bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms) { try { - return mq().Send(BHTopicBusAddress(), MakeSub(mq().Id(), topics), timeout_ms); + MsgSubscribe sub; + for (auto &topic : topics) { + sub.add_topics(topic); + } + BHMsgHead head(InitMsgHead(GetType(sub), proc_id)); + AddRoute(head, mq().Id()); + + return Send(&BHTopicBusAddress(), head, sub, timeout_ms); } catch (...) { return false; } @@ -47,11 +61,11 @@ bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker) { - auto AsyncRecvProc = [this, tdcb](BHMsg &msg) { - if (msg.type() == kMsgTypePublish) { - MsgPub d; - if (d.ParseFromString(msg.body())) { - tdcb(d.topic(), d.data()); + auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { + if (head.type() == kMsgTypePublish) { + MsgPublish pub; + if (imsg.ParseBody(pub)) { + tdcb(head.proc_id(), pub.topic(), pub.data()); } } else { // ignored, or dropped @@ -61,14 +75,16 @@ return tdcb && Start(AsyncRecvProc, nworker); } -bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms) +bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms) { - BHMsg msg; - if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) { - MsgPub d; - if (d.ParseFromString(msg.body())) { - d.mutable_topic()->swap(topic); - d.mutable_data()->swap(data); + MsgI msg; + BHMsgHead head; + if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { + MsgPublish pub; + if (msg.ParseBody(pub)) { + head.mutable_proc_id()->swap(proc_id); + pub.mutable_topic()->swap(topic); + pub.mutable_data()->swap(data); return true; } } diff --git a/src/pubsub.h b/src/pubsub.h index 3c3d4ad..bd60fcd 100644 --- a/src/pubsub.h +++ b/src/pubsub.h @@ -33,11 +33,7 @@ shm_(shm) {} SocketPublish() : SocketPublish(BHomeShm()) {} - bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms); - bool Publish(const Topic &topic, const std::string &data, const int timeout_ms) - { - return Publish(topic, data.data(), data.size(), timeout_ms); - } + bool Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms); }; // socket subscribe @@ -52,11 +48,11 @@ SocketSubscribe(BHomeShm()) {} ~SocketSubscribe() { Stop(); } - typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB; + typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB; bool StartRecv(const TopicDataCB &tdcb, int nworker = 2); bool Stop() { return Socket::Stop(); } - bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms); - bool RecvSub(Topic &topic, std::string &data, const int timeout_ms); + bool Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms); + bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms); }; #endif // end of include guard: PUBSUB_4KGRA997 diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp deleted file mode 100644 index 698327e..0000000 --- a/src/pubsub_center.cpp +++ /dev/null @@ -1,147 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: pubsub_center.cpp - * - * Description: pub/sub center/manager - * - * Version: 1.0 - * Created: 2021骞�04鏈�01鏃� 09鏃�29鍒�04绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), - * Organization: - * - * ===================================================================================== - */ -#include "pubsub_center.h" -#include "bh_util.h" -using namespace bhome_shm; -namespace -{ -class BusCenter -{ - typedef std::set<MQId> Clients; - std::unordered_map<Topic, Clients> records_; - // todo cache data if send fail. - -public: - template <class Iter> - void SubScribe(const MQId &client, Iter topic_begin, Iter topic_end) - { - for (auto it = topic_begin; it != topic_end; ++it) { - records_[*it].insert(client); - } - } - template <class Iter> - void UnsubScribe(const MQId &client, Iter topic_begin, Iter topic_end) - { - for (auto it = topic_begin; it != topic_end; ++it) { - auto pos = records_.find(*it); - if (pos != records_.end()) { - if (pos->second.erase(client) && pos->second.empty()) { - records_.erase(pos); - } - } - } - }; - Clients FindClients(const std::string &topic) - { - Clients dests; - auto Find1 = [&](const std::string &t) { - auto pos = records_.find(topic); - if (pos != records_.end() && !pos->second.empty()) { - auto &clients = pos->second; - for (auto &cli : clients) { - dests.insert(cli); - } - } - }; - Find1(topic); - - //TODO check and adjust topic on client side sub/pub. - size_t pos = 0; - while (true) { - pos = topic.find(kTopicSep, pos); - if (pos == topic.npos || ++pos == topic.size()) { - // Find1(std::string()); // sub all. - break; - } else { - Find1(topic.substr(0, pos)); - } - } - return dests; - } -}; - -} // namespace - -BHCenter::MsgHandler MakeBusCenter() -{ - auto bus_ptr = std::make_shared<Synced<BusCenter>>(); - - return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { -#ifndef NDEBUG - static std::atomic<time_t> last(0); - time_t now = 0; - time(&now); - if (last.exchange(now) < now) { - printf("bus queue size: %ld\n", socket.Pending()); - } -#endif - auto &bus = *bus_ptr; - auto &shm = socket.shm(); - - auto OnSubChange = [&](auto &&update) { - MsgSub sub; - if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) { - assert(sizeof(MQId) == msg.route(0).mq_id().size()); - MQId client; - memcpy(&client, msg.route(0).mq_id().data(), sizeof(client)); - update(client, sub.topics()); - } - }; - auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); }; - auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); }; - - auto OnPublish = [&]() { - MsgPub pub; - if (!pub.ParseFromString(msg.body())) { - return; - } - auto Dispatch = [&](auto &&send1) { - const auto &clients(bus->FindClients(pub.topic())); - for (auto &cli : clients) { - send1(cli); - } - }; - - if (imsg.IsCounted()) { - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); }); - } else { - MsgI pubmsg; - if (!pubmsg.MakeRC(shm, msg)) { return; } - DEFER1(pubmsg.Release(shm)); - - Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); }); - } - }; - - switch (msg.type()) { - case kMsgTypeSubscribe: OnSubChange(Sub); return true; - case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true; - case kMsgTypePublish: OnPublish(); return true; - default: return false; - } - }; -} - -bool PubSubCenter::Start(const int nworker) -{ - auto handler = MakeBusCenter(); - printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler)); - - const int kMaxWorker = 16; - return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); -} \ No newline at end of file diff --git a/src/pubsub_center.h b/src/pubsub_center.h deleted file mode 100644 index f81fa0e..0000000 --- a/src/pubsub_center.h +++ /dev/null @@ -1,45 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: pubsub_center.h - * - * Description: - * - * Version: 1.0 - * Created: 2021骞�04鏈�01鏃� 09鏃�29鍒�39绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), - * Organization: - * - * ===================================================================================== - */ -#ifndef PUBSUB_CENTER_MFSUZJU7 -#define PUBSUB_CENTER_MFSUZJU7 - -#include "center.h" -#include "defs.h" -#include "socket.h" -#include <mutex> -#include <set> -#include <unordered_map> - -BHCenter::MsgHandler MakeBusCenter(); - -// publish/subcribe manager. -class PubSubCenter -{ - ShmSocket socket_; - -public: - PubSubCenter(ShmSocket::Shm &shm) : - socket_(shm, &BHTopicBusAddress(), 1000) {} - PubSubCenter() : - PubSubCenter(BHomeShm()) {} - ~PubSubCenter() { Stop(); } - bool Start(const int nworker = 2); - bool Stop() { return socket_.Stop(); } -}; - -#endif // end of include guard: PUBSUB_CENTER_MFSUZJU7 diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp deleted file mode 100644 index ce35d1c..0000000 --- a/src/reqrep_center.cpp +++ /dev/null @@ -1,173 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: reqrep_center.cpp - * - * Description: topic request/reply center - * - * Version: 1.0 - * Created: 2021骞�04鏈�01鏃� 14鏃�08鍒�50绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), - * Organization: - * - * ===================================================================================== - */ -#include "reqrep_center.h" -#include "bh_util.h" -#include "msg.h" -#include <chrono> -#include <memory> -#include <mutex> -#include <unordered_map> - -using namespace bhome_shm; - -namespace -{ -auto Now = []() { time_t t; return time(&t); }; - -class NodeCenter -{ -public: - typedef std::string ProcAddr; - typedef bhome::msg::ProcInfo ProcInfo; - - template <class Iter> - bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end) - { - try { - Node node(new NodeInfo); - node->addr_ = src_mq; - node->proc_.Swap(&info); - node->state_.timestamp_ = Now(); - nodes_[node->proc_.id()] = node; - for (auto it = topics_begin; it != topics_end; ++it) { - topic_map_[*it] = node; - } - return true; - } catch (...) { - return false; - } - } - void Heartbeat(ProcInfo &info, const ProcAddr &src_mq) - { - auto pos = nodes_.find(info.name()); - if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same. - NodeInfo &ni = *pos->second; - ni.state_.timestamp_ = Now(); - if (!info.public_info().empty()) { - ni.proc_.set_public_info(info.public_info()); - } - if (!info.private_info().empty()) { - ni.proc_.set_private_info(info.private_info()); - } - } - } - bool QueryTopic(const Topic &topic, ProcAddr &addr) - { - auto pos = topic_map_.find(topic); - if (pos != topic_map_.end()) { - Node node(pos->second.lock()); - if (node) { - addr = node->addr_; - return true; - } else { // dead, remove record. - topic_map_.erase(pos); - return false; - } - } else { - return false; - } - } - -private: - struct ProcState { - time_t timestamp_ = 0; - uint32_t flag_ = 0; // reserved - }; - typedef std::string ProcId; - struct NodeInfo { - ProcState state_; // state - ProcAddr addr_; // registered_mqid. - ProcInfo proc_; // - }; - typedef std::shared_ptr<NodeInfo> Node; - typedef std::weak_ptr<NodeInfo> WeakNode; - std::unordered_map<Topic, WeakNode> topic_map_; - std::unordered_map<ProcId, Node> nodes_; -}; - -} // namespace - -BHCenter::MsgHandler MakeReqRepCenter() -{ - auto center_ptr = std::make_shared<Synced<NodeCenter>>(); - return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { - auto ¢er = *center_ptr; - auto &shm = socket.shm(); - -#ifndef NDEBUG - static std::atomic<time_t> last(0); - time_t now = 0; - time(&now); - if (last.exchange(now) < now) { - printf("center queue size: %ld\n", socket.Pending()); - } -#endif - auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; - - auto OnRegister = [&]() { - if (msg.route_size() != 1) { return; } - - MsgRegister reg; - if (reg.ParseFromString(msg.body()) && reg.has_proc()) { - center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); - } - }; - - auto OnHeartbeat = [&]() { - if (msg.route_size() != 1) { return; } - auto &src_mq = msg.route(0).mq_id(); - - MsgHeartbeat hb; - if (hb.ParseFromString(msg.body()) && hb.has_proc()) { - center->Heartbeat(*hb.mutable_proc(), SrcMQ()); - } - }; - - auto OnQueryTopic = [&]() { - if (msg.route_size() != 1) { return; } - - MsgQueryTopic query; - NodeCenter::ProcAddr dest; - if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { - MQId remote; - memcpy(&remote, SrcMQ().data(), sizeof(MQId)); - MsgI imsg; - if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; } - if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) { - imsg.Release(shm); - } - } - }; - - switch (msg.type()) { - case kMsgTypeRegister: OnRegister(); return true; - case kMsgTypeHeartbeat: OnHeartbeat(); return true; - case kMsgTypeQueryTopic: OnQueryTopic(); return true; - default: return false; - } - }; -} - -bool ReqRepCenter::Start(const int nworker) -{ - auto handler = MakeReqRepCenter(); - printf("sizeof(rep/req handler) = %ld\n", sizeof(handler)); - - const int kMaxWorker = 16; - return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); -} diff --git a/src/reqrep_center.h b/src/reqrep_center.h deleted file mode 100644 index bdcdcad..0000000 --- a/src/reqrep_center.h +++ /dev/null @@ -1,40 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: reqrep_center.h - * - * Description: - * - * Version: 1.0 - * Created: 2021骞�04鏈�01鏃� 14鏃�09鍒�13绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), - * Organization: - * - * ===================================================================================== - */ -#ifndef REQREP_CENTER_US3RBM60 -#define REQREP_CENTER_US3RBM60 - -#include "center.h" -#include "defs.h" -#include "socket.h" - -BHCenter::MsgHandler MakeReqRepCenter(); -class ReqRepCenter -{ - ShmSocket socket_; - -public: - ReqRepCenter(ShmSocket::Shm &shm) : - socket_(shm, &BHTopicCenterAddress(), 1000) {} - ReqRepCenter() : - ReqRepCenter(BHomeShm()) {} - ~ReqRepCenter() { Stop(); } - bool Start(const int nworker = 2); - bool Stop() { return socket_.Stop(); } -}; - -#endif // end of include guard: REQREP_CENTER_US3RBM60 diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp index dcb5a9e..521f773 100644 --- a/src/shm_queue.cpp +++ b/src/shm_queue.cpp @@ -87,15 +87,14 @@ // 2) find remote queue first, then build msg; // 1 is about 50% faster than 2, maybe cache related. -bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms) -{ - MsgI imsg; - if (Read(imsg, timeout_ms)) { - DEFER1(imsg.Release(shm());); - return imsg.Unpack(msg); - } else { - return false; - } -} +// bool ShmMsgQueue::Recv(MsgI &imsg, BHMsgHead &head, const int timeout_ms) +// { +// if (Read(imsg, timeout_ms)) { +// // DEFER1(imsg.Release(shm());); +// return imsg.ParseHead(head); +// } else { +// return false; +// } +// } } // namespace bhome_shm diff --git a/src/shm_queue.h b/src/shm_queue.h index ab8a88c..32ccfae 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -131,7 +131,7 @@ ~ShmMsgQueue(); const MQId &Id() const { return id_; } - bool Recv(BHMsg &msg, const int timeout_ms); + // bool Recv(MsgI &msg, BHMsgHead &head, const int timeout_ms); bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend); static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); @@ -141,12 +141,11 @@ { return Send(shm(), remote_id, msg, timeout_ms, extra...); } - - template <class... Extra> - bool Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, Extra const &...extra) + template <class Body, class... Extra> + bool Send(const MQId &remote_id, const BHMsgHead &head, const Body &body, const int timeout_ms, Extra const &...extra) { MsgI msg; - if (msg.Make(shm(), data)) { + if (msg.Make(shm(), head, body)) { if (Send(shm(), remote_id, msg, timeout_ms, extra...)) { return true; } else { @@ -155,6 +154,7 @@ } return false; } + size_t Pending() const { return data()->size(); } }; diff --git a/src/socket.cpp b/src/socket.cpp index b9def0c..f2b29f4 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -29,43 +29,53 @@ } // namespace -ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) : - shm_(shm), run_(false) +ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) : + shm_(shm), run_(false), mq_(id, shm, len) { - if (id && len > 0) { - mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len)); - } } ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : - shm_(shm), run_(false) -{ - if (len > 0) { - mq_.reset(new Queue(shm_, len)); - } -} + shm_(shm), run_(false), mq_(shm, len) {} ShmSocket::~ShmSocket() { Stop(); //TODO should stop in sub class, incase thread access sub class data. } -bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker) +bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) { - if (!mq_ || !onData) { - return false; // TODO error code. - } + auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { + auto Find = [&](RecvCB &cb) { + std::lock_guard<std::mutex> lock(mutex()); + const std::string &msgid = head.msg_id(); + auto pos = async_cbs_.find(msgid); + if (pos != async_cbs_.end()) { + cb.swap(pos->second); + async_cbs_.erase(pos); + return true; + } else { + return false; + } + }; + + RecvCB cb; + if (Find(cb)) { + cb(socket, imsg, head); + } else if (onData) { + onData(socket, imsg, head); + } // else ignored, or dropped + }; std::lock_guard<std::mutex> lock(mutex_); StopNoLock(); - auto RecvProc = [this, onData, onIdle]() { + auto RecvProc = [this, onRecv, onIdle]() { while (run_) { try { MsgI imsg; - DEFER1(imsg.Release(shm_)); - if (mq_->Recv(imsg, 100)) { - BHMsg msg; - if (imsg.Unpack(msg)) { - onData(*this, imsg, msg); + if (mq().Recv(imsg, 10)) { + DEFER1(imsg.Release(shm())); + BHMsgHead head; + if (imsg.ParseHead(head)) { + onRecv(*this, imsg, head); } } else if (onIdle) { onIdle(*this); @@ -102,17 +112,18 @@ return false; } -bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms) -{ - return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms); -} - -bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms) +bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms) { std::lock_guard<std::mutex> lock(mutex_); - if (!mq_ || RunningNoLock()) { + auto Recv = [&]() { + if (mq().Recv(msg, timeout_ms)) { + if (msg.ParseHead(head)) { + return true; + } else { + msg.Release(shm()); + } + } return false; - } else { - return mq_->Recv(msg, timeout_ms); - } + }; + return !RunningNoLock() && Recv(); } diff --git a/src/socket.h b/src/socket.h index 57d0ae4..7c4f83f 100644 --- a/src/socket.h +++ b/src/socket.h @@ -19,14 +19,18 @@ #ifndef SOCKET_GWTJHBPO #define SOCKET_GWTJHBPO +#include "defs.h" #include "shm_queue.h" #include <atomic> #include <boost/noncopyable.hpp> +#include <condition_variable> #include <functional> #include <memory> #include <mutex> #include <thread> #include <vector> + +using namespace bhome_msg; class ShmSocket : private boost::noncopyable { @@ -35,36 +39,88 @@ public: typedef bhome_shm::SharedMemory Shm; - typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB; - typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB; + typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB; + typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB; typedef std::function<void(ShmSocket &sock)> IdleCB; - ShmSocket(Shm &shm, const void *id, const int len); + ShmSocket(Shm &shm, const MQId &id, const int len); ShmSocket(Shm &shm, const int len = 12); ~ShmSocket(); - + const MQId &id() const { return mq().Id(); } Shm &shm() { return shm_; } // start recv. - bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1); - bool Start(const RecvCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); } - bool Start(const RecvBHMsgCB &onData, const IdleCB &onIdle, int nworker = 1) - { - return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, onIdle, nworker); - } - bool Start(const RecvBHMsgCB &onData, int nworker = 1) - { - return Start(onData, IdleCB(), nworker); - } + bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); + bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } + bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } bool Stop(); - size_t Pending() const { return mq_ ? mq_->Pending() : 0; } + size_t Pending() const { return mq().Pending(); } - bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms); - bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms); + bool Send(const void *id, const MsgI &imsg, const int timeout_ms) + { + return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms); + } + //TODO reimplment, using async. + bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms); + + template <class Body> + bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB()) + { + assert(valid_remote); + try { + if (cb) { + auto RegisterCB = [&]() { + std::lock_guard<std::mutex> lock(mutex()); + async_cbs_.emplace(head.msg_id(), cb); + }; + return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB); + } else { + return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms); + } + } catch (...) { + return false; + } + } + + template <class Body> + bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) + { + struct State { + std::mutex mutex; + std::condition_variable cv; + bool canceled = false; + }; + + try { + std::shared_ptr<State> st(new State); + auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); + + auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { + std::unique_lock<std::mutex> lk(st->mutex); + if (!st->canceled) { + reply.swap(msg); + reply_head.Swap(&head); + st->cv.notify_one(); + } else { + } + }; + + std::unique_lock<std::mutex> lk(st->mutex); + bool sendok = Send(remote, head, body, timeout_ms, OnRecv); + if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) { + return true; + } else { + st->canceled = true; + return false; + } + } catch (...) { + return false; + } + } protected: const Shm &shm() const { return shm_; } - Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid. - const Queue &mq() const { return *mq_; } + Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid. + const Queue &mq() const { return mq_; } std::mutex &mutex() { return mutex_; } private: @@ -76,7 +132,8 @@ std::mutex mutex_; std::atomic<bool> run_; - std::unique_ptr<Queue> mq_; + Queue mq_; + std::unordered_map<std::string, RecvCB> async_cbs_; }; #endif // end of include guard: SOCKET_GWTJHBPO diff --git a/src/topic_node.cpp b/src/topic_node.cpp new file mode 100644 index 0000000..c6c9771 --- /dev/null +++ b/src/topic_node.cpp @@ -0,0 +1,322 @@ +/* + * ===================================================================================== + * + * Filename: topic_node.cpp + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�07鏃� 09鏃�01鍒�48绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ +#include "topic_node.h" +#include "bh_util.h" +#include <chrono> +#include <list> + +using namespace std::chrono; +using namespace std::chrono_literals; + +namespace +{ +inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } + +struct SrcInfo { + std::vector<BHAddress> route; + std::string msg_id; +}; + +class ServerFailedQ +{ + struct FailedMsg { + steady_clock::time_point xpr; + std::string remote_; + BHMsgHead head_; + MsgRequestTopicReply body_; + FailedMsg(const std::string &addr, BHMsgHead &&head, MsgRequestTopicReply &&body) : + xpr(steady_clock::now() + 10s), remote_(addr), head_(std::move(head)), body_(std::move(body)) {} + bool Expired() { return steady_clock::now() > xpr; } + }; + typedef std::list<FailedMsg> Queue; + Synced<Queue> queue_; + +public: + void Push(const std::string &remote, BHMsgHead &&head, MsgRequestTopicReply &&body) + { + queue_->emplace_back(remote, std::move(head), std::move(body)); + } + void TrySend(ShmSocket &socket, const int timeout_ms = 0) + { + queue_.Apply([&](Queue &q) { + if (!q.empty()) { + auto it = q.begin(); + do { + if (it->Expired()) { + // it->msg_.Release(socket.shm()); + it = q.erase(it); + } else if (socket.Send(it->remote_.data(), it->head_, it->body_, timeout_ms)) { + it = q.erase(it); + } else { + ++it; + } + } while (it != q.end()); + } + }); + } +}; + +} // namespace +TopicNode::TopicNode(SharedMemory &shm) : + shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) +{ + SockNode().Start(); +} +TopicNode::~TopicNode() +{ + StopAll(); + SockNode().Stop(); +} +void TopicNode::StopAll() +{ + ServerStop(); + ClientStopWorker(); +} + +bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms) +{ + auto head(InitMsgHead(GetType(body), body.proc().proc_id())); + AddRoute(head, SockNode().id()); + + MsgI reply; + DEFER1(reply.Release(shm_);); + BHMsgHead reply_head; + bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply; + r = r && reply.ParseBody(reply_body); + if (r) { + info_ = body; + } + return r; +} + +bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms) +{ + //TODO check registered + + auto head(InitMsgHead(GetType(body), proc_id())); + AddRoute(head, SockReply().id()); + + MsgI reply; + DEFER1(reply.Release(shm_);); + BHMsgHead reply_head; + bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + r = r && reply_head.type() == kMsgTypeCommonReply; + r = r && reply.ParseBody(reply_body); + return r; +} + +bool TopicNode::ServerStart(const OnRequest &rcb, int nworker) +{ + //TODO check registered + + auto failed_q = std::make_shared<ServerFailedQ>(); + + auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; + + auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) { + MsgRequestTopic req; + if (imsg.ParseBody(req)) { + std::string out; + if (rcb(req.topic(), req.data(), out)) { + MsgRequestTopicReply reply_body; + reply_body.set_data(std::move(out)); + BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); + + for (int i = 0; i < head.route_size() - 1; ++i) { + reply_head.add_route()->Swap(head.mutable_route(i)); + } + if (!sock.Send(head.route().rbegin()->mq_id().data(), reply_head, reply_body, 10)) { + failed_q->Push(head.route().rbegin()->mq_id(), std::move(reply_head), std::move(reply_body)); + } + } + } + } else { + // ignored, or dropped + } + + onIdle(sock); + }; + + return rcb && SockReply().Start(onRecv, onIdle, nworker); +} +bool TopicNode::ServerStop() { return SockReply().Stop(); } + +bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) +{ + MsgI imsg; + BHMsgHead head; + if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { + MsgRequestTopic request; + if (imsg.ParseBody(request)) { + request.mutable_topic()->swap(topic); + request.mutable_data()->swap(data); + SrcInfo *p = new SrcInfo; + p->route.assign(head.route().begin(), head.route().end()); + p->msg_id = head.msg_id(); + src_info = p; + return true; + } + } + return false; +} + +bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms) +{ + SrcInfo *p = static_cast<SrcInfo *>(src_info); + DEFER1(delete p); + if (!p || p->route.empty()) { + return false; + } + MsgRequestTopicReply body; + body.set_data(data); + BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id)); + + for (unsigned i = 0; i < p->route.size() - 1; ++i) { + head.add_route()->Swap(&p->route[i]); + } + + return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms); +} + +bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) +{ + if (!cb) { + return false; + } + auto onData = [this, cb](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { + if (head.type() == kMsgTypeRequestTopicReply) { + MsgRequestTopicReply reply; + if (imsg.ParseBody(reply)) { + cb(reply.data()); + } + } + }; + + return SockRequest().Start(onData, nworker); +} +bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); } + +bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) +{ + auto Call = [&](const void *remote) { + auto &sock = SockRequest(); + MsgRequestTopic req; + req.set_topic(topic); + req.set_data(data, size); + BHMsgHead head(InitMsgHead(GetType(req), proc_id())); + AddRoute(head, sock.id()); + + if (cb) { + auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + if (head.type() == kMsgTypeRequestTopicReply) { + MsgRequestTopicReply reply; + if (imsg.ParseBody(reply)) { + cb(reply.data()); + } + } + }; + return sock.Send(remote, head, req, timeout_ms, onRecv); + } else { + return sock.Send(remote, head, req, timeout_ms); + } + }; + + try { + BHAddress addr; + if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { + return Call(addr.mq_id().data()); + } else { + return false; + } + } catch (...) { + return false; + } +} + +bool TopicNode::ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) +{ + try { + auto &sock = SockRequest(); + BHAddress addr; + if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { + + MsgRequestTopic req; + req.set_topic(topic); + req.set_data(data, size); + BHMsgHead head(InitMsgHead(GetType(req), proc_id())); + AddRoute(head, sock.id()); + + MsgI reply; + DEFER1(reply.Release(shm_);); + BHMsgHead reply_head; + + if (sock.SendAndRecv(addr.mq_id().data(), head, req, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) { + MsgRequestTopicReply dr; + if (reply.ParseBody(dr)) { + dr.mutable_data()->swap(out); + return true; + } else { + printf("error parse reply.\n"); + } + } else { + printf("error recv data. line: %d\n", __LINE__); + } + } else { + printf("error recv data. line: %d\n", __LINE__); + } + } catch (...) { + printf("error recv data. line: %d\n", __LINE__); + } + return false; +} + +bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) +{ + auto &sock = SockRequest(); + if (topic_query_cache_.Find(topic, addr)) { + return true; + } + + MsgQueryTopic query; + query.set_topic(topic); + BHMsgHead head(InitMsgHead(GetType(query), proc_id())); + AddRoute(head, sock.id()); + + MsgI reply; + DEFER1(reply.Release(shm_)); + BHMsgHead reply_head; + + if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) { + if (reply_head.type() == kMsgTypeQueryTopicReply) { + MsgQueryTopicReply rep; + if (reply.ParseBody(rep)) { + addr = rep.address(); + if (addr.mq_id().empty()) { + return false; + } else { + topic_query_cache_.Update(topic, addr); + return true; + } + } + } + } else { + } + return false; +} \ No newline at end of file diff --git a/src/topic_node.h b/src/topic_node.h new file mode 100644 index 0000000..8852af1 --- /dev/null +++ b/src/topic_node.h @@ -0,0 +1,121 @@ +/* + * ===================================================================================== + * + * Filename: topic_node.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�04鏈�07鏃� 09鏃�05鍒�26绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), lichao@aiotlink.com + * Organization: + * + * ===================================================================================== + */ +#ifndef TOPIC_NODE_YVKWA6TF +#define TOPIC_NODE_YVKWA6TF + +#include "msg.h" +#include "pubsub.h" +#include "socket.h" +#include <memory> + +using namespace bhome_shm; +using namespace bhome_msg; + +// a node is a client. +class TopicNode +{ + SharedMemory &shm_; + MsgRegister info_; + +public: + TopicNode(SharedMemory &shm); + ~TopicNode(); + bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms); + bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms); + + // topic rpc server + typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; + bool ServerStart(OnRequest const &cb, const int nworker = 2); + bool ServerStop(); + bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); + bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms); + + // topic client + typedef std::function<void(const std::string &data)> RequestResultCB; + bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); + bool ClientStopWorker(); + bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()); + bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()) + { + return ClientAsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); + } + bool ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); + bool ClientSyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms) + { + return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms); + } + + void StopAll(); + +private: + bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); + const std::string &proc_id() { return info_.proc().proc_id(); } + + typedef bhome_msg::BHAddress Address; + class TopicQueryCache + { + class Impl + { + typedef std::unordered_map<Topic, Address> Store; + Store store_; + + public: + bool Find(const Topic &topic, Address &addr) + { + auto pos = store_.find(topic); + if (pos != store_.end()) { + addr = pos->second; + return true; + } else { + return false; + } + } + bool Update(const Topic &topic, const Address &addr) + { + store_[topic] = addr; + return true; + } + }; + Synced<Impl> impl_; + // Impl &impl() + // { + // thread_local Impl impl; + // return impl; + // } + + public: + bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); } + bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); } + }; + + // some sockets may be the same one, using functions make it easy to change. + + auto &SockNode() { return sock_node_; } + auto &SockSub() { return sock_sub_; } + auto &SockRequest() { return sock_request_; } + auto &SockReply() { return sock_reply_; } + + ShmSocket sock_node_; + ShmSocket sock_request_; + ShmSocket sock_reply_; + SocketSubscribe sock_sub_; + + TopicQueryCache topic_query_cache_; +}; + +#endif // end of include guard: TOPIC_NODE_YVKWA6TF diff --git a/src/topic_reply.cpp b/src/topic_reply.cpp deleted file mode 100644 index 2ab75e6..0000000 --- a/src/topic_reply.cpp +++ /dev/null @@ -1,142 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: topic_reply.cpp - * - * Description: - * - * Version: 1.0 - * Created: 2021骞�04鏈�06鏃� 14鏃�40鍒�52绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), - * Organization: - * - * ===================================================================================== - */ -#include "topic_reply.h" -#include <chrono> -#include <list> - -using namespace bhome_msg; -using namespace std::chrono; -using namespace std::chrono_literals; - -namespace -{ -struct SrcInfo { - std::vector<BHAddress> route; - std::string msg_id; -}; - -class FailedQ -{ - struct FailedMsg { - steady_clock::time_point xpr; - std::string remote_; - BHMsg msg_; - FailedMsg(const std::string &addr, BHMsg &&msg) : - xpr(steady_clock::now() + 10s), remote_(addr), msg_(std::move(msg)) {} - bool Expired() { return steady_clock::now() > xpr; } - }; - typedef std::list<FailedMsg> Queue; - Synced<Queue> queue_; - -public: - void Push(const std::string &remote, BHMsg &&msg) - { - queue_->emplace_back(remote, std::move(msg)); - } - void TrySend(ShmSocket &socket, const int timeout_ms = 0) - { - queue_.Apply([&](Queue &q) { - if (!q.empty()) { - auto it = q.begin(); - do { - if (it->Expired() || socket.SyncSend(it->remote_.data(), it->msg_, timeout_ms)) { - it = q.erase(it); - } else { - ++it; - } - } while (it != q.end()); - } - }); - } -}; - -} // namespace - -bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms) -{ - //TODO check reply? - return SyncSend(&BHTopicCenterAddress(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms); -} -bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms) -{ - return SyncSend(&BHTopicCenterAddress(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms); -} -bool SocketReply::StartWorker(const OnRequest &rcb, int nworker) -{ - auto failed_q = std::make_shared<FailedQ>(); - - auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; - - auto onRecv = [this, rcb, failed_q, onIdle](BHMsg &msg) { - if (msg.type() == kMsgTypeRequestTopic && msg.route_size() > 0) { - MsgRequestTopic req; - if (req.ParseFromString(msg.body())) { - std::string out; - if (rcb(req.topic(), req.data(), out)) { - BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size())); - for (int i = 0; i < msg.route_size() - 1; ++i) { - msg.add_route()->Swap(msg.mutable_route(i)); - } - if (!SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 10)) { - failed_q->Push(msg.route().rbegin()->mq_id(), std::move(msg_reply)); - } - } - } - } else { - // ignored, or dropped - } - - onIdle(*this); - }; - - return rcb && Start(onRecv, onIdle, nworker); -} - -bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) -{ - BHMsg msg; - if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequestTopic) { - MsgRequestTopic request; - if (request.ParseFromString(msg.body())) { - request.mutable_topic()->swap(topic); - request.mutable_data()->swap(data); - SrcInfo *p = new SrcInfo; - p->route.assign(msg.route().begin(), msg.route().end()); - p->msg_id = msg.msg_id(); - src_info = p; - return true; - } - } - return false; -} - -bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms) -{ - SrcInfo *p = static_cast<SrcInfo *>(src_info); - DEFER1(delete p); - if (!p || p->route.empty()) { - return false; - } - - BHMsg msg(MakeReply(p->msg_id, data.data(), data.size())); - for (unsigned i = 0; i < p->route.size() - 1; ++i) { - msg.add_route()->Swap(&p->route[i]); - } - - return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms); -} \ No newline at end of file diff --git a/src/topic_reply.h b/src/topic_reply.h deleted file mode 100644 index 090ad88..0000000 --- a/src/topic_reply.h +++ /dev/null @@ -1,52 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: topic_reply.h - * - * Description: - * - * Version: 1.0 - * Created: 2021骞�04鏈�06鏃� 14鏃�41鍒�12绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), - * Organization: - * - * ===================================================================================== - */ -#ifndef TOPIC_REPLY_3RVYPPWI -#define TOPIC_REPLY_3RVYPPWI - -#include "bh_util.h" -#include "defs.h" -#include "msg.h" -#include "socket.h" -#include <deque> -#include <functional> - -using bhome::msg::ProcInfo; - -class SocketReply : private ShmSocket -{ - typedef ShmSocket Socket; - -public: - SocketReply(Socket::Shm &shm) : - Socket(shm, 64) {} - SocketReply() : - SocketReply(BHomeShm()) {} - ~SocketReply() { Stop(); } - - typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; - bool StartWorker(const OnRequest &rcb, int nworker = 2); - bool Stop() { return Socket::Stop(); } - bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); - bool SendReply(void *src_info, const std::string &data, const int timeout_ms); - bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms); - bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms); - -private: -}; - -#endif // end of include guard: TOPIC_REPLY_3RVYPPWI diff --git a/src/topic_request.cpp b/src/topic_request.cpp deleted file mode 100644 index 382ce21..0000000 --- a/src/topic_request.cpp +++ /dev/null @@ -1,210 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: topic_request.cpp - * - * Description: topic request sockets - * - * Version: 1.0 - * Created: 2021骞�04鏈�01鏃� 09鏃�35鍒�35绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), - * Organization: - * - * ===================================================================================== - */ -#include "topic_request.h" -#include "bh_util.h" -#include "msg.h" -#include <chrono> -#include <condition_variable> - -using namespace bhome_msg; - -bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker) -{ - auto AsyncRecvProc = [this, rrcb](BHMsg &msg) { - auto Find = [&](RecvBHMsgCB &cb) { - std::lock_guard<std::mutex> lock(mutex()); - const std::string &msgid = msg.msg_id(); - auto pos = async_cbs_.find(msgid); - if (pos != async_cbs_.end()) { - cb.swap(pos->second); - async_cbs_.erase(pos); - return true; - } else { - return false; - } - }; - - RecvBHMsgCB cb; - if (Find(cb) && cb) { - cb(msg); - } else if (msg.type() == kMsgTypeRequestTopicReply && rrcb) { - MsgRequestTopicReply reply; - if (reply.ParseFromString(msg.body())) { - rrcb(reply.data()); - } - } else { - // ignored, or dropped - } - }; - - return Start(AsyncRecvProc, nworker); -} - -bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms) -{ - try { - BHAddress addr; - if (QueryRPCTopic(topic, addr, timeout_ms)) { - const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); - return AsyncSend(addr.mq_id().data(), &msg, timeout_ms); - } else { - return false; - } - } catch (...) { - return false; - } -} -bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) -{ - auto Call = [&](const void *remote) { - const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); - auto onRecv = [cb](BHMsg &msg) { - if (msg.type() == kMsgTypeRequestTopicReply) { - MsgRequestTopicReply reply; - if (reply.ParseFromString(msg.body())) { - cb(reply.data()); - } - } - }; - return AsyncSend(remote, &msg, timeout_ms, onRecv); - }; - - try { - BHAddress addr; - if (QueryRPCTopic(topic, addr, timeout_ms)) { - return Call(addr.mq_id().data()); - } else { - return false; - } - } catch (...) { - return false; - } -} - -bool SocketRequest::SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) -{ - try { - BHAddress addr; - if (QueryRPCTopic(topic, addr, timeout_ms)) { - const BHMsg &req(MakeRequest(mq().Id(), topic, data, size)); - BHMsg reply; - if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeRequestTopicReply) { - MsgRequestTopicReply dr; - if (dr.ParseFromString(reply.body())) { - dr.mutable_data()->swap(out); - return true; - } else { - printf("error parse reply.\n"); - } - } else { - printf("error recv data. line: %d\n", __LINE__); - } - } else { - printf("error recv data. line: %d\n", __LINE__); - } - } catch (...) { - printf("error recv data. line: %d\n", __LINE__); - } - return false; -} - -bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms) -{ - assert(remote && pmsg); - try { - const BHMsg &msg = *static_cast<const BHMsg *>(pmsg); - return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms); - } catch (...) { - return false; - } -} -bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb) -{ - assert(remote && pmsg); - try { - const BHMsg &msg = *static_cast<const BHMsg *>(pmsg); - auto RegisterCB = [&]() { - std::lock_guard<std::mutex> lock(mutex()); - async_cbs_.emplace(msg.msg_id(), cb); - }; - - return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB); - } catch (...) { - return false; - } -} - -bool SocketRequest::SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms) -{ - struct State { - std::mutex mutex; - std::condition_variable cv; - bool canceled = false; - }; - - try { - std::shared_ptr<State> st(new State); - auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); - - auto OnRecv = [=](BHMsg &msg) { - std::unique_lock<std::mutex> lk(st->mutex); - if (!st->canceled) { - static_cast<BHMsg *>(result)->Swap(&msg); - st->cv.notify_one(); - } else { - } - }; - - std::unique_lock<std::mutex> lk(st->mutex); - bool sendok = AsyncSend(remote, msg, timeout_ms, OnRecv); - if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) { - return true; - } else { - st->canceled = true; - return false; - } - } catch (...) { - return false; - } -} - -bool SocketRequest::QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) -{ - if (topic_cache_.Find(topic, addr)) { - return true; - } - - BHMsg result; - const BHMsg &msg = MakeQueryTopic(mq().Id(), topic); - if (SyncSendAndRecv(&BHTopicCenterAddress(), &msg, &result, timeout_ms)) { - if (result.type() == kMsgTypeQueryTopicReply) { - MsgQueryTopicReply reply; - if (reply.ParseFromString(result.body())) { - addr = reply.address(); - if (addr.mq_id().empty()) { - return false; - } else { - topic_cache_.Update(topic, addr); - return true; - } - } - } - } else { - } - return false; -} diff --git a/src/topic_request.h b/src/topic_request.h deleted file mode 100644 index 6765dc2..0000000 --- a/src/topic_request.h +++ /dev/null @@ -1,108 +0,0 @@ -/* - * ===================================================================================== - * - * Filename: topic_request.h - * - * Description: topic request socket - * - * Version: 1.0 - * Created: 2021骞�04鏈�01鏃� 09鏃�36鍒�06绉� - * Revision: none - * Compiler: gcc - * - * Author: Li Chao (), - * Organization: - * - * ===================================================================================== - */ -#ifndef TOPIC_REQUEST_ACEH09NK -#define TOPIC_REQUEST_ACEH09NK - -#include "bh_util.h" -#include "defs.h" -#include "msg.h" -#include "socket.h" -#include <functional> -#include <unordered_map> - -using bhome::msg::ProcInfo; - -class SocketRequest : private ShmSocket -{ - typedef ShmSocket Socket; - -public: - SocketRequest(Socket::Shm &shm) : - Socket(shm, 64) { StartWorker(); } - SocketRequest() : - SocketRequest(BHomeShm()) {} - ~SocketRequest() { Stop(); } - - typedef std::function<void(const std::string &data)> RequestResultCB; - bool StartWorker(const RequestResultCB &rrcb, int nworker = 2); - bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); } - bool Stop() { return Socket::Stop(); } - bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); - bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms); - - bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) - { - return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); - } - bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms) - { - return AsyncRequest(topic, data.data(), data.size(), timeout_ms); - } - bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); - bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms) - { - return SyncRequest(topic, data.data(), data.size(), out, timeout_ms); - } - -private: - bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvBHMsgCB &cb); - bool AsyncSend(const void *remote, const void *msg, const int timeout_ms); - bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms); - bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); - std::unordered_map<std::string, RecvBHMsgCB> async_cbs_; - - typedef bhome_msg::BHAddress Address; - class TopicCache - { - class Impl - { - typedef std::unordered_map<Topic, Address> Store; - Store store_; - - public: - bool Find(const Topic &topic, Address &addr) - { - auto pos = store_.find(topic); - if (pos != store_.end()) { - addr = pos->second; - return true; - } else { - return false; - } - } - bool Update(const Topic &topic, const Address &addr) - { - store_[topic] = addr; - return true; - } - }; - Synced<Impl> impl_; - // Impl &impl() - // { - // thread_local Impl impl; - // return impl; - // } - - public: - bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); } - bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); } - }; - TopicCache topic_cache_; -}; - -#endif // end of include guard: TOPIC_REQUEST_ACEH09NK diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp index 06093fd..cbbcc2a 100644 --- a/utest/simple_tests.cpp +++ b/utest/simple_tests.cpp @@ -36,7 +36,7 @@ BOOST_CHECK(!p); BOOST_CHECK(p.get() == 0); const char *str = "basic"; - p = str; + p = str; BOOST_CHECK(p); BOOST_CHECK(p.get() == str); p = 0; @@ -49,7 +49,7 @@ auto Code = [&](int id) { typedef ShmObject<s1000> Int; std::string name = std::to_string(id); - auto a0 = Avail(); + auto a0 = Avail(); Int i1(shm, name); auto a1 = Avail(); BOOST_CHECK_LT(a1, a0); @@ -64,7 +64,7 @@ { auto old = Avail(); - void *p = shm.Alloc(1024); + void *p = shm.Alloc(1024); shm.Dealloc(p); BOOST_CHECK_EQUAL(old, Avail()); } @@ -80,7 +80,7 @@ // boost::timer::auto_cpu_timer timer; ThreadManager threads; int nthread = 1; - int nloop = 1; + int nloop = 1; for (int i = 0; i < nthread; ++i) { threads.Launch(BasicTest, i, nloop); } @@ -114,7 +114,7 @@ int ms = i * 100; printf("Timeout Test %4d: ", ms); boost::timer::auto_cpu_timer timer; - BHMsg msg; + MsgI msg; bool r = q.Recv(msg, ms); BOOST_CHECK(!r); } diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index 34f80d8..d777f91 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -28,14 +28,20 @@ MQId id = boost::uuids::random_generator()(); const int timeout = 100; const uint32_t data_size = 4000; + const std::string proc_id = "demo_proc"; auto Writer = [&](int writer_id, uint64_t n) { SharedMemory shm(shm_name, mem_size); ShmMsgQueue mq(shm, 64); std::string str(data_size, 'a'); MsgI msg; + MsgRequestTopic body; + body.set_topic("topic"); + body.set_data(str); + auto head(InitMsgHead(GetType(body), proc_id)); + msg.MakeRC(shm, head, body); DEFER1(msg.Release(shm);); - msg.MakeRC(shm, MakeRequest(mq.Id(), "topic", str.data(), str.size())); + for (uint64_t i = 0; i < n; ++i) { // mq.Send(id, str.data(), str.size(), timeout); mq.Send(id, msg, timeout); @@ -45,8 +51,10 @@ SharedMemory shm(shm_name, mem_size); ShmMsgQueue mq(id, shm, 1000); while (*run) { - BHMsg msg; + MsgI msg; + BHMsgHead head; if (mq.Recv(msg, timeout)) { + DEFER1(msg.Release(shm)); // ok } else if (isfork) { exit(0); // for forked quit after 1s. @@ -113,6 +121,8 @@ const size_t msg_length = 1000; std::string msg_content(msg_length, 'a'); msg_content[20] = '\0'; + const std::string client_proc_id = "client_proc"; + const std::string server_proc_id = "server_proc"; SharedMemory shm(shm_name, 1024 * 1024 * 50); auto Avail = [&]() { return shm.get_free_memory(); }; @@ -121,9 +131,18 @@ ShmMsgQueue cli(shm, qlen); MsgI request_rc; - request_rc.MakeRC(shm, MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size())); + MsgRequestTopic req_body; + req_body.set_topic("topic"); + req_body.set_data(msg_content); + auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); + request_rc.MakeRC(shm, req_head, req_body); + + MsgRequestTopic reply_body; + reply_body.set_topic("topic"); + reply_body.set_data(msg_content); + auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id)); MsgI reply_rc; - reply_rc.MakeRC(shm, MakeReply("fakemsgid", msg_content.data(), msg_content.size())); + reply_rc.MakeRC(shm, reply_head, reply_body); std::atomic<uint64_t> count(0); @@ -133,7 +152,11 @@ auto Client = [&](int cli_id, int nmsg) { for (int i = 0; i < nmsg; ++i) { auto Req = [&]() { - return cli.Send(srv.Id(), MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()), 100); + MsgRequestTopic req_body; + req_body.set_topic("topic"); + req_body.set_data(msg_content); + auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); + return cli.Send(srv.Id(), req_head, req_body, 100); }; auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); }; @@ -141,10 +164,12 @@ printf("********** client send error.\n"); continue; } - BHMsg msg; + MsgI msg; + BHMsgHead head; if (!cli.Recv(msg, 1000)) { printf("********** client recv error.\n"); } else { + DEFER1(msg.Release(shm)); ++count; auto cur = Now(); if (last_time.exchange(cur) < cur) { @@ -158,18 +183,27 @@ std::atomic<bool> stop(false); auto Server = [&]() { - BHMsg req; - while (!stop) { - if (srv.Recv(req, 100) && req.type() == kMsgTypeRequestTopic) { - auto &mqid = req.route()[0].mq_id(); - MQId src_id; - memcpy(&src_id, mqid.data(), sizeof(src_id)); - auto Reply = [&]() { - return srv.Send(src_id, MakeReply(req.msg_id(), msg_content.data(), msg_content.size()), 100); - }; - auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); }; + MsgI req; + BHMsgHead req_head; - if (ReplyRC()) { + while (!stop) { + if (srv.Recv(req, 100)) { + DEFER1(req.Release(shm)); + if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { + auto &mqid = req_head.route()[0].mq_id(); + MQId src_id; + memcpy(&src_id, mqid.data(), sizeof(src_id)); + auto Reply = [&]() { + MsgRequestTopic reply_body; + reply_body.set_topic("topic"); + reply_body.set_data(msg_content); + auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); + return srv.Send(src_id, reply_head, reply_body, 100); + }; + auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); }; + + if (ReplyRC()) { + } } } } diff --git a/utest/utest.cpp b/utest/utest.cpp index 092455f..c925e22 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -1,11 +1,8 @@ #include "center.h" #include "defs.h" #include "pubsub.h" -#include "pubsub_center.h" -#include "reqrep_center.h" #include "socket.h" -#include "topic_reply.h" -#include "topic_request.h" +#include "topic_node.h" #include "util.h" #include <atomic> #include <boost/uuid/uuid_generators.hpp> @@ -15,6 +12,7 @@ #include <string> #include <thread> #include <vector> +using namespace bhome_msg; template <class A, class B> struct IsSameType { @@ -79,9 +77,11 @@ int *flag = shm.find_or_construct<int>("flag")(123); printf("flag = %d\n", *flag); ++*flag; + const std::string sub_proc_id = "subscriber"; + const std::string pub_proc_id = "publisher"; - PubSubCenter bus(shm); - bus.Start(); + BHCenter center(shm); + center.Start(); std::this_thread::sleep_for(100ms); @@ -93,12 +93,12 @@ const int timeout = 1000; auto Sub = [&](int id, const std::vector<std::string> &topics) { SocketSubscribe client(shm); - bool r = client.Subscribe(topics, timeout); + bool r = client.Subscribe(sub_proc_id, topics, timeout); std::mutex mutex; std::condition_variable cv; std::atomic<uint64_t> n(0); - auto OnTopicData = [&](const std::string &topic, const std::string &data) { + auto OnTopicData = [&](const std::string &proc_id, const std::string &topic, const std::string &data) { ++total_count; auto cur = Now(); @@ -123,7 +123,7 @@ for (unsigned i = 0; i < nmsg; ++i) { std::string data = topic + std::to_string(i) + std::string(1000, '-'); - bool r = provider.Publish(topic, data, timeout); + bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout); if (!r) { printf("pub ret: %s\n", r ? "ok" : "fail"); } @@ -150,9 +150,8 @@ std::cout << "end : " << Now(); printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail()); - - bus.Stop(); } + namespace { struct C { @@ -177,12 +176,24 @@ printf("flag = %d\n", *flag); ++*flag; + const std::string client_proc_id = "client_proc_"; + const std::string server_proc_id = "server_proc_"; + BHCenter center(shm); center.Start(); std::atomic<bool> run(true); auto Client = [&](const std::string &topic, const int nreq) { - SocketRequest client(shm); + TopicNode client(shm); + MsgRegister reg; + reg.mutable_proc()->set_proc_id(client_proc_id + topic); + MsgCommonReply reply_body; + + if (!client.Register(reg, reply_body, 1000)) { + printf("client register failed\n"); + return; + } + std::atomic<int> count(0); std::string reply; auto onRecv = [&](const std::string &rep) { @@ -191,40 +202,54 @@ printf("count: %d\n", count.load()); } }; - client.StartWorker(onRecv, 2); + client.ClientStartWorker(onRecv, 2); boost::timer::auto_cpu_timer timer; for (int i = 0; i < nreq; ++i) { - if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) { + if (!client.ClientAsyncRequest(topic, "data " + std::to_string(i), 1000)) { printf("client request failed\n"); + ++count; } // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) { // printf("client request failed\n"); - // } else { - // ++count; // } + // ++count; } do { std::this_thread::yield(); } while (count.load() < nreq); - client.Stop(); + client.ClientStopWorker(); printf("request %s %d done ", topic.c_str(), count.load()); }; std::atomic_uint64_t server_msg_count(0); auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { - SocketReply server(shm); - ProcInfo info; - info.set_id(name); - info.set_name(name); - if (!server.Register(info, topics, 100)) { - printf("register failed\n"); + TopicNode server(shm); + MsgRegister reg; + reg.mutable_proc()->set_proc_id(server_proc_id); + reg.mutable_proc()->set_name(name); + MsgCommonReply reply_body; + + if (!server.Register(reg, reply_body, 100)) { + printf("server register failed\n"); + return; } + auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) { ++server_msg_count; reply = topic + ':' + data; return true; }; - server.StartWorker(onData); + server.ServerStart(onData); + + MsgRegisterRPC rpc; + for (auto &topic : topics) { + rpc.add_topics(topic); + } + if (!server.RegisterRPC(rpc, reply_body, 100)) { + printf("server register topic failed\n"); + return; + } + while (run) { std::this_thread::yield(); } @@ -234,7 +259,7 @@ servers.Launch(Server, "server", topics); std::this_thread::sleep_for(100ms); for (auto &t : topics) { - clients.Launch(Client, t, 1000 * 100); + clients.Launch(Client, t, 1000 * 1); } clients.WaitAll(); printf("clients done, server replyed: %d\n", server_msg_count.load()); -- Gitblit v1.8.0