lichao
2021-04-30 377e395a5fdc6ad44bdd5a2d41d2930f45fc4384
add node init msg, alloc msgq on success.
9个文件已修改
198 ■■■■ 已修改文件
box/center.cpp 75 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.h 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 63 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -103,7 +103,22 @@
    // center name, no relative to shm.
    const std::string &id() const { return id_; }
    void OnNodeInit(const int64_t msg)
    {
        MQId ssn = msg;
        auto UpdateRegInfo = [&](Node &node) {
            for (int i = 0; i < 10; ++i) {
                node->addrs_.insert(ssn + i);
            }
            node->state_.timestamp_ = NowSec() - offline_time_;
            node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
        };
        Node node(new NodeInfo);
        UpdateRegInfo(node);
        nodes_[ssn] = node;
        printf("new node ssn (%ld) init\n", ssn);
    }
    MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
    {
        if (msg.proc().proc_id() != head.proc_id()) {
@@ -132,17 +147,19 @@
                Node node(new NodeInfo);
                UpdateRegInfo(node);
                nodes_[ssn] = node;
            }
            printf("node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn);
                printf("new node (%s) ssn (%ld)\n", head.proc_id().c_str(), ssn);
                auto old = online_node_addr_map_.find(head.proc_id());
                if (old != online_node_addr_map_.end()) { // old session
                    auto &old_ssn = old->second;
            auto old = online_node_addr_map_.find(head.proc_id());
            if (old != online_node_addr_map_.end()) { // old session
                auto &old_ssn = old->second;
                if (old_ssn != ssn) {
                    nodes_[old_ssn]->state_.PutOffline(offline_time_);
                    printf("put node (%s) ssn (%ld) offline\n", nodes_[old_ssn]->proc_.proc_id().c_str(), old->second);
                    old_ssn = ssn;
                } else {
                    online_node_addr_map_.emplace(head.proc_id(), ssn);
                }
            } else {
                online_node_addr_map_.emplace(head.proc_id(), ssn);
            }
            return MakeReply(eSuccess);
        } catch (...) {
@@ -446,16 +463,24 @@
            msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
        return true;
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id)
{
    auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 60s, 60s * 2);
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
        return [&](auto &&rep_body) {
            auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
            auto remote = head.route(0).mq_id();
            socket.Send(remote, reply_head, rep_body);
        };
    return [&](auto &&rep_body) {
        auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
        auto remote = head.route(0).mq_id();
        socket.Send(remote, reply_head, rep_body);
    };
}
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
{
    auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) {
        auto &center = *center_ptr;
        center->OnNodeInit(msg.Offset());
    };
    auto Nothing = [](ShmSocket &socket) {};
    BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16);
    auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
        auto &center = *center_ptr;
@@ -475,6 +500,7 @@
        default: return false;
        }
    };
    BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
    auto OnBusIdle = [=](ShmSocket &socket) {};
    auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
@@ -515,7 +541,6 @@
        }
    };
    BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
    BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000);
    return true;
