lichao
2021-04-02 83085f2ce99cca05d40a19482151873a55e6393a
refactor center; add async request no cb.
11个文件已修改
242 ■■■■■ 已修改文件
src/center.cpp 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.h 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.cpp 37 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.h 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep.cpp 34 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep.h 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.cpp 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.h 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.cpp
@@ -17,6 +17,8 @@
 */
#include "center.h"
#include "defs.h"
#include "pubsub_center.h"
#include "reqrep_center.h"
#include "shm.h"
using namespace bhome_shm;
@@ -26,3 +28,14 @@
    static SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 64);
    return shm;
}
BHCenter::BHCenter(Socket::Shm &shm) :
    socket_(shm) {}
BHCenter::BHCenter() :
    BHCenter(BHomeShm()) {}
bool BHCenter::Start()
{
    return false;
}
src/center.h
@@ -18,8 +18,24 @@
#ifndef CENTER_TM9OUQTG
#define CENTER_TM9OUQTG
#include "socket.h"
#include <functional>
class BHCenter
{
    typedef ShmSocket Socket;
public:
    typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler;
    BHCenter(Socket::Shm &shm);
    BHCenter();
    ~BHCenter() { Stop(); }
    bool Start();
    bool Stop() { return socket_.Stop(); }
private:
    ShmSocket socket_;
};
#endif // end of include guard: CENTER_TM9OUQTG
src/pubsub_center.cpp
@@ -77,25 +77,21 @@
} // namespace
bool PubSubCenter::Start(const int nworker)
BHCenter::MsgHandler MakeBusCenter()
{
    auto bus_ptr = std::make_shared<Synced<BusCenter>>();
    auto onRecv = [bus_ptr, this](MsgI &imsg) {
    return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
#ifndef NDEBUG
        static std::atomic<time_t> last(0);
        time_t now = 0;
        time(&now);
        if (last.exchange(now) < now) {
            printf("bus queue size: %ld\n", socket_.Pending());
            printf("bus queue size: %ld\n", socket.Pending());
        }
#endif
        auto &bus = *bus_ptr;
        BHMsg msg;
        if (!imsg.Unpack(msg)) {
            return;
        }
        auto &shm = socket.shm();
        auto OnSubChange = [&](auto &&update) {
            DataSub sub;
@@ -106,7 +102,6 @@
                update(client, sub.topics());
            }
        };
        auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
        auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
@@ -123,24 +118,30 @@
            };
            if (imsg.IsCounted()) {
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 10); });
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); });
            } else {
                MsgI pubmsg;
                if (!pubmsg.MakeRC(shm(), msg)) { return; }
                DEFER1(pubmsg.Release(shm()));
                if (!pubmsg.MakeRC(shm, msg)) { return; }
                DEFER1(pubmsg.Release(shm));
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 10); });
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); });
            }
        };
        switch (msg.type()) {
        case kMsgTypeSubscribe: OnSubChange(Sub); break;
        case kMsgTypeUnsubscribe: OnSubChange(Unsub); break;
        case kMsgTypePublish: OnPublish(); break;
        default: break;
        case kMsgTypeSubscribe: OnSubChange(Sub); return true;
        case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true;
        case kMsgTypePublish: OnPublish(); return true;
        default: return false;
        }
    };
}
bool PubSubCenter::Start(const int nworker)
{
    auto handler = MakeBusCenter();
    printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler));
    const int kMaxWorker = 16;
    return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
    return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}
