lichao
2021-04-09 4e5cb7960ce4e7e66d5190be67426aeca8b55c3d
add heartbeat, not tested yet.
14个文件已修改
237 ■■■■■ 已修改文件
proto/cpp/CMakeLists.txt 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg_api.proto 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.cpp 137 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.h 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/util.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/cpp/CMakeLists.txt
@@ -9,4 +9,3 @@
add_library(${Target} STATIC ${PROTO_SRCS})
target_link_libraries(${Target} libprotobuf-lite.a)
proto/source/bhome_msg.proto
@@ -62,6 +62,16 @@
}
message MsgSubscribe {
    MsgTopicList topics = 1;
}
message MsgUnsubscribe {
    MsgTopicList topics = 1;
}
message MsgRegisterRPC {
    MsgTopicList topics = 1;
}
service TopicRPC {
    rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply);
    rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply);
proto/source/bhome_msg_api.proto
@@ -20,16 +20,14 @@
    bytes private_info = 4; 
}
message MsgTopicList {
    repeated bytes topic_list = 1;
}
message MsgPublish {
    bytes topic = 1;
    bytes data = 2; 
}
message MsgSubscribe {
    repeated bytes topics = 1;
}
message MsgUnsubscribe {
    repeated bytes topics = 1;
}
message MsgCommonReply {
@@ -49,11 +47,7 @@
message MsgRegister
{
    ProcInfo proc = 1;
}
message MsgRegisterRPC
{
    repeated bytes topics = 1;
    repeated BHAddress addrs = 2;
}
message MsgHeartbeat
src/center.cpp
@@ -19,7 +19,11 @@
#include "bh_util.h"
#include "defs.h"
#include "shm.h"
#include <chrono>
#include <set>
using namespace std::chrono;
using namespace std::chrono_literals;
using namespace bhome_shm;
using namespace bhome_msg;
@@ -28,7 +32,8 @@
namespace
{
auto Now = []() { time_t t; return time(&t); };
typedef steady_clock::time_point TimePoint;
inline TimePoint Now() { return steady_clock::now(); };
//TODO check proc_id
class NodeCenter
@@ -37,24 +42,39 @@
    typedef std::string ProcId;
    typedef std::string Address;
    typedef bhome::msg::ProcInfo ProcInfo;
    typedef std::function<void(Address const &)> Cleaner;
private:
    enum {
        kStateInvalid = 0,
        kStateNormal = 1,
        kStateNoRespond = 2,
        kStateOffline = 3,
        kStateInvalid,
        kStateNormal,
        kStateOffline,
        kStateKillme,
    };
    struct ProcState {
        time_t timestamp_ = 0;
        TimePoint timestamp_;
        uint32_t flag_ = 0; // reserved
        void UpdateState(TimePoint now)
        {
            const auto kOfflineTime = 60 * 10s;
            const auto kKillTime = 60 * 20s;
            auto diff = now - timestamp_;
            if (diff < kOfflineTime) {
                flag_ = kStateNormal;
            } else if (diff < kKillTime) {
                flag_ = kStateOffline;
            } else {
                flag_ = kStateKillme;
            }
        }
    };
    typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
    struct NodeInfo {
        ProcState state_;             // state
        Address addr_;                // registered_mqid.
        std::set<Address> addrs_;     // registered mqs
        ProcInfo proc_;               //
        AddressTopics services_;      // address: topics
        AddressTopics subscriptions_; // address: topics
@@ -67,13 +87,14 @@
        WeakNode weak_node_;
        bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
    };
    const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
    inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
    inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
public:
    typedef std::set<TopicDest> Clients;
    NodeCenter(const std::string &id = "#Center") :
        id_(id) {}
    NodeCenter(const std::string &id, const Cleaner &cleaner) :
        id_(id), cleaner_(cleaner) {}
    const std::string &id() const { return id_; } // no need to lock.
    //TODO maybe just return serialized string.
@@ -85,7 +106,10 @@
        try {
            Node node(new NodeInfo);
            node->addr_ = SrcAddr(head);
            node->addrs_.insert(SrcAddr(head));
            for (auto &addr : msg.addrs()) {
                node->addrs_.insert(addr.mq_id());
            }
            node->proc_.Swap(msg.mutable_proc());
            node->state_.timestamp_ = Now();
            node->state_.flag_ = kStateNormal;
@@ -95,37 +119,17 @@
            return MakeReply(eError, "register node error.");
        }
    }
    template <class OnSuccess, class OnError>
    auto HandleMsg(const BHMsgHead &head, OnSuccess onOk, OnError onErr)
    {
        auto pos = nodes_.find(head.proc_id());
        if (pos == nodes_.end()) {
            return onErr(eNotRegistered, "Node is not registered.");
        } else {
            auto node = pos->second;
            if (head.type() == kMsgTypeHeartbeat && node->addr_ != SrcAddr(head)) {
                return onErr(eAddressNotMatch, "Node address error.");
            } else if (!Valid(*node)) {
                return onErr(eNoRespond, "Node is not alive.");
            } else {
                return onOk(node);
            }
        }
    }
    template <class Reply, class Func>
    Reply HandleMsg(const BHMsgHead &head, Func const &op)
    {
        try {
            auto onErr = [](const ErrorCode ec, const std::string &str) { return MakeReply<Reply>(ec, str); };
            return HandleMsg(head, op, onErr);
            auto pos = nodes_.find(head.proc_id());
            if (pos == nodes_.end()) {
                return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
            } else {
                auto node = pos->second;
                if (node->addr_ != SrcAddr(head)) {
                if (!MatchAddr(node->addrs_, SrcAddr(head))) {
                    return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
                } else if (!Valid(*node)) {
                    return MakeReply<Reply>(eNoRespond, "Node is not alive.");
@@ -149,9 +153,10 @@
        return HandleMsg(
            head, [&](Node node) -> MsgCommonReply {
                auto &src = SrcAddr(head);
                node->services_[src].insert(msg.topics().begin(), msg.topics().end());
                auto &topics = msg.topics().topic_list();
                node->services_[src].insert(topics.begin(), topics.end());
                TopicDest dest = {src, node};
                for (auto &topic : msg.topics()) {
                for (auto &topic : topics) {
                    service_map_[topic].insert(dest);
                }
                return MakeReply(eSuccess);
@@ -163,6 +168,7 @@
        return HandleMsg(head, [&](Node node) {
            NodeInfo &ni = *node;
            ni.state_.timestamp_ = Now();
            auto &info = msg.proc();
            if (!info.public_info().empty()) {
                ni.proc_.set_public_info(info.public_info());
@@ -207,9 +213,10 @@
    {
        return HandleMsg(head, [&](Node node) {
            auto &src = SrcAddr(head);
            node->subscriptions_[src].insert(msg.topics().begin(), msg.topics().end());
            auto &topics = msg.topics().topic_list();
            node->subscriptions_[src].insert(topics.begin(), topics.end());
            TopicDest dest = {src, node};
            for (auto &topic : msg.topics()) {
            for (auto &topic : topics) {
                subscribe_map_[topic].insert(dest);
            }
            return MakeReply(eSuccess);
@@ -232,8 +239,9 @@
            if (pos != node->subscriptions_.end()) {
                const TopicDest &dest = {src, node};
                auto &topics = msg.topics().topic_list();
                // clear node sub records;
                for (auto &topic : msg.topics()) {
                for (auto &topic : topics) {
                    pos->second.erase(topic);
                    RemoveSubTopicDestRecord(topic, dest);
                }
@@ -284,7 +292,30 @@
        return ret;
    }
    void OnTimer()
    {
        CheckNodes();
    }
private:
    void CheckNodes()
    {
        auto it = nodes_.begin();
        while (it != nodes_.end()) {
            auto &cli = *it->second;
            cli.state_.UpdateState(Now());
            if (cli.state_.flag_ == kStateKillme) {
                if (cleaner_) {
                    for (auto &addr : cli.addrs_) {
                        cleaner_(addr);
                    }
                }
                it = nodes_.erase(it);
            } else {
                ++it;
            }
        }
    }
    bool Valid(const NodeInfo &node)
    {
        return node.state_.flag_ == kStateNormal;
@@ -300,6 +331,7 @@
    std::unordered_map<Topic, Clients> service_map_;
    std::unordered_map<Topic, Clients> subscribe_map_;
    std::unordered_map<ProcId, Node> nodes_;
    Cleaner cleaner_; // remove mqs.
};
template <class Body, class OnMsg, class Replyer>
@@ -330,9 +362,11 @@
            msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
        return true;
bool InstallCenter()
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
    auto center_ptr = std::make_shared<Synced<NodeCenter>>();
    auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner);
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
        return [&](auto &&rep_body) {
            auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
@@ -342,6 +376,11 @@
            }
            //TODO resend failed.
        };
    };
    auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
        auto &center = *center_ptr;
        center->OnTimer();
    };
    auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
@@ -357,6 +396,7 @@
        }
    };
    auto OnBusIdle = [](ShmSocket &socket) {};
    auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
        auto &center = *center_ptr;
        auto replyer = MakeReplyer(socket, head, center->id());
@@ -390,8 +430,8 @@
        }
    };
    BHCenter::Install("#center.reg", OnCenter, BHTopicCenterAddress(), 1000);
    BHCenter::Install("#center.bus", OnPubSub, BHTopicBusAddress(), 1000);
    BHCenter::Install("#center.reg", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
    BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000);
    return true;
}
@@ -412,19 +452,24 @@
    return rec;
}
bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len)
bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len)
{
    Centers()[name] = CenterInfo{name, handler, mqid, mq_len};
    Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len};
    return true;
}
bool BHCenter::Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len)
bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len)
{
    return Install(name, handler, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
    return Install(name, handler, idle, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
}
BHCenter::BHCenter(Socket::Shm &shm)
{
    InstallCenter();
    auto gc = [&](const std::string &id) {
        auto r = ShmSocket::Remove(shm, *(MQId *) id.data());
        printf("remove mq : %s\n", r ? "ok" : "failed");
    };
    AddCenter("#center", gc);
    for (auto &kv : Centers()) {
        auto &info = kv.second;
src/center.h
@@ -29,8 +29,9 @@
public:
    typedef Socket::PartialRecvCB MsgHandler;
    static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len);
    static bool Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len);
    typedef Socket::IdleCB IdleHandler;
    static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len);
    static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len);
    BHCenter(Socket::Shm &shm);
    BHCenter();
@@ -42,6 +43,7 @@
    struct CenterInfo {
        std::string name_;
        MsgHandler handler_;
        IdleHandler idle_;
        std::string mqid_;
        int mq_len_ = 0;
    };
src/proto.h
@@ -19,6 +19,7 @@
#define PROTO_UA9UWKL1
#include "bhome_msg.pb.h"
#include "bhome_msg_api.pb.h"
using namespace bhome::msg;
src/shm.h
@@ -114,6 +114,7 @@
            throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
        }
    }
    static bool Remove(SharedMemory &shm, const std::string &name) { return shm.destroy<Data>(ObjName(name).c_str()); }
    static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); }
    Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); }
    virtual ~ShmObject() {}
