| | |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); |
| | | MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; |
| | | MsgI msg; |
| | | MsgI msg(socket.shm()); |
| | | if (msg.Make(reply_head, rep_body)) { |
| | | DEFER1(msg.Release();); |
| | | center->SendAllocMsg(socket, remote, msg); |
| | |
| | | }; |
| | | } |
| | | |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm) |
| | | { |
| | | // command |
| | | auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { |
| | |
| | | auto onInit = [&](const int64_t request) { |
| | | return center->OnNodeInit(socket, request); |
| | | }; |
| | | BHCenterHandleInit(onInit); |
| | | BHCenterHandleInit(socket.shm(), onInit); |
| | | center->OnTimer(); |
| | | }; |
| | | |
| | |
| | | default: return false; |
| | | } |
| | | }; |
| | | BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); |
| | | BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); |
| | | |
| | | auto OnBusIdle = [=](ShmSocket &socket) {}; |
| | | auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; |
| | |
| | | } |
| | | }; |
| | | |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); |
| | | |
| | | return true; |
| | | } |
| | |
| | | { |
| | | auto nsec = NodeTimeoutSec(); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | AddCenter(center_ptr); |
| | | AddCenter(center_ptr, shm); |
| | | |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | |
| | | |
| | | topic_node_.reset(new CenterTopicNode(center_ptr, shm)); |
| | | } |
| | | |
| | | BHCenter::~BHCenter() { Stop(); } |
| | | |
| | | bool BHCenter::Start() |