From 101b5cf85397ef9350aaedd12cfcf9fd3d07a565 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 20 五月 2021 12:41:51 +0800
Subject: [PATCH] refactor node center.
---
src/defs.cpp | 89 ++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 85 insertions(+), 4 deletions(-)
diff --git a/src/defs.cpp b/src/defs.cpp
index 450349e..2715911 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -18,6 +18,10 @@
#include "defs.h"
#include "msg.h"
#include "shm_msg_queue.h"
+#include "shm_socket.h"
+#include <boost/uuid/random_generator.hpp>
+#include <boost/uuid/string_generator.hpp>
+#include <boost/uuid/uuid.hpp>
namespace
{
@@ -70,7 +74,83 @@
const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]);
static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough.");
static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
+
+const int64_t kCenterInfoFixedAddress = 1024 * 4;
+
+const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+struct CenterMetaInfo {
+ boost::uuids::uuid tag_;
+ CenterInfo info_;
+};
+
+int64_t Addr(void *ptr) { return reinterpret_cast<int64_t>(ptr); }
+// void *Ptr(const int64_t offset) { return reinterpret_cast<void *>(offset); }
+template <class T = void>
+T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
+
} // namespace
+
+CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm)
+{
+ auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+ if (pmeta->tag_ == kCenterInfoTag) {
+ return &pmeta->info_;
+ }
+ return nullptr;
+}
+
+// put center info at fixed memory position.
+// as boost shm find object (find socket/mq by id, etc...) also locks inside,
+// which node might crash inside and cause deadlock.
+bool CenterInit(bhome_shm::SharedMemory &shm)
+{
+ Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
+ if (!mutex || !mutex->try_lock()) {
+ return false;
+ }
+ DEFER1(mutex->unlock());
+
+ auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+ if (pmeta->tag_ == kCenterInfoTag) {
+ return true;
+ } else {
+ auto base = Addr(shm.get_address());
+ auto offset = kCenterInfoFixedAddress;
+ void *p = shm.Alloc(offset * 2);
+ if (Addr(p) - base <= offset) {
+ pmeta = new (Ptr(offset + base)) CenterMetaInfo;
+ auto &info = pmeta->info_;
+
+ auto InitMQ = [&](auto &mq, auto &&id) {
+ mq.id_ = id;
+ ShmSocket tmp(shm, id, 16);
+ mq.offset_ = tmp.AbsAddr();
+ };
+
+ int id = 100;
+ auto NextId = [&]() { return ++id; };
+ InitMQ(info.mq_sender_, NextId());
+ InitMQ(info.mq_center_, NextId());
+ InitMQ(info.mq_bus_, NextId());
+
+ pmeta->tag_ = kCenterInfoTag;
+ return true;
+ }
+ }
+ return false;
+}
+
+const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; }
+const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; }
+const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; }
+bool BHNodeInit(const int64_t request, int64_t &reply)
+{
+ return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply);
+}
+void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq)
+{
+ GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq);
+}
int64_t CalcAllocIndex(int64_t size)
{
@@ -93,9 +173,8 @@
bool GlobalInit(bhome_shm::SharedMemory &shm)
{
MsgI::BindShm(shm);
- typedef std::atomic<MQId> IdSrc;
- IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
- return psrc && ShmMsgQueue::SetData(*psrc);
+ CenterInfo *pinfo = GetCenterInfo(shm);
+ return pinfo && ShmMsgQueue::SetData(pinfo->mqid_);
}
void SetLastError(const int ec, const std::string &msg)
@@ -108,4 +187,6 @@
{
ec = LastErrorStore().ec_;
msg = LastErrorStore().msg_;
-}
\ No newline at end of file
+}
+
+int NodeTimeoutSec() { return 60; }
\ No newline at end of file
--
Gitblit v1.8.0