@@ -122,7 +123,7 @@
    const Data *data() const { return pdata_; }
    Data *operator->() { return data(); }
    const Data *operator->() const { return data(); }
    bool Remove() { return shm_.destroy<Data>(ObjName(name_).c_str()); }
    bool Remove() { return Remove(shm_, name_); }
};
} // namespace bhome_shm
src/shm_queue.cpp
@@ -60,15 +60,13 @@
}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
    ShmMsgQueue(NewId(), segment, len)
{
}
    ShmMsgQueue(NewId(), segment, len) {}
ShmMsgQueue::~ShmMsgQueue()
ShmMsgQueue::~ShmMsgQueue() {}
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId &id)
{
    // It's not safe to remove, others may still holder pointers and write to it.
    // TODO use smart_ptr or garbage collection.
    //Remove();
    return Super::Remove(shm, MsgQIdToName(id));
}
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
src/shm_queue.h
@@ -129,6 +129,7 @@
    ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
    ShmMsgQueue(ShmType &segment, const int len);
    ~ShmMsgQueue();
    static bool Remove(SharedMemory &shm, const MQId &id);
    const MQId &Id() const { return id_; }
    // bool Recv(MsgI &msg, BHMsgHead &head, const int timeout_ms);
src/socket.h
@@ -56,6 +56,7 @@
    ShmSocket(Shm &shm, const MQId &id, const int len);
    ShmSocket(Shm &shm, const int len = 12);
    ~ShmSocket();
    static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); }
    const MQId &id() const { return mq().Id(); }
    Shm &shm() { return shm_; }
    // start recv.
