lichao
2021-04-06 3e9f5b869dd32441fdd3d77091cb33ef4301f244
use BHCenter.
10个文件已修改
83 ■■■■ 已修改文件
src/center.cpp 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.h 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_reply.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_request.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.cpp
@@ -37,8 +37,24 @@
    return shm;
}
BHCenter::BHCenter(Socket::Shm &shm) :
    socket_(shm, &BHUniCenter(), 1000) {}
BHCenter::CenterRecords &BHCenter::Centers()
{
    static CenterRecords rec;
    return rec;
}
bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len)
{
    CenterRecords()[name] = CenterInfo{name, handler, mqid, mq_len};
}
BHCenter::BHCenter(Socket::Shm &shm)
{
    sockets_["center"] = std::make_shared<ShmSocket>(shm, &BHTopicCenterAddress(), 1000);
    sockets_["bus"] = std::make_shared<ShmSocket>(shm, &BHTopicBusAddress(), 1000);
    for (auto &kv : Centers()) {
        sockets_[kv.first] = std::make_shared<ShmSocket>(shm, kv.second.mqid_.data(), kv.second.mq_len_);
    }
}
BHCenter::BHCenter() :
    BHCenter(BHomeShm()) {}
@@ -47,6 +63,20 @@
{
    auto onCenter = MakeReqRepCenter();
    auto onBus = MakeBusCenter();
    sockets_["center"]->Start(onCenter);
    sockets_["bus"]->Start(onBus);
    socket_.Start(Join(onCenter, onBus));
    for (auto &kv : Centers()) {
        sockets_[kv.first]->Start(kv.second.handler_);
    }
    return true;
    // socket_.Start(Join(onCenter, onBus));
}
bool BHCenter::Stop()
{
    for (auto &kv : sockets_) {
        kv.second->Stop();
    }
    return true;
}
src/center.h
@@ -20,6 +20,8 @@
#include "socket.h"
#include <functional>
#include <map>
#include <memory>
class BHCenter
{
@@ -27,15 +29,25 @@
public:
    typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler;
    static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len);
    BHCenter(Socket::Shm &shm);
    BHCenter();
    ~BHCenter() { Stop(); }
    bool Start();
    bool Stop() { return socket_.Stop(); }
    bool Stop();
private:
    ShmSocket socket_;
    struct CenterInfo {
        std::string name_;
        MsgHandler handler_;
        std::string mqid_;
        int mq_len_ = 0;
    };
    typedef std::map<std::string, CenterInfo> CenterRecords;
    static CenterRecords &Centers();
    std::map<std::string, std::shared_ptr<ShmSocket>> sockets_;
};
#endif // end of include guard: CENTER_TM9OUQTG
src/defs.cpp
@@ -25,6 +25,6 @@
} // namespace
const MQId &BHTopicBus() { return kBHTopicBus; }
const MQId &BHTopicReqRepCenter() { return kBHTopicReqRepCenter; }
const MQId &BHUniCenter() { return kBHUniCenter; }
const MQId &BHTopicBusAddress() { return kBHTopicBus; }
const MQId &BHTopicCenterAddress() { return kBHTopicReqRepCenter; }
const MQId &BHUniCenterAddress() { return kBHUniCenter; }
src/defs.h
@@ -25,9 +25,9 @@
typedef boost::uuids::uuid MQId;
const MQId &BHTopicBus();
const MQId &BHTopicReqRepCenter();
const MQId &BHUniCenter();
const MQId &BHTopicBusAddress();
const MQId &BHTopicCenterAddress();
const MQId &BHUniCenterAddress();
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
src/pubsub.cpp
@@ -30,7 +30,7 @@
            return false;
        }
        DEFER1(imsg.Release(shm()));
        return ShmMsgQueue::Send(shm(), BHTopicBus(), imsg, timeout_ms);
        return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
    } catch (...) {
        return false;
    }
@@ -39,7 +39,7 @@
bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
{
    try {
        return mq().Send(BHTopicBus(), MakeSub(mq().Id(), topics), timeout_ms);
        return mq().Send(BHTopicBusAddress(), MakeSub(mq().Id(), topics), timeout_ms);
    } catch (...) {
        return false;
    }
src/pubsub_center.h
@@ -34,7 +34,7 @@
public:
    PubSubCenter(ShmSocket::Shm &shm) :
        socket_(shm, &BHTopicBus(), 1000) {}
        socket_(shm, &BHTopicBusAddress(), 1000) {}
    PubSubCenter() :
        PubSubCenter(BHomeShm()) {}
    ~PubSubCenter() { Stop(); }
src/reqrep_center.h
@@ -29,7 +29,7 @@
public:
    ReqRepCenter(ShmSocket::Shm &shm) :
        socket_(shm, &BHTopicReqRepCenter(), 1000) {}
        socket_(shm, &BHTopicCenterAddress(), 1000) {}
    ReqRepCenter() :
        ReqRepCenter(BHomeShm()) {}
    ~ReqRepCenter() { Stop(); }
src/topic_reply.cpp
@@ -70,11 +70,11 @@
bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
{
    //TODO check reply?
    return SyncSend(&BHTopicReqRepCenter(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
    return SyncSend(&BHTopicCenterAddress(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
}
bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
{
    return SyncSend(&BHTopicReqRepCenter(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
    return SyncSend(&BHTopicCenterAddress(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
}
bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
{
src/topic_request.cpp
@@ -191,7 +191,7 @@
    BHMsg result;
    const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
    if (SyncSendAndRecv(&BHTopicReqRepCenter(), &msg, &result, timeout_ms)) {
    if (SyncSendAndRecv(&BHTopicCenterAddress(), &msg, &result, timeout_ms)) {
        if (result.type() == kMsgTypeQueryTopicReply) {
            MsgQueryTopicReply reply;
            if (reply.ParseFromString(result.body())) {
utest/utest.cpp
@@ -1,3 +1,4 @@
#include "center.h"
#include "defs.h"
#include "pubsub.h"
#include "pubsub_center.h"
@@ -176,8 +177,8 @@
    printf("flag = %d\n", *flag);
    ++*flag;
    ReqRepCenter center(shm);
    center.Start(2);
    BHCenter center(shm);
    center.Start();
    std::atomic<bool> run(true);
    auto Client = [&](const std::string &topic, const int nreq) {