From 330f78f3334bcdcdb4cc2ab2dbf66604e0224d71 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 21 五月 2021 16:21:45 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/valib/bhshmq
---
src/defs.cpp | 62 ++++++++++++++++++++++---------
1 files changed, 44 insertions(+), 18 deletions(-)
diff --git a/src/defs.cpp b/src/defs.cpp
index 6e7a5fd..a2f05cc 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -18,7 +18,7 @@
#include "defs.h"
#include "msg.h"
#include "shm_msg_queue.h"
-#include "socket.h"
+#include "shm_socket.h"
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
@@ -90,7 +90,7 @@
} // namespace
-CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm)
+CenterInfo *GetCenterInfo(SharedMemory &shm)
{
auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
if (pmeta->tag_ == kCenterInfoTag) {
@@ -98,13 +98,35 @@
}
return nullptr;
}
+ShmSocket &DefaultSender(SharedMemory &shm)
+{
+ typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
+ static std::vector<Pair> store;
+ static std::mutex s_mtx;
+ thread_local Pair local_cache;
+ if (local_cache.first == &shm) {
+ return *local_cache.second;
+ }
+
+ std::lock_guard<std::mutex> lk(s_mtx);
+ for (auto &kv : store) {
+ if (kv.first == &shm) {
+ local_cache = kv;
+ return *local_cache.second;
+ }
+ }
+ auto &mq = GetCenterInfo(shm)->mq_sender_;
+ store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_));
+ local_cache = store.back();
+ return *local_cache.second;
+}
// put center info at fixed memory position.
// as boost shm find object (find socket/mq by id, etc...) also locks inside,
// which node might crash inside and cause deadlock.
-bool CenterInit(bhome_shm::SharedMemory &shm)
+bool CenterInit(SharedMemory &shm)
{
- Mutex *mutex = shm.Create<Mutex>("shm_center_lock");
+ Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
if (!mutex || !mutex->try_lock()) {
return false;
}
@@ -132,7 +154,6 @@
InitMQ(info.mq_sender_, NextId());
InitMQ(info.mq_center_, NextId());
InitMQ(info.mq_bus_, NextId());
- InitMQ(info.mq_init_, NextId());
pmeta->tag_ = kCenterInfoTag;
return true;
@@ -141,10 +162,16 @@
return false;
}
-const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; }
-const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; }
-const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; }
-const MQInfo &BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_; }
+const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; }
+const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; }
+bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply)
+{
+ return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply);
+}
+void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq)
+{
+ GetCenterInfo(shm)->init_rr_.ServerProcess(onReq);
+}
int64_t CalcAllocIndex(int64_t size)
{
@@ -158,18 +185,15 @@
{
return "bhome_default_shm_v0";
}
-bhome_shm::SharedMemory &BHomeShm()
+SharedMemory &BHomeShm()
{
- static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
+ static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
return shm;
}
-bool GlobalInit(bhome_shm::SharedMemory &shm)
-{
- MsgI::BindShm(shm);
- CenterInfo *pinfo = GetCenterInfo(shm);
- return pinfo && ShmMsgQueue::SetData(pinfo->mqid_);
-}
+bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
+
+MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); }
void SetLastError(const int ec, const std::string &msg)
{
@@ -181,4 +205,6 @@
{
ec = LastErrorStore().ec_;
msg = LastErrorStore().msg_;
-}
\ No newline at end of file
+}
+
+int NodeTimeoutSec() { return 60; }
\ No newline at end of file
--
Gitblit v1.8.0