src/pubsub_center.h
@@ -18,28 +18,23 @@
#ifndef PUBSUB_CENTER_MFSUZJU7
#define PUBSUB_CENTER_MFSUZJU7
#include "center.h"
#include "defs.h"
#include "socket.h"
#include <mutex>
#include <set>
#include <unordered_map>
BHCenter::MsgHandler MakeBusCenter();
// publish/subcribe manager.
class PubSubCenter
{
    class SocketBus : public ShmSocket
    {
    public:
        SocketBus(ShmSocket::Shm &shm) :
            ShmSocket(shm, &kBHTopicBus, 1000) {}
        using ShmSocket::shm;
    };
    SocketBus socket_;
    ShmSocket::Shm &shm() { return socket_.shm(); }
    ShmSocket socket_;
public:
    PubSubCenter(ShmSocket::Shm &shm) :
        socket_(shm) {}
        socket_(shm, &kBHTopicBus, 1000) {}
    PubSubCenter() :
        PubSubCenter(BHomeShm()) {}
    ~PubSubCenter() { Stop(); }
src/reqrep.cpp
@@ -26,7 +26,7 @@
bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker)
{
    auto AsyncRecvProc = [this, rrcb](BHMsg &msg) {
        auto Find = [&](RecvCB &cb) {
        auto Find = [&](RecvBHMsgCB &cb) {
            std::lock_guard<std::mutex> lock(mutex());
            const std::string &msgid = msg.msg_id();
            auto pos = async_cbs_.find(msgid);
@@ -39,10 +39,10 @@
            }
        };
        RecvCB cb;
        if (Find(cb) && cb) {
        RecvBHMsgCB cb;
        if (Find(cb)) {
            cb(msg);
        } else if (rrcb && msg.type() == kMsgTypeReply) {
        } else if (msg.type() == kMsgTypeReply) {
            DataReply reply;
            if (reply.ParseFromString(msg.body())) {
                rrcb(reply.data());
@@ -55,6 +55,20 @@
    return Start(AsyncRecvProc, nworker);
}
bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
{
    try {
        BHAddress addr;
        if (QueryRPCTopic(topic, addr, timeout_ms)) {
            const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
            return AsyncSend(addr.mq_id().data(), &msg, timeout_ms);
        } else {
            return false;
        }
    } catch (...) {
        return false;
    }
}
bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
{
    auto Call = [&](const void *remote) {
@@ -103,7 +117,17 @@
    return false;
}
bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms)
{
    assert(remote && pmsg);
    try {
        const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
        return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms);
    } catch (...) {
        return false;
    }
}
bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb)
{
    assert(remote && pmsg);
    try {
src/reqrep.h
@@ -43,9 +43,15 @@
    bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
    bool Stop() { return Socket::Stop(); }
    bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
    bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
    bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
    {
        return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
    }
    bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms)
    {
        return AsyncRequest(topic, data.data(), data.size(), timeout_ms);
    }
    bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
    bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
@@ -54,10 +60,11 @@
    }
private:
    bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
    bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvBHMsgCB &cb);
    bool AsyncSend(const void *remote, const void *msg, const int timeout_ms);
    bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
    bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    std::unordered_map<std::string, RecvCB> async_cbs_;
    std::unordered_map<std::string, RecvBHMsgCB> async_cbs_;
    typedef bhome_msg::BHAddress Address;
    class TopicCache
src/reqrep_center.cpp
@@ -99,63 +99,81 @@
    std::unordered_map<Topic, WeakNode> topic_map_;
    std::unordered_map<ProcId, Node> nodes_;
};
Synced<NodeCenter> &Center()
{
    static Synced<NodeCenter> s;
    return s;
}
} // namespace
bool ReqRepCenter::Start(const int nworker)
BHCenter::MsgHandler MakeReqRepCenter()
{
    auto center_ptr = std::make_shared<Synced<NodeCenter>>();
    auto onRecv = [center_ptr, this](BHMsg &msg) {
    return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
        auto &center = *center_ptr;
        auto &shm = socket.shm();
#ifndef NDEBUG
        static std::atomic<time_t> last(0);
        time_t now = 0;
        time(&now);
        if (last.exchange(now) < now) {
            printf("bus queue size: %ld\n", socket_.Pending());
            printf("bus queue size: %ld\n", socket.Pending());
        }
#endif
        if (msg.route_size() == 0) {
            return;
        }
        auto &src_mq = msg.route(0).mq_id();
        auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
        auto OnRegister = [&]() {
            if (msg.route_size() != 1) { return; }
            DataProcRegister reg;
            if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
                center->Register(*reg.mutable_proc(), src_mq, reg.topics().begin(), reg.topics().end());
                center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
            }
        };
        auto OnHeartbeat = [&]() {
            if (msg.route_size() != 1) { return; }
            auto &src_mq = msg.route(0).mq_id();
            DataProcHeartbeat hb;
            if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
                center->Heartbeat(*hb.mutable_proc(), src_mq);
                center->Heartbeat(*hb.mutable_proc(), SrcMQ());
            }
        };
        auto OnQueryTopic = [&]() {
            if (msg.route_size() != 1) { return; }
            DataProcQueryTopic query;
            NodeCenter::ProcAddr dest;
            if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
                MQId remote;
                memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
                memcpy(&remote, SrcMQ().data(), sizeof(MQId));
                MsgI imsg;
                if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
                if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
                    imsg.Release(shm());
                if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
                if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) {
                    imsg.Release(shm);
                }
            }
        };
        switch (msg.type()) {
        case kMsgTypeProcRegisterTopics: OnRegister(); break;
        case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
        case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
        default: break;
        case kMsgTypeProcRegisterTopics: OnRegister(); return true;
        case kMsgTypeProcHeartbeat: OnHeartbeat(); return true;
        case kMsgTypeProcQueryTopic: OnQueryTopic(); return true;
        default: return false;
        }
    };
}
bool ReqRepCenter::Start(const int nworker)
{
    auto handler = MakeReqRepCenter();
    printf("sizeof(rep/req handler) = %ld\n", sizeof(handler));
    const int kMaxWorker = 16;
    return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
    return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}