src/topic_node.cpp
@@ -92,9 +92,17 @@
    SockNode().Stop();
}
bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms)
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    auto &sock = SockNode();
    MsgRegister body;
    *body.mutable_proc() = proc;
    auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); };
    AddId(SockNode().id());
    AddId(SockServer().id());
    AddId(SockClient().id());
    AddId(SockSub().id());
    AddId(SockPub().id());
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    AddRoute(head, sock.id());
@@ -110,10 +118,12 @@
    return r;
}
bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms)
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
    //TODO check registered
    auto &sock = SockServer();
    MsgRegisterRPC body;
    body.mutable_topics()->Swap(&topics);
    auto head(InitMsgHead(GetType(body), proc_id()));
    AddRoute(head, sock.id());
@@ -361,14 +371,13 @@
// subscribe
bool TopicNode::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
bool TopicNode::Subscribe(MsgTopicList &topics, const int timeout_ms)
{
    try {
        auto &sock = SockSub();
        MsgSubscribe sub;
        for (auto &topic : topics) {
            sub.add_topics(topic);
        }
        sub.mutable_topics()->Swap(&topics);
        BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
        AddRoute(head, sock.id());
src/topic_node.h
@@ -39,12 +39,12 @@
    void StopAll();
    // topic node
    bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms);
    bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms);
    bool Register(ProcInfo &body, MsgCommonReply &reply, const int timeout_ms);
    // topic rpc server
    typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
    bool ServerStart(OnRequest const &cb, const int nworker = 2);
    bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
    bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
    bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms);
