From 69f60d8bcc5121eb952b57277c94ad5cecb8d44a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 03 六月 2021 11:03:08 +0800
Subject: [PATCH] client side update shm on center restart.
---
utest/api_test.cpp | 152 +++++++++++++++++++++++---------------
src/bh_api.cc | 34 ++++++--
src/defs.cpp | 53 +++++++++----
3 files changed, 154 insertions(+), 85 deletions(-)
diff --git a/src/bh_api.cc b/src/bh_api.cc
index 8690d5f..3dafe7a 100644
--- a/src/bh_api.cc
+++ b/src/bh_api.cc
@@ -1,9 +1,11 @@
#include "bh_api.h"
#include "defs.h"
#include "topic_node.h"
+#include <chrono>
#include <cstdio>
#include <memory>
+using namespace std::chrono_literals;
using namespace bhome_shm;
using namespace bhome_msg;
@@ -30,17 +32,33 @@
}
std::unique_ptr<TopicNode> &ProcNodePtr()
{
+ // client side init here.
static std::mutex mtx;
- std::lock_guard<std::mutex> lk(mtx);
+ auto InitLog = []() {
+ ns_log::AddLog(BHLogDir() + "bhshmq_node_" + GetProcExe() + ".log", true);
+ return true;
+ };
+ static bool init_log = InitLog();
+ static std::string shm_name;
static std::unique_ptr<TopicNode> ptr;
- if (!ptr && GlobalInit(BHomeShm())) {
- auto InitLog = []() {
- ns_log::AddLog(BHLogDir() + "bhshmq_node_" + GetProcExe() + ".log");
- return true;
- };
- static bool init_log = InitLog();
- ptr.reset(new TopicNode(BHomeShm()));
+
+ std::lock_guard<std::mutex> lk(mtx);
+ if (shm_name != BHomeShmName()) {
+ shm_name = BHomeShmName();
+ LOG_INFO() << "using shm " << shm_name;
+
+ ptr.reset();
+ // must reset/stop node before call BHomeShm() which resets shm.
+
+ auto &shm = BHomeShm();
+ for (int i = 0; !ptr && i < 3; ++i) {
+ if (GlobalInit(shm)) {
+ ptr.reset(new TopicNode(shm));
+ } else {
+ std::this_thread::sleep_for(1s); // make sure shm init done.
+ }
+ }
}
return ptr;
}
diff --git a/src/defs.cpp b/src/defs.cpp
index 9402b27..22b43e4 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -123,29 +123,28 @@
return shm;
}
-} // namespace
-
-CenterInfo *GetCenterInfo(SharedMemory &shm)
-{
- auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
- if (pmeta->tag_ == kMetaInfoTag) {
- return &pmeta->info_;
- }
- return nullptr;
-}
-
-ShmSocket &DefaultSender(SharedMemory &shm)
+ShmSocket &ShmSender(SharedMemory &shm, const bool reset)
{
typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
static std::vector<Pair> store;
static std::mutex s_mtx;
-
thread_local Pair local_cache;
- if (local_cache.first == &shm) {
+
+ std::lock_guard<std::mutex> lk(s_mtx);
+
+ if (reset) {
+ for (auto &kv : store) {
+ if (kv.first == &shm) {
+ auto &mq = GetCenterInfo(shm)->mq_sender_;
+ kv.second.reset(new ShmSocket(mq.offset_, shm, mq.id_));
+ local_cache = kv;
+ return *local_cache.second;
+ }
+ }
+ } else if (local_cache.first == &shm) {
return *local_cache.second;
}
- std::lock_guard<std::mutex> lk(s_mtx);
for (auto &kv : store) {
if (kv.first == &shm) {
local_cache = kv;
@@ -157,6 +156,18 @@
local_cache = store.back();
return *local_cache.second;
}
+} // namespace
+
+CenterInfo *GetCenterInfo(SharedMemory &shm)
+{
+ auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
+ if (pmeta->tag_ == kMetaInfoTag) {
+ return &pmeta->info_;
+ }
+ return nullptr;
+}
+
+ShmSocket &DefaultSender(SharedMemory &shm) { return ShmSender(shm, false); }
BHomeMetaInfo *GetBHomeMeta()
{
@@ -271,8 +282,16 @@
SharedMemory &BHomeShm()
{
- static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
- return shm;
+ static std::unique_ptr<SharedMemory> shm_ptr;
+ static std::string shm_name;
+ if (!shm_ptr || shm_name != BHomeShmName()) {
+ shm_name = BHomeShmName();
+ if (shm_ptr) {
+ ShmSender(*shm_ptr, true); // reset sender.
+ }
+ shm_ptr.reset(new SharedMemory(shm_name, 1024 * 1024 * 512));
+ }
+ return *shm_ptr;
}
bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index fc1ad08..b4e0f4b 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -111,25 +111,8 @@
using namespace std::chrono;
// using namespace std::chrono_literals;
-BOOST_AUTO_TEST_CASE(ApiTest)
+bool Register(const std::string &proc_id)
{
- auto max_time = std::chrono::steady_clock::time_point::max();
- auto dur = max_time.time_since_epoch();
- auto nsec = std::chrono::duration_cast<std::chrono::seconds>(dur).count();
- auto nmin = nsec / 60;
- auto nhour = nmin / 60;
- auto nday = nhour / 24;
- auto years = nday / 365;
- printf("seconds: %ld, hours: %ld , days:%ld, years: %ld\n",
- nsec, nhour, nday, years);
- std::chrono::steady_clock::duration a(123456);
- printf("nowsec: %ld\n", NowSec());
-
- printf("maxsec: %ld\n", CountSeconds(max_time));
-
- // BHCleanup();
- // return;
- const std::string proc_id = "demo_client";
bool reg = false;
for (int i = 0; i < 3 && !reg; ++i) {
ProcInfo proc;
@@ -160,7 +143,96 @@
BHFree(reply, reply_len);
// Sleep(1s);
}
- if (!reg) {
+ return reg;
+}
+
+bool SyncRequest(const std::string &topic, const std::string &data)
+{ // SyncRequest
+ MsgRequestTopic req;
+ req.set_topic(topic);
+ req.set_data(data);
+ std::string s(req.SerializeAsString());
+ // Sleep(10ms, false);
+ std::string dest(BHAddress().SerializeAsString());
+ void *proc_id = 0;
+ int proc_id_len = 0;
+ DEFER1(BHFree(proc_id, proc_id_len););
+ void *reply = 0;
+ int reply_len = 0;
+ DEFER1(BHFree(reply, reply_len));
+ bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 100);
+ if (!r) {
+ int ec = 0;
+ std::string msg;
+ GetApiError(ec, msg);
+ printf("request error: %d, %s\n", ec, msg.c_str());
+ } else {
+ MsgRequestTopicReply ret;
+ ret.ParseFromArray(reply, reply_len);
+ printf("request result: %s\n", ret.data().c_str());
+ }
+ return r;
+}
+
+BOOST_AUTO_TEST_CASE(RestartTest)
+{
+ const std::string proc_id = "demo_node";
+ for (int i = 0; i < 5; ++i) {
+ printf("loop: %d --------------------------\n", i);
+ if (!Register(proc_id)) { continue; }
+
+ const std::string topic_ = "topic_";
+
+ { // Server Register Topics
+ MsgTopicList topics;
+ for (int i = 0; i < 10; ++i) {
+ topics.add_topic_list(topic_ + std::to_string(i));
+ }
+ std::string s = topics.SerializeAsString();
+ void *reply = 0;
+ int reply_len = 0;
+ bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
+ DEFER1(BHFree(reply, reply_len));
+ }
+
+ BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
+
+ for (int i = 0; i < 60; ++i) {
+ bool r = SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i));
+ if (!r) {
+ void *msg = 0;
+ int msg_len = 0;
+ DEFER1(BHFree(msg, msg_len));
+ int ec = BHGetLastError(&msg, &msg_len);
+ if (ec == eNotRegistered) {
+ printf("need re-register\n");
+ break;
+ }
+ }
+ Sleep(1s);
+ }
+ }
+ BHCleanup();
+}
+
+BOOST_AUTO_TEST_CASE(ApiTest)
+{
+ auto max_time = std::chrono::steady_clock::time_point::max();
+ auto dur = max_time.time_since_epoch();
+ auto nsec = std::chrono::duration_cast<std::chrono::seconds>(dur).count();
+ auto nmin = nsec / 60;
+ auto nhour = nmin / 60;
+ auto nday = nhour / 24;
+ auto years = nday / 365;
+ printf("seconds: %ld, hours: %ld , days:%ld, years: %ld\n",
+ nsec, nhour, nday, years);
+ std::chrono::steady_clock::duration a(123456);
+ printf("nowsec: %ld\n", NowSec());
+
+ printf("maxsec: %ld\n", CountSeconds(max_time));
+
+ const std::string proc_id = "demo_client";
+ if (!Register(proc_id)) {
return;
}
@@ -176,21 +248,6 @@
int reply_len = 0;
bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
DEFER1(BHFree(reply, reply_len));
- }
- { // Server Register Topics
- MsgTopicList topics;
- topics.add_topic_list("@should_fail");
- std::string s = topics.SerializeAsString();
- void *reply = 0;
- int reply_len = 0;
- bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
- DEFER1(BHFree(reply, reply_len));
- if (!r) {
- int ec = 0;
- std::string msg;
- GetApiError(ec, msg);
- printf("register rpc failed, %d, %s\n", ec, msg.c_str());
- }
}
auto PrintProcs = [](MsgQueryProcReply const &result) {
printf("query proc result: %d\n", result.proc_list().size());
@@ -304,31 +361,6 @@
}
};
- auto SyncRequest = [&](int idx) { // SyncRequest
- MsgRequestTopic req;
- req.set_topic(topic_ + std::to_string(0));
- req.set_data("request_data_" + std::to_string(idx));
- std::string s(req.SerializeAsString());
- // Sleep(10ms, false);
- std::string dest(BHAddress().SerializeAsString());
- void *proc_id = 0;
- int proc_id_len = 0;
- DEFER1(BHFree(proc_id, proc_id_len););
- void *reply = 0;
- int reply_len = 0;
- DEFER1(BHFree(reply, reply_len));
- bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 1000);
- if (!r) {
- int ec = 0;
- std::string msg;
- GetApiError(ec, msg);
- printf("request error: %s\n", msg.c_str());
- } else {
- MsgRequestTopicReply ret;
- ret.ParseFromArray(reply, reply_len);
- printf("request result: %s\n", ret.data().c_str());
- }
- };
{
for (int i = 0; i < 1; ++i) {
MsgPublish pub;
@@ -407,7 +439,7 @@
const int64_t nreq = 1000 * 100;
for (int i = 0; i < 10; ++i) {
- SyncRequest(i);
+ SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i));
}
for (int i = 0; i < ncli; ++i) {
--
Gitblit v1.8.0