src/reqrep_center.h
@@ -18,24 +18,18 @@
#ifndef REQREP_CENTER_US3RBM60
#define REQREP_CENTER_US3RBM60
#include "center.h"
#include "defs.h"
#include "socket.h"
BHCenter::MsgHandler MakeReqRepCenter();
class ReqRepCenter
{
    class Socket : public ShmSocket
    {
    public:
        Socket(ShmSocket::Shm &shm) :
            ShmSocket(shm, &kBHTopicReqRepCenter, 1000) {}
        using ShmSocket::shm;
    };
    Socket socket_;
    ShmSocket::Shm &shm() { return socket_.shm(); }
    ShmSocket socket_;
public:
    ReqRepCenter(ShmSocket::Shm &shm) :
        socket_(shm) {}
        socket_(shm, &kBHTopicReqRepCenter, 1000) {}
    ReqRepCenter() :
        ReqRepCenter(BHomeShm()) {}
    ~ReqRepCenter() { Stop(); }
src/socket.cpp
@@ -49,7 +49,7 @@
    Stop(); //TODO should stop in sub class, incase thread access sub class data.
}
bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
bool ShmSocket::Start(const RecvCB &onData, int nworker)
{
    if (!mq_) {
        return false;
@@ -62,7 +62,12 @@
            try {
                MsgI imsg;
                DEFER1(imsg.Release(shm_));
                if (mq_->Recv(imsg, 100)) { onData(imsg); }
                if (mq_->Recv(imsg, 100)) {
                    BHMsg msg;
                    if (imsg.Unpack(msg)) {
                        onData(*this, imsg, msg);
                    }
                }
            } catch (...) {
            }
        }
@@ -73,11 +78,6 @@
        workers_.emplace_back(RecvProc);
    }
    return true;
}
bool ShmSocket::Start(const RecvCB &onData, int nworker)
{
    return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker);
}
bool ShmSocket::Stop()
src/socket.h
@@ -35,21 +35,24 @@
public:
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
    typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
    typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB;
    typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB;
    ShmSocket(Shm &shm, const void *id, const int len);
    ShmSocket(Shm &shm, const int len = 12);
    ~ShmSocket();
    Shm &shm() { return shm_; }
    // start recv.
    bool Start(const RecvCB &onData, int nworker = 1);
    bool StartRaw(const RecvRawCB &onData, int nworker = 1);
    bool Start(const RecvBHMsgCB &onData, int nworker = 1)
    {
        return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, nworker);
    }
    bool Stop();
    size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
protected:
    ShmSocket(Shm &shm, const void *id, const int len);
    Shm &shm() { return shm_; }
    const Shm &shm() const { return shm_; }
    Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return *mq_; }
utest/utest.cpp
@@ -181,14 +181,31 @@
    auto Client = [&](const std::string &topic, const int nreq) {
        SocketRequest client(shm);
        std::atomic<int> count(0);
        std::string reply;
        auto onRecv = [&](const std::string &rep) {
            reply = rep;
            if (++count >= nreq) {
                printf("count: %d\n", count.load());
            }
        };
        client.StartWorker(onRecv, 1);
        boost::timer::auto_cpu_timer timer;
        for (int i = 0; i < nreq; ++i) {
            if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
            if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) {
                printf("client request failed\n");
            }
            // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
            //     printf("client request failed\n");
            // } else {
            //     ++count;
            // }
        }
        printf("request %s %d done ", topic.c_str(), nreq);
        while (count.load() < nreq) {
            std::this_thread::yield();
        }
        client.Stop();
    };
    auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
        SocketReply server(shm);
@@ -212,7 +229,7 @@
    servers.Launch(Server, "server", topics);
    std::this_thread::sleep_for(100ms);
    for (auto &t : topics) {
        clients.Launch(Client, t, 1000 * 100);
        clients.Launch(Client, t, 1000 * 1000);
    }
    clients.WaitAll();
    run = false;