From ae17d1439b35b55212c3a30712e0a60b1d6a99c0 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期三, 30 六月 2021 11:15:53 +0800
Subject: [PATCH] support tcp pub/sub.
---
src/defs.cpp | 181 +++++++++++++++++++++++++++++++++++++++------
1 files changed, 156 insertions(+), 25 deletions(-)
diff --git a/src/defs.cpp b/src/defs.cpp
index 6e7a5fd..8ebcc7d 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -18,10 +18,11 @@
#include "defs.h"
#include "msg.h"
#include "shm_msg_queue.h"
-#include "socket.h"
+#include "shm_socket.h"
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
+#include <sys/file.h>
namespace
{
@@ -76,8 +77,16 @@
static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
const int64_t kCenterInfoFixedAddress = 1024 * 4;
+const int64_t kShmMetaInfoFixedAddress = 1024 * 16;
-const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
+
+struct BHomeMetaInfo {
+ boost::uuids::uuid tag_;
+ std::atomic<uint64_t> shm_id_;
+ std::atomic<uint64_t> ssn_id_;
+};
+
struct CenterMetaInfo {
boost::uuids::uuid tag_;
CenterInfo info_;
@@ -88,30 +97,136 @@
template <class T = void>
T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
+class FileLock
+{
+public:
+ FileLock(const std::string &path) :
+ fd_(Open(path))
+ {
+ if (fd_ == -1) { throw std::runtime_error("error open file:" + path); }
+ }
+ ~FileLock() { Close(fd_); }
+ bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); }
+ void unlock() { flock(fd_, LOCK_UN); }
+
+private:
+ static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); }
+ static int Close(int fd) { return close(fd); }
+ int fd_;
+ std::mutex mtx_;
+};
+
+SharedMemory &BHomeMetaShm()
+{
+ static std::string name("bhshmq_meta_v0");
+ static SharedMemory shm(name, 1024 * 128);
+ return shm;
+}
+
+ShmSocket &ShmSender(SharedMemory &shm, const bool reset)
+{
+ typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
+ static std::vector<Pair> store;
+ static std::mutex s_mtx;
+ thread_local Pair local_cache;
+
+ std::lock_guard<std::mutex> lk(s_mtx);
+
+ if (reset) {
+ for (auto &kv : store) {
+ if (kv.first == &shm) {
+ auto &mq = GetCenterInfo(shm)->mq_sender_;
+ kv.second.reset(new ShmSocket(mq.offset_, shm, mq.id_));
+ local_cache = kv;
+ return *local_cache.second;
+ }
+ }
+ } else if (local_cache.first == &shm) {
+ return *local_cache.second;
+ }
+
+ for (auto &kv : store) {
+ if (kv.first == &shm) {
+ local_cache = kv;
+ return *local_cache.second;
+ }
+ }
+ auto &mq = GetCenterInfo(shm)->mq_sender_;
+ store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_));
+ local_cache = store.back();
+ return *local_cache.second;
+}
} // namespace
-CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm)
+CenterInfo *GetCenterInfo(SharedMemory &shm)
{
auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
- if (pmeta->tag_ == kCenterInfoTag) {
+ if (pmeta->tag_ == kMetaInfoTag) {
return &pmeta->info_;
}
return nullptr;
}
+ShmSocket &DefaultSender(SharedMemory &shm) { return ShmSender(shm, false); }
+
+BHomeMetaInfo *GetBHomeMeta()
+{
+ auto p = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
+ return (p->tag_ == kMetaInfoTag) ? p : nullptr;
+}
+
+bool ShmMetaInit()
+{
+ SharedMemory &shm = BHomeMetaShm();
+
+ static FileLock fl("/dev/shm/" + shm.name());
+ if (!fl.try_lock()) { // single center instance only.
+ return false;
+ }
+
+ auto pmeta = GetBHomeMeta();
+ if (pmeta && pmeta->tag_ == kMetaInfoTag) {
+ // remove old shm
+ SharedMemory::Remove(BHomeShmName());
+ ++pmeta->shm_id_; // inc shm id
+ return true; // already exist.
+ } else {
+ Mutex *mutex = shm.FindOrCreate<Mutex>("bhshmq_meta_lock");
+ if (!mutex || !mutex->try_lock()) {
+ return false;
+ }
+ DEFER1(mutex->unlock());
+
+ auto base = Addr(shm.get_address());
+ auto offset = kShmMetaInfoFixedAddress;
+ void *p = shm.Alloc(offset * 2);
+ if (Addr(p) - base <= offset) {
+ pmeta = new (Ptr(offset + base)) BHomeMetaInfo;
+ pmeta->tag_ = kMetaInfoTag;
+ pmeta->shm_id_ = 100;
+ pmeta->ssn_id_ = 10000;
+ return true;
+ }
+ }
+ return false;
+}
+
// put center info at fixed memory position.
// as boost shm find object (find socket/mq by id, etc...) also locks inside,
// which node might crash inside and cause deadlock.
-bool CenterInit(bhome_shm::SharedMemory &shm)
+bool CenterInit()
{
- Mutex *mutex = shm.Create<Mutex>("shm_center_lock");
+ if (!ShmMetaInit()) { return false; }
+
+ SharedMemory &shm = BHomeShm();
+ Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
if (!mutex || !mutex->try_lock()) {
return false;
}
DEFER1(mutex->unlock());
auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
- if (pmeta->tag_ == kCenterInfoTag) {
+ if (pmeta->tag_ == kMetaInfoTag) {
return true;
} else {
auto base = Addr(shm.get_address());
@@ -123,7 +238,7 @@
auto InitMQ = [&](auto &mq, auto &&id) {
mq.id_ = id;
- ShmSocket tmp(shm, id, 16);
+ ShmSocket tmp(shm, id, eOpenOrCreate);
mq.offset_ = tmp.AbsAddr();
};
@@ -132,19 +247,24 @@
InitMQ(info.mq_sender_, NextId());
InitMQ(info.mq_center_, NextId());
InitMQ(info.mq_bus_, NextId());
- InitMQ(info.mq_init_, NextId());
- pmeta->tag_ = kCenterInfoTag;
+ pmeta->tag_ = kMetaInfoTag;
return true;
}
}
return false;
}
-const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; }
-const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; }
-const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; }
-const MQInfo &BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_; }
+const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; }
+const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; }
+bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply)
+{
+ return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply);
+}
+void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq)
+{
+ GetCenterInfo(shm)->init_rr_.ServerProcess(onReq);
+}
int64_t CalcAllocIndex(int64_t size)
{
@@ -156,20 +276,27 @@
std::string BHomeShmName()
{
- return "bhome_default_shm_v0";
-}
-bhome_shm::SharedMemory &BHomeShm()
-{
- static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
- return shm;
+ auto bhome_meta = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
+ return "bhshmq_sid_" + std::to_string(bhome_meta->shm_id_.load());
}
-bool GlobalInit(bhome_shm::SharedMemory &shm)
+SharedMemory &BHomeShm()
{
- MsgI::BindShm(shm);
- CenterInfo *pinfo = GetCenterInfo(shm);
- return pinfo && ShmMsgQueue::SetData(pinfo->mqid_);
+ static std::unique_ptr<SharedMemory> shm_ptr;
+ static std::string shm_name;
+ if (!shm_ptr || shm_name != BHomeShmName()) {
+ shm_name = BHomeShmName();
+ if (shm_ptr) {
+ ShmSender(*shm_ptr, true); // reset sender.
+ }
+ shm_ptr.reset(new SharedMemory(shm_name, 1024 * 1024 * 512));
+ }
+ return *shm_ptr;
}
+
+bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
+
+MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); }
void SetLastError(const int ec, const std::string &msg)
{
@@ -181,4 +308,8 @@
{
ec = LastErrorStore().ec_;
msg = LastErrorStore().msg_;
-}
\ No newline at end of file
+}
+
+int NodeTimeoutSec() { return 60; }
+
+std::string BHLogDir() { return "/opt/vasystem/valog/"; }
--
Gitblit v1.8.0