lichao
2021-06-03 69f60d8bcc5121eb952b57277c94ad5cecb8d44a
client side update shm on center restart.
3个文件已修改
239 ■■■■■ 已修改文件
src/bh_api.cc 34 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 152 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cc
@@ -1,9 +1,11 @@
#include "bh_api.h"
#include "defs.h"
#include "topic_node.h"
#include <chrono>
#include <cstdio>
#include <memory>
using namespace std::chrono_literals;
using namespace bhome_shm;
using namespace bhome_msg;
@@ -30,17 +32,33 @@
}
std::unique_ptr<TopicNode> &ProcNodePtr()
{
    // client side init here.
    static std::mutex mtx;
    std::lock_guard<std::mutex> lk(mtx);
    auto InitLog = []() {
        ns_log::AddLog(BHLogDir() + "bhshmq_node_" + GetProcExe() + ".log", true);
        return true;
    };
    static bool init_log = InitLog();
    static std::string shm_name;
    static std::unique_ptr<TopicNode> ptr;
    if (!ptr && GlobalInit(BHomeShm())) {
        auto InitLog = []() {
            ns_log::AddLog(BHLogDir() + "bhshmq_node_" + GetProcExe() + ".log");
            return true;
        };
        static bool init_log = InitLog();
        ptr.reset(new TopicNode(BHomeShm()));
    std::lock_guard<std::mutex> lk(mtx);
    if (shm_name != BHomeShmName()) {
        shm_name = BHomeShmName();
        LOG_INFO() << "using shm " << shm_name;
        ptr.reset();
        // must reset/stop node before call BHomeShm() which resets shm.
        auto &shm = BHomeShm();
        for (int i = 0; !ptr && i < 3; ++i) {
            if (GlobalInit(shm)) {
                ptr.reset(new TopicNode(shm));
            } else {
                std::this_thread::sleep_for(1s); // make sure shm init done.
            }
        }
    }
    return ptr;
}
src/defs.cpp
@@ -123,29 +123,28 @@
    return shm;
}
} // namespace
CenterInfo *GetCenterInfo(SharedMemory &shm)
{
    auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
    if (pmeta->tag_ == kMetaInfoTag) {
        return &pmeta->info_;
    }
    return nullptr;
}
ShmSocket &DefaultSender(SharedMemory &shm)
ShmSocket &ShmSender(SharedMemory &shm, const bool reset)
{
    typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
    static std::vector<Pair> store;
    static std::mutex s_mtx;
    thread_local Pair local_cache;
    if (local_cache.first == &shm) {
    std::lock_guard<std::mutex> lk(s_mtx);
    if (reset) {
        for (auto &kv : store) {
            if (kv.first == &shm) {
                auto &mq = GetCenterInfo(shm)->mq_sender_;
                kv.second.reset(new ShmSocket(mq.offset_, shm, mq.id_));
                local_cache = kv;
                return *local_cache.second;
            }
        }
    } else if (local_cache.first == &shm) {
        return *local_cache.second;
    }
    std::lock_guard<std::mutex> lk(s_mtx);
    for (auto &kv : store) {
        if (kv.first == &shm) {
            local_cache = kv;
@@ -157,6 +156,18 @@
    local_cache = store.back();
    return *local_cache.second;
}
} // namespace
CenterInfo *GetCenterInfo(SharedMemory &shm)
{
    auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
    if (pmeta->tag_ == kMetaInfoTag) {
        return &pmeta->info_;
    }
    return nullptr;
}
ShmSocket &DefaultSender(SharedMemory &shm) { return ShmSender(shm, false); }
BHomeMetaInfo *GetBHomeMeta()
{
@@ -271,8 +282,16 @@
SharedMemory &BHomeShm()
{
    static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
    return shm;
    static std::unique_ptr<SharedMemory> shm_ptr;
    static std::string shm_name;
    if (!shm_ptr || shm_name != BHomeShmName()) {
        shm_name = BHomeShmName();
        if (shm_ptr) {
            ShmSender(*shm_ptr, true); // reset sender.
        }
        shm_ptr.reset(new SharedMemory(shm_name, 1024 * 1024 * 512));
    }
    return *shm_ptr;
}
bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
utest/api_test.cpp
@@ -111,25 +111,8 @@
using namespace std::chrono;
// using namespace std::chrono_literals;
BOOST_AUTO_TEST_CASE(ApiTest)
bool Register(const std::string &proc_id)
{
    auto max_time = std::chrono::steady_clock::time_point::max();
    auto dur = max_time.time_since_epoch();
    auto nsec = std::chrono::duration_cast<std::chrono::seconds>(dur).count();
    auto nmin = nsec / 60;
    auto nhour = nmin / 60;
    auto nday = nhour / 24;
    auto years = nday / 365;
    printf("seconds: %ld, hours: %ld , days:%ld, years: %ld\n",
           nsec, nhour, nday, years);
    std::chrono::steady_clock::duration a(123456);
    printf("nowsec: %ld\n", NowSec());
    printf("maxsec: %ld\n", CountSeconds(max_time));
    // BHCleanup();
    // return;
    const std::string proc_id = "demo_client";
    bool reg = false;
    for (int i = 0; i < 3 && !reg; ++i) {
        ProcInfo proc;
@@ -160,7 +143,96 @@
        BHFree(reply, reply_len);
        // Sleep(1s);
    }
    if (!reg) {
    return reg;
}
bool SyncRequest(const std::string &topic, const std::string &data)
{ // SyncRequest
    MsgRequestTopic req;
    req.set_topic(topic);
    req.set_data(data);
    std::string s(req.SerializeAsString());
    // Sleep(10ms, false);
    std::string dest(BHAddress().SerializeAsString());
    void *proc_id = 0;
    int proc_id_len = 0;
    DEFER1(BHFree(proc_id, proc_id_len););
    void *reply = 0;
    int reply_len = 0;
    DEFER1(BHFree(reply, reply_len));
    bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 100);
    if (!r) {
        int ec = 0;
        std::string msg;
        GetApiError(ec, msg);
        printf("request error: %d, %s\n", ec, msg.c_str());
    } else {
        MsgRequestTopicReply ret;
        ret.ParseFromArray(reply, reply_len);
        printf("request result: %s\n", ret.data().c_str());
    }
    return r;
}
BOOST_AUTO_TEST_CASE(RestartTest)
{
    const std::string proc_id = "demo_node";
    for (int i = 0; i < 5; ++i) {
        printf("loop: %d --------------------------\n", i);
        if (!Register(proc_id)) { continue; }
        const std::string topic_ = "topic_";
        { // Server Register Topics
            MsgTopicList topics;
            for (int i = 0; i < 10; ++i) {
                topics.add_topic_list(topic_ + std::to_string(i));
            }
            std::string s = topics.SerializeAsString();
            void *reply = 0;
            int reply_len = 0;
            bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
            DEFER1(BHFree(reply, reply_len));
        }
        BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
        for (int i = 0; i < 60; ++i) {
            bool r = SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i));
            if (!r) {
                void *msg = 0;
                int msg_len = 0;
                DEFER1(BHFree(msg, msg_len));
                int ec = BHGetLastError(&msg, &msg_len);
                if (ec == eNotRegistered) {
                    printf("need re-register\n");
                    break;
                }
            }
            Sleep(1s);
        }
    }
    BHCleanup();
}
BOOST_AUTO_TEST_CASE(ApiTest)
{
    auto max_time = std::chrono::steady_clock::time_point::max();
    auto dur = max_time.time_since_epoch();
    auto nsec = std::chrono::duration_cast<std::chrono::seconds>(dur).count();
    auto nmin = nsec / 60;
    auto nhour = nmin / 60;
    auto nday = nhour / 24;
    auto years = nday / 365;
    printf("seconds: %ld, hours: %ld , days:%ld, years: %ld\n",
           nsec, nhour, nday, years);
    std::chrono::steady_clock::duration a(123456);
    printf("nowsec: %ld\n", NowSec());
    printf("maxsec: %ld\n", CountSeconds(max_time));
    const std::string proc_id = "demo_client";
    if (!Register(proc_id)) {
        return;
    }
@@ -176,21 +248,6 @@
        int reply_len = 0;
        bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
        DEFER1(BHFree(reply, reply_len));
    }
    { // Server Register Topics
        MsgTopicList topics;
        topics.add_topic_list("@should_fail");
        std::string s = topics.SerializeAsString();
        void *reply = 0;
        int reply_len = 0;
        bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
        DEFER1(BHFree(reply, reply_len));
        if (!r) {
            int ec = 0;
            std::string msg;
            GetApiError(ec, msg);
            printf("register rpc failed, %d, %s\n", ec, msg.c_str());
        }
    }
    auto PrintProcs = [](MsgQueryProcReply const &result) {
        printf("query proc result: %d\n", result.proc_list().size());
@@ -304,31 +361,6 @@
        }
    };
    auto SyncRequest = [&](int idx) { // SyncRequest
        MsgRequestTopic req;
        req.set_topic(topic_ + std::to_string(0));
        req.set_data("request_data_" + std::to_string(idx));
        std::string s(req.SerializeAsString());
        // Sleep(10ms, false);
        std::string dest(BHAddress().SerializeAsString());
        void *proc_id = 0;
        int proc_id_len = 0;
        DEFER1(BHFree(proc_id, proc_id_len););
        void *reply = 0;
        int reply_len = 0;
        DEFER1(BHFree(reply, reply_len));
        bool r = BHRequest(dest.data(), dest.size(), s.data(), s.size(), &proc_id, &proc_id_len, &reply, &reply_len, 1000);
        if (!r) {
            int ec = 0;
            std::string msg;
            GetApiError(ec, msg);
            printf("request error: %s\n", msg.c_str());
        } else {
            MsgRequestTopicReply ret;
            ret.ParseFromArray(reply, reply_len);
            printf("request result: %s\n", ret.data().c_str());
        }
    };
    {
        for (int i = 0; i < 1; ++i) {
            MsgPublish pub;
@@ -407,7 +439,7 @@
    const int64_t nreq = 1000 * 100;
    for (int i = 0; i < 10; ++i) {
        SyncRequest(i);
        SyncRequest(topic_ + std::to_string(0), "request_data_" + std::to_string(i));
    }
    for (int i = 0; i < ncli; ++i) {