| | |
| | | } // namespace |
| | | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered) |
| | | shm_(shm), state_(eStateUnregistered) |
| | | { |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_[i].reset(new ShmSocket(shm_, kMqLen)); |
| | | } |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | // for (auto &p : sockets_) { |
| | | // p->Start(default_ignore_msg); |
| | | // } |
| | | Init(); |
| | | } |
| | | |
| | | TopicNode::~TopicNode() |
| | | { |
| | | printf("~TopicNode()\n"); |
| | | Stop(); |
| | | SockNode().Stop(); |
| | | if (state() == eStateUnregistered) { |
| | | for (auto &p : sockets_) { p->Remove(); } |
| | | } |
| | | |
| | | bool TopicNode::Init() |
| | | { |
| | | std::lock_guard<std::mutex> lk(mutex_); |
| | | |
| | | if (Valid()) { |
| | | return true; |
| | | } |
| | | |
| | | if (ssn_id_ == 0) { |
| | | ssn_id_ = ShmMsgQueue::NewId(); |
| | | } |
| | | printf("Node Init, id %ld \n", ssn_id_); |
| | | MsgI msg; |
| | | msg.OffsetRef() = ssn_id_; |
| | | if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) { |
| | | sockets_.resize(eSockEnd); |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen)); |
| | | } |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker) |
| | | { |
| | | std::lock_guard<std::mutex> lk(mutex_); |
| | | |
| | | if (!Init()) { |
| | | SetLastError(eError, "BHome Node Not Inited."); |
| | | return; |
| | | } |
| | | if (nworker < 1) { |
| | | nworker = 1; |
| | | } else if (nworker > 16) { |
| | |
| | | } |
| | | void TopicNode::Stop() |
| | | { |
| | | printf("Node Stopping\n"); |
| | | for (auto &p : sockets_) { p->Stop(); } |
| | | printf("Node Stopped\n"); |
| | | } |
| | | |
| | | bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!Init()) { |
| | | SetLastError(eError, "BHome Node Not Inited."); |
| | | return false; |
| | | } |
| | | |
| | | info_ = proc; |
| | | |
| | | auto &sock = SockNode(); |
| | |
| | | } |
| | | bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | if (!IsOnline()) { |
| | | SetLastError(eNotRegistered, "Not Registered."); |
| | | return false; |
| | | } |
| | | |
| | | info_.Clear(); |
| | | state_cas(eStateOnline, eStateOffline); |
| | | |
| | |
| | | reply_head.mutable_proc_id()->swap(out_proc_id); |
| | | return true; |
| | | } |
| | | } else { |
| | | SetLastError(eNotFound, "remote not found."); |
| | | } |
| | | } catch (...) { |
| | | SetLastError(eError, __func__ + std::string(" internal errer.")); |
| | |
| | | return true; |
| | | } |
| | | } |
| | | SetLastError(eNotFound, "remote not found."); |
| | | return false; |
| | | } |
| | | |