@@ -533,7 +558,12 @@
bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
{
    Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len};
    Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len};
    return true;
}
bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
{
    Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len};
    return true;
}
@@ -541,10 +571,13 @@
{
    auto gc = [&](const MQId id) {
        auto r = ShmSocket::Remove(shm, id);
        printf("remove mq %ld : %s\n", id, (r ? "ok" : "failed"));
        if (r) {
            printf("remove mq %ld ok\n", id);
        }
    };
    AddCenter("#bhome_center", gc);
    auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", gc, 6s, 6s * 2);
    AddCenter(center_ptr);
    for (auto &kv : Centers()) {
        auto &info = kv.second;
@@ -556,7 +589,11 @@
{
    for (auto &kv : Centers()) {
        auto &info = kv.second;
        sockets_[info.name_]->Start(info.handler_, info.idle_);
        if (info.handler_) {
            sockets_[info.name_]->Start(info.handler_, info.idle_);
        } else {
            sockets_[info.name_]->Start(info.raw_handler_, info.idle_);
        }
    }
    return true;
box/center.h
@@ -29,8 +29,10 @@
public:
    typedef Socket::PartialRecvCB MsgHandler;
    typedef Socket::RawRecvCB MsgIHandler;
    typedef Socket::IdleCB IdleHandler;
    static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
    static bool Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
    BHCenter(Socket::Shm &shm);
    ~BHCenter() { Stop(); }
@@ -41,6 +43,7 @@
    struct CenterInfo {
        std::string name_;
        MsgHandler handler_;
        MsgIHandler raw_handler_;
        IdleHandler idle_;
        MQId mqid_;
        int mq_len_ = 0;
src/defs.h
@@ -23,9 +23,11 @@
typedef uint64_t MQId;
const MQId kBHNodeInit = 10;
const MQId kBHTopicCenter = 100;
const MQId kBHTopicBus = 101;
const MQId kBHUniCenter = 102;
inline const MQId BHInitAddress() { return kBHNodeInit; }
inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
inline const MQId BHUniCenterAddress() { return kBHUniCenter; }
src/shm_msg_queue.cpp
@@ -34,7 +34,7 @@
ShmMsgQueue::MQId ShmMsgQueue::NewId()
{
    static auto &id = GetData();
    return ++id;
    return (++id) * 10;
}
// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
src/socket.cpp
@@ -40,6 +40,44 @@
    Stop();
}
bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker)
{
    auto ioProc = [this, onData, onIdle]() {
        auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
        auto DoRecv = [=] {
            // do not recv if no cb is set.
            if (!onData) {
                return false;
            }
            auto onMsg = [&](MsgI &imsg) {
                DEFER1(imsg.Release());
                onData(*this, imsg);
            };
            MsgI imsg;
            return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
        };
        try {
            bool more_to_send = DoSend();
            bool more_to_recv = DoRecv();
            if (onIdle) { onIdle(*this); }
            if (!more_to_send && !more_to_recv) {
                robust::QuickSleep();
            }
        } catch (...) {
        }
    };
    std::lock_guard<std::mutex> lock(mutex_);
    StopNoLock();
    run_.store(true);
    for (int i = 0; i < nworker; ++i) {
        workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
    }
    return true;
}
bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
    auto ioProc = [this, onData, onIdle]() {
@@ -74,9 +112,7 @@
            bool more_to_recv = DoRecv();
            if (onIdle) { onIdle(*this); }
            if (!more_to_send && !more_to_recv) {
                std::this_thread::yield();
                using namespace std::chrono_literals;
                std::this_thread::sleep_for(10000ns);
                robust::QuickSleep();
            }
        } catch (...) {
        }
src/socket.h
@@ -42,6 +42,7 @@
public:
    typedef ShmMsgQueue::MQId MQId;
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB;
    typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
    typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
    typedef std::function<void(ShmSocket &sock)> IdleCB;
@@ -53,6 +54,7 @@
    bool Remove() { return Remove(shm(), id()); }
    MQId id() const { return mq().Id(); }
    // start recv.
    bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1);
    bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
    bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
src/topic_node.cpp
@@ -37,30 +37,52 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sockets_(eSockEnd), state_(eStateUnregistered)
    shm_(shm), state_(eStateUnregistered)
{
    for (int i = eSockStart; i < eSockEnd; ++i) {
        sockets_[i].reset(new ShmSocket(shm_, kMqLen));
    }
    // recv msgs to avoid memory leak.
    auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
    SockNode().Start(default_ignore_msg);
    // for (auto &p : sockets_) {
    //     p->Start(default_ignore_msg);
    // }
    Init();
}
TopicNode::~TopicNode()
{
    printf("~TopicNode()\n");
    Stop();
    SockNode().Stop();
    if (state() == eStateUnregistered) {
        for (auto &p : sockets_) { p->Remove(); }
}
bool TopicNode::Init()
{
    std::lock_guard<std::mutex> lk(mutex_);
    if (Valid()) {
        return true;
    }
    if (ssn_id_ == 0) {
        ssn_id_ = ShmMsgQueue::NewId();
    }
    printf("Node Init, id %ld \n", ssn_id_);
    MsgI msg;
    msg.OffsetRef() = ssn_id_;
    if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) {
        sockets_.resize(eSockEnd);
        for (int i = eSockStart; i < eSockEnd; ++i) {
            sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen));
        }
        // recv msgs to avoid memory leak.
        auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
        SockNode().Start(default_ignore_msg);
        return true;
    }
    return false;
}
void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
{
    std::lock_guard<std::mutex> lk(mutex_);
    if (!Init()) {
        SetLastError(eError, "BHome Node Not Inited.");
        return;
    }
    if (nworker < 1) {
        nworker = 1;
    } else if (nworker > 16) {
@@ -73,11 +95,18 @@
}
void TopicNode::Stop()
{
    printf("Node Stopping\n");
    for (auto &p : sockets_) { p->Stop(); }
    printf("Node Stopped\n");
}
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!Init()) {
        SetLastError(eError, "BHome Node Not Inited.");
        return false;
    }
    info_ = proc;
    auto &sock = SockNode();
@@ -123,6 +152,11 @@
}
bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
    info_.Clear();
    state_cas(eStateOnline, eStateOffline);
@@ -404,8 +438,6 @@
                reply_head.mutable_proc_id()->swap(out_proc_id);
                return true;
            }
        } else {
            SetLastError(eNotFound, "remote not found.");
        }
    } catch (...) {
        SetLastError(eError, __func__ + std::string(" internal errer."));
@@ -448,6 +480,7 @@
            return true;
        }
    }
    SetLastError(eNotFound, "remote not found.");
    return false;
}
src/topic_node.h
@@ -22,6 +22,7 @@
#include "socket.h"
#include <atomic>
#include <memory>
#include <mutex>
#include <vector>
using namespace bhome_shm;
@@ -137,7 +138,11 @@
    void state(const State st) { state_.store(st); }
    void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); }
    State state() const { return state_.load(); }
    bool IsOnline() const { return state() == eStateOnline; }
    bool IsOnline() { return Init() && state() == eStateOnline; }
    bool Init();
    bool Valid() const { return !sockets_.empty(); }
    std::mutex mutex_;
    MQId ssn_id_ = 0;
    std::atomic<State> state_;
    TopicQueryCache topic_query_cache_;
utest/api_test.cpp
@@ -118,6 +118,8 @@
    printf("maxsec: %ld\n", CountSeconds(max_time));
    // BHCleanup();
    // return;
    bool reg = false;
    for (int i = 0; i < 3 && !reg; ++i) {
        ProcInfo proc;