center alloc node queue; node just find them.
| | |
| | | |
| | | // center name, no relative to shm. |
| | | const std::string &id() const { return id_; } |
| | | void OnNodeInit(const int64_t msg) |
| | | void OnNodeInit(SharedMemory &shm, const int64_t msg) |
| | | { |
| | | MQId ssn = msg; |
| | | if (nodes_.find(ssn) != nodes_.end()) { |
| | | return; // ignore in exists. |
| | | } |
| | | |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | for (int i = 0; i < 10; ++i) { |
| | | node->addrs_.insert(ssn + i); |
| | | } |
| | | node->state_.timestamp_ = NowSec() - offline_time_; |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | |
| | | // create sockets. |
| | | try { |
| | | auto CreateSocket = [](SharedMemory &shm, const MQId id) { |
| | | ShmSocket tmp(shm, true, id, 16); |
| | | }; |
| | | // alloc(-1), node, server, sub, request, |
| | | for (int i = -1; i < 4; ++i) { |
| | | CreateSocket(shm, ssn + i); |
| | | node->addrs_.insert(ssn + i); |
| | | } |
| | | return true; |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | }; |
| | | |
| | | Node node(new NodeInfo); |
| | | UpdateRegInfo(node); |
| | | nodes_[ssn] = node; |
| | | LOG_INFO() << "new node ssn (" << ssn << ") init"; |
| | | if (UpdateRegInfo(node)) { |
| | | nodes_[ssn] = node; |
| | | LOG_INFO() << "new node ssn (" << ssn << ") init"; |
| | | } |
| | | } |
| | | |
| | | MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) |
| | | { |
| | | if (msg.proc().proc_id() != head.proc_id()) { |
| | |
| | | { |
| | | auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) { |
| | | auto ¢er = *center_ptr; |
| | | center->OnNodeInit(msg.Offset()); |
| | | center->OnNodeInit(socket.shm(), msg.Offset()); |
| | | }; |
| | | auto Nothing = [](ShmSocket &socket) {}; |
| | | |
| | |
| | | MsgI::BindShm(shm); |
| | | typedef std::atomic<MQId> IdSrc; |
| | | IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000); |
| | | return ShmMsgQueue::SetData(*psrc); |
| | | return psrc && ShmMsgQueue::SetData(*psrc); |
| | | } |
| | | |
| | | void SetLastError(const int ec, const std::string &msg) |
| | |
| | | static auto &id = GetData(); |
| | | return (++id) * 10; |
| | | } |
| | | // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : |
| | | id_(id), |
| | | queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager()) |
| | | { |
| | | } |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : |
| | | id_(NewId()), |
| | | queue_(segment, true, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager()) |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) : |
| | | id_(id), |
| | | queue_(segment, create_or_else_find, MsgQIdToName(id_)) |
| | | { |
| | | if (!queue_.IsOk()) { |
| | | throw("error create msgq " + std::to_string(id_)); |
| | | throw("error create/find msgq " + std::to_string(id_)); |
| | | } |
| | | } |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : |
| | | ShmMsgQueue(NewId(), true, segment, len) {} |
| | | |
| | | ShmMsgQueue::~ShmMsgQueue() {} |
| | | |
| | |
| | | static MQId NewId(); |
| | | |
| | | ShmMsgQueue(const MQId id, ShmType &segment, const int len); |
| | | ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len); |
| | | ShmMsgQueue(ShmType &segment, const int len); |
| | | ~ShmMsgQueue(); |
| | | static bool Remove(SharedMemory &shm, const MQId id); |
| | |
| | | { |
| | | Start(); |
| | | } |
| | | ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) : |
| | | run_(false), mq_(id, create_or_else_find, shm, len) |
| | | { |
| | | Start(); |
| | | } |
| | | ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : |
| | | run_(false), mq_(shm, len) |
| | | { |
| | |
| | | typedef std::function<void(ShmSocket &sock)> IdleCB; |
| | | |
| | | ShmSocket(Shm &shm, const MQId id, const int len); |
| | | ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len); |
| | | ShmSocket(Shm &shm, const int len = 12); |
| | | ~ShmSocket(); |
| | | static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); } |
| | |
| | | template <class... Rest> |
| | | bool SendImpl(const MQId remote, Rest &&...rest) |
| | | { |
| | | // TODO send alloc request, and pack later, higher bit means alloc? |
| | | send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); |
| | | return true; |
| | | } |
| | |
| | | std::atomic<bool> run_; |
| | | |
| | | Queue mq_; |
| | | class AsyncCBs |
| | | template <class Key> |
| | | class CallbackRecords |
| | | { |
| | | std::unordered_map<std::string, RecvCB> store_; |
| | | std::unordered_map<Key, RecvCB> store_; |
| | | |
| | | public: |
| | | bool empty() const { return store_.empty(); } |
| | | bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; } |
| | | bool Pick(const std::string &id, RecvCB &cb) |
| | | bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; } |
| | | bool Pick(const Key &id, RecvCB &cb) |
| | | { |
| | | auto pos = store_.find(id); |
| | | if (pos != store_.end()) { |
| | |
| | | } |
| | | }; |
| | | |
| | | Synced<AsyncCBs> per_msg_cbs_; |
| | | Synced<CallbackRecords<std::string>> per_msg_cbs_; |
| | | |
| | | SendQ send_buffer_; |
| | | // Synced<SendQ> send_buffer_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SOCKET_GWTJHBPO |
| | |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | | |
| | | const char *const kErrMsgNotInit = "BHome node NOT initialized."; |
| | | const char *const kErrMsgNotRegistered = "BHome node NOT registered."; |
| | | |
| | | namespace |
| | | { |
| | | inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); } |
| | |
| | | MsgI msg; |
| | | msg.OffsetRef() = ssn_id_; |
| | | if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) { |
| | | sockets_.resize(eSockEnd); |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen)); |
| | | |
| | | auto end_time = steady_clock::now() + 3s; |
| | | do { |
| | | try { |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen)); |
| | | } |
| | | break; |
| | | } catch (...) { |
| | | sockets_.clear(); |
| | | std::this_thread::sleep_for(100ms); |
| | | } |
| | | } while (steady_clock::now() < end_time); |
| | | if (!sockets_.empty()) { |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | return true; |
| | | } |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | |
| | | void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | | { |
| | | if (!Init()) { |
| | | SetLastError(eError, "BHome Node Not Inited."); |
| | | SetLastError(eError, kErrMsgNotInit); |
| | | return; |
| | | } |
| | | if (nworker < 1) { |
| | |
| | | bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!Init()) { |
| | | SetLastError(eError, "BHome Node Not Inited."); |
| | | SetLastError(eError, kErrMsgNotInit); |
| | | return false; |
| | | } |
| | | |
| | |
| | | bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | |
| | | bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | |
| | | bool TopicNode::QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | auto &sock = SockNode(); |
| | |
| | | bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | |
| | | bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | |
| | | bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | |
| | | bool TopicNode::ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | |
| | | bool TopicNode::ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | |
| | | bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | |
| | | bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | |
| | | bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | |
| | | bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | SetLastError(eNotRegistered, kErrMsgNotRegistered); |
| | | return false; |
| | | } |
| | | |
| | |
| | | eSockSub, |
| | | eSockEnd, |
| | | }; |
| | | std::vector<std::unique_ptr<ShmSocket>> sockets_; |
| | | std::vector<std::shared_ptr<ShmSocket>> sockets_; |
| | | |
| | | ShmSocket &SockNode() { return *sockets_[eSockNode]; } |
| | | ShmSocket &SockPub() { return *sockets_[eSockPub]; } |