From 69f60d8bcc5121eb952b57277c94ad5cecb8d44a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 03 六月 2021 11:03:08 +0800 Subject: [PATCH] client side update shm on center restart. --- utest/api_test.cpp | 152 +++++++++++++++++++++++--------------- src/bh_api.cc | 34 ++++++-- src/defs.cpp | 53 +++++++++---- 3 files changed, 154 insertions(+), 85 deletions(-) diff --git a/src/bh_api.cc b/src/bh_api.cc index 8690d5f..3dafe7a 100644 --- a/src/bh_api.cc +++ b/src/bh_api.cc @@ -1,9 +1,11 @@ #include "bh_api.h" #include "defs.h" #include "topic_node.h" +#include <chrono> #include <cstdio> #include <memory> +using namespace std::chrono_literals; using namespace bhome_shm; using namespace bhome_msg; @@ -30,17 +32,33 @@ } std::unique_ptr<TopicNode> &ProcNodePtr() { + // client side init here. static std::mutex mtx; - std::lock_guard<std::mutex> lk(mtx); + auto InitLog = []() { + ns_log::AddLog(BHLogDir() + "bhshmq_node_" + GetProcExe() + ".log", true); + return true; + }; + static bool init_log = InitLog(); + static std::string shm_name; static std::unique_ptr<TopicNode> ptr; - if (!ptr && GlobalInit(BHomeShm())) { - auto InitLog = []() { - ns_log::AddLog(BHLogDir() + "bhshmq_node_" + GetProcExe() + ".log"); - return true; - }; - static bool init_log = InitLog(); - ptr.reset(new TopicNode(BHomeShm())); + + std::lock_guard<std::mutex> lk(mtx); + if (shm_name != BHomeShmName()) { + shm_name = BHomeShmName(); + LOG_INFO() << "using shm " << shm_name; + + ptr.reset(); + // must reset/stop node before call BHomeShm() which resets shm. + + auto &shm = BHomeShm(); + for (int i = 0; !ptr && i < 3; ++i) { + if (GlobalInit(shm)) { + ptr.reset(new TopicNode(shm)); + } else { + std::this_thread::sleep_for(1s); // make sure shm init done. + } + } } return ptr; } diff --git a/src/defs.cpp b/src/defs.cpp index 9402b27..22b43e4 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -123,29 +123,28 @@ return shm; } -} // namespace - -CenterInfo *GetCenterInfo(SharedMemory &shm) -{ - auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); - if (pmeta->tag_ == kMetaInfoTag) { - return &pmeta->info_; - } - return nullptr; -} - -ShmSocket &DefaultSender(SharedMemory &shm) +ShmSocket &ShmSender(SharedMemory &shm, const bool reset) { typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair; static std::vector<Pair> store; static std::mutex s_mtx; - thread_local Pair local_cache; - if (local_cache.first == &shm) { + + std::lock_guard<std::mutex> lk(s_mtx); + + if (reset) { + for (auto &kv : store) { + if (kv.first == &shm) { + auto &mq = GetCenterInfo(shm)->mq_sender_; + kv.second.reset(new ShmSocket(mq.offset_, shm, mq.id_)); + local_cache = kv; + return *local_cache.second; + } + } + } else if (local_cache.first == &shm) { return *local_cache.second; } - std::lock_guard<std::mutex> lk(s_mtx); for (auto &kv : store) { if (kv.first == &shm) { local_cache = kv; @@ -157,6 +156,18 @@ local_cache = store.back(); return *local_cache.second; } +} // namespace + +CenterInfo *GetCenterInfo(SharedMemory &shm) +{ + auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); + if (pmeta->tag_ == kMetaInfoTag) { + return &pmeta->info_; + } + return nullptr; +} + +ShmSocket &DefaultSender(SharedMemory &shm) { return ShmSender(shm, false); } BHomeMetaInfo *GetBHomeMeta() { @@ -271,8 +282,16 @@ SharedMemory &BHomeShm() { - static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); - return shm; + static std::unique_ptr<SharedMemory> shm_ptr; + static std::string shm_name; + if (!shm_ptr || shm_name != BHomeShmName()) { + shm_name = BHomeShmName(); + if (shm_ptr) { + ShmSender(*shm_ptr, true); // reset sender. + } + shm_ptr.reset(new SharedMemory(shm_name, 1024 * 1024 * 512)); + } + return *shm_ptr; } bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); } diff --git a/utest/api_test.cpp b/utest/api_test.cpp index fc1ad08..b4e0f4b 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -111,25 +111,8 @@ using namespace std::chrono; // using namespace std::chrono_literals; -BOOST_AUTO_TEST_CASE(ApiTest) +bool Register(const std::string &proc_id) { - auto max_time = std::chrono::steady_clock::time_point::max(); - auto dur = max_time.time_since_epoch(); - auto nsec = std::chrono::duration_cast<std::chrono::seconds>(dur).count(); - auto nmin = nsec / 60; - auto nhour = nmin / 60; - auto nday = nhour / 24; - auto years = nday / 365; - printf("seconds: %ld, hours: %ld , days:%ld, years: %ld\n", - nsec, nhour, nday, years); - std::chrono::steady_clock::duration a(123456); - printf("nowsec: %ld\n", NowSec()); - - printf("maxsec: %ld\n", CountSeconds(max_time)); - - // BHCleanup(); - // return; - const std::string proc_id = "demo_client"; bool reg = false; for (int i = 0; i < 3 && !reg; ++i) { ProcInfo proc; @@ -160,7 +143,96 @@ BHFree(reply, reply_len); // Sleep(1s); } - if (!reg) { + return reg; +} + +bool SyncRequest(const std::string &topic, const std::string &data) +{ // SyncRequest + MsgRequestTopic req; + req.set_topic(topic); + req.set_data(data); + std::string s(req.SerializeAsString()); + // Sleep(10ms, false); + std::string dest(BHAddress().SerializeAsString()); + void *proc_id = 0; + int proc_id_len = 0; + DEFER1(BHFree(proc_id, proc_id_len);); + void *reply = 0; + int reply_len = 0; + DEFER1(BHFree(reply, reply_len)); + bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 100); + if (!r) { + int ec = 0; + std::string msg; + GetApiError(ec, msg); + printf("request error: %d, %s\n", ec, msg.c_str()); + } else { + MsgRequestTopicReply ret; + ret.ParseFromArray(reply, reply_len); + printf("request result: %s\n", ret.data().c_str()); + } + return r; +} + +BOOST_AUTO_TEST_CASE(RestartTest) +{ + const std::string proc_id = "demo_node"; + for (int i = 0; i < 5; ++i) { + printf("loop: %d --------------------------\n", i); + if (!Register(proc_id)) { continue; } + + const std::string topic_ = "topic_"; + + { // Server Register Topics + MsgTopicList topics; + for (int i = 0; i < 10; ++i) { + topics.add_topic_list(topic_ + std::to_string(i)); + } + std::string s = topics.SerializeAsString(); + void *reply = 0; + int reply_len = 0; + bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); + DEFER1(BHFree(reply, reply_len)); + } + + BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc); + + for (int i = 0; i < 60; ++i) { + bool r = SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i)); + if (!r) { + void *msg = 0; + int msg_len = 0; + DEFER1(BHFree(msg, msg_len)); + int ec = BHGetLastError(&msg, &msg_len); + if (ec == eNotRegistered) { + printf("need re-register\n"); + break; + } + } + Sleep(1s); + } + } + BHCleanup(); +} + +BOOST_AUTO_TEST_CASE(ApiTest) +{ + auto max_time = std::chrono::steady_clock::time_point::max(); + auto dur = max_time.time_since_epoch(); + auto nsec = std::chrono::duration_cast<std::chrono::seconds>(dur).count(); + auto nmin = nsec / 60; + auto nhour = nmin / 60; + auto nday = nhour / 24; + auto years = nday / 365; + printf("seconds: %ld, hours: %ld , days:%ld, years: %ld\n", + nsec, nhour, nday, years); + std::chrono::steady_clock::duration a(123456); + printf("nowsec: %ld\n", NowSec()); + + printf("maxsec: %ld\n", CountSeconds(max_time)); + + const std::string proc_id = "demo_client"; + if (!Register(proc_id)) { return; } @@ -176,21 +248,6 @@ int reply_len = 0; bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); DEFER1(BHFree(reply, reply_len)); - } - { // Server Register Topics - MsgTopicList topics; - topics.add_topic_list("@should_fail"); - std::string s = topics.SerializeAsString(); - void *reply = 0; - int reply_len = 0; - bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000); - DEFER1(BHFree(reply, reply_len)); - if (!r) { - int ec = 0; - std::string msg; - GetApiError(ec, msg); - printf("register rpc failed, %d, %s\n", ec, msg.c_str()); - } } auto PrintProcs = [](MsgQueryProcReply const &result) { printf("query proc result: %d\n", result.proc_list().size()); @@ -304,31 +361,6 @@ } }; - auto SyncRequest = [&](int idx) { // SyncRequest - MsgRequestTopic req; - req.set_topic(topic_ + std::to_string(0)); - req.set_data("request_data_" + std::to_string(idx)); - std::string s(req.SerializeAsString()); - // Sleep(10ms, false); - std::string dest(BHAddress().SerializeAsString()); - void *proc_id = 0; - int proc_id_len = 0; - DEFER1(BHFree(proc_id, proc_id_len);); - void *reply = 0; - int reply_len = 0; - DEFER1(BHFree(reply, reply_len)); - bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 1000); - if (!r) { - int ec = 0; - std::string msg; - GetApiError(ec, msg); - printf("request error: %s\n", msg.c_str()); - } else { - MsgRequestTopicReply ret; - ret.ParseFromArray(reply, reply_len); - printf("request result: %s\n", ret.data().c_str()); - } - }; { for (int i = 0; i < 1; ++i) { MsgPublish pub; @@ -407,7 +439,7 @@ const int64_t nreq = 1000 * 100; for (int i = 0; i < 10; ++i) { - SyncRequest(i); + SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i)); } for (int i = 0; i < ncli; ++i) { -- Gitblit v1.8.0