add node init msg, alloc msgq on success.
| | |
| | | |
| | | // center name, no relative to shm. |
| | | const std::string &id() const { return id_; } |
| | | void OnNodeInit(const int64_t msg) |
| | | { |
| | | MQId ssn = msg; |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | for (int i = 0; i < 10; ++i) { |
| | | node->addrs_.insert(ssn + i); |
| | | } |
| | | node->state_.timestamp_ = NowSec() - offline_time_; |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | }; |
| | | |
| | | Node node(new NodeInfo); |
| | | UpdateRegInfo(node); |
| | | nodes_[ssn] = node; |
| | | printf("new node ssn (%ld) init\n", ssn); |
| | | } |
| | | MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) |
| | | { |
| | | if (msg.proc().proc_id() != head.proc_id()) { |
| | |
| | | Node node(new NodeInfo); |
| | | UpdateRegInfo(node); |
| | | nodes_[ssn] = node; |
| | | } |
| | | printf("node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn); |
| | | |
| | | printf("new node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn); |
| | | auto old = online_node_addr_map_.find(head.proc_id()); |
| | | if (old != online_node_addr_map_.end()) { // old session |
| | | auto &old_ssn = old->second; |
| | | auto old = online_node_addr_map_.find(head.proc_id()); |
| | | if (old != online_node_addr_map_.end()) { // old session |
| | | auto &old_ssn = old->second; |
| | | if (old_ssn != ssn) { |
| | | nodes_[old_ssn]->state_.PutOffline(offline_time_); |
| | | printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second); |
| | | old_ssn = ssn; |
| | | } else { |
| | | online_node_addr_map_.emplace(head.proc_id(), ssn); |
| | | } |
| | | } else { |
| | | online_node_addr_map_.emplace(head.proc_id(), ssn); |
| | | } |
| | | return MakeReply(eSuccess); |
| | | } catch (...) { |
| | |
| | | msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ |
| | | return true; |
| | | |
| | | bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner) |
| | | auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) |
| | | { |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2); |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); |
| | | auto remote = head.route(0).mq_id(); |
| | | socket.Send(remote, reply_head, rep_body); |
| | | }; |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); |
| | | auto remote = head.route(0).mq_id(); |
| | | socket.Send(remote, reply_head, rep_body); |
| | | }; |
| | | } |
| | | |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) |
| | | { |
| | | auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) { |
| | | auto ¢er = *center_ptr; |
| | | center->OnNodeInit(msg.Offset()); |
| | | }; |
| | | auto Nothing = [](ShmSocket &socket) {}; |
| | | |
| | | BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16); |
| | | |
| | | auto OnCenterIdle = [center_ptr](ShmSocket &socket) { |
| | | auto ¢er = *center_ptr; |
| | |
| | | default: return false; |
| | | } |
| | | }; |
| | | BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); |
| | | |
| | | auto OnBusIdle = [=](ShmSocket &socket) {}; |
| | | auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | |
| | | } |
| | | }; |
| | | |
| | | BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000); |
| | | |
| | | return true; |
| | |
| | | |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) |
| | | { |
| | | Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len}; |
| | | Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len}; |
| | | return true; |
| | | } |
| | | bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) |
| | | { |
| | | Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len}; |
| | | return true; |
| | | } |
| | | |
| | |
| | | { |
| | | auto gc = [&](const MQId id) { |
| | | auto r = ShmSocket::Remove(shm, id); |
| | | printf("remove mq %ld : %s\n", id, (r ? "ok" : "failed")); |
| | | if (r) { |
| | | printf("remove mq %ld ok\n", id); |
| | | } |
| | | }; |
| | | |
| | | AddCenter("#bhome_center", gc); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, 6s, 6s * 2); |
| | | AddCenter(center_ptr); |
| | | |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | |
| | | { |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | | sockets_[info.name_]->Start(info.handler_, info.idle_); |
| | | if (info.handler_) { |
| | | sockets_[info.name_]->Start(info.handler_, info.idle_); |
| | | } else { |
| | | sockets_[info.name_]->Start(info.raw_handler_, info.idle_); |
| | | } |
| | | } |
| | | |
| | | return true; |
| | |
| | | |
| | | public: |
| | | typedef Socket::PartialRecvCB MsgHandler; |
| | | typedef Socket::RawRecvCB MsgIHandler; |
| | | typedef Socket::IdleCB IdleHandler; |
| | | static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len); |
| | | static bool Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len); |
| | | |
| | | BHCenter(Socket::Shm &shm); |
| | | ~BHCenter() { Stop(); } |
| | |
| | | struct CenterInfo { |
| | | std::string name_; |
| | | MsgHandler handler_; |
| | | MsgIHandler raw_handler_; |
| | | IdleHandler idle_; |
| | | MQId mqid_; |
| | | int mq_len_ = 0; |
| | |
| | | |
| | | typedef uint64_t MQId; |
| | | |
| | | const MQId kBHNodeInit = 10; |
| | | const MQId kBHTopicCenter = 100; |
| | | const MQId kBHTopicBus = 101; |
| | | const MQId kBHUniCenter = 102; |
| | | inline const MQId BHInitAddress() { return kBHNodeInit; } |
| | | inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; } |
| | | inline const MQId BHTopicBusAddress() { return kBHTopicBus; } |
| | | inline const MQId BHUniCenterAddress() { return kBHUniCenter; } |
| | |
| | | ShmMsgQueue::MQId ShmMsgQueue::NewId() |
| | | { |
| | | static auto &id = GetData(); |
| | | return ++id; |
| | | return (++id) * 10; |
| | | } |
| | | // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : |
| | |
| | | Stop(); |
| | | } |
| | | |
| | | bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker) |
| | | { |
| | | auto ioProc = [this, onData, onIdle]() { |
| | | auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; |
| | | auto DoRecv = [=] { |
| | | // do not recv if no cb is set. |
| | | if (!onData) { |
| | | return false; |
| | | } |
| | | auto onMsg = [&](MsgI &imsg) { |
| | | DEFER1(imsg.Release()); |
| | | onData(*this, imsg); |
| | | }; |
| | | MsgI imsg; |
| | | return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false; |
| | | }; |
| | | |
| | | try { |
| | | bool more_to_send = DoSend(); |
| | | bool more_to_recv = DoRecv(); |
| | | if (onIdle) { onIdle(*this); } |
| | | if (!more_to_send && !more_to_recv) { |
| | | robust::QuickSleep(); |
| | | } |
| | | } catch (...) { |
| | | } |
| | | }; |
| | | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | StopNoLock(); |
| | | |
| | | run_.store(true); |
| | | for (int i = 0; i < nworker; ++i) { |
| | | workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } }); |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) |
| | | { |
| | | auto ioProc = [this, onData, onIdle]() { |
| | |
| | | bool more_to_recv = DoRecv(); |
| | | if (onIdle) { onIdle(*this); } |
| | | if (!more_to_send && !more_to_recv) { |
| | | std::this_thread::yield(); |
| | | using namespace std::chrono_literals; |
| | | std::this_thread::sleep_for(10000ns); |
| | | robust::QuickSleep(); |
| | | } |
| | | } catch (...) { |
| | | } |
| | |
| | | public: |
| | | typedef ShmMsgQueue::MQId MQId; |
| | | typedef bhome_shm::SharedMemory Shm; |
| | | typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB; |
| | | typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB; |
| | | typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB; |
| | | typedef std::function<void(ShmSocket &sock)> IdleCB; |
| | |
| | | bool Remove() { return Remove(shm(), id()); } |
| | | MQId id() const { return mq().Id(); } |
| | | // start recv. |
| | | bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1); |
| | | bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); |
| | | bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } |
| | | bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } |
| | |
| | | } // namespace |
| | | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered) |
| | | shm_(shm), state_(eStateUnregistered) |
| | | { |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_[i].reset(new ShmSocket(shm_, kMqLen)); |
| | | } |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | // for (auto &p : sockets_) { |
| | | // p->Start(default_ignore_msg); |
| | | // } |
| | | Init(); |
| | | } |
| | | |
| | | TopicNode::~TopicNode() |
| | | { |
| | | printf("~TopicNode()\n"); |
| | | Stop(); |
| | | SockNode().Stop(); |
| | | if (state() == eStateUnregistered) { |
| | | for (auto &p : sockets_) { p->Remove(); } |
| | | } |
| | | |
| | | bool TopicNode::Init() |
| | | { |
| | | std::lock_guard<std::mutex> lk(mutex_); |
| | | |
| | | if (Valid()) { |
| | | return true; |
| | | } |
| | | |
| | | if (ssn_id_ == 0) { |
| | | ssn_id_ = ShmMsgQueue::NewId(); |
| | | } |
| | | printf("Node Init, id %ld \n", ssn_id_); |
| | | MsgI msg; |
| | | msg.OffsetRef() = ssn_id_; |
| | | if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) { |
| | | sockets_.resize(eSockEnd); |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen)); |
| | | } |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | | { |
| | | std::lock_guard<std::mutex> lk(mutex_); |
| | | |
| | | if (!Init()) { |
| | | SetLastError(eError, "BHome Node Not Inited."); |
| | | return; |
| | | } |
| | | if (nworker < 1) { |
| | | nworker = 1; |
| | | } else if (nworker > 16) { |
| | |
| | | } |
| | | void TopicNode::Stop() |
| | | { |
| | | printf("Node Stopping\n"); |
| | | for (auto &p : sockets_) { p->Stop(); } |
| | | printf("Node Stopped\n"); |
| | | } |
| | | |
| | | bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!Init()) { |
| | | SetLastError(eError, "BHome Node Not Inited."); |
| | | return false; |
| | | } |
| | | |
| | | info_ = proc; |
| | | |
| | | auto &sock = SockNode(); |
| | |
| | | } |
| | | bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | info_.Clear(); |
| | | state_cas(eStateOnline, eStateOffline); |
| | | |
| | |
| | | reply_head.mutable_proc_id()->swap(out_proc_id); |
| | | return true; |
| | | } |
| | | } else { |
| | | SetLastError(eNotFound, "remote not found."); |
| | | } |
| | | } catch (...) { |
| | | SetLastError(eError, __func__ + std::string(" internal errer.")); |
| | |
| | | return true; |
| | | } |
| | | } |
| | | SetLastError(eNotFound, "remote not found."); |
| | | return false; |
| | | } |
| | | |
| | |
| | | #include "socket.h" |
| | | #include <atomic> |
| | | #include <memory> |
| | | #include <mutex> |
| | | #include <vector> |
| | | |
| | | using namespace bhome_shm; |
| | |
| | | void state(const State st) { state_.store(st); } |
| | | void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); } |
| | | State state() const { return state_.load(); } |
| | | bool IsOnline() const { return state() == eStateOnline; } |
| | | bool IsOnline() { return Init() && state() == eStateOnline; } |
| | | bool Init(); |
| | | bool Valid() const { return !sockets_.empty(); } |
| | | std::mutex mutex_; |
| | | MQId ssn_id_ = 0; |
| | | std::atomic<State> state_; |
| | | |
| | | TopicQueryCache topic_query_cache_; |
| | |
| | | |
| | | printf("maxsec: %ld\n", CountSeconds(max_time)); |
| | | |
| | | // BHCleanup(); |
| | | // return; |
| | | bool reg = false; |
| | | for (int i = 0; i < 3 && !reg; ++i) { |
| | | ProcInfo proc; |