From 9bf199a4770b08c03d553129757d960b605e598a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 14 五月 2021 18:05:21 +0800
Subject: [PATCH] add center info at fixed address in shm.
---
box/center_main.cc | 11 +-
utest/speed_test.cpp | 4
src/shm.h | 8 ++
src/socket.h | 3
utest/simple_tests.cpp | 2
box/center.cpp | 4
src/shm_msg_queue.h | 15 +--
src/socket.cpp | 22 +----
src/defs.h | 30 +++++-
src/shm_msg_queue.cpp | 16 ++-
src/defs.cpp | 79 +++++++++++++++++++
src/msg.cpp | 7 +
12 files changed, 145 insertions(+), 56 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index 445d307..7d51f2f 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -98,7 +98,7 @@
if (now < time_to_clean_) {
return;
}
- LOG_FUNCTION;
+ // LOG_FUNCTION;
time_to_clean_ = now + 1;
int64_t limit = std::max(10000ul, msgs_.size() / 10);
int64_t n = 0;
@@ -109,7 +109,7 @@
msg.Free();
it = msgs_.erase(it);
++n;
- } else if (msg.timestamp() + 10 < NowSec()) {
+ } else if (msg.timestamp() + 60 < NowSec()) {
msg.Free();
it = msgs_.erase(it);
++n;
diff --git a/box/center_main.cc b/box/center_main.cc
index 6795e41..5eb21b9 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -83,12 +83,6 @@
std::atomic<bool> run_;
};
-bool CenterInit(bhome_shm::SharedMemory &shm)
-{
- ShmSocket create(shm, BHGlobalSenderAddress(), 16);
- return true;
-}
-
} // namespace
int center_main(int argc, const char *argv[])
{
@@ -108,7 +102,10 @@
if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); }
auto &shm = BHomeShm();
- CenterInit(shm);
+ if (!CenterInit(shm)) {
+ LOG_FATAL() << "init memory error.";
+ exit(0);
+ }
GlobalInit(shm);
InstanceFlag inst(shm, kCenterRunningFlag);
diff --git a/src/defs.cpp b/src/defs.cpp
index 450349e..b812b65 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -18,6 +18,10 @@
#include "defs.h"
#include "msg.h"
#include "shm_msg_queue.h"
+#include "socket.h"
+#include <boost/uuid/random_generator.hpp>
+#include <boost/uuid/string_generator.hpp>
+#include <boost/uuid/uuid.hpp>
namespace
{
@@ -70,7 +74,77 @@
const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]);
static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough.");
static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
+
+const int64_t kCenterInfoFixedAddress = 1024 * 4;
+
+const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+struct CenterMetaInfo {
+ boost::uuids::uuid tag_;
+ CenterInfo info_;
+};
+
+int64_t Addr(void *ptr) { return reinterpret_cast<int64_t>(ptr); }
+// void *Ptr(const int64_t offset) { return reinterpret_cast<void *>(offset); }
+template <class T = void>
+T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
+
} // namespace
+
+CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm)
+{
+ auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+ if (pmeta->tag_ == kCenterInfoTag) {
+ return &pmeta->info_;
+ }
+ return nullptr;
+}
+
+// put center info at fixed memory position.
+// as boost shm find object (find socket/mq by id, etc...) also locks inside,
+// which node might crash inside and cause deadlock.
+bool CenterInit(bhome_shm::SharedMemory &shm)
+{
+ Mutex *mutex = shm.Create<Mutex>("shm_center_lock");
+ if (!mutex || !mutex->try_lock()) {
+ return false;
+ }
+ DEFER1(mutex->unlock());
+
+ auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+ if (pmeta->tag_ == kCenterInfoTag) {
+ return true;
+ } else {
+ auto base = Addr(shm.get_address());
+ auto offset = kCenterInfoFixedAddress;
+ void *p = shm.Alloc(offset * 2);
+ if (Addr(p) - base <= offset) {
+ pmeta = new (Ptr(offset + base)) CenterMetaInfo;
+ auto &info = pmeta->info_;
+
+ auto InitMQ = [&](auto &mq, auto &&id) {
+ mq.id_ = id;
+ ShmSocket tmp(shm, id, 16);
+ mq.offset_ = tmp.AbsAddr();
+ };
+
+ int id = 100;
+ auto NextId = [&]() { return ++id; };
+ InitMQ(info.mq_sender_, NextId());
+ InitMQ(info.mq_center_, NextId());
+ InitMQ(info.mq_bus_, NextId());
+ InitMQ(info.mq_init_, NextId());
+
+ pmeta->tag_ = kCenterInfoTag;
+ return true;
+ }
+ }
+ return false;
+}
+
+uint64_t BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_.id_; }
+uint64_t BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_.id_; }
+uint64_t BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_.id_; }
+uint64_t BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_.id_; }
int64_t CalcAllocIndex(int64_t size)
{
@@ -93,9 +167,8 @@
bool GlobalInit(bhome_shm::SharedMemory &shm)
{
MsgI::BindShm(shm);
- typedef std::atomic<MQId> IdSrc;
- IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
- return psrc && ShmMsgQueue::SetData(*psrc);
+ CenterInfo *pinfo = GetCenterInfo(shm);
+ return pinfo && ShmMsgQueue::SetData(pinfo->mqid_);
}
void SetLastError(const int ec, const std::string &msg)
diff --git a/src/defs.h b/src/defs.h
index f0a0d49..5c770a7 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -19,19 +19,28 @@
#ifndef DEFS_KP8LKGD0
#define DEFS_KP8LKGD0
+#include <atomic>
#include <string>
typedef uint64_t MQId;
-const MQId kBHDefaultSender = 99;
-const MQId kBHTopicCenter = 100;
-const MQId kBHTopicBus = 101;
-inline const MQId BHGlobalSenderAddress() { return kBHDefaultSender; }
-inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
-inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
-
int64_t CalcAllocIndex(int64_t size);
int64_t GetAllocSize(int index);
+
+struct CenterInfo {
+ struct MQInfo {
+ int64_t id_ = 0;
+ int64_t offset_ = 0;
+ };
+
+ MQInfo mq_center_;
+ MQInfo mq_bus_;
+ MQInfo mq_init_;
+ MQInfo mq_sender_;
+ std::atomic<MQId> mqid_;
+ CenterInfo() :
+ mqid_(100000) {}
+};
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
@@ -42,10 +51,17 @@
std::string BHomeShmName();
bhome_shm::SharedMemory &BHomeShm();
+CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm);
+bool CenterInit(bhome_shm::SharedMemory &shm);
bool GlobalInit(bhome_shm::SharedMemory &shm);
typedef std::string Topic;
void SetLastError(const int ec, const std::string &msg);
void GetLastError(int &ec, std::string &msg);
//TODO center can check shm for previous crash.
+uint64_t BHGlobalSenderAddress();
+uint64_t BHTopicCenterAddress();
+uint64_t BHTopicBusAddress();
+uint64_t BHCenterReplyAddress();
+
#endif // end of include guard: DEFS_KP8LKGD0
diff --git a/src/msg.cpp b/src/msg.cpp
index a4777d2..edffff1 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -17,6 +17,7 @@
*/
#include "msg.h"
#include "bh_util.h"
+#include "defs.h"
#include "socket.h"
namespace bhome_msg
@@ -24,7 +25,8 @@
ShmSocket &ShmMsg::Sender()
{
- static ShmSocket sender(shm(), false, BHGlobalSenderAddress(), 16);
+ static auto &mq = GetCenterInfo(shm())->mq_sender_;
+ static ShmSocket sender(mq.offset_, shm(), mq.id_);
return sender;
}
@@ -38,7 +40,8 @@
int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree);
Sender().Send(BHTopicCenterAddress(), free_cmd);
} else if (n < 0) {
- throw -123;
+ LOG_FATAL() << "error double release data.";
+ throw std::runtime_error("double release msg.");
}
return n;
}
diff --git a/src/shm.h b/src/shm.h
index 269df44..b5ec2ea 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -192,6 +192,12 @@
pdata_ = shm_.Find<Data>(ObjName(name_));
}
}
+ ShmObject(const int64_t offset, ShmType &segment, const std::string &name) :
+ shm_(segment), name_(name)
+ {
+ pdata_ = reinterpret_cast<Data *>(Addr(shm_.get_address()) + offset);
+ }
+
bool IsOk() const { return pdata_; }
static bool Remove(SharedMemory &shm, const std::string &name) { return shm.destroy<Data>(ObjName(name).c_str()); }
@@ -201,11 +207,13 @@
std::string name() const { return name_; }
Data *data() { return pdata_; }
const Data *data() const { return pdata_; }
+ int64_t offset() const { return Addr(pdata_) - Addr(shm_.get_address()); }
Data *operator->() { return data(); }
const Data *operator->() const { return data(); }
bool Remove() { return Remove(shm_, name_); }
private:
+ static int64_t Addr(const void *p) { return reinterpret_cast<int64_t>(p); }
ShmType &shm_;
std::string name_;
Data *pdata_ = nullptr;
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index d96c511..663da1e 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -37,13 +37,13 @@
return (++id) * 10;
}
-ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
+ShmMsgQueue::ShmMsgQueue(ShmType &segment, const MQId id, const int len) :
id_(id),
queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager())
{
}
-ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) :
+ShmMsgQueue::ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len) :
id_(id),
queue_(segment, create_or_else_find, MsgQIdToName(id_), len, segment.get_segment_manager())
{
@@ -51,8 +51,11 @@
throw("error create/find msgq " + std::to_string(id_));
}
}
-ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
- ShmMsgQueue(NewId(), true, segment, len) {}
+ShmMsgQueue::ShmMsgQueue(const int64_t abs_addr, ShmType &segment, const MQId id) :
+ id_(id), queue_(abs_addr, segment, MsgQIdToName(id_))
+{
+ //TODO check some tag.
+}
ShmMsgQueue::~ShmMsgQueue() {}
@@ -93,10 +96,11 @@
return Shmq::Find(shm, MsgQIdToName(remote_id));
}
-bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, int64_t val)
+bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote, int64_t val)
{
try {
- ShmMsgQueue dest(remote_id, false, shm, 1);
+ //TODO find from center, or use offset.
+ ShmMsgQueue dest(shm, false, remote, 1);
#ifndef BH_USE_ATOMIC_Q
Guard lock(GetMutex(remote_id));
#endif
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index 56ea076..eead739 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -47,13 +47,14 @@
static MQId NewId();
- ShmMsgQueue(const MQId id, ShmType &segment, const int len);
- ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len);
- ShmMsgQueue(ShmType &segment, const int len);
+ ShmMsgQueue(ShmType &segment, const MQId id, const int len);
+ ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len);
+ ShmMsgQueue(const int64_t abs_addr, ShmType &segment, const MQId id);
~ShmMsgQueue();
static bool Remove(ShmType &shm, const MQId id);
MQId Id() const { return id_; }
ShmType &shm() const { return queue_.shm(); }
+ int64_t AbsAddr() const { return queue_.offset(); }
bool Recv(RawData &val, const int timeout_ms)
{
@@ -73,11 +74,9 @@
bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
- static Queue *Find(ShmType &shm, const MQId remote_id);
- static bool TrySend(ShmType &shm, const MQId remote_id, const RawData val);
- static bool TrySend(ShmType &shm, const MQId remote_id, MsgI msg) { return TrySend(shm, remote_id, msg.Offset()); }
- bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
- bool TrySend(const MQId remote_id, const RawData val) { return TrySend(shm(), remote_id, val); }
+ static Queue *Find(ShmType &shm, const MQId remote);
+ static bool TrySend(ShmType &shm, const MQId remote, const RawData val);
+ bool TrySend(const MQId remote, const RawData val) { return TrySend(shm(), remote, val); }
private:
#ifndef BH_USE_ATOMIC_Q
diff --git a/src/socket.cpp b/src/socket.cpp
index 0704174..4f09517 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -28,25 +28,13 @@
using namespace bhome_shm;
ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
- run_(false), mq_(id, shm, len), alloc_id_(0)
-{
- Start();
-}
+ run_(false), mq_(shm, id, len), alloc_id_(0) { Start(); }
ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
- run_(false), mq_(id, create_or_else_find, shm, len), alloc_id_(0)
-{
- Start();
-}
-ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
- run_(false), mq_(shm, len), alloc_id_(0)
-{
- Start();
-}
+ run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0) { Start(); }
+ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) :
+ run_(false), mq_(abs_addr, shm, id), alloc_id_(0) { Start(); }
-ShmSocket::~ShmSocket()
-{
- Stop();
-}
+ShmSocket::~ShmSocket() { Stop(); }
bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle)
{
diff --git a/src/socket.h b/src/socket.h
index d69b8d4..8e9db69 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -49,11 +49,12 @@
ShmSocket(Shm &shm, const MQId id, const int len);
ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len);
- ShmSocket(Shm &shm, const int len = 12);
+ ShmSocket(int64_t offset, Shm &shm, const MQId id);
~ShmSocket();
static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
bool Remove() { return Remove(shm(), id()); }
MQId id() const { return mq().Id(); }
+ int64_t AbsAddr() const { return mq().AbsAddr(); }
void SetNodeProc(const int proc_index, const int socket_index)
{
node_proc_index_ = proc_index;
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index e14a1cd..e1f1d2f 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -108,7 +108,7 @@
{
SharedMemory &shm = TestShm();
GlobalInit(shm);
- ShmMsgQueue q(shm, 64);
+ ShmMsgQueue q(shm, ShmMsgQueue::NewId(), 64);
for (int i = 0; i < 2; ++i) {
int ms = i * 100;
printf("Timeout Test %4d: ", ms);
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 8950bbf..66e5179 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -158,8 +158,8 @@
auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
- ShmSocket srv(shm, qlen);
- ShmSocket cli(shm, qlen);
+ ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen);
+ ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen);
int ncli = 1;
uint64_t nmsg = 1000 * 1000 * 1;
--
Gitblit v1.8.0