From 993c556000a414011626770540678948f16eaa9e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 02 六月 2021 17:40:50 +0800
Subject: [PATCH] center restart with new shm; set center node ssn.
---
box/center_main.cc | 75 +-----------------
src/robust.h | 4
utest/api_test.cpp | 5
box/center_topic_node.cpp | 2
src/defs.h | 5 -
utest/tcp_test.cpp | 4
src/topic_node.h | 2
src/topic_node.cpp | 4
src/defs.cpp | 96 ++++++++++++++++++++++-
9 files changed, 107 insertions(+), 90 deletions(-)
diff --git a/box/center_main.cc b/box/center_main.cc
index e7715f8..c7d67e3 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -27,63 +27,6 @@
using namespace std::chrono_literals;
using namespace bhome_shm;
-namespace
-{
-const std::string kCenterRunningFlag = "bh_center_single_flag_0";
-
-class InstanceFlag
-{
-
-public:
- InstanceFlag(SharedMemory &shm, const std::string &name) :
- shm_(shm), name_(name), run_(false) {}
- ~InstanceFlag() { Stop(); }
-
- bool TryStartAsFirstInstance()
- {
- if (run_) {
- return true;
- }
-
- auto mtx(shm_.FindOrCreate<Mutex>(name_ + "_mutex_0"));
- auto time_stamp(shm_.FindOrCreate<int64_t>(name_ + "_timestamp_0", 0));
-
- if (mtx && time_stamp) {
- Guard lock(*mtx);
- auto now = NowSec();
- LOG_DEBUG() << "old: " << *time_stamp << ", now: " << now;
- if (now > *time_stamp + 10) {
- *time_stamp = now;
- auto UpdateTime = [this, time_stamp]() {
- while (run_) {
- std::this_thread::sleep_for(1s);
- *time_stamp = NowSec();
- }
- };
- run_.store(true);
- std::thread(UpdateTime).swap(worker_);
- return true;
- }
- }
- return false;
- }
-
-private:
- void Stop()
- {
- run_.store(false);
- if (worker_.joinable()) {
- worker_.join();
- }
- }
-
- std::thread worker_;
- SharedMemory &shm_;
- std::string name_;
- std::atomic<bool> run_;
-};
-
-} // namespace
int center_main(int argc, const char *argv[])
{
AppArg args(argc, argv);
@@ -101,22 +44,14 @@
if (strcasecmp(lvl.c_str(), "error") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::error); }
if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); }
- auto &shm = BHomeShm();
- if (!CenterInit(shm)) {
- auto msg = "init memory error.";
+ if (!CenterInit()) {
+ auto msg = "init memory failed, or center is already running.";
LOG_FATAL() << msg;
printf("%s\n", msg);
exit(0);
}
+ auto &shm = BHomeShm();
GlobalInit(shm);
-
- InstanceFlag inst(shm, kCenterRunningFlag);
- if (!inst.TryStartAsFirstInstance()) {
- auto msg = "another instance is running, exit.";
- LOG_INFO() << msg;
- printf("%s\n", msg);
- return 0;
- }
if (args.Has("daemon") || args.Has("d")) {
int r = daemon(0, 0); // TODO center control msg to close itself.
@@ -125,9 +60,9 @@
BHCenter center(shm);
center.Start();
- auto msg = "center started ...";
+ auto msg = "center (" + shm.name() + ") started ...";
LOG_INFO() << msg;
- printf("%s\n", msg);
+ printf("%s\n", msg.c_str());
WaitForSignals({SIGINT, SIGTERM});
center.Stop();
LOG_INFO() << "center stopped.";
diff --git a/box/center_topic_node.cpp b/box/center_topic_node.cpp
index 749f4e6..8228992 100644
--- a/box/center_topic_node.cpp
+++ b/box/center_topic_node.cpp
@@ -56,7 +56,7 @@
} // namespace
CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) :
- pscenter_(center), pnode_(new TopicNode(shm)), run_(false) {}
+ pscenter_(center), pnode_(new TopicNode(shm, 200)), run_(false) {}
CenterTopicNode::~CenterTopicNode() { Stop(); }
diff --git a/src/defs.cpp b/src/defs.cpp
index 57df1bc..694e2c5 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -22,6 +22,7 @@
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
+#include <sys/file.h>
namespace
{
@@ -76,8 +77,16 @@
static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
const int64_t kCenterInfoFixedAddress = 1024 * 4;
+const int64_t kShmMetaInfoFixedAddress = 1024 * 16;
-const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+
+struct BHomeMetaInfo {
+ boost::uuids::uuid tag_;
+ std::atomic<uint64_t> shm_id_;
+ std::atomic<uint64_t> ssn_id_;
+};
+
struct CenterMetaInfo {
boost::uuids::uuid tag_;
CenterInfo info_;
@@ -88,16 +97,43 @@
template <class T = void>
T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
+class FileLock
+{
+public:
+ FileLock(const std::string &path) :
+ fd_(Open(path))
+ {
+ if (fd_ == -1) { throw std::runtime_error("error open file:" + path); }
+ }
+ ~FileLock() { Close(fd_); }
+ bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); }
+ void unlock() { flock(fd_, LOCK_UN); }
+
+private:
+ static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); }
+ static int Close(int fd) { return close(fd); }
+ int fd_;
+ std::mutex mtx_;
+};
+
+SharedMemory &BHomeMetaShm()
+{
+ static std::string name("bhshmq_meta_v0");
+ static SharedMemory shm(name, 1024 * 128);
+ return shm;
+}
+
} // namespace
CenterInfo *GetCenterInfo(SharedMemory &shm)
{
auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
- if (pmeta->tag_ == kCenterInfoTag) {
+ if (pmeta->tag_ == kMetaInfoTag) {
return &pmeta->info_;
}
return nullptr;
}
+
ShmSocket &DefaultSender(SharedMemory &shm)
{
typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
@@ -121,11 +157,55 @@
local_cache = store.back();
return *local_cache.second;
}
+
+BHomeMetaInfo *GetBHomeMeta()
+{
+ auto p = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
+ return (p->tag_ == kMetaInfoTag) ? p : nullptr;
+}
+
+bool ShmMetaInit()
+{
+ SharedMemory &shm = BHomeMetaShm();
+
+ static FileLock fl("/dev/shm/" + shm.name());
+ if (!fl.try_lock()) { // single center instance only.
+ return false;
+ }
+
+ auto pmeta = GetBHomeMeta();
+ if (pmeta && pmeta->tag_ == kMetaInfoTag) {
+ ++pmeta->shm_id_; // inc shm id
+ return true; // already exist.
+ } else {
+ Mutex *mutex = shm.FindOrCreate<Mutex>("bhshmq_meta_lock");
+ if (!mutex || !mutex->try_lock()) {
+ return false;
+ }
+ DEFER1(mutex->unlock());
+
+ auto base = Addr(shm.get_address());
+ auto offset = kShmMetaInfoFixedAddress;
+ void *p = shm.Alloc(offset * 2);
+ if (Addr(p) - base <= offset) {
+ pmeta = new (Ptr(offset + base)) BHomeMetaInfo;
+ pmeta->tag_ = kMetaInfoTag;
+ pmeta->shm_id_ = 100;
+ pmeta->ssn_id_ = 10000;
+ return true;
+ }
+ }
+ return false;
+}
+
// put center info at fixed memory position.
// as boost shm find object (find socket/mq by id, etc...) also locks inside,
// which node might crash inside and cause deadlock.
-bool CenterInit(SharedMemory &shm)
+bool CenterInit()
{
+ if (!ShmMetaInit()) { return false; }
+
+ SharedMemory &shm = BHomeShm();
Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
if (!mutex || !mutex->try_lock()) {
return false;
@@ -133,7 +213,7 @@
DEFER1(mutex->unlock());
auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
- if (pmeta->tag_ == kCenterInfoTag) {
+ if (pmeta->tag_ == kMetaInfoTag) {
return true;
} else {
auto base = Addr(shm.get_address());
@@ -155,7 +235,7 @@
InitMQ(info.mq_center_, NextId());
InitMQ(info.mq_bus_, NextId());
- pmeta->tag_ = kCenterInfoTag;
+ pmeta->tag_ = kMetaInfoTag;
return true;
}
}
@@ -183,8 +263,10 @@
std::string BHomeShmName()
{
- return "bhome_default_shm_v0";
+ auto bhome_meta = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
+ return "bhome_shmq_id_" + std::to_string(bhome_meta->shm_id_.load());
}
+
SharedMemory &BHomeShm()
{
static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
@@ -193,7 +275,7 @@
bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
-MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); }
+MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); }
void SetLastError(const int ec, const std::string &msg)
{
diff --git a/src/defs.h b/src/defs.h
index 5f6fc16..56de8fa 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -39,9 +39,6 @@
MQInfo mq_bus_;
MQInfo mq_sender_;
robust::AtomicReqRep init_rr_;
- std::atomic<MQId> mqid_;
- CenterInfo() :
- mqid_(100000) {}
};
const int kBHCenterPort = 24287;
@@ -59,7 +56,7 @@
ShmSocket &DefaultSender(SharedMemory &shm);
MQId NewSession();
-bool CenterInit(SharedMemory &shm);
+bool CenterInit();
bool GlobalInit(SharedMemory &shm);
typedef std::string Topic;
void SetLastError(const int ec, const std::string &msg);
diff --git a/src/robust.h b/src/robust.h
index c46010a..1a3f430 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -67,6 +67,8 @@
typedef std::function<Data(const Data)> Handler;
bool ClientRequest(const Data request, Data &reply);
bool ServerProcess(Handler onReq);
+ AtomicReqRep() :
+ data_(0), timestamp_(now()) {}
private:
enum State {
@@ -79,7 +81,7 @@
static Data Decode(Data d) { return d >> 3; }
typedef std::chrono::steady_clock steady_clock;
typedef steady_clock::duration Duration;
- Duration now() { return steady_clock::now().time_since_epoch(); }
+ static Duration now() { return steady_clock::now().time_since_epoch(); }
bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
std::atomic<Data> data_;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index b21f7ef..f592bff 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -50,8 +50,8 @@
} // namespace
-TopicNode::TopicNode(SharedMemory &shm) :
- shm_(shm), state_(eStateUninited)
+TopicNode::TopicNode(SharedMemory &shm, MQId ssn_id) :
+ shm_(shm), state_(eStateUninited), ssn_id_(ssn_id)
{
}
diff --git a/src/topic_node.h b/src/topic_node.h
index 3d6767b..8c08fcf 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -39,7 +39,7 @@
const MQInfo &BusAddr() const { return BHTopicBusAddress(shm()); }
public:
- TopicNode(SharedMemory &shm);
+ TopicNode(SharedMemory &shm, MQId ssn_id = 0);
~TopicNode();
// topic node
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index bddcbf7..fc1ad08 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -235,8 +235,9 @@
printf("query with ip set\n");
host.set_ip("127.0.0.1");
host.set_port(kBHCenterPort);
- host.set_mq_id(1000011);
- host.set_abs_addr(10296);
+ // center topic node address.
+ host.set_mq_id(201);
+ host.set_abs_addr(10072);
std::string dest(host.SerializeAsString());
void *proc_id = 0;
diff --git a/utest/tcp_test.cpp b/utest/tcp_test.cpp
index 86a0897..2aead7e 100644
--- a/utest/tcp_test.cpp
+++ b/utest/tcp_test.cpp
@@ -52,8 +52,8 @@
head.mutable_dest()->set_ip(connect_addr);
head.mutable_dest()->set_port(port);
- head.mutable_dest()->set_mq_id(1000011);
- head.mutable_dest()->set_abs_addr(10296);
+ head.mutable_dest()->set_mq_id(201);
+ head.mutable_dest()->set_abs_addr(10072);
return (MsgI::Serialize(head, req));
};
--
Gitblit v1.8.0