From 8967e7f2f8b94dc032135707e16c8a9f233d0db6 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 03 六月 2021 13:32:16 +0800
Subject: [PATCH] rafactor, remove old todo, add some err msg.
---
src/defs.cpp | 297 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 291 insertions(+), 6 deletions(-)
diff --git a/src/defs.cpp b/src/defs.cpp
index 4051196..8ebcc7d 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -16,15 +16,300 @@
* =====================================================================================
*/
#include "defs.h"
+#include "msg.h"
+#include "shm_msg_queue.h"
+#include "shm_socket.h"
+#include <boost/uuid/random_generator.hpp>
+#include <boost/uuid/string_generator.hpp>
+#include <boost/uuid/uuid.hpp>
+#include <sys/file.h>
+
namespace
{
-const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
-const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
-const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff");
+struct LastError {
+ int ec_ = 0;
+ std::string msg_;
+};
+LastError &LastErrorStore()
+{
+ thread_local LastError le;
+ return le;
+}
+
+constexpr int64_t AllocSizeIndex[] = {
+ 16, 24, 32, 40, 48, 56, 64, 72,
+ 80, 88, 96, 104, 120, 136, 152, 168,
+ 184, 200, 224, 248, 272, 296, 328, 360,
+ 392, 432, 472, 520, 568, 624, 680, 744,
+ 816, 896, 984, 1080, 1184, 1296, 1416, 1544,
+ 1688, 1848, 2016, 2200, 2400, 2624, 2864, 3128,
+ 3416, 3728, 4072, 4448, 4856, 5304, 5792, 6320,
+ 6896, 7528, 8216, 8968, 9784, 10680, 11656, 12720,
+ 13880, 15144, 16520, 18024, 19664, 21456, 23408, 25536,
+ 27864, 30400, 33168, 36184, 39480, 43072, 46992, 51264,
+ 55928, 61016, 66568, 72624, 79232, 86440, 94304, 102880,
+ 112232, 122440, 133576, 145720, 158968, 173424, 189192, 206392,
+ 225160, 245632, 267968, 292328, 318904, 347896, 379528, 414032,
+ 451672, 492736, 537536, 586408, 639720, 697880, 761328, 830544,
+ 906048, 988416, 1078272, 1176296, 1283232, 1399896, 1527160, 1665992,
+ 1817448, 1982672, 2162920, 2359552, 2574056, 2808064, 3063344, 3341832,
+ 3645640, 3977064, 4338616, 4733040, 5163320, 5632712, 6144776, 6703392,
+ 7312792, 7977592, 8702832, 9494000, 10357096, 11298656, 12325808, 13446336,
+ 14668736, 16002264, 17457016, 19044024, 20775304, 22663968, 24724328, 26972000,
+ 29424000, 32098912, 35017000, 38200368, 41673128, 45461600, 49594472, 54103064,
+ 59021528, 64387128, 70240504, 76626008, 83592008, 91191288, 99481408, 108525176,
+ 118391104, 129153936, 140895208, 153703864, 167676944, 182920304, 199549424, 217690280,
+ 237480312, 259069432, 282621200, 308314040, 336342592, 366919192, 400275488, 436664168,
+ 476360912, 519666456, 566908864, 618446040, 674668408, 736001904, 802911168, 875903096,
+ 955530656, 1042397080, 1137160456, 1240538680, 1353314928, 1476343560, 1610556616, 1756970856,
+ 1916695480, 2090940528, 2281026032, 2488392040, 2714609504, 2961392192, 3230609664, 3524301456,
+ 3844692504, 4194210008, 4575501832, 4991456544, 5445225320, 5940245808, 6480268160, 7069383448,
+ 7712054672, 8413150552, 9177982424, 10012344464, 10922557600, 11915517384, 12998746240, 14180450448,
+ 15469582312, 16875907976, 18410081432, 20083725200, 21909518400, 23901292800, 26074137600, 28444513752,
+ 31030378640, 33851322152, 36928715080, 40285871000, 43948222912, 47943515904, 52302017352, 57056746208,
+ 62243723136, 67902243424, 74075174648, 80809281440, 88155579752, 96169723368, 104912425496, 114449918728,
+ 124854456800, 136204861968, 148587122152, 162095042352, 176830955296, 192906496688, 210443450936, 229574673752};
+
+const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]);
+static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough.");
+static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
+
+const int64_t kCenterInfoFixedAddress = 1024 * 4;
+const int64_t kShmMetaInfoFixedAddress = 1024 * 16;
+
+const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+
+struct BHomeMetaInfo {
+ boost::uuids::uuid tag_;
+ std::atomic<uint64_t> shm_id_;
+ std::atomic<uint64_t> ssn_id_;
+};
+
+struct CenterMetaInfo {
+ boost::uuids::uuid tag_;
+ CenterInfo info_;
+};
+
+int64_t Addr(void *ptr) { return reinterpret_cast<int64_t>(ptr); }
+// void *Ptr(const int64_t offset) { return reinterpret_cast<void *>(offset); }
+template <class T = void>
+T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
+
+class FileLock
+{
+public:
+ FileLock(const std::string &path) :
+ fd_(Open(path))
+ {
+ if (fd_ == -1) { throw std::runtime_error("error open file:" + path); }
+ }
+ ~FileLock() { Close(fd_); }
+ bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); }
+ void unlock() { flock(fd_, LOCK_UN); }
+
+private:
+ static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); }
+ static int Close(int fd) { return close(fd); }
+ int fd_;
+ std::mutex mtx_;
+};
+
+SharedMemory &BHomeMetaShm()
+{
+ static std::string name("bhshmq_meta_v0");
+ static SharedMemory shm(name, 1024 * 128);
+ return shm;
+}
+
+ShmSocket &ShmSender(SharedMemory &shm, const bool reset)
+{
+ typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
+ static std::vector<Pair> store;
+ static std::mutex s_mtx;
+ thread_local Pair local_cache;
+
+ std::lock_guard<std::mutex> lk(s_mtx);
+
+ if (reset) {
+ for (auto &kv : store) {
+ if (kv.first == &shm) {
+ auto &mq = GetCenterInfo(shm)->mq_sender_;
+ kv.second.reset(new ShmSocket(mq.offset_, shm, mq.id_));
+ local_cache = kv;
+ return *local_cache.second;
+ }
+ }
+ } else if (local_cache.first == &shm) {
+ return *local_cache.second;
+ }
+
+ for (auto &kv : store) {
+ if (kv.first == &shm) {
+ local_cache = kv;
+ return *local_cache.second;
+ }
+ }
+ auto &mq = GetCenterInfo(shm)->mq_sender_;
+ store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_));
+ local_cache = store.back();
+ return *local_cache.second;
+}
} // namespace
-const MQId &BHTopicBus() { return kBHTopicBus; }
-const MQId &BHTopicReqRepCenter() { return kBHTopicReqRepCenter; }
-const MQId &BHUniCenter() { return kBHUniCenter; }
+CenterInfo *GetCenterInfo(SharedMemory &shm)
+{
+ auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+ if (pmeta->tag_ == kMetaInfoTag) {
+ return &pmeta->info_;
+ }
+ return nullptr;
+}
+
+ShmSocket &DefaultSender(SharedMemory &shm) { return ShmSender(shm, false); }
+
+BHomeMetaInfo *GetBHomeMeta()
+{
+ auto p = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
+ return (p->tag_ == kMetaInfoTag) ? p : nullptr;
+}
+
+bool ShmMetaInit()
+{
+ SharedMemory &shm = BHomeMetaShm();
+
+ static FileLock fl("/dev/shm/" + shm.name());
+ if (!fl.try_lock()) { // single center instance only.
+ return false;
+ }
+
+ auto pmeta = GetBHomeMeta();
+ if (pmeta && pmeta->tag_ == kMetaInfoTag) {
+ // remove old shm
+ SharedMemory::Remove(BHomeShmName());
+ ++pmeta->shm_id_; // inc shm id
+ return true; // already exist.
+ } else {
+ Mutex *mutex = shm.FindOrCreate<Mutex>("bhshmq_meta_lock");
+ if (!mutex || !mutex->try_lock()) {
+ return false;
+ }
+ DEFER1(mutex->unlock());
+
+ auto base = Addr(shm.get_address());
+ auto offset = kShmMetaInfoFixedAddress;
+ void *p = shm.Alloc(offset * 2);
+ if (Addr(p) - base <= offset) {
+ pmeta = new (Ptr(offset + base)) BHomeMetaInfo;
+ pmeta->tag_ = kMetaInfoTag;
+ pmeta->shm_id_ = 100;
+ pmeta->ssn_id_ = 10000;
+ return true;
+ }
+ }
+ return false;
+}
+
+// put center info at fixed memory position.
+// as boost shm find object (find socket/mq by id, etc...) also locks inside,
+// which node might crash inside and cause deadlock.
+bool CenterInit()
+{
+ if (!ShmMetaInit()) { return false; }
+
+ SharedMemory &shm = BHomeShm();
+ Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
+ if (!mutex || !mutex->try_lock()) {
+ return false;
+ }
+ DEFER1(mutex->unlock());
+
+ auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+ if (pmeta->tag_ == kMetaInfoTag) {
+ return true;
+ } else {
+ auto base = Addr(shm.get_address());
+ auto offset = kCenterInfoFixedAddress;
+ void *p = shm.Alloc(offset * 2);
+ if (Addr(p) - base <= offset) {
+ pmeta = new (Ptr(offset + base)) CenterMetaInfo;
+ auto &info = pmeta->info_;
+
+ auto InitMQ = [&](auto &mq, auto &&id) {
+ mq.id_ = id;
+ ShmSocket tmp(shm, id, eOpenOrCreate);
+ mq.offset_ = tmp.AbsAddr();
+ };
+
+ int id = 100;
+ auto NextId = [&]() { return ++id; };
+ InitMQ(info.mq_sender_, NextId());
+ InitMQ(info.mq_center_, NextId());
+ InitMQ(info.mq_bus_, NextId());
+
+ pmeta->tag_ = kMetaInfoTag;
+ return true;
+ }
+ }
+ return false;
+}
+
+const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; }
+const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; }
+bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply)
+{
+ return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply);
+}
+void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq)
+{
+ GetCenterInfo(shm)->init_rr_.ServerProcess(onReq);
+}
+
+int64_t CalcAllocIndex(int64_t size)
+{
+ auto pos = std::lower_bound(AllocSizeIndex, AllocSizeIndex + kAllocIndexLen, size);
+ return (pos == AllocSizeIndex + kAllocIndexLen) ? -1 : pos - AllocSizeIndex;
+}
+
+int64_t GetAllocSize(int index) { return index < kAllocIndexLen ? AllocSizeIndex[index] : 0; }
+
+std::string BHomeShmName()
+{
+ auto bhome_meta = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
+ return "bhshmq_sid_" + std::to_string(bhome_meta->shm_id_.load());
+}
+
+SharedMemory &BHomeShm()
+{
+ static std::unique_ptr<SharedMemory> shm_ptr;
+ static std::string shm_name;
+ if (!shm_ptr || shm_name != BHomeShmName()) {
+ shm_name = BHomeShmName();
+ if (shm_ptr) {
+ ShmSender(*shm_ptr, true); // reset sender.
+ }
+ shm_ptr.reset(new SharedMemory(shm_name, 1024 * 1024 * 512));
+ }
+ return *shm_ptr;
+}
+
+bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
+
+MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); }
+
+void SetLastError(const int ec, const std::string &msg)
+{
+ LastErrorStore().ec_ = ec;
+ LastErrorStore().msg_ = msg;
+}
+
+void GetLastError(int &ec, std::string &msg)
+{
+ ec = LastErrorStore().ec_;
+ msg = LastErrorStore().msg_;
+}
+
+int NodeTimeoutSec() { return 60; }
+
+std::string BHLogDir() { return "/opt/vasystem/valog/"; }
--
Gitblit v1.8.0