refactor center; add async request no cb.
| | |
| | | */ |
| | | #include "center.h" |
| | | #include "defs.h" |
| | | #include "pubsub_center.h" |
| | | #include "reqrep_center.h" |
| | | #include "shm.h" |
| | | |
| | | using namespace bhome_shm; |
| | |
| | | static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64); |
| | | return shm; |
| | | } |
| | | |
| | | BHCenter::BHCenter(Socket::Shm &shm) : |
| | | socket_(shm) {} |
| | | |
| | | BHCenter::BHCenter() : |
| | | BHCenter(BHomeShm()) {} |
| | | |
| | | bool BHCenter::Start() |
| | | { |
| | | return false; |
| | | } |
| | |
| | | #ifndef CENTER_TM9OUQTG |
| | | #define CENTER_TM9OUQTG |
| | | |
| | | #include "socket.h" |
| | | #include <functional> |
| | | |
| | | class BHCenter |
| | | { |
| | | typedef ShmSocket Socket; |
| | | |
| | | public: |
| | | typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler; |
| | | |
| | | BHCenter(Socket::Shm &shm); |
| | | BHCenter(); |
| | | ~BHCenter() { Stop(); } |
| | | bool Start(); |
| | | bool Stop() { return socket_.Stop(); } |
| | | |
| | | private: |
| | | ShmSocket socket_; |
| | | }; |
| | | |
| | | #endif // end of include guard: CENTER_TM9OUQTG |
| | |
| | | |
| | | } // namespace |
| | | |
| | | bool PubSubCenter::Start(const int nworker) |
| | | BHCenter::MsgHandler MakeBusCenter() |
| | | { |
| | | auto bus_ptr = std::make_shared<Synced<BusCenter>>(); |
| | | |
| | | auto onRecv = [bus_ptr, this](MsgI &imsg) { |
| | | return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { |
| | | #ifndef NDEBUG |
| | | static std::atomic<time_t> last(0); |
| | | time_t now = 0; |
| | | time(&now); |
| | | if (last.exchange(now) < now) { |
| | | printf("bus queue size: %ld\n", socket_.Pending()); |
| | | printf("bus queue size: %ld\n", socket.Pending()); |
| | | } |
| | | #endif |
| | | auto &bus = *bus_ptr; |
| | | |
| | | BHMsg msg; |
| | | if (!imsg.Unpack(msg)) { |
| | | return; |
| | | } |
| | | auto &shm = socket.shm(); |
| | | |
| | | auto OnSubChange = [&](auto &&update) { |
| | | DataSub sub; |
| | |
| | | update(client, sub.topics()); |
| | | } |
| | | }; |
| | | |
| | | auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); }; |
| | | auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); }; |
| | | |
| | |
| | | }; |
| | | |
| | | if (imsg.IsCounted()) { |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); }); |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); }); |
| | | } else { |
| | | MsgI pubmsg; |
| | | if (!pubmsg.MakeRC(shm(), msg)) { return; } |
| | | DEFER1(pubmsg.Release(shm())); |
| | | if (!pubmsg.MakeRC(shm, msg)) { return; } |
| | | DEFER1(pubmsg.Release(shm)); |
| | | |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); }); |
| | | Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); }); |
| | | } |
| | | }; |
| | | |
| | | switch (msg.type()) { |
| | | case kMsgTypeSubscribe: OnSubChange(Sub); break; |
| | | case kMsgTypeUnsubscribe: OnSubChange(Unsub); break; |
| | | case kMsgTypePublish: OnPublish(); break; |
| | | default: break; |
| | | case kMsgTypeSubscribe: OnSubChange(Sub); return true; |
| | | case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true; |
| | | case kMsgTypePublish: OnPublish(); return true; |
| | | default: return false; |
| | | } |
| | | }; |
| | | } |
| | | |
| | | bool PubSubCenter::Start(const int nworker) |
| | | { |
| | | auto handler = MakeBusCenter(); |
| | | printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler)); |
| | | |
| | | const int kMaxWorker = 16; |
| | | return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); |
| | | return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); |
| | | } |
| | |
| | | #ifndef PUBSUB_CENTER_MFSUZJU7 |
| | | #define PUBSUB_CENTER_MFSUZJU7 |
| | | |
| | | #include "center.h" |
| | | #include "defs.h" |
| | | #include "socket.h" |
| | | #include <mutex> |
| | | #include <set> |
| | | #include <unordered_map> |
| | | |
| | | BHCenter::MsgHandler MakeBusCenter(); |
| | | |
| | | // publish/subcribe manager. |
| | | class PubSubCenter |
| | | { |
| | | class SocketBus : public ShmSocket |
| | | { |
| | | public: |
| | | SocketBus(ShmSocket::Shm &shm) : |
| | | ShmSocket(shm, &kBHTopicBus, 1000) {} |
| | | using ShmSocket::shm; |
| | | }; |
| | | SocketBus socket_; |
| | | ShmSocket::Shm &shm() { return socket_.shm(); } |
| | | ShmSocket socket_; |
| | | |
| | | public: |
| | | PubSubCenter(ShmSocket::Shm &shm) : |
| | | socket_(shm) {} |
| | | socket_(shm, &kBHTopicBus, 1000) {} |
| | | PubSubCenter() : |
| | | PubSubCenter(BHomeShm()) {} |
| | | ~PubSubCenter() { Stop(); } |
| | |
| | | bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker) |
| | | { |
| | | auto AsyncRecvProc = [this, rrcb](BHMsg &msg) { |
| | | auto Find = [&](RecvCB &cb) { |
| | | auto Find = [&](RecvBHMsgCB &cb) { |
| | | std::lock_guard<std::mutex> lock(mutex()); |
| | | const std::string &msgid = msg.msg_id(); |
| | | auto pos = async_cbs_.find(msgid); |
| | |
| | | } |
| | | }; |
| | | |
| | | RecvCB cb; |
| | | if (Find(cb) && cb) { |
| | | RecvBHMsgCB cb; |
| | | if (Find(cb)) { |
| | | cb(msg); |
| | | } else if (rrcb && msg.type() == kMsgTypeReply) { |
| | | } else if (msg.type() == kMsgTypeReply) { |
| | | DataReply reply; |
| | | if (reply.ParseFromString(msg.body())) { |
| | | rrcb(reply.data()); |
| | |
| | | return Start(AsyncRecvProc, nworker); |
| | | } |
| | | |
| | | bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms) |
| | | { |
| | | try { |
| | | BHAddress addr; |
| | | if (QueryRPCTopic(topic, addr, timeout_ms)) { |
| | | const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size)); |
| | | return AsyncSend(addr.mq_id().data(), &msg, timeout_ms); |
| | | } else { |
| | | return false; |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) |
| | | bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms) |
| | | { |
| | | assert(remote && pmsg); |
| | | try { |
| | | const BHMsg &msg = *static_cast<const BHMsg *>(pmsg); |
| | | return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms); |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb) |
| | | { |
| | | assert(remote && pmsg); |
| | | try { |
| | |
| | | bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); } |
| | | bool Stop() { return Socket::Stop(); } |
| | | bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); |
| | | bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms); |
| | | |
| | | bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb) |
| | | { |
| | | return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb); |
| | | } |
| | | bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms) |
| | | { |
| | | return AsyncRequest(topic, data.data(), data.size(), timeout_ms); |
| | | } |
| | | bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms); |
| | | bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms) |
| | |
| | | } |
| | | |
| | | private: |
| | | bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb); |
| | | bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvBHMsgCB &cb); |
| | | bool AsyncSend(const void *remote, const void *msg, const int timeout_ms); |
| | | bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms); |
| | | bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms); |
| | | std::unordered_map<std::string, RecvCB> async_cbs_; |
| | | std::unordered_map<std::string, RecvBHMsgCB> async_cbs_; |
| | | |
| | | typedef bhome_msg::BHAddress Address; |
| | | class TopicCache |
| | |
| | | std::unordered_map<Topic, WeakNode> topic_map_; |
| | | std::unordered_map<ProcId, Node> nodes_; |
| | | }; |
| | | |
| | | Synced<NodeCenter> &Center() |
| | | { |
| | | static Synced<NodeCenter> s; |
| | | return s; |
| | | } |
| | | |
| | | } // namespace |
| | | |
| | | bool ReqRepCenter::Start(const int nworker) |
| | | BHCenter::MsgHandler MakeReqRepCenter() |
| | | { |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(); |
| | | auto onRecv = [center_ptr, this](BHMsg &msg) { |
| | | return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) { |
| | | auto ¢er = *center_ptr; |
| | | auto &shm = socket.shm(); |
| | | |
| | | #ifndef NDEBUG |
| | | static std::atomic<time_t> last(0); |
| | | time_t now = 0; |
| | | time(&now); |
| | | if (last.exchange(now) < now) { |
| | | printf("bus queue size: %ld\n", socket_.Pending()); |
| | | printf("bus queue size: %ld\n", socket.Pending()); |
| | | } |
| | | #endif |
| | | if (msg.route_size() == 0) { |
| | | return; |
| | | } |
| | | auto &src_mq = msg.route(0).mq_id(); |
| | | auto SrcMQ = [&]() { return msg.route(0).mq_id(); }; |
| | | |
| | | auto OnRegister = [&]() { |
| | | if (msg.route_size() != 1) { return; } |
| | | |
| | | DataProcRegister reg; |
| | | if (reg.ParseFromString(msg.body()) && reg.has_proc()) { |
| | | center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end()); |
| | | center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end()); |
| | | } |
| | | }; |
| | | |
| | | auto OnHeartbeat = [&]() { |
| | | if (msg.route_size() != 1) { return; } |
| | | auto &src_mq = msg.route(0).mq_id(); |
| | | |
| | | DataProcHeartbeat hb; |
| | | if (hb.ParseFromString(msg.body()) && hb.has_proc()) { |
| | | center->Heartbeat(*hb.mutable_proc(), src_mq); |
| | | center->Heartbeat(*hb.mutable_proc(), SrcMQ()); |
| | | } |
| | | }; |
| | | |
| | | auto OnQueryTopic = [&]() { |
| | | if (msg.route_size() != 1) { return; } |
| | | |
| | | DataProcQueryTopic query; |
| | | NodeCenter::ProcAddr dest; |
| | | if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) { |
| | | MQId remote; |
| | | memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote)); |
| | | memcpy(&remote, SrcMQ().data(), sizeof(MQId)); |
| | | MsgI imsg; |
| | | if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; } |
| | | if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) { |
| | | imsg.Release(shm()); |
| | | if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; } |
| | | if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) { |
| | | imsg.Release(shm); |
| | | } |
| | | } |
| | | }; |
| | | |
| | | switch (msg.type()) { |
| | | case kMsgTypeProcRegisterTopics: OnRegister(); break; |
| | | case kMsgTypeProcHeartbeat: OnHeartbeat(); break; |
| | | case kMsgTypeProcQueryTopic: OnQueryTopic(); break; |
| | | default: break; |
| | | case kMsgTypeProcRegisterTopics: OnRegister(); return true; |
| | | case kMsgTypeProcHeartbeat: OnHeartbeat(); return true; |
| | | case kMsgTypeProcQueryTopic: OnQueryTopic(); return true; |
| | | default: return false; |
| | | } |
| | | }; |
| | | } |
| | | |
| | | bool ReqRepCenter::Start(const int nworker) |
| | | { |
| | | auto handler = MakeReqRepCenter(); |
| | | printf("sizeof(rep/req handler) = %ld\n", sizeof(handler)); |
| | | |
| | | const int kMaxWorker = 16; |
| | | return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); |
| | | return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker)); |
| | | } |
| | |
| | | #ifndef REQREP_CENTER_US3RBM60 |
| | | #define REQREP_CENTER_US3RBM60 |
| | | |
| | | #include "center.h" |
| | | #include "defs.h" |
| | | #include "socket.h" |
| | | |
| | | BHCenter::MsgHandler MakeReqRepCenter(); |
| | | class ReqRepCenter |
| | | { |
| | | class Socket : public ShmSocket |
| | | { |
| | | public: |
| | | Socket(ShmSocket::Shm &shm) : |
| | | ShmSocket(shm, &kBHTopicReqRepCenter, 1000) {} |
| | | using ShmSocket::shm; |
| | | }; |
| | | Socket socket_; |
| | | ShmSocket::Shm &shm() { return socket_.shm(); } |
| | | ShmSocket socket_; |
| | | |
| | | public: |
| | | ReqRepCenter(ShmSocket::Shm &shm) : |
| | | socket_(shm) {} |
| | | socket_(shm, &kBHTopicReqRepCenter, 1000) {} |
| | | ReqRepCenter() : |
| | | ReqRepCenter(BHomeShm()) {} |
| | | ~ReqRepCenter() { Stop(); } |
| | |
| | | Stop(); //TODO should stop in sub class, incase thread access sub class data. |
| | | } |
| | | |
| | | bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker) |
| | | bool ShmSocket::Start(const RecvCB &onData, int nworker) |
| | | { |
| | | if (!mq_) { |
| | | return false; |
| | |
| | | try { |
| | | MsgI imsg; |
| | | DEFER1(imsg.Release(shm_)); |
| | | if (mq_->Recv(imsg, 100)) { onData(imsg); } |
| | | if (mq_->Recv(imsg, 100)) { |
| | | BHMsg msg; |
| | | if (imsg.Unpack(msg)) { |
| | | onData(*this, imsg, msg); |
| | | } |
| | | } |
| | | } catch (...) { |
| | | } |
| | | } |
| | |
| | | workers_.emplace_back(RecvProc); |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | bool ShmSocket::Start(const RecvCB &onData, int nworker) |
| | | { |
| | | return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker); |
| | | } |
| | | |
| | | bool ShmSocket::Stop() |
| | |
| | | |
| | | public: |
| | | typedef bhome_shm::SharedMemory Shm; |
| | | typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB; |
| | | typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB; |
| | | typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB; |
| | | typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB; |
| | | |
| | | ShmSocket(Shm &shm, const void *id, const int len); |
| | | ShmSocket(Shm &shm, const int len = 12); |
| | | ~ShmSocket(); |
| | | |
| | | Shm &shm() { return shm_; } |
| | | // start recv. |
| | | bool Start(const RecvCB &onData, int nworker = 1); |
| | | bool StartRaw(const RecvRawCB &onData, int nworker = 1); |
| | | bool Start(const RecvBHMsgCB &onData, int nworker = 1) |
| | | { |
| | | return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, nworker); |
| | | } |
| | | bool Stop(); |
| | | size_t Pending() const { return mq_ ? mq_->Pending() : 0; } |
| | | |
| | | protected: |
| | | ShmSocket(Shm &shm, const void *id, const int len); |
| | | Shm &shm() { return shm_; } |
| | | const Shm &shm() const { return shm_; } |
| | | Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid. |
| | | const Queue &mq() const { return *mq_; } |
| | |
| | | |
| | | auto Client = [&](const std::string &topic, const int nreq) { |
| | | SocketRequest client(shm); |
| | | std::atomic<int> count(0); |
| | | std::string reply; |
| | | auto onRecv = [&](const std::string &rep) { |
| | | reply = rep; |
| | | if (++count >= nreq) { |
| | | printf("count: %d\n", count.load()); |
| | | } |
| | | }; |
| | | client.StartWorker(onRecv, 1); |
| | | boost::timer::auto_cpu_timer timer; |
| | | for (int i = 0; i < nreq; ++i) { |
| | | if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) { |
| | | if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) { |
| | | printf("client request failed\n"); |
| | | } |
| | | // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) { |
| | | // printf("client request failed\n"); |
| | | // } else { |
| | | // ++count; |
| | | // } |
| | | } |
| | | printf("request %s %d done ", topic.c_str(), nreq); |
| | | while (count.load() < nreq) { |
| | | std::this_thread::yield(); |
| | | } |
| | | client.Stop(); |
| | | }; |
| | | auto Server = [&](const std::string &name, const std::vector<std::string> &topics) { |
| | | SocketReply server(shm); |
| | |
| | | servers.Launch(Server, "server", topics); |
| | | std::this_thread::sleep_for(100ms); |
| | | for (auto &t : topics) { |
| | | clients.Launch(Client, t, 1000 * 100); |
| | | clients.Launch(Client, t, 1000 * 1000); |
| | | } |
| | | clients.WaitAll(); |
| | | run = false; |