lichao
2021-04-02 dc12826dd61ce18fac3a9561c5843d30a0cf9660
add request topic cache; refactor req/rep center.
8个文件已修改
285 ■■■■ 已修改文件
proto/source/bhome_msg.proto 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_util.h 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep.cpp 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep.h 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.cpp 136 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.h 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto
@@ -4,13 +4,31 @@
package bhome.msg;
// message format : header(BHMsgHead) + body(variable types)
message BHAddress {
    bytes mq_id = 1; // mqid, uuid
    bytes ip = 2;   //
    int32 port = 3;
}
message BHMsg {
message ProcInfo
{
    bytes id = 1;
    bytes name = 2;
    bytes public_info = 3;
    bytes private_info = 4;
}
message BHMsgHead {
    bytes msg_id = 1;
    repeated BHAddress route = 2; // for reply and proxy.
    int64 timestamp = 3;
    int32 type = 4;
    ProcInfo proc = 5;
    bytes topic = 6; // for request route
}
message BHMsg { // deprecated
    bytes msg_id = 1;
    int64 timestamp = 2;
    int32 type = 3;
@@ -50,12 +68,6 @@
    bytes data = 1; 
}
message ProcInfo
{
    bytes name = 1;
    bytes info = 2;
}
message DataProcRegister
{
    ProcInfo proc = 1;
@@ -74,3 +86,7 @@
message DataProcQueryTopicReply {
    BHAddress address = 1;
}
service TopicRequestReplyService {
    rpc Request (DataRequest) returns (DataReply);
}
src/bh_util.h
@@ -19,6 +19,7 @@
#define BH_UTIL_SOXWOK67
#include <functional>
#include <mutex>
#include <stdint.h>
inline uint16_t Get8(const void *p)
@@ -104,6 +105,44 @@
    }
};
template <class D, class M, class G = std::unique_lock<M>>
class SyncedPtr
{
    G lock_;
    D *p_ = nullptr;
public:
    SyncedPtr(M &mtx, D &data) :
        lock_(mtx), p_(&data) {}
    SyncedPtr(SyncedPtr &&a)
    {
        lock_.swap(a.lock_);
        std::swap(p_, a.p_);
    }
    D *operator->() const { return p_; }
    D &operator*() const { return *p_; }
};
template <class T, class Mutex = std::mutex, class Lock = std::unique_lock<Mutex>>
class Synced
{
    typedef T Data;
    Mutex mutex_;
    Data data_;
    typedef SyncedPtr<Data, Mutex, Lock> Ptr;
public:
    template <class... P>
    explicit Synced(const P &...p) :
        data_(p...) {}
    Ptr operator->() { return Ptr(mutex_, data_); }
    auto Apply(const auto &f)
    {
        Lock lk(mutex_);
        return f(data_);
    }
};
// macro helper
#define JOIN_IMPL(a, b) a##b
#define JOIN(a, b) JOIN_IMPL(a, b)
src/msg.h
@@ -69,6 +69,7 @@
BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
// message content layout: header_size + header + data_size + data
class MsgI
{
private:
src/reqrep.cpp
@@ -155,8 +155,7 @@
bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
    if (tmp_cache_.first == topic) {
        addr = tmp_cache_.second;
    if (topic_cache_.Find(topic, addr)) {
        return true;
    }
@@ -167,9 +166,12 @@
            DataProcQueryTopicReply reply;
            if (reply.ParseFromString(result.body())) {
                addr = reply.address();
                tmp_cache_.first = topic;
                tmp_cache_.second = addr;
                return !addr.mq_id().empty();
                if (addr.mq_id().empty()) {
                    return false;
                } else {
                    topic_cache_.Update(topic, addr);
                    return true;
                }
            }
        }
    } else {
src/reqrep.h
@@ -18,6 +18,7 @@
#ifndef REQREP_ACEH09NK
#define REQREP_ACEH09NK
#include "bh_util.h"
#include "defs.h"
#include "msg.h"
#include "socket.h"
@@ -58,7 +59,43 @@
    bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    std::unordered_map<std::string, RecvCB> async_cbs_;
    std::pair<std::string, bhome::msg::BHAddress> tmp_cache_;
    typedef bhome_msg::BHAddress Address;
    class TopicCache
    {
        class Impl
        {
            typedef std::unordered_map<std::string, Address> Store;
            Store store_;
        public:
            bool Find(const std::string &topic, Address &addr)
            {
                auto pos = store_.find(topic);
                if (pos != store_.end()) {
                    addr = pos->second;
                    return true;
                } else {
                    return false;
                }
            }
            bool Update(const std::string &topic, const Address &addr)
            {
                store_[topic] = addr;
                return true;
            }
        };
        Synced<Impl> impl_;
        // Impl &impl()
        // {
        //     thread_local Impl impl;
        //     return impl;
        // }
    public:
        bool Find(const std::string &topic, Address &addr) { return impl_->Find(topic, addr); }
        bool Update(const std::string &topic, const Address &addr) { return impl_->Update(topic, addr); }
    };
    TopicCache topic_cache_;
};
class SocketReply : private ShmSocket
src/reqrep_center.cpp
@@ -17,24 +17,96 @@
 */
