BIG change, join center,bus; now msg is head+body.
| | |
| | | *.un~ |
| | | build/ |
| | | debug/ |
| | | release/ |
| | | Makefile |
| | | utest/utest |
| | | *.bak |
| | | gmon.out |
| | |
| | | "name": "g++ - Build and debug active file", |
| | | "type": "cppdbg", |
| | | "request": "launch", |
| | | "program": "${workspaceFolder}/utest/utest", |
| | | "program": "${workspaceFolder}/debug/bin/utest", |
| | | "args": [ |
| | | "-t", |
| | | "ReqRepTest" |
| | |
| | | "cinttypes": "cpp", |
| | | "typeindex": "cpp", |
| | | "typeinfo": "cpp", |
| | | "variant": "cpp" |
| | | "variant": "cpp", |
| | | "iomanip": "cpp", |
| | | "*.inc": "cpp", |
| | | "strstream": "cpp", |
| | | "unordered_set": "cpp", |
| | | "cfenv": "cpp" |
| | | }, |
| | | "files.exclude": { |
| | | "**/*.un~": true |
| | |
| | | "command": "ninja", |
| | | "args": [ |
| | | "-C", |
| | | "../build" |
| | | "debug" |
| | | ], |
| | | "options": { |
| | | "cwd": "${workspaceFolder}/utest" |
| | | "cwd": "${workspaceFolder}" |
| | | }, |
| | | "problemMatcher": [ |
| | | "$gcc" |
| | |
| | | syntax = "proto3"; |
| | | |
| | | option optimize_for = LITE_RUNTIME; |
| | | |
| | | import "google/protobuf/descriptor.proto"; |
| | | import "error_msg.proto"; |
| | | // import "google/protobuf/descriptor.proto"; |
| | | import "bhome_msg_api.proto"; |
| | | |
| | | package bhome.msg; |
| | | |
| | | |
| | | // message format : header(BHMsgHead) + body(variable types) |
| | | message BHAddress { |
| | | bytes mq_id = 1; // mqid, uuid |
| | | bytes ip = 2; // |
| | | int32 port = 3; |
| | | } |
| | | |
| | | message ProcInfo |
| | | { |
| | | bytes id = 1; // serial number, maybe managed |
| | | bytes name = 2; |
| | | bytes public_info = 3; |
| | | bytes private_info = 4; |
| | | } |
| | | // message format : head_len(4) + head(BHMsgHead) + body_len(4) + body(variable types) |
| | | |
| | | message BHMsgHead { |
| | | bytes msg_id = 1; |
| | | repeated BHAddress route = 2; // for reply and proxy. |
| | | int64 timestamp = 3; |
| | | int32 type = 4; |
| | | ProcInfo proc = 5; |
| | | bytes proc_id = 5; |
| | | bytes topic = 6; // for request route |
| | | } |
| | | |
| | | message BHMsgBody { |
| | | bytes data = 1; |
| | | } |
| | | |
| | | message BHMsg { // deprecated |
| | |
| | | |
| | | enum MsgType { |
| | | kMsgTypeInvalid = 0; |
| | | kMsgTypeRawData = 1; |
| | | |
| | | kMsgTypeCommonReply = 2; |
| | | |
| | |
| | | kMsgTypeQueryTopicReply = 15; |
| | | kMsgTypeRequestTopic = 16; |
| | | kMsgTypeRequestTopicReply = 17; |
| | | kMsgTypeRegisterRPC = 18; |
| | | // reply |
| | | |
| | | kMsgTypePublish = 100; |
| | | // kMsgTypePublishReply = 101; |
| | | kMsgTypeSubscribe = 102; |
| | | // kMsgTypeSubscribeReply = 103; |
| | | kMsgTypeUnsubscribe = 104; |
| | | // kMsgTypeUnsubscribeReply = 105; |
| | | kMsgTypePublish = 20; |
| | | // kMsgTypePublishReply = 21; |
| | | kMsgTypeSubscribe = 22; |
| | | // kMsgTypeSubscribeReply = 23; |
| | | kMsgTypeUnsubscribe = 24; |
| | | // kMsgTypeUnsubscribeReply = 25; |
| | | |
| | | } |
| | | |
| | | message MsgPub { |
| | | bytes topic = 1; |
| | | bytes data = 2; |
| | | } |
| | | |
| | | message MsgSub { |
| | | repeated bytes topics = 1; |
| | | } |
| | | |
| | | message MsgCommonReply { |
| | | ErrorMsg errmsg = 1; |
| | | } |
| | | |
| | | message MsgRequestTopic { |
| | | bytes topic = 1; |
| | | bytes data = 2; |
| | | } |
| | | |
| | | message MsgRequestTopicReply { |
| | | ErrorMsg errmsg = 1; |
| | | bytes data = 2; |
| | | } |
| | | |
| | | message MsgRegister |
| | | { |
| | | ProcInfo proc = 1; |
| | | repeated bytes topics = 2; |
| | | } |
| | | |
| | | message MsgHeartbeat |
| | | { |
| | | ProcInfo proc = 1; |
| | | } |
| | | |
| | | message MsgQueryTopic { |
| | | bytes topic = 1; |
| | | } |
| | | |
| | | message MsgQueryTopicReply { |
| | | ErrorMsg errmsg = 1; |
| | | BHAddress address = 2; |
| | | } |
| | | |
| | | service TopicRPC { |
| New file |
| | |
| | | syntax = "proto3"; |
| | | option optimize_for = LITE_RUNTIME; |
| | | |
| | | // public messages |
| | | import "error_msg.proto"; |
| | | |
| | | package bhome.msg; |
| | | |
| | | message BHAddress { |
| | | bytes mq_id = 1; // mqid, uuid |
| | | // bytes ip = 2; // |
| | | // int32 port = 3; |
| | | } |
| | | |
| | | message ProcInfo |
| | | { |
| | | bytes proc_id = 1; // serial number, maybe managed |
| | | bytes name = 2; |
| | | bytes public_info = 3; // maybe json. |
| | | bytes private_info = 4; |
| | | } |
| | | |
| | | message MsgPublish { |
| | | bytes topic = 1; |
| | | bytes data = 2; |
| | | } |
| | | |
| | | message MsgSubscribe { |
| | | repeated bytes topics = 1; |
| | | } |
| | | message MsgUnsubscribe { |
| | | repeated bytes topics = 1; |
| | | } |
| | | |
| | | message MsgCommonReply { |
| | | ErrorMsg errmsg = 1; |
| | | } |
| | | |
| | | message MsgRequestTopic { |
| | | bytes topic = 1; |
| | | bytes data = 2; |
| | | } |
| | | |
| | | message MsgRequestTopicReply { |
| | | ErrorMsg errmsg = 1; |
| | | bytes data = 2; |
| | | } |
| | | |
| | | message MsgRegister |
| | | { |
| | | ProcInfo proc = 1; |
| | | } |
| | | |
| | | message MsgRegisterRPC |
| | | { |
| | | repeated bytes topics = 1; |
| | | } |
| | | |
| | | message MsgHeartbeat |
| | | { |
| | | ProcInfo proc = 1; |
| | | } |
| | | |
| | | message MsgQueryTopic { |
| | | bytes topic = 1; |
| | | } |
| | | |
| | | message MsgQueryTopicReply { |
| | | ErrorMsg errmsg = 1; |
| | | BHAddress address = 2; |
| | | } |
| | |
| | | eSuccess = 0; |
| | | eError = 1; |
| | | eInvalidInput = 2; |
| | | eNotRegistered = 3; |
| | | eNotFound = 4; |
| | | eOffline = 5; |
| | | eNoRespond = 6; |
| | | eAddressNotMatch = 7; |
| | | } |
| | | |
| | | message ErrorMsg { |
| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "center.h" |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "pubsub_center.h" |
| | | #include "reqrep_center.h" |
| | | #include "shm.h" |
| | | #include <set> |
| | | |
| | | using namespace bhome_shm; |
| | | using namespace bhome_msg; |
| | | using namespace bhome::msg; |
| | | typedef BHCenter::MsgHandler Handler; |
| | | |
| | | Handler Join(Handler h1, Handler h2) |
| | | namespace |
| | | { |
| | | return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg) { |
| | | return h1(socket, imsg, msg) || h2(socket, imsg, msg); |
| | | auto Now = []() { time_t t; return time(&t); }; |
| | | |
| | | //TODO check proc_id |
| | | class NodeCenter |
| | | { |
| | | public: |
| | | typedef std::string ProcId; |
| | | typedef std::string Address; |
| | | typedef bhome::msg::ProcInfo ProcInfo; |
| | | |
| | | private: |
| | | enum { |
| | | kStateInvalid = 0, |
| | | kStateNormal = 1, |
| | | kStateNoRespond = 2, |
| | | kStateOffline = 3, |
| | | }; |
| | | |
| | | struct ProcState { |
| | | time_t timestamp_ = 0; |
| | | uint32_t flag_ = 0; // reserved |
| | | }; |
| | | typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; |
| | | |
| | | struct NodeInfo { |
| | | ProcState state_; // state |
| | | Address addr_; // registered_mqid. |
| | | ProcInfo proc_; // |
| | | AddressTopics services_; // address: topics |
| | | AddressTopics subscriptions_; // address: topics |
| | | }; |
| | | typedef std::shared_ptr<NodeInfo> Node; |
| | | typedef std::weak_ptr<NodeInfo> WeakNode; |
| | | |
| | | struct TopicDest { |
| | | Address mq_; |
| | | WeakNode weak_node_; |
| | | bool operator<(const TopicDest &a) const { return mq_ < a.mq_; } |
| | | }; |
| | | const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } |
| | | |
| | | public: |
| | | typedef std::set<TopicDest> Clients; |
| | | |
| | | NodeCenter(const std::string &id = "#Center") : |
| | | id_(id) {} |
| | | const std::string &id() const { return id_; } // no need to lock. |
| | | |
| | | //TODO maybe just return serialized string. |
| | | MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) |
| | | { |
| | | if (msg.proc().proc_id() != head.proc_id()) { |
| | | return MakeReply(eInvalidInput, "invalid proc id."); |
| | | } |
| | | |
| | | try { |
| | | Node node(new NodeInfo); |
| | | node->addr_ = SrcAddr(head); |
| | | node->proc_.Swap(msg.mutable_proc()); |
| | | node->state_.timestamp_ = Now(); |
| | | node->state_.flag_ = kStateNormal; |
| | | nodes_[node->proc_.proc_id()] = node; |
| | | return MakeReply(eSuccess); |
| | | } catch (...) { |
| | | return MakeReply(eError, "register node error."); |
| | | } |
| | | } |
| | | template <class OnSuccess, class OnError> |
| | | auto HandleMsg(const BHMsgHead &head, OnSuccess onOk, OnError onErr) |
| | | { |
| | | auto pos = nodes_.find(head.proc_id()); |
| | | if (pos == nodes_.end()) { |
| | | return onErr(eNotRegistered, "Node is not registered."); |
| | | } else { |
| | | auto node = pos->second; |
| | | if (head.type() == kMsgTypeHeartbeat && node->addr_ != SrcAddr(head)) { |
| | | return onErr(eAddressNotMatch, "Node address error."); |
| | | } else if (!Valid(*node)) { |
| | | return onErr(eNoRespond, "Node is not alive."); |
| | | } else { |
| | | return onOk(node); |
| | | } |
| | | } |
| | | } |
| | | |
| | | template <class Reply, class Func> |
| | | Reply HandleMsg(const BHMsgHead &head, Func const &op) |
| | | { |
| | | try { |
| | | auto onErr = [](const ErrorCode ec, const std::string &str) { return MakeReply<Reply>(ec, str); }; |
| | | return HandleMsg(head, op, onErr); |
| | | |
| | | auto pos = nodes_.find(head.proc_id()); |
| | | if (pos == nodes_.end()) { |
| | | return MakeReply<Reply>(eNotRegistered, "Node is not registered."); |
| | | } else { |
| | | auto node = pos->second; |
| | | if (node->addr_ != SrcAddr(head)) { |
| | | return MakeReply<Reply>(eAddressNotMatch, "Node address error."); |
| | | } else if (!Valid(*node)) { |
| | | return MakeReply<Reply>(eNoRespond, "Node is not alive."); |
| | | } else { |
| | | return op(node); |
| | | } |
| | | } |
| | | } catch (...) { |
| | | //TODO error log |
| | | return MakeReply<Reply>(eError, "internal error."); |
| | | } |
| | | } |
| | | template <class Func> |
| | | inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op) |
| | | { |
| | | return HandleMsg<MsgCommonReply, Func>(head, op); |
| | | } |
| | | |
| | | MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg) |
| | | { |
| | | return HandleMsg( |
| | | head, [&](Node node) -> MsgCommonReply { |
| | | auto &src = SrcAddr(head); |
| | | node->services_[src].insert(msg.topics().begin(), msg.topics().end()); |
| | | TopicDest dest = {src, node}; |
| | | for (auto &topic : msg.topics()) { |
| | | service_map_[topic].insert(dest); |
| | | } |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | |
| | | MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg) |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | NodeInfo &ni = *node; |
| | | ni.state_.timestamp_ = Now(); |
| | | auto &info = msg.proc(); |
| | | if (!info.public_info().empty()) { |
| | | ni.proc_.set_public_info(info.public_info()); |
| | | } |
| | | if (!info.private_info().empty()) { |
| | | ni.proc_.set_private_info(info.private_info()); |
| | | } |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | |
| | | MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req) |
| | | { |
| | | typedef MsgQueryTopicReply Reply; |
| | | |
| | | auto query = [&](Node self) -> MsgQueryTopicReply { |
| | | auto pos = service_map_.find(req.topic()); |
| | | if (pos != service_map_.end() && !pos->second.empty()) { |
| | | // now just find first one. |
| | | const TopicDest &dest = *(pos->second.begin()); |
| | | Node dest_node(dest.weak_node_.lock()); |
| | | if (!dest_node) { |
| | | service_map_.erase(pos); |
| | | return MakeReply<Reply>(eOffline, "topic server offline."); |
| | | } else if (!Valid(*dest_node)) { |
| | | return MakeReply<Reply>(eNoRespond, "topic server not responding."); |
| | | } else { |
| | | MsgQueryTopicReply reply = MakeReply<Reply>(eSuccess); |
| | | reply.mutable_address()->set_mq_id(dest.mq_); |
| | | return reply; |
| | | } |
| | | |
| | | } else { |
| | | return MakeReply<Reply>(eNotFound, "topic server not found."); |
| | | } |
| | | }; |
| | | |
| | | return HandleMsg<Reply>(head, query); |
| | | } |
| | | |
| | | MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | auto &src = SrcAddr(head); |
| | | node->subscriptions_[src].insert(msg.topics().begin(), msg.topics().end()); |
| | | TopicDest dest = {src, node}; |
| | | for (auto &topic : msg.topics()) { |
| | | subscribe_map_[topic].insert(dest); |
| | | } |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | auto &src = SrcAddr(head); |
| | | auto pos = node->subscriptions_.find(src); |
| | | |
| | | auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { |
| | | auto pos = subscribe_map_.find(topic); |
| | | if (pos != subscribe_map_.end() && |
| | | pos->second.erase(dest) != 0 && |
| | | pos->second.empty()) { |
| | | subscribe_map_.erase(pos); |
| | | } |
| | | }; |
| | | |
| | | if (pos != node->subscriptions_.end()) { |
| | | const TopicDest &dest = {src, node}; |
| | | // clear node sub records; |
| | | for (auto &topic : msg.topics()) { |
| | | pos->second.erase(topic); |
| | | RemoveSubTopicDestRecord(topic, dest); |
| | | } |
| | | if (pos->second.empty()) { |
| | | node->subscriptions_.erase(pos); |
| | | } |
| | | } |
| | | return MakeReply(eSuccess); |
| | | }); |
| | | } |
| | | |
| | | Clients DoFindClients(const std::string &topic) |
| | | { |
| | | Clients dests; |
| | | auto Find1 = [&](const std::string &t) { |
| | | auto pos = subscribe_map_.find(topic); |
| | | if (pos != subscribe_map_.end()) { |
| | | auto &clients = pos->second; |
| | | for (auto &cli : clients) { |
| | | if (Valid(cli.weak_node_)) { |
| | | dests.insert(cli); |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | Find1(topic); |
| | | |
| | | size_t pos = 0; |
| | | while (true) { |
| | | pos = topic.find(kTopicSep, pos); |
| | | if (pos == topic.npos || ++pos == topic.size()) { |
| | | // Find1(std::string()); // sub all. |
| | | break; |
| | | } else { |
| | | Find1(topic.substr(0, pos)); |
| | | } |
| | | } |
| | | return dests; |
| | | } |
| | | bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply) |
| | | { |
| | | bool ret = false; |
| | | HandleMsg(head, [&](Node node) { |
| | | DoFindClients(msg.topic()).swap(out); |
| | | ret = true; |
| | | return MakeReply(eSuccess); |
| | | }).Swap(&reply); |
| | | return ret; |
| | | } |
| | | |
| | | private: |
| | | bool Valid(const NodeInfo &node) |
| | | { |
| | | return node.state_.flag_ == kStateNormal; |
| | | } |
| | | bool Valid(const WeakNode &weak) |
| | | { |
| | | auto node = weak.lock(); |
| | | return node && Valid(*node); |
| | | } |
| | | void CheckAllNodes(); //TODO, call it in timer. |
| | | std::string id_; // center proc id; |
| | | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | | std::unordered_map<ProcId, Node> nodes_; |
| | | }; |
| | | |
| | | template <class Body, class OnMsg, class Replyer> |
| | | inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer) |
| | | { |
| | | if (head.route_size() != 1) { return; } |
| | | Body body; |
| | | if (msg.ParseBody(body)) { |
| | | replyer(onmsg(body)); |
| | | } |
| | | } |
| | | |
| | | Handler Combine(const Handler &h1, const Handler &h2) |
| | | { |
| | | return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) { |
| | | return h1(socket, msg, head) || h2(socket, msg, head); |
| | | }; |
| | | } |
| | | template <class... H> |
| | | Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest) |
| | | { |
| | | return Combine(Combine(h0, h1), h2, rest...); |
| | | } |
| | | |
| | | #define CASE_ON_MSG_TYPE(MsgTag) \ |
| | | case kMsgType##MsgTag: \ |
| | | Dispatch<Msg##MsgTag>( \ |
| | | msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ |
| | | return true; |
| | | |
| | | bool InstallCenter() |
| | | { |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(); |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); |
| | | bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10); |
| | | if (!r) { |
| | | printf("send reply failed.\n"); |
| | | } |
| | | //TODO resend failed. |
| | | }; |
| | | }; |
| | | |
| | | auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(Register); |
| | | CASE_ON_MSG_TYPE(Heartbeat); |
| | | |
| | | CASE_ON_MSG_TYPE(RegisterRPC); |
| | | CASE_ON_MSG_TYPE(QueryTopic); |
| | | default: return false; |
| | | } |
| | | }; |
| | | |
| | | auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | auto OnPublish = [&]() { |
| | | MsgPublish pub; |
| | | NodeCenter::Clients clients; |
| | | MsgCommonReply reply; |
| | | MsgI pubmsg; |
| | | if (head.route_size() != 1 || !msg.ParseBody(pub)) { |
| | | return; |
| | | } else if (!center->FindClients(head, pub, clients, reply)) { |
| | | // send error reply. |
| | | MakeReplyer(socket, head, center->id())(reply); |
| | | } else if (pubmsg.MakeRC(socket.shm(), msg)) { |
| | | DEFER1(pubmsg.Release(socket.shm())); |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node) { |
| | | socket.Send(cli.mq_.data(), pubmsg, 10); |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(Subscribe); |
| | | CASE_ON_MSG_TYPE(Unsubscribe); |
| | | case kMsgTypePublish: OnPublish(); return true; |
| | | default: return false; |
| | | } |
| | | }; |
| | | |
| | | BHCenter::Install("#center.reg", OnCenter, BHTopicCenterAddress(), 1000); |
| | | BHCenter::Install("#center.bus", OnPubSub, BHTopicBusAddress(), 1000); |
| | | |
| | | return true; |
| | | } |
| | | |
| | | #undef CASE_ON_MSG_TYPE |
| | | |
| | | } // namespace |
| | | |
| | | SharedMemory &BHomeShm() |
| | | { |
| | |
| | | static CenterRecords rec; |
| | | return rec; |
| | | } |
| | | |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len) |
| | | { |
| | | CenterRecords()[name] = CenterInfo{name, handler, mqid, mq_len}; |
| | | Centers()[name] = CenterInfo{name, handler, mqid, mq_len}; |
| | | return true; |
| | | } |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len) |
| | | { |
| | | return Install(name, handler, std::string((const char *) &mqid, sizeof(mqid)), mq_len); |
| | | } |
| | | |
| | | BHCenter::BHCenter(Socket::Shm &shm) |
| | | { |
| | | sockets_["center"] = std::make_shared<ShmSocket>(shm, &BHTopicCenterAddress(), 1000); |
| | | sockets_["bus"] = std::make_shared<ShmSocket>(shm, &BHTopicBusAddress(), 1000); |
| | | InstallCenter(); |
| | | |
| | | for (auto &kv : Centers()) { |
| | | sockets_[kv.first] = std::make_shared<ShmSocket>(shm, kv.second.mqid_.data(), kv.second.mq_len_); |
| | | auto &info = kv.second; |
| | | sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_); |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | bool BHCenter::Start() |
| | | { |
| | | auto onCenter = MakeReqRepCenter(); |
| | | auto onBus = MakeBusCenter(); |
| | | sockets_["center"]->Start(onCenter); |
| | | sockets_["bus"]->Start(onBus); |
| | | |
| | | for (auto &kv : Centers()) { |
| | | sockets_[kv.first]->Start(kv.second.handler_); |
| | | auto &info = kv.second; |
| | | sockets_[info.name_]->Start(info.handler_); |
| | | } |
| | | |
| | | return true; |
| | | // socket_.Start(Join(onCenter, onBus)); |
| | | } |
| | | |
| | | bool BHCenter::Stop() |
| | |
| | | typedef ShmSocket Socket; |
| | | |
| | | public: |
| | | typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler; |
| | | typedef Socket::PartialRecvCB MsgHandler; |
| | | static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len); |
| | | static bool Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len); |
| | | |
| | | BHCenter(Socket::Shm &shm); |
| | | BHCenter(); |
| | |
| | | center accept request and route.; |
| | | //*/ |
| | | const uint32_t kMsgTag = 0xf1e2d3c4; |
| | | const uint32_t kMsgPrefixLen = 4; |
| | | |
| | | inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); } |
| | | |
| | | std::string RandId() |
| | | void *MsgI::Pack(SharedMemory &shm, |
| | | const uint32_t head_len, const ToArray &headToArray, |
| | | const uint32_t body_len, const ToArray &bodyToArray) |
| | | { |
| | | boost::uuids::uuid id = boost::uuids::random_generator()(); |
| | | return std::string((char *) &id, sizeof(id)); |
| | | } |
| | | BHMsg InitMsg(MsgType type, const std::string &msgid = RandId()) |
| | | { |
| | | BHMsg msg; |
| | | msg.set_msg_id(msgid); |
| | | msg.set_type(type); |
| | | time_t tm = 0; |
| | | msg.set_timestamp(time(&tm)); |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeRequestTopic)); |
| | | AddRoute(msg, src_id); |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data(data, size); |
| | | msg.set_body(req.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeRegister)); |
| | | AddRoute(msg, src_id); |
| | | MsgRegister reg; |
| | | reg.mutable_proc()->Swap(&info); |
| | | for (auto &t : topics) { |
| | | reg.add_topics(t); |
| | | void *addr = shm.Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len); |
| | | if (addr) { |
| | | auto p = static_cast<char *>(addr); |
| | | auto Pack1 = [&p](auto len, auto &writer) { |
| | | Put32(p, len); |
| | | p += sizeof(len); |
| | | writer(p, len); |
| | | p += len; |
| | | }; |
| | | Pack1(head_len, headToArray); |
| | | Pack1(body_len, bodyToArray); |
| | | } |
| | | msg.set_body(reg.SerializeAsString()); |
| | | return msg; |
| | | return addr; |
| | | } |
| | | |
| | | BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info) |
| | | bool MsgI::ParseHead(BHMsgHead &head) const |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeHeartbeat)); |
| | | AddRoute(msg, src_id); |
| | | MsgHeartbeat reg; |
| | | reg.mutable_proc()->Swap(&info); |
| | | msg.set_body(reg.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size) |
| | | { |
| | | assert(data && size); |
| | | BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid)); |
| | | MsgRequestTopicReply reply; |
| | | reply.set_data(data, size); |
| | | msg.set_body(reply.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeSubUnsub(const MQId &client, const std::vector<std::string> &topics, const MsgType sub_unsub) |
| | | { |
| | | assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe); |
| | | BHMsg msg(InitMsg(sub_unsub)); |
| | | AddRoute(msg, client); |
| | | MsgSub subs; |
| | | for (auto &t : topics) { |
| | | subs.add_topics(t); |
| | | } |
| | | msg.set_body(subs.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); } |
| | | BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeUnsubscribe); } |
| | | |
| | | BHMsg MakePub(const std::string &topic, const void *data, const size_t size) |
| | | { |
| | | assert(data && size); |
| | | BHMsg msg(InitMsg(kMsgTypePublish)); |
| | | MsgPub pub; |
| | | pub.set_topic(topic); |
| | | pub.set_data(data, size); |
| | | msg.set_body(pub.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeQueryTopic(const MQId &client, const std::string &topic) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeQueryTopic)); |
| | | AddRoute(msg, client); |
| | | MsgQueryTopic query; |
| | | query.set_topic(topic); |
| | | msg.set_body(query.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid)); |
| | | MsgQueryTopicReply reply; |
| | | reply.mutable_address()->set_mq_id(mqid); |
| | | msg.set_body(reply.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | | void *Pack(SharedMemory &shm, const BHMsg &msg) |
| | | { |
| | | uint32_t msg_size = msg.ByteSizeLong(); |
| | | void *p = shm.Alloc(4 + msg_size); |
| | | if (p) { |
| | | Put32(p, msg_size); |
| | | if (!msg.SerializeToArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size)) { |
| | | shm.Dealloc(p); |
| | | p = 0; |
| | | } |
| | | } |
| | | return p; |
| | | } |
| | | |
| | | bool MsgI::Unpack(BHMsg &msg) const |
| | | { |
| | | void *p = ptr_.get(); |
| | | auto p = static_cast<char *>(ptr_.get()); |
| | | assert(p); |
| | | uint32_t msg_size = Get32(p); |
| | | return msg.ParseFromArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size); |
| | | p += 4; |
| | | return head.ParseFromArray(p, msg_size); |
| | | } |
| | | |
| | | // with ref count; |
| | | bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg) |
| | | bool MsgI::MakeRC(SharedMemory &shm, void *p) |
| | | { |
| | | void *p = Pack(shm, msg); |
| | | if (!p) { |
| | | return false; |
| | | } |
| | |
| | | return true; |
| | | } |
| | | |
| | | bool MsgI::Make(SharedMemory &shm, const BHMsg &msg) |
| | | bool MsgI::Make(SharedMemory &shm, void *p) |
| | | { |
| | | void *p = Pack(shm, msg); |
| | | if (!p) { |
| | | return false; |
| | | } |
| | |
| | | #ifndef MSG_5BILLZET |
| | | #define MSG_5BILLZET |
| | | |
| | | #include "bhome_msg.pb.h" |
| | | #include "bh_util.h" |
| | | #include "proto.h" |
| | | #include "shm.h" |
| | | #include <boost/interprocess/offset_ptr.hpp> |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | | #include <functional> |
| | | #include <stdint.h> |
| | | |
| | | namespace bhome_msg |
| | |
| | | int num_ = 1; |
| | | }; |
| | | |
| | | BHMsg MakeQueryTopic(const MQId &client, const std::string &topic); |
| | | BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid); |
| | | BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size); |
| | | BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size); |
| | | BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics); |
| | | BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info); |
| | | BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics); |
| | | BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics); |
| | | BHMsg MakePub(const std::string &topic, const void *data, const size_t size); |
| | | |
| | | // message content layout: header_size + header + data_size + data |
| | | class MsgI |
| | | { |
| | |
| | | offset_ptr<void> ptr_; |
| | | offset_ptr<RefCount> count_; |
| | | |
| | | bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub); |
| | | typedef std::function<void(void *p, int len)> ToArray; |
| | | void *Pack(SharedMemory &shm, |
| | | const uint32_t head_len, const ToArray &headToArray, |
| | | const uint32_t body_len, const ToArray &bodyToArray); |
| | | |
| | | template <class Body> |
| | | void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | { |
| | | return Pack( |
| | | shm, |
| | | uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); }, |
| | | uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); |
| | | } |
| | | |
| | | bool MakeRC(SharedMemory &shm, void *addr); |
| | | bool Make(SharedMemory &shm, void *addr); |
| | | |
| | | public: |
| | | MsgI(void *p = 0, RefCount *c = 0) : |
| | |
| | | int Count() const { return IsCounted() ? count_->Get() : 1; } |
| | | bool IsCounted() const { return static_cast<bool>(count_); } |
| | | |
| | | bool Make(SharedMemory &shm, const BHMsg &msg); |
| | | bool MakeRC(SharedMemory &shm, const BHMsg &msg); |
| | | bool Unpack(BHMsg &msg) const; |
| | | template <class Body> |
| | | bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | { |
| | | return Make(shm, Pack(shm, head, body)); |
| | | } |
| | | template <class Body> |
| | | bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body) |
| | | { |
| | | return MakeRC(shm, Pack(shm, head, body)); |
| | | } |
| | | bool MakeRC(SharedMemory &shm, MsgI &a) |
| | | { |
| | | if (a.IsCounted()) { |
| | | *this = a; |
| | | AddRef(); |
| | | return true; |
| | | } else { |
| | | void *p = a.ptr_.get(); |
| | | a.ptr_ = 0; |
| | | return MakeRC(shm, p); |
| | | } |
| | | } |
| | | bool ParseHead(BHMsgHead &head) const; |
| | | template <class Body> |
| | | bool ParseBody(Body &body) const |
| | | { |
| | | auto p = static_cast<char *>(ptr_.get()); |
| | | assert(p); |
| | | uint32_t size = Get32(p); |
| | | p += 4; |
| | | p += size; |
| | | size = Get32(p); |
| | | p += 4; |
| | | return body.ParseFromArray(p, size); |
| | | } |
| | | }; |
| | | |
| | | inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); } |
| New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: proto.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月07日 17时04分36秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "proto.h" |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | | |
| | | std::string RandId() |
| | | { |
| | | boost::uuids::uuid id = boost::uuids::random_generator()(); |
| | | return std::string((char *) &id, sizeof(id)); |
| | | } |
| | | |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id) |
| | | { |
| | | return InitMsgHead(type, proc_id, RandId()); |
| | | } |
| | | |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid) |
| | | { |
| | | BHMsgHead msg; |
| | | msg.set_msg_id(msgid); |
| | | msg.set_type(type); |
| | | msg.set_proc_id(proc_id); |
| | | time_t tm = 0; |
| | | msg.set_timestamp(time(&tm)); |
| | | return msg; |
| | | } |
| New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: proto.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月07日 13时48分51秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef PROTO_UA9UWKL1 |
| | | #define PROTO_UA9UWKL1 |
| | | |
| | | #include "bhome_msg.pb.h" |
| | | |
| | | using namespace bhome::msg; |
| | | |
| | | template <class Msg> |
| | | struct MsgToType { |
| | | }; |
| | | |
| | | #define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE) \ |
| | | template <> \ |
| | | struct MsgToType<mSG> { \ |
| | | static const bhome::msg::MsgType value = tYPE; \ |
| | | }; |
| | | |
| | | #define BHOME_SIMPLE_MAP_MSG(name) BHOME_MAP_MSG_AND_TYPE(Msg##name, kMsgType##name) |
| | | |
| | | BHOME_SIMPLE_MAP_MSG(CommonReply); |
| | | BHOME_SIMPLE_MAP_MSG(Register); |
| | | BHOME_SIMPLE_MAP_MSG(RegisterRPC); |
| | | BHOME_SIMPLE_MAP_MSG(Heartbeat); |
| | | BHOME_SIMPLE_MAP_MSG(QueryTopic); |
| | | BHOME_SIMPLE_MAP_MSG(QueryTopicReply); |
| | | BHOME_SIMPLE_MAP_MSG(RequestTopic); |
| | | BHOME_SIMPLE_MAP_MSG(RequestTopicReply); |
| | | BHOME_SIMPLE_MAP_MSG(Publish); |
| | | BHOME_SIMPLE_MAP_MSG(Subscribe); |
| | | BHOME_SIMPLE_MAP_MSG(Unsubscribe); |
| | | |
| | | #undef BHOME_SIMPLE_MAP_MSG |
| | | #undef BHOME_MAP_MSG_AND_TYPE |
| | | |
| | | template <class Msg> |
| | | constexpr inline bhome::msg::MsgType GetType(const Msg &) |
| | | { |
| | | return MsgToType<Msg>::value; |
| | | } |
| | | |
| | | inline void SetError(ErrorMsg &em, const ErrorCode err_code, const std::string &err_str = "") |
| | | { |
| | | em.set_errcode(err_code); |
| | | if (!err_str.empty()) { |
| | | em.set_errstring(err_str); |
| | | } |
| | | } |
| | | |
| | | template <class Reply = MsgCommonReply> |
| | | inline Reply MakeReply(const ErrorCode err_code, const std::string &err_str = "") |
| | | { |
| | | Reply msg; |
| | | SetError(*msg.mutable_errmsg(), err_code, err_str); |
| | | return msg; |
| | | } |
| | | |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid); |
| | | BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id); |
| | | // inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } |
| | | |
| | | #endif // end of include guard: PROTO_UA9UWKL1 |
| | |
| | | using namespace std::chrono_literals; |
| | | using namespace bhome_msg; |
| | | |
| | | bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms) |
| | | bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms) |
| | | { |
| | | try { |
| | | MsgPublish pub; |
| | | pub.set_topic(topic); |
| | | pub.set_data(data, size); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id)); |
| | | MsgI imsg; |
| | | if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) { |
| | | return false; |
| | | if (imsg.MakeRC(shm(), head, pub)) { |
| | | DEFER1(imsg.Release(shm())); |
| | | return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms); |
| | | } |
| | | DEFER1(imsg.Release(shm())); |
| | | return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms); |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | return false; |
| | | } |
| | | namespace |
| | | { |
| | | inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } |
| | | |
| | | bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms) |
| | | } // namespace |
| | | bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms) |
| | | { |
| | | try { |
| | | return mq().Send(BHTopicBusAddress(), MakeSub(mq().Id(), topics), timeout_ms); |
| | | MsgSubscribe sub; |
| | | for (auto &topic : topics) { |
| | | sub.add_topics(topic); |
| | | } |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id)); |
| | | AddRoute(head, mq().Id()); |
| | | |
| | | return Send(&BHTopicBusAddress(), head, sub, timeout_ms); |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | |
| | | |
| | | bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker) |
| | | { |
| | | auto AsyncRecvProc = [this, tdcb](BHMsg &msg) { |
| | | if (msg.type() == kMsgTypePublish) { |
| | | MsgPub d; |
| | | if (d.ParseFromString(msg.body())) { |
| | | tdcb(d.topic(), d.data()); |
| | | auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypePublish) { |
| | | MsgPublish pub; |
| | | if (imsg.ParseBody(pub)) { |
| | | tdcb(head.proc_id(), pub.topic(), pub.data()); |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | |
| | | return tdcb && Start(AsyncRecvProc, nworker); |
| | | } |
| | | |
| | | bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms) |
| | | bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms) |
| | | { |
| | | BHMsg msg; |
| | | if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) { |
| | | MsgPub d; |
| | | if (d.ParseFromString(msg.body())) { |
| | | d.mutable_topic()->swap(topic); |
| | | d.mutable_data()->swap(data); |
| | | MsgI msg; |
| | | BHMsgHead head; |
| | | if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) { |
| | | MsgPublish pub; |
| | | if (msg.ParseBody(pub)) { |
| | | head.mutable_proc_id()->swap(proc_id); |
| | | pub.mutable_topic()->swap(topic); |
| | | pub.mutable_data()->swap(data); |
| | | return true; |
| | | } |
| | | } |
| | |
| | | shm_(shm) {} |
| | | SocketPublish() : |
| | | SocketPublish(BHomeShm()) {} |
| | | bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms); |
| | | bool Publish(const Topic &topic, const std::string &data, const int timeout_ms) |
| | | { |
| | | return Publish(topic, data.data(), data.size(), timeout_ms); |
| | | } |
| | | bool Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms); |
| | | }; |
| | | |
| | | // socket subscribe |
| | |
| | | SocketSubscribe(BHomeShm()) {} |
| | | ~SocketSubscribe() { Stop(); } |
| | | |
| | | typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB; |
| | | typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB; |
| | | bool StartRecv(const TopicDataCB &tdcb, int nworker = 2); |
| | | bool Stop() { return Socket::Stop(); } |
| | | bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms); |
| | | bool RecvSub(Topic &topic, std::string &data, const int timeout_ms); |
| | | bool Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms); |
| | | bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms); |
| | | }; |
| | | |
| | | #endif // end of include guard: PUBSUB_4KGRA997 |
| | |
| | | // 2) find remote queue first, then build msg; |
| | | // 1 is about 50% faster than 2, maybe cache related. |
| | | |
| | | bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms) |
| | | { |
| | | MsgI imsg; |
| | | if (Read(imsg, timeout_ms)) { |
| | | DEFER1(imsg.Release(shm());); |
| | | return imsg.Unpack(msg); |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | // bool ShmMsgQueue::Recv(MsgI &imsg, BHMsgHead &head, const int timeout_ms) |
| | | // { |
| | | // if (Read(imsg, timeout_ms)) { |
| | | // // DEFER1(imsg.Release(shm());); |
| | | // return imsg.ParseHead(head); |
| | | // } else { |
| | | // return false; |
| | | // } |
| | | // } |
| | | |
| | | } // namespace bhome_shm |
| | |
| | | ~ShmMsgQueue(); |
| | | const MQId &Id() const { return id_; } |
| | | |
| | | bool Recv(BHMsg &msg, const int timeout_ms); |
| | | // bool Recv(MsgI &msg, BHMsgHead &head, const int timeout_ms); |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend); |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); |
| | |
| | | { |
| | | return Send(shm(), remote_id, msg, timeout_ms, extra...); |
| | | } |
| | | |
| | | template <class... Extra> |
| | | bool Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, Extra const &...extra) |
| | | template <class Body, class... Extra> |
| | | bool Send(const MQId &remote_id, const BHMsgHead &head, const Body &body, const int timeout_ms, Extra const &...extra) |
| | | { |
| | | MsgI msg; |
| | | if (msg.Make(shm(), data)) { |
| | | if (msg.Make(shm(), head, body)) { |
| | | if (Send(shm(), remote_id, msg, timeout_ms, extra...)) { |
| | | return true; |
| | | } else { |
| | |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | size_t Pending() const { return data()->size(); } |
| | | }; |
| | | |
| | |
| | | |
| | | } // namespace |
| | | |
| | | ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) : |
| | | shm_(shm), run_(false) |
| | | ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) : |
| | | shm_(shm), run_(false), mq_(id, shm, len) |
| | | { |
| | | if (id && len > 0) { |
| | | mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len)); |
| | | } |
| | | } |
| | | ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : |
| | | shm_(shm), run_(false) |
| | | { |
| | | if (len > 0) { |
| | | mq_.reset(new Queue(shm_, len)); |
| | | } |
| | | } |
| | | shm_(shm), run_(false), mq_(shm, len) {} |
| | | |
| | | ShmSocket::~ShmSocket() |
| | | { |
| | | Stop(); //TODO should stop in sub class, incase thread access sub class data. |
| | | } |
| | | |
| | | bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker) |
| | | bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) |
| | | { |
| | | if (!mq_ || !onData) { |
| | | return false; // TODO error code. |
| | | } |
| | | auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | auto Find = [&](RecvCB &cb) { |
| | | std::lock_guard<std::mutex> lock(mutex()); |
| | | const std::string &msgid = head.msg_id(); |
| | | auto pos = async_cbs_.find(msgid); |
| | | if (pos != async_cbs_.end()) { |
| | | cb.swap(pos->second); |
| | | async_cbs_.erase(pos); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | }; |
| | | |
| | | RecvCB cb; |
| | | if (Find(cb)) { |
| | | cb(socket, imsg, head); |
| | | } else if (onData) { |
| | | onData(socket, imsg, head); |
| | | } // else ignored, or dropped |
| | | }; |
| | | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | StopNoLock(); |
| | | auto RecvProc = [this, onData, onIdle]() { |
| | | auto RecvProc = [this, onRecv, onIdle]() { |
| | | while (run_) { |
| | | try { |
| | | MsgI imsg; |
| | | DEFER1(imsg.Release(shm_)); |
| | | if (mq_->Recv(imsg, 100)) { |
| | | BHMsg msg; |
| | | if (imsg.Unpack(msg)) { |
| | | onData(*this, imsg, msg); |
| | | if (mq().Recv(imsg, 10)) { |
| | | DEFER1(imsg.Release(shm())); |
| | | BHMsgHead head; |
| | | if (imsg.ParseHead(head)) { |
| | | onRecv(*this, imsg, head); |
| | | } |
| | | } else if (onIdle) { |
| | | onIdle(*this); |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms) |
| | | { |
| | | return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms); |
| | | } |
| | | |
| | | bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms) |
| | | bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms) |
| | | { |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | if (!mq_ || RunningNoLock()) { |
| | | auto Recv = [&]() { |
| | | if (mq().Recv(msg, timeout_ms)) { |
| | | if (msg.ParseHead(head)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(shm()); |
| | | } |
| | | } |
| | | return false; |
| | | } else { |
| | | return mq_->Recv(msg, timeout_ms); |
| | | } |
| | | }; |
| | | return !RunningNoLock() && Recv(); |
| | | } |
| | |
| | | #ifndef SOCKET_GWTJHBPO |
| | | #define SOCKET_GWTJHBPO |
| | | |
| | | #include "defs.h" |
| | | #include "shm_queue.h" |
| | | #include <atomic> |
| | | #include <boost/noncopyable.hpp> |
| | | #include <condition_variable> |
| | | #include <functional> |
| | | #include <memory> |
| | | #include <mutex> |
| | | #include <thread> |
| | | #include <vector> |
| | | |
| | | using namespace bhome_msg; |
| | | |
| | | class ShmSocket : private boost::noncopyable |
| | | { |
| | |
| | | |
| | | public: |
| | | typedef bhome_shm::SharedMemory Shm; |
| | | typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB; |
| | | typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB; |
| | | typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB; |
| | | typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB; |
| | | typedef std::function<void(ShmSocket &sock)> IdleCB; |
| | | |
| | | ShmSocket(Shm &shm, const void *id, const int len); |
| | | ShmSocket(Shm &shm, const MQId &id, const int len); |
| | | ShmSocket(Shm &shm, const int len = 12); |
| | | ~ShmSocket(); |
| | | |
| | | const MQId &id() const { return mq().Id(); } |
| | | Shm &shm() { return shm_; } |
| | | // start recv. |
| | | bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1); |
| | | bool Start(const RecvCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); } |
| | | bool Start(const RecvBHMsgCB &onData, const IdleCB &onIdle, int nworker = 1) |
| | | { |
| | | return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, onIdle, nworker); |
| | | } |
| | | bool Start(const RecvBHMsgCB &onData, int nworker = 1) |
| | | { |
| | | return Start(onData, IdleCB(), nworker); |
| | | } |
| | | bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); |
| | | bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } |
| | | bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } |
| | | bool Stop(); |
| | | size_t Pending() const { return mq_ ? mq_->Pending() : 0; } |
| | | size_t Pending() const { return mq().Pending(); } |
| | | |
| | | bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms); |
| | | bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms); |
| | | bool Send(const void *id, const MsgI &imsg, const int timeout_ms) |
| | | { |
| | | return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms); |
| | | } |
| | | //TODO reimplment, using async. |
| | | bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms); |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB()) |
| | | { |
| | | assert(valid_remote); |
| | | try { |
| | | if (cb) { |
| | | auto RegisterCB = [&]() { |
| | | std::lock_guard<std::mutex> lock(mutex()); |
| | | async_cbs_.emplace(head.msg_id(), cb); |
| | | }; |
| | | return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB); |
| | | } else { |
| | | return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms); |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | template <class Body> |
| | | bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) |
| | | { |
| | | struct State { |
| | | std::mutex mutex; |
| | | std::condition_variable cv; |
| | | bool canceled = false; |
| | | }; |
| | | |
| | | try { |
| | | std::shared_ptr<State> st(new State); |
| | | auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); |
| | | |
| | | auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { |
| | | std::unique_lock<std::mutex> lk(st->mutex); |
| | | if (!st->canceled) { |
| | | reply.swap(msg); |
| | | reply_head.Swap(&head); |
| | | st->cv.notify_one(); |
| | | } else { |
| | | } |
| | | }; |
| | | |
| | | std::unique_lock<std::mutex> lk(st->mutex); |
| | | bool sendok = Send(remote, head, body, timeout_ms, OnRecv); |
| | | if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) { |
| | | return true; |
| | | } else { |
| | | st->canceled = true; |
| | | return false; |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | protected: |
| | | const Shm &shm() const { return shm_; } |
| | | Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid. |
| | | const Queue &mq() const { return *mq_; } |
| | | Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid. |
| | | const Queue &mq() const { return mq_; } |
| | | std::mutex &mutex() { return mutex_; } |
| | | |
| | | private: |
| | |
| | | std::mutex mutex_; |
| | | std::atomic<bool> run_; |
| | | |
| | | std::unique_ptr<Queue> mq_; |
| | | Queue mq_; |
| | | std::unordered_map<std::string, RecvCB> async_cbs_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SOCKET_GWTJHBPO |
| New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: topic_node.cpp |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月07日 09时01分48秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "topic_node.h" |
| | | #include "bh_util.h" |
| | | #include <chrono> |
| | | #include <list> |
| | | |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | | |
| | | namespace |
| | | { |
| | | inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } |
| | | |
| | | struct SrcInfo { |
| | | std::vector<BHAddress> route; |
| | | std::string msg_id; |
| | | }; |
| | | |
| | | class ServerFailedQ |
| | | { |
| | | struct FailedMsg { |
| | | steady_clock::time_point xpr; |
| | | std::string remote_; |
| | | BHMsgHead head_; |
| | | MsgRequestTopicReply body_; |
| | | FailedMsg(const std::string &addr, BHMsgHead &&head, MsgRequestTopicReply &&body) : |
| | | xpr(steady_clock::now() + 10s), remote_(addr), head_(std::move(head)), body_(std::move(body)) {} |
| | | bool Expired() { return steady_clock::now() > xpr; } |
| | | }; |
| | | typedef std::list<FailedMsg> Queue; |
| | | Synced<Queue> queue_; |
| | | |
| | | public: |
| | | void Push(const std::string &remote, BHMsgHead &&head, MsgRequestTopicReply &&body) |
| | | { |
| | | queue_->emplace_back(remote, std::move(head), std::move(body)); |
| | | } |
| | | void TrySend(ShmSocket &socket, const int timeout_ms = 0) |
| | | { |
| | | queue_.Apply([&](Queue &q) { |
| | | if (!q.empty()) { |
| | | auto it = q.begin(); |
| | | do { |
| | | if (it->Expired()) { |
| | | // it->msg_.Release(socket.shm()); |
| | | it = q.erase(it); |
| | | } else if (socket.Send(it->remote_.data(), it->head_, it->body_, timeout_ms)) { |
| | | it = q.erase(it); |
| | | } else { |
| | | ++it; |
| | | } |
| | | } while (it != q.end()); |
| | | } |
| | | }); |
| | | } |
| | | }; |
| | | |
| | | } // namespace |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm) |
| | | { |
| | | SockNode().Start(); |
| | | } |
| | | TopicNode::~TopicNode() |
| | | { |
| | | StopAll(); |
| | | SockNode().Stop(); |
| | | } |
| | | void TopicNode::StopAll() |
| | | { |
| | | ServerStop(); |
| | | ClientStopWorker(); |
| | | } |
| | | |
| | | bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id())); |
| | | AddRoute(head, SockNode().id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply; |
| | | r = r && reply.ParseBody(reply_body); |
| | | if (r) { |
| | | info_ = body; |
| | | } |
| | | return r; |
| | | } |
| | | |
| | | bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | //TODO check registered |
| | | |
| | | auto head(InitMsgHead(GetType(body), proc_id())); |
| | | AddRoute(head, SockReply().id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply; |
| | | r = r && reply.ParseBody(reply_body); |
| | | return r; |
| | | } |
| | | |
| | | bool TopicNode::ServerStart(const OnRequest &rcb, int nworker) |
| | | { |
| | | //TODO check registered |
| | | |
| | | auto failed_q = std::make_shared<ServerFailedQ>(); |
| | | |
| | | auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); }; |
| | | |
| | | auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) { |
| | | MsgRequestTopic req; |
| | | if (imsg.ParseBody(req)) { |
| | | std::string out; |
| | | if (rcb(req.topic(), req.data(), out)) { |
| | | MsgRequestTopicReply reply_body; |
| | | reply_body.set_data(std::move(out)); |
| | | BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id())); |
| | | |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | | } |
| | | if (!sock.Send(head.route().rbegin()->mq_id().data(), reply_head, reply_body, 10)) { |
| | | failed_q->Push(head.route().rbegin()->mq_id(), std::move(reply_head), std::move(reply_body)); |
| | | } |
| | | } |
| | | } |
| | | } else { |
| | | // ignored, or dropped |
| | | } |
| | | |
| | | onIdle(sock); |
| | | }; |
| | | |
| | | return rcb && SockReply().Start(onRecv, onIdle, nworker); |
| | | } |
| | | bool TopicNode::ServerStop() { return SockReply().Stop(); } |
| | | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms) |
| | | { |
| | | MsgI imsg; |
| | | BHMsgHead head; |
| | | if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { |
| | | MsgRequestTopic request; |
| | | if (imsg.ParseBody(request)) { |
| | | request.mutable_topic()->swap(topic); |
| | | request.mutable_data()->swap(data); |
| | | SrcInfo *p = new SrcInfo; |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | src_info = p; |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms) |
| | | { |
| | | SrcInfo *p = static_cast<SrcInfo *>(src_info); |
| | | DEFER1(delete p); |
| | | if (!p || p->route.empty()) { |
| | | return false; |
| | | } |
| | | MsgRequestTopicReply body; |
| | | body.set_data(data); |
| | | BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id)); |
| | | |
| | | for (unsigned i = 0; i < p->route.size() - 1; ++i) { |
| | | head.add_route()->Swap(&p->route[i]); |
| | | } |
| | | |
| | | return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms); |
| | | } |
| | | |
| | | bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) |
| | | { |
| | | if (!cb) { |
| | | return false; |
| | | } |
| | | auto onData = [this, cb](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply reply; |
| | | if (imsg.ParseBody(reply)) { |
| | | cb(reply.data()); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | return SockRequest().Start(onData, nworker); |
| | | } |
| | | bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); } |
| | | |
| | | bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | | auto &sock = SockRequest(); |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data(data, size); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | if (cb) { |
| | | auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | if (head.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply reply; |
| | | if (imsg.ParseBody(reply)) { |
| | | cb(reply.data()); |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(remote, head, req, timeout_ms, onRecv); |
| | | } else { |
| | | return sock.Send(remote, head, req, timeout_ms); |
| | | } |
| | | }; |
| | | |
| | | try { |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { |
| | | return Call(addr.mq_id().data()); |
| | | } else { |
| | | return false; |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool TopicNode::ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms) |
| | | { |
| | | try { |
| | | auto &sock = SockRequest(); |
| | | BHAddress addr; |
| | | if (ClientQueryRPCTopic(topic, addr, timeout_ms)) { |
| | | |
| | | MsgRequestTopic req; |
| | | req.set_topic(topic); |
| | | req.set_data(data, size); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_);); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(addr.mq_id().data(), head, req, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) { |
| | | MsgRequestTopicReply dr; |
| | | if (reply.ParseBody(dr)) { |
| | | dr.mutable_data()->swap(out); |
| | | return true; |
| | | } else { |
| | | printf("error parse reply.\n"); |
| | | } |
| | | } else { |
| | | printf("error recv data. line: %d\n", __LINE__); |
| | | } |
| | | } else { |
| | | printf("error recv data. line: %d\n", __LINE__); |
| | | } |
| | | } catch (...) { |
| | | printf("error recv data. line: %d\n", __LINE__); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | { |
| | | auto &sock = SockRequest(); |
| | | if (topic_query_cache_.Find(topic, addr)) { |
| | | return true; |
| | | } |
| | | |
| | | MsgQueryTopic query; |
| | | query.set_topic(topic); |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release(shm_)); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) { |
| | | if (reply_head.type() == kMsgTypeQueryTopicReply) { |
| | | MsgQueryTopicReply rep; |
| | | if (reply.ParseBody(rep)) { |
| | | addr = rep.address(); |
| | | if (addr.mq_id().empty()) { |
| | | return false; |
| | | } else { |
| | | topic_query_cache_.Update(topic, addr); |
| | | return true; |
| | | } |
| | | } |
| | | } |
| | | } else { |
| | | } |
| | | return false; |
| | | } |
| New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: topic_node.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年04月07日 09时05分26秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: Li Chao (), lichao@aiotlink.com |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef TOPIC_NODE_YVKWA6TF |
| | | #define TOPIC_NODE_YVKWA6TF |
| | | |
| | | #include "msg.h" |
| | | #include "pubsub.h" |
| | | #include "socket.h" |
| | | #include <memory> |
| | | |
| | | using namespace bhome_shm; |
| | | using namespace bhome_msg; |
| | | |
| | | // a node is a client. |
| | | class TopicNode |
| | | { |
| | | SharedMemory &shm_; |
| | | MsgRegister info_; |
| | | |
| | | public: |
| | | TopicNode(SharedMemory &shm); |
| | | ~TopicNode(); |
| | | bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms); |
| | | bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms); |
| | | |
| | | // topic rpc server |
| | | typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest; |
| | | bool ServerStart(OnRequest const &cb, const int nworker = 2); |
| | | bool ServerStop(); |
| | | bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms); |
| | | bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms); |
| | | |
| | | // topic client |
| | | typedef std::function<void(const std::string &data)> RequestResultCB; |
| | | bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2); |
| | | bool ClientStopWorker(); |
| | | bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()); |
| | | bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB()) |
| | | { |
| | | return ClientAsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); |
| | | } |
| | | bool ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); |
| | | bool ClientSyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms) |
| | | { |
| | | return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms); |
| | | } |
| | | |
| | | void StopAll(); |
| | | |
| | | private: |
| | | bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); |
| | | const std::string &proc_id() { return info_.proc().proc_id(); } |
| | | |
| | | typedef bhome_msg::BHAddress Address; |
| | | class TopicQueryCache |
| | | { |
| | | class Impl |
| | | { |
| | | typedef std::unordered_map<Topic, Address> Store; |
| | | Store store_; |
| | | |
| | | public: |
| | | bool Find(const Topic &topic, Address &addr) |
| | | { |
| | | auto pos = store_.find(topic); |
| | | if (pos != store_.end()) { |
| | | addr = pos->second; |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | } |
| | | bool Update(const Topic &topic, const Address &addr) |
| | | { |
| | | store_[topic] = addr; |
| | | return true; |
| | | } |
| | | }; |
| | | Synced<Impl> impl_; |
| | | // Impl &impl() |
| | | // { |
| | | // thread_local Impl impl; |
| | | // return impl; |
| | | // } |
| | | |
| | | public: |
| | | bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); } |
| | | bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); } |
| | | }; |
| | | |
| | | // some sockets may be the same one, using functions make it easy to change. |
| | | |
| | | auto &SockNode() { return sock_node_; } |
| | | auto &SockSub() { return sock_sub_; } |
| | | auto &SockRequest() { return sock_request_; } |
| | | auto &SockReply() { return sock_reply_; } |
| | | |
| | | ShmSocket sock_node_; |
| | | ShmSocket sock_request_; |
| | | ShmSocket sock_reply_; |
| | | SocketSubscribe sock_sub_; |
| | | |
| | | TopicQueryCache topic_query_cache_; |
| | | }; |
| | | |
| | | #endif // end of include guard: TOPIC_NODE_YVKWA6TF |
| | |
| | | BOOST_CHECK(!p); |
| | | BOOST_CHECK(p.get() == 0); |
| | | const char *str = "basic"; |
| | | p = str; |
| | | p = str; |
| | | BOOST_CHECK(p); |
| | | BOOST_CHECK(p.get() == str); |
| | | p = 0; |
| | |
| | | auto Code = [&](int id) { |
| | | typedef ShmObject<s1000> Int; |
| | | std::string name = std::to_string(id); |
| | | auto a0 = Avail(); |
| | | auto a0 = Avail(); |
| | | Int i1(shm, name); |
| | | auto a1 = Avail(); |
| | | BOOST_CHECK_LT(a1, a0); |
| | |
| | | |
| | | { |
| | | auto old = Avail(); |
| | | void *p = shm.Alloc(1024); |
| | | void *p = shm.Alloc(1024); |
| | | shm.Dealloc(p); |
| | | BOOST_CHECK_EQUAL(old, Avail()); |
| | | } |
| | |
| | | // boost::timer::auto_cpu_timer timer; |
| | | ThreadManager threads; |
| | | int nthread = 1; |
| | | int nloop = 1; |
| | | int nloop = 1; |
| | | for (int i = 0; i < nthread; ++i) { |
| | | threads.Launch(BasicTest, i, nloop); |
| | | } |
| | |
| | | int ms = i * 100; |
| | | printf("Timeout Test %4d: ", ms); |
| | | boost::timer::auto_cpu_timer timer; |
| | | BHMsg msg; |
| | | MsgI msg; |
| | | bool r = q.Recv(msg, ms); |
| | | BOOST_CHECK(!r); |
| | | } |
| | |
| | | MQId id = boost::uuids::random_generator()(); |
| | | const int timeout = 100; |
| | | const uint32_t data_size = 4000; |
| | | const std::string proc_id = "demo_proc"; |
| | | |
| | | auto Writer = [&](int writer_id, uint64_t n) { |
| | | SharedMemory shm(shm_name, mem_size); |
| | | ShmMsgQueue mq(shm, 64); |
| | | std::string str(data_size, 'a'); |
| | | MsgI msg; |
| | | MsgRequestTopic body; |
| | | body.set_topic("topic"); |
| | | body.set_data(str); |
| | | auto head(InitMsgHead(GetType(body), proc_id)); |
| | | msg.MakeRC(shm, head, body); |
| | | DEFER1(msg.Release(shm);); |
| | | msg.MakeRC(shm, MakeRequest(mq.Id(), "topic", str.data(), str.size())); |
| | | |
| | | for (uint64_t i = 0; i < n; ++i) { |
| | | // mq.Send(id, str.data(), str.size(), timeout); |
| | | mq.Send(id, msg, timeout); |
| | |
| | | SharedMemory shm(shm_name, mem_size); |
| | | ShmMsgQueue mq(id, shm, 1000); |
| | | while (*run) { |
| | | BHMsg msg; |
| | | MsgI msg; |
| | | BHMsgHead head; |
| | | if (mq.Recv(msg, timeout)) { |
| | | DEFER1(msg.Release(shm)); |
| | | // ok |
| | | } else if (isfork) { |
| | | exit(0); // for forked quit after 1s. |
| | |
| | | const size_t msg_length = 1000; |
| | | std::string msg_content(msg_length, 'a'); |
| | | msg_content[20] = '\0'; |
| | | const std::string client_proc_id = "client_proc"; |
| | | const std::string server_proc_id = "server_proc"; |
| | | |
| | | SharedMemory shm(shm_name, 1024 * 1024 * 50); |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | |
| | | ShmMsgQueue cli(shm, qlen); |
| | | |
| | | MsgI request_rc; |
| | | request_rc.MakeRC(shm, MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size())); |
| | | MsgRequestTopic req_body; |
| | | req_body.set_topic("topic"); |
| | | req_body.set_data(msg_content); |
| | | auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); |
| | | request_rc.MakeRC(shm, req_head, req_body); |
| | | |
| | | MsgRequestTopic reply_body; |
| | | reply_body.set_topic("topic"); |
| | | reply_body.set_data(msg_content); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id)); |
| | | MsgI reply_rc; |
| | | reply_rc.MakeRC(shm, MakeReply("fakemsgid", msg_content.data(), msg_content.size())); |
| | | reply_rc.MakeRC(shm, reply_head, reply_body); |
| | | |
| | | std::atomic<uint64_t> count(0); |
| | | |
| | |
| | | auto Client = [&](int cli_id, int nmsg) { |
| | | for (int i = 0; i < nmsg; ++i) { |
| | | auto Req = [&]() { |
| | | return cli.Send(srv.Id(), MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()), 100); |
| | | MsgRequestTopic req_body; |
| | | req_body.set_topic("topic"); |
| | | req_body.set_data(msg_content); |
| | | auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); |
| | | return cli.Send(srv.Id(), req_head, req_body, 100); |
| | | }; |
| | | auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); }; |
| | | |
| | |
| | | printf("********** client send error.\n"); |
| | | continue; |
| | | } |
| | | BHMsg msg; |
| | | MsgI msg; |
| | | BHMsgHead head; |
| | | if (!cli.Recv(msg, 1000)) { |
| | | printf("********** client recv error.\n"); |
| | | } else { |
| | | DEFER1(msg.Release(shm)); |
| | | ++count; |
| | | auto cur = Now(); |
| | | if (last_time.exchange(cur) < cur) { |
| | |
| | | |
| | | std::atomic<bool> stop(false); |
| | | auto Server = [&]() { |
| | | BHMsg req; |
| | | while (!stop) { |
| | | if (srv.Recv(req, 100) && req.type() == kMsgTypeRequestTopic) { |
| | | auto &mqid = req.route()[0].mq_id(); |
| | | MQId src_id; |
| | | memcpy(&src_id, mqid.data(), sizeof(src_id)); |
| | | auto Reply = [&]() { |
| | | return srv.Send(src_id, MakeReply(req.msg_id(), msg_content.data(), msg_content.size()), 100); |
| | | }; |
| | | auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); }; |
| | | MsgI req; |
| | | BHMsgHead req_head; |
| | | |
| | | if (ReplyRC()) { |
| | | while (!stop) { |
| | | if (srv.Recv(req, 100)) { |
| | | DEFER1(req.Release(shm)); |
| | | if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { |
| | | auto &mqid = req_head.route()[0].mq_id(); |
| | | MQId src_id; |
| | | memcpy(&src_id, mqid.data(), sizeof(src_id)); |
| | | auto Reply = [&]() { |
| | | MsgRequestTopic reply_body; |
| | | reply_body.set_topic("topic"); |
| | | reply_body.set_data(msg_content); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); |
| | | return srv.Send(src_id, reply_head, reply_body, 100); |
| | | }; |
| | | auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); }; |
| | | |
| | | if (ReplyRC()) { |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | #include "center.h" |
| | | #include "defs.h" |
| | | #include "pubsub.h" |
| | | #include "pubsub_center.h" |
| | | #include "reqrep_center.h" |
| | | #include "socket.h" |
| | | #include "topic_reply.h" |
| | | #include "topic_request.h" |
| | | #include "topic_node.h" |
| | | #include "util.h" |
| | | #include <atomic> |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | |
| | | #include <string> |
| | | #include <thread> |
| | | #include <vector> |
| | | using namespace bhome_msg; |
| | | |
| | | template <class A, class B> |
| | | struct IsSameType { |
| | |
| | | int *flag = shm.find_or_construct<int>("flag")(123); |
| | | printf("flag = %d\n", *flag); |
| | | ++*flag; |
| | | const std::string sub_proc_id = "subscriber"; |
| | | const std::string pub_proc_id = "publisher"; |
| | | |
| | | PubSubCenter bus(shm); |
| | | bus.Start(); |
| | | BHCenter center(shm); |
| | | center.Start(); |
| | | |
| | | std::this_thread::sleep_for(100ms); |
| | | |
| | |
| | | const int timeout = 1000; |
| | | auto Sub = [&](int id, const std::vector<std::string> &topics) { |
| | | SocketSubscribe client(shm); |
| | | bool r = client.Subscribe(topics, timeout); |
| | | bool r = client.Subscribe(sub_proc_id, topics, timeout); |
| | | std::mutex mutex; |
| | | std::condition_variable cv; |
| | | |
| | | std::atomic<uint64_t> n(0); |
| | | auto OnTopicData = [&](const std::string &topic, const std::string &data) { |
| | | auto OnTopicData = [&](const std::string &proc_id, const std::string &topic, const std::string &data) { |
| | | ++total_count; |
| | | |
| | | auto cur = Now(); |
| | |
| | | for (unsigned i = 0; i < nmsg; ++i) { |
| | | std::string data = topic + std::to_string(i) + std::string(1000, '-'); |
| | | |
| | | bool r = provider.Publish(topic, data, timeout); |
| | | bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout); |
| | | if (!r) { |
| | | printf("pub ret: %s\n", r ? "ok" : "fail"); |
| | | } |
| | |
| | | std::cout << "end : " << Now(); |
| | | printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n", |
| | | total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail()); |
| | | |
| | | bus.Stop(); |
| | | } |
| | | |
| | | namespace |
| | | { |
| | | struct C { |
| | |
| | | printf("flag = %d\n", *flag); |
| | | ++*flag; |
| | | |
| | | const std::string client_proc_id = "client_proc_"; |
| | | const std::string server_proc_id = "server_proc_"; |
| | | |
| | | BHCenter center(shm); |
| | | center.Start(); |
| | | std::atomic<bool> run(true); |
| | | |
| | | auto Client = [&](const std::string &topic, const int nreq) { |
| | | SocketRequest client(shm); |
| | | TopicNode client(shm); |
| | | MsgRegister reg; |
| | | reg.mutable_proc()->set_proc_id(client_proc_id + topic); |
| | | MsgCommonReply reply_body; |
| | | |
| | | if (!client.Register(reg, reply_body, 1000)) { |
| | | printf("client register failed\n"); |
| | | return; |
| | | } |
| | | |
| | | std::atomic<int> count(0); |
| | | std::string reply; |
| | | auto onRecv = [&](const std::string &rep) { |
| | |
| | | printf("count: %d\n", count.load()); |
| | | } |
| | | }; |
| | | client.StartWorker(onRecv, 2); |
| | | client.ClientStartWorker(onRecv, 2); |
| | | boost::timer::auto_cpu_timer timer; |
| | | for (int i = 0; i < nreq; ++i) { |
| | | if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) { |
| | | if (!client.ClientAsyncRequest(topic, "data " + std::to_string(i), 1000)) { |
| | | printf("client request failed\n"); |
| | | ++count; |
| | | } |
| | | |
| | | // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) { |
| | | // printf("client request failed\n"); |
| | | // } else { |
| | | // ++count; |
| | | // } |
| | | // ++count; |
| | | } |
| | | do { |
| | | std::this_thread::yield(); |
| | | } while (count.load() < nreq); |
| | | client.Stop(); |
| | | client.ClientStopWorker(); |
| | | printf("request %s %d done ", topic.c_str(), count.load()); |
| | | }; |
| | | std::atomic_uint64_t server_msg_count(0); |
| | | auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { |
| | | SocketReply server(shm); |
| | | ProcInfo info; |
| | | info.set_id(name); |
| | | info.set_name(name); |
| | | if (!server.Register(info, topics, 100)) { |
| | | printf("register failed\n"); |
| | | TopicNode server(shm); |
| | | MsgRegister reg; |
| | | reg.mutable_proc()->set_proc_id(server_proc_id); |
| | | reg.mutable_proc()->set_name(name); |
| | | MsgCommonReply reply_body; |
| | | |
| | | if (!server.Register(reg, reply_body, 100)) { |
| | | printf("server register failed\n"); |
| | | return; |
| | | } |
| | | |
| | | auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) { |
| | | ++server_msg_count; |
| | | reply = topic + ':' + data; |
| | | return true; |
| | | }; |
| | | server.StartWorker(onData); |
| | | server.ServerStart(onData); |
| | | |
| | | MsgRegisterRPC rpc; |
| | | for (auto &topic : topics) { |
| | | rpc.add_topics(topic); |
| | | } |
| | | if (!server.RegisterRPC(rpc, reply_body, 100)) { |
| | | printf("server register topic failed\n"); |
| | | return; |
| | | } |
| | | |
| | | while (run) { |
| | | std::this_thread::yield(); |
| | | } |
| | |
| | | servers.Launch(Server, "server", topics); |
| | | std::this_thread::sleep_for(100ms); |
| | | for (auto &t : topics) { |
| | | clients.Launch(Client, t, 1000 * 100); |
| | | clients.Launch(Client, t, 1000 * 1); |
| | | } |
| | | clients.WaitAll(); |
| | | printf("clients done, server replyed: %d\n", server_msg_count.load()); |