From 377e395a5fdc6ad44bdd5a2d41d2930f45fc4384 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 30 四月 2021 18:25:33 +0800
Subject: [PATCH] add node init msg, alloc msgq on success.
---
src/socket.h | 2
utest/api_test.cpp | 2
box/center.cpp | 75 ++++++++++++++----
src/socket.cpp | 42 +++++++++
src/defs.h | 2
src/topic_node.h | 7 +
box/center.h | 3
src/topic_node.cpp | 63 ++++++++++++---
src/shm_msg_queue.cpp | 2
9 files changed, 159 insertions(+), 39 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 0952ca7..aa6f285 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -103,7 +103,22 @@
// center name, no relative to shm.
const std::string &id() const { return id_; }
+ void OnNodeInit(const int64_t msg)
+ {
+ MQId ssn = msg;
+ auto UpdateRegInfo = [&](Node &node) {
+ for (int i = 0; i < 10; ++i) {
+ node->addrs_.insert(ssn + i);
+ }
+ node->state_.timestamp_ = NowSec() - offline_time_;
+ node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
+ };
+ Node node(new NodeInfo);
+ UpdateRegInfo(node);
+ nodes_[ssn] = node;
+ printf("new node ssn (%ld) init\n", ssn);
+ }
MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
{
if (msg.proc().proc_id() != head.proc_id()) {
@@ -132,17 +147,19 @@
Node node(new NodeInfo);
UpdateRegInfo(node);
nodes_[ssn] = node;
+ }
+ printf("node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn);
- printf("new node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn);
- auto old = online_node_addr_map_.find(head.proc_id());
- if (old != online_node_addr_map_.end()) { // old session
- auto &old_ssn = old->second;
+ auto old = online_node_addr_map_.find(head.proc_id());
+ if (old != online_node_addr_map_.end()) { // old session
+ auto &old_ssn = old->second;
+ if (old_ssn != ssn) {
nodes_[old_ssn]->state_.PutOffline(offline_time_);
printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
old_ssn = ssn;
- } else {
- online_node_addr_map_.emplace(head.proc_id(), ssn);
}
+ } else {
+ online_node_addr_map_.emplace(head.proc_id(), ssn);
}
return MakeReply(eSuccess);
} catch (...) {
@@ -446,16 +463,24 @@
msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
return true;
-bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
+auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id)
{
- auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2);
- auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
- return [&](auto &&rep_body) {
- auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
- auto remote = head.route(0).mq_id();
- socket.Send(remote, reply_head, rep_body);
- };
+ return [&](auto &&rep_body) {
+ auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
+ auto remote = head.route(0).mq_id();
+ socket.Send(remote, reply_head, rep_body);
};
+}
+
+bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
+{
+ auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) {
+ auto ¢er = *center_ptr;
+ center->OnNodeInit(msg.Offset());
+ };
+ auto Nothing = [](ShmSocket &socket) {};
+
+ BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16);
auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
auto ¢er = *center_ptr;
@@ -475,6 +500,7 @@
default: return false;
}
};
+ BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
auto OnBusIdle = [=](ShmSocket &socket) {};
auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
@@ -515,7 +541,6 @@
}
};
- BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000);
return true;
@@ -533,7 +558,12 @@
bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
{
- Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len};
+ Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len};
+ return true;
+}
+bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
+{
+ Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len};
return true;
}
@@ -541,10 +571,13 @@
{
auto gc = [&](const MQId id) {
auto r = ShmSocket::Remove(shm, id);
- printf("remove mq %ld : %s\n", id, (r ? "ok" : "failed"));
+ if (r) {
+ printf("remove mq %ld ok\n", id);
+ }
};
- AddCenter("#bhome_center", gc);
+ auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, 6s, 6s * 2);
+ AddCenter(center_ptr);
for (auto &kv : Centers()) {
auto &info = kv.second;
@@ -556,7 +589,11 @@
{
for (auto &kv : Centers()) {
auto &info = kv.second;
- sockets_[info.name_]->Start(info.handler_, info.idle_);
+ if (info.handler_) {
+ sockets_[info.name_]->Start(info.handler_, info.idle_);
+ } else {
+ sockets_[info.name_]->Start(info.raw_handler_, info.idle_);
+ }
}
return true;
diff --git a/box/center.h b/box/center.h
index ab8b15f..4d71bc9 100644
--- a/box/center.h
+++ b/box/center.h
@@ -29,8 +29,10 @@
public:
typedef Socket::PartialRecvCB MsgHandler;
+ typedef Socket::RawRecvCB MsgIHandler;
typedef Socket::IdleCB IdleHandler;
static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
+ static bool Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
BHCenter(Socket::Shm &shm);
~BHCenter() { Stop(); }
@@ -41,6 +43,7 @@
struct CenterInfo {
std::string name_;
MsgHandler handler_;
+ MsgIHandler raw_handler_;
IdleHandler idle_;
MQId mqid_;
int mq_len_ = 0;
diff --git a/src/defs.h b/src/defs.h
index 43375bf..a95c81f 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -23,9 +23,11 @@
typedef uint64_t MQId;
+const MQId kBHNodeInit = 10;
const MQId kBHTopicCenter = 100;
const MQId kBHTopicBus = 101;
const MQId kBHUniCenter = 102;
+inline const MQId BHInitAddress() { return kBHNodeInit; }
inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
inline const MQId BHUniCenterAddress() { return kBHUniCenter; }
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index cd8cd66..38c5f1c 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -34,7 +34,7 @@
ShmMsgQueue::MQId ShmMsgQueue::NewId()
{
static auto &id = GetData();
- return ++id;
+ return (++id) * 10;
}
// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
diff --git a/src/socket.cpp b/src/socket.cpp
index 2127260..9580529 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -40,6 +40,44 @@
Stop();
}
+bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker)
+{
+ auto ioProc = [this, onData, onIdle]() {
+ auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
+ auto DoRecv = [=] {
+ // do not recv if no cb is set.
+ if (!onData) {
+ return false;
+ }
+ auto onMsg = [&](MsgI &imsg) {
+ DEFER1(imsg.Release());
+ onData(*this, imsg);
+ };
+ MsgI imsg;
+ return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
+ };
+
+ try {
+ bool more_to_send = DoSend();
+ bool more_to_recv = DoRecv();
+ if (onIdle) { onIdle(*this); }
+ if (!more_to_send && !more_to_recv) {
+ robust::QuickSleep();
+ }
+ } catch (...) {
+ }
+ };
+
+ std::lock_guard<std::mutex> lock(mutex_);
+ StopNoLock();
+
+ run_.store(true);
+ for (int i = 0; i < nworker; ++i) {
+ workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
+ }
+ return true;
+}
+
bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
auto ioProc = [this, onData, onIdle]() {
@@ -74,9 +112,7 @@
bool more_to_recv = DoRecv();
if (onIdle) { onIdle(*this); }
if (!more_to_send && !more_to_recv) {
- std::this_thread::yield();
- using namespace std::chrono_literals;
- std::this_thread::sleep_for(10000ns);
+ robust::QuickSleep();
}
} catch (...) {
}
diff --git a/src/socket.h b/src/socket.h
index a5dd72c..bd85fec 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -42,6 +42,7 @@
public:
typedef ShmMsgQueue::MQId MQId;
typedef bhome_shm::SharedMemory Shm;
+ typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB;
typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
typedef std::function<void(ShmSocket &sock)> IdleCB;
@@ -53,6 +54,7 @@
bool Remove() { return Remove(shm(), id()); }
MQId id() const { return mq().Id(); }
// start recv.
+ bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1);
bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index d274c4b..f629597 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -37,30 +37,52 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
- shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered)
+ shm_(shm), state_(eStateUnregistered)
{
- for (int i = eSockStart; i < eSockEnd; ++i) {
- sockets_[i].reset(new ShmSocket(shm_, kMqLen));
- }
- // recv msgs to avoid memory leak.
- auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
- SockNode().Start(default_ignore_msg);
- // for (auto &p : sockets_) {
- // p->Start(default_ignore_msg);
- // }
+ Init();
}
TopicNode::~TopicNode()
{
+ printf("~TopicNode()\n");
Stop();
- SockNode().Stop();
- if (state() == eStateUnregistered) {
- for (auto &p : sockets_) { p->Remove(); }
+}
+
+bool TopicNode::Init()
+{
+ std::lock_guard<std::mutex> lk(mutex_);
+
+ if (Valid()) {
+ return true;
}
+
+ if (ssn_id_ == 0) {
+ ssn_id_ = ShmMsgQueue::NewId();
+ }
+ printf("Node Init, id %ld \n", ssn_id_);
+ MsgI msg;
+ msg.OffsetRef() = ssn_id_;
+ if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) {
+ sockets_.resize(eSockEnd);
+ for (int i = eSockStart; i < eSockEnd; ++i) {
+ sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen));
+ }
+ // recv msgs to avoid memory leak.
+ auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
+ SockNode().Start(default_ignore_msg);
+ return true;
+ }
+ return false;
}
void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
{
+ std::lock_guard<std::mutex> lk(mutex_);
+
+ if (!Init()) {
+ SetLastError(eError, "BHome Node Not Inited.");
+ return;
+ }
if (nworker < 1) {
nworker = 1;
} else if (nworker > 16) {
@@ -73,11 +95,18 @@
}
void TopicNode::Stop()
{
+ printf("Node Stopping\n");
for (auto &p : sockets_) { p->Stop(); }
+ printf("Node Stopped\n");
}
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
+ if (!Init()) {
+ SetLastError(eError, "BHome Node Not Inited.");
+ return false;
+ }
+
info_ = proc;
auto &sock = SockNode();
@@ -123,6 +152,11 @@
}
bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
+ if (!IsOnline()) {
+ SetLastError(eNotRegistered, "Not Registered.");
+ return false;
+ }
+
info_.Clear();
state_cas(eStateOnline, eStateOffline);
@@ -404,8 +438,6 @@
reply_head.mutable_proc_id()->swap(out_proc_id);
return true;
}
- } else {
- SetLastError(eNotFound, "remote not found.");
}
} catch (...) {
SetLastError(eError, __func__ + std::string(" internal errer."));
@@ -448,6 +480,7 @@
return true;
}
}
+ SetLastError(eNotFound, "remote not found.");
return false;
}
diff --git a/src/topic_node.h b/src/topic_node.h
index 3c90e5b..afce4fc 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -22,6 +22,7 @@
#include "socket.h"
#include <atomic>
#include <memory>
+#include <mutex>
#include <vector>
using namespace bhome_shm;
@@ -137,7 +138,11 @@
void state(const State st) { state_.store(st); }
void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); }
State state() const { return state_.load(); }
- bool IsOnline() const { return state() == eStateOnline; }
+ bool IsOnline() { return Init() && state() == eStateOnline; }
+ bool Init();
+ bool Valid() const { return !sockets_.empty(); }
+ std::mutex mutex_;
+ MQId ssn_id_ = 0;
std::atomic<State> state_;
TopicQueryCache topic_query_cache_;
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index cf7baf9..38483eb 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -118,6 +118,8 @@
printf("maxsec: %ld\n", CountSeconds(max_time));
+ // BHCleanup();
+ // return;
bool reg = false;
for (int i = 0; i < 3 && !reg; ++i) {
ProcInfo proc;
--
Gitblit v1.8.0