lichao
2021-05-21 b2484c8bd77a9d21bcf1827f554444535196953d
center save shm on each node, no bind to shm.
4个文件已修改
44 ■■■■ 已修改文件
box/center.cpp 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.cpp 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.h 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -165,15 +165,8 @@
BHCenter::BHCenter(Socket::Shm &shm)
{
    auto gc = [&](const MQId id) {
        auto r = ShmSocket::Remove(shm, id);
        if (r) {
            LOG_DEBUG() << "remove mq " << id << " ok\n";
        }
    };
    auto nsec = NodeTimeoutSec();
    auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
    auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
    AddCenter(center_ptr);
    for (auto &kv : Centers()) {
box/node_center.cpp
@@ -116,7 +116,7 @@
    Json json;
    json.put("proc_id", proc_.proc_id());
    center_.Publish(kTopicNodeOffline, json.dump());
    center_.Publish(shm_, kTopicNodeOffline, json.dump());
}
void NodeCenter::NodeInfo::UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
@@ -127,7 +127,7 @@
        if (proc_.proc_id().empty()) { return; } // node init, ignore.
        Json json;
        json.put("proc_id", proc_.proc_id());
        center_.Publish(topic, json.dump());
        center_.Publish(shm_, topic, json.dump());
    };
    LOG_TRACE() << "node " << proc_.proc_id() << " timeout count: " << diff;
@@ -182,7 +182,7 @@
               SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
    };
    Node node(new NodeInfo(*this));
    Node node(new NodeInfo(*this, shm));
    if (UpdateRegInfo(node) && PrepareProcInit(node)) {
        reply |= (node->addrs_[ssn] << 4);
        nodes_[ssn] = node;
@@ -281,7 +281,7 @@
    auto &node = pos->second;
    try {
        for (int i = 0; i < msg.extra_mq_num(); ++i) {
            ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16);
            ShmSocket tmp(node->shm_, true, head.ssn_id() + i + 1, 16);
            node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
            auto addr = reply.add_extra_mqs();
            addr->set_mq_id(tmp.id());
@@ -593,13 +593,15 @@
    }
    for (auto &addr : node->addrs_) {
        cleaner_(addr.first);
        auto &id = addr.first;
        auto r = ShmSocket::Remove(node->shm_, id);
        LOG_DEBUG() << "remove mq " << id << (r ? " ok" : " failed");
    }
    node->addrs_.clear();
}
void NodeCenter::Publish(const Topic &topic, const std::string &content)
void NodeCenter::Publish(SharedMemory &shm, const Topic &topic, const std::string &content)
{
    try {
        // LOG_DEBUG() << "center publish: " << topic << ": " << content;
@@ -615,8 +617,8 @@
            DEFER1(msg.Release());
            RecordMsg(msg);
            auto &mq = GetCenterInfo(BHomeShm())->mq_sender_;
            ShmSocket sender(mq.offset_, BHomeShm(), mq.id_);
            auto &mq = GetCenterInfo(shm)->mq_sender_;
            ShmSocket sender(mq.offset_, shm, mq.id_);
            for (auto &cli : clients) {
                auto node = cli.weak_node_.lock();
box/node_center.h
@@ -85,13 +85,14 @@
    struct NodeInfo {
        NodeCenter &center_;
        SharedMemory &shm_;
        ProcState state_;               // state
        std::map<MQId, int64_t> addrs_; // registered mqs
        ProcInfo proc_;                 //
        AddressTopics services_;        // address: topics
        AddressTopics subscriptions_;   // address: topics
        NodeInfo(NodeCenter &center) :
            center_(center) {}
        NodeInfo(NodeCenter &center, SharedMemory &shm) :
            center_(center), shm_(shm) {}
        void PutOffline(const int64_t offline_time);
        void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time);
    };
@@ -112,8 +113,8 @@
public:
    typedef std::set<TopicDest> Clients;
    NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time_sec, const int64_t kill_time_sec) :
        id_(id), cleaner_(cleaner), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
    NodeCenter(const std::string &id, const int64_t offline_time_sec, const int64_t kill_time_sec) :
        id_(id), offline_time_(offline_time_sec), kill_time_(kill_time_sec), last_check_time_(0) {}
    // center name, no relative to shm.
    const std::string &id() const { return id_; }
@@ -174,7 +175,7 @@
private:
    void CheckNodes();
    bool CanHeartbeat(const NodeInfo &node) { return Valid(node) || node.state_.flag_ == kStateOffline; }
    void Publish(const Topic &topic, const std::string &content);
    void Publish(SharedMemory &shm, const Topic &topic, const std::string &content);
    bool Valid(const NodeInfo &node) { return node.state_.flag_ == kStateNormal; }
    bool Valid(const WeakNode &weak)
    {
@@ -191,7 +192,6 @@
    ProcRecords procs_; // To get a short index for msg alloc.
    MsgRecords msgs_;   // record all msgs alloced.
    Cleaner cleaner_; // remove mqs.
    int64_t offline_time_;
    int64_t kill_time_;
    int64_t last_check_time_;
utest/api_test.cpp
@@ -373,7 +373,7 @@
    threads.Launch(hb, &run);
    threads.Launch(showStatus, &run);
    int ncli = 10;
    const int64_t nreq = 10; //00 * 100;
    const int64_t nreq = 1000 * 100;
    for (int i = 0; i < 10; ++i) {
        SyncRequest(i);
@@ -397,10 +397,11 @@
        }
    }
    Sleep(1s);
    run = false;
    threads.WaitAll();
    auto &st = Status();
    Sleep(1s);
    printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
    BHCleanup();
    printf("after cleanup\n");