#include "reqrep_center.h"
#include "bh_util.h"
using namespace bhome_shm;
#include "msg.h"
#include <chrono>
#include <memory>
#include <mutex>
#include <unordered_map>
struct A {
    void F(int){};
};
using namespace bhome_shm;
namespace
{
inline uint64_t Now()
{
    time_t t;
    return time(&t);
}
auto Now = []() { time_t t; return time(&t); };
class NodeCenter
{
public:
    typedef std::string ProcAddr;
    typedef bhome::msg::ProcInfo ProcInfo;
    template <class Iter>
    bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end)
    {
        try {
            Node node(new NodeInfo);
            node->addr_ = src_mq;
            node->proc_.Swap(&info);
            node->state_.timestamp_ = Now();
            nodes_[node->proc_.id()] = node;
            for (auto it = topics_begin; it != topics_end; ++it) {
                topic_map_[*it] = node;
            }
            return true;
        } catch (...) {
            return false;
        }
    }
    void Heartbeat(ProcInfo &info, const ProcAddr &src_mq)
    {
        auto pos = nodes_.find(info.name());
        if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same.
            NodeInfo &ni = *pos->second;
            ni.state_.timestamp_ = Now();
            if (!info.public_info().empty()) {
                ni.proc_.set_public_info(info.public_info());
            }
            if (!info.private_info().empty()) {
                ni.proc_.set_private_info(info.private_info());
            }
        }
    }
    bool QueryTopic(const std::string &topic, ProcAddr &addr)
    {
        auto pos = topic_map_.find(topic);
        if (pos != topic_map_.end()) {
            Node node(pos->second.lock());
            if (node) {
                addr = node->addr_;
                return true;
            } else { // dead, remove record.
                topic_map_.erase(pos);
                return false;
            }
        } else {
            return false;
        }
    }
private:
    struct ProcState {
        time_t timestamp_ = 0;
        uint32_t flag_ = 0; // reserved
    };
    typedef std::string ProcId;
    struct NodeInfo {
        ProcState state_; // state
        ProcAddr addr_;   // registered_mqid.
        ProcInfo proc_;   //
    };
    typedef std::shared_ptr<NodeInfo> Node;
    typedef std::weak_ptr<NodeInfo> WeakNode;
    std::unordered_map<std::string, WeakNode> topic_map_;
    std::unordered_map<ProcId, Node> nodes_;
};
} // namespace
bool ReqRepCenter::Start(const int nworker)
{
    auto onRecv = [&](BHMsg &msg) {
    auto center_ptr = std::make_shared<Synced<NodeCenter>>();
    auto onRecv = [center_ptr, this](BHMsg &msg) {
        auto &center = *center_ptr;
#ifndef NDEBUG
        static std::atomic<time_t> last(0);
        time_t now = 0;
@@ -50,54 +122,22 @@
        auto OnRegister = [&]() {
            DataProcRegister reg;
            if (!reg.ParseFromString(msg.body())) {
                return;
            if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
                center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end());
            }
            ProcInfo pi;
            pi.server_mqid_ = src_mq;
            pi.proc_id_ = reg.proc().name();
            pi.ext_info_ = reg.proc().info();
            pi.timestamp_ = Now();
            std::lock_guard<std::mutex> lock(mutex_);
            for (auto &t : reg.topics()) {
                topic_mq_[t] = pi.server_mqid_;
            }
            procs_[pi.proc_id_] = pi;
        };
        auto OnHeartbeat = [&]() {
            DataProcHeartbeat hb;
            if (!hb.ParseFromString(msg.body())) {
                return;
            }
            std::lock_guard<std::mutex> lock(mutex_);
            auto pos = procs_.find(hb.proc().name());
            if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same.
                pos->second.timestamp_ = Now();
                pos->second.ext_info_ = hb.proc().info();
            if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
                center->Heartbeat(*hb.mutable_proc(), src_mq);
            }
        };
        auto OnQueryTopic = [&]() {
            DataProcQueryTopic query;
            if (!query.ParseFromString(msg.body())) {
                return;
            }
            std::string dest;
            auto FindDest = [&]() {
                std::lock_guard<std::mutex> lock(mutex_);
                auto pos = topic_mq_.find(query.topic());
                if (pos != topic_mq_.end()) {
                    dest = pos->second;
                    return true;
                } else {
                    return false;
                }
            };
            if (FindDest()) {
            NodeCenter::ProcAddr dest;
            if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
                MQId remote;
                memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
                MsgI imsg;
src/reqrep_center.h
@@ -20,9 +20,6 @@
#include "defs.h"
#include "socket.h"
#include <chrono>
#include <mutex>
#include <set>
class ReqRepCenter
{
@@ -35,18 +32,6 @@
    };
    Socket socket_;
    ShmSocket::Shm &shm() { return socket_.shm(); }
    struct ProcInfo {
        std::string proc_id_; // unique name
        std::string server_mqid_;
        std::string ext_info_; // maybe json.
        uint64_t timestamp_ = 0;
    };
    typedef std::string Dests;
    std::mutex mutex_;
    std::unordered_map<std::string, Dests> topic_mq_;
    std::unordered_map<std::string, ProcInfo> procs_;
public:
    ReqRepCenter(ShmSocket::Shm &shm) :
utest/utest.cpp
@@ -151,6 +151,17 @@
    bus.Stop();
}
namespace
{
struct C {
    C() { printf("+C\n"); }
    C(const C &c) { printf("+C(const C&)\n"); }
    void F() { printf("C::F()\n"); }
    ~C() { printf("-C\n"); }
    char arr[100];
};
int F(C &c) { return printf(":::::::::::::F()\n"); }
} // namespace
BOOST_AUTO_TEST_CASE(ReqRepTest)
{
@@ -182,8 +193,8 @@
    auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
        SocketReply server(shm);
        ProcInfo info;
        info.set_id(name);
        info.set_name(name);
        info.set_info(name);
        if (!server.Register(info, topics, 100)) {
            printf("register failed\n");
        }