@@ -68,7 +68,7 @@
    // subscribe
    typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
    bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2);
    bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
    bool Subscribe(MsgTopicList &topics, const int timeout_ms);
    bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
private:
utest/utest.cpp
@@ -9,6 +9,7 @@
#include <string>
#include <thread>
#include <vector>
using namespace bhome_msg;
template <class A, class B>
@@ -90,8 +91,11 @@
    const int timeout = 1000;
    auto Sub = [&](int id, const std::vector<std::string> &topics) {
        DemoNode client("client_" + std::to_string(id), shm);
        bool r = client.Subscribe(topics, timeout);
        MsgTopicList tlist;
        for (auto &t : topics) {
            tlist.add_topic_list(t);
        }
        bool r = client.Subscribe(tlist, timeout);
        if (!r) {
            printf("client subscribe failed.\n");
        }
@@ -227,12 +231,12 @@
        };
        server.ServerStart(onData);
        MsgRegisterRPC rpc;
        MsgTopicList rpc;
        for (auto &topic : topics) {
            rpc.add_topics(topic);
            rpc.add_topic_list(topic);
        }
        MsgCommonReply reply_body;
        if (!server.RegisterRPC(rpc, reply_body, 100)) {
        if (!server.ServerRegisterRPC(rpc, reply_body, 100)) {
            printf("server register topic failed\n");
            return;
        }
utest/util.h
@@ -114,11 +114,11 @@
        TopicNode(shm), id_(id) { Init(); }
    void Init()
    {
        MsgRegister reg;
        reg.mutable_proc()->set_proc_id(id_);
        ProcInfo proc;
        proc.set_proc_id(id_);
        MsgCommonReply reply_body;
        if (!Register(reg, reply_body, 1000)) {
        if (!Register(proc, reply_body, 1000)) {
            printf("node %s register failed\n", id_.c_str());
        }
    }