lichao
2021-04-06 70fec55c71f707358e6dba1b551d7836e93a5c78
refactor.
1个文件已添加
7个文件已修改
52 ■■■■ 已修改文件
src/center.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_reply.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_request.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.cpp
@@ -38,7 +38,7 @@
}
BHCenter::BHCenter(Socket::Shm &shm) :
    socket_(shm, &kBHUniCenter, 1000) {}
    socket_(shm, &BHUniCenter(), 1000) {}
BHCenter::BHCenter() :
    BHCenter(BHomeShm()) {}
src/defs.cpp
New file
@@ -0,0 +1,30 @@
/*
 * =====================================================================================
 *
 *       Filename:  defs.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月06日 19时23分14秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#include "defs.h"
namespace
{
const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff");
} // namespace
const MQId &BHTopicBus() { return kBHTopicBus; }
const MQId &BHTopicReqRepCenter() { return kBHTopicReqRepCenter; }
const MQId &BHUniCenter() { return kBHUniCenter; }
src/defs.h
@@ -25,9 +25,9 @@
typedef boost::uuids::uuid MQId;
const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff");
const MQId &BHTopicBus();
const MQId &BHTopicReqRepCenter();
const MQId &BHUniCenter();
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
src/pubsub.cpp
@@ -30,7 +30,7 @@
            return false;
        }
        DEFER1(imsg.Release(shm()));
        return ShmMsgQueue::Send(shm(), kBHTopicBus, imsg, timeout_ms);
        return ShmMsgQueue::Send(shm(), BHTopicBus(), imsg, timeout_ms);
    } catch (...) {
        return false;
    }
@@ -39,7 +39,7 @@
bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
{
    try {
        return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms);
        return mq().Send(BHTopicBus(), MakeSub(mq().Id(), topics), timeout_ms);
    } catch (...) {
        return false;
    }
src/pubsub_center.h
@@ -34,7 +34,7 @@
public:
    PubSubCenter(ShmSocket::Shm &shm) :
        socket_(shm, &kBHTopicBus, 1000) {}
        socket_(shm, &BHTopicBus(), 1000) {}
    PubSubCenter() :
        PubSubCenter(BHomeShm()) {}
    ~PubSubCenter() { Stop(); }
src/reqrep_center.h
@@ -29,7 +29,7 @@
public:
    ReqRepCenter(ShmSocket::Shm &shm) :
        socket_(shm, &kBHTopicReqRepCenter, 1000) {}
        socket_(shm, &BHTopicReqRepCenter(), 1000) {}
    ReqRepCenter() :
        ReqRepCenter(BHomeShm()) {}
    ~ReqRepCenter() { Stop(); }
src/topic_reply.cpp
@@ -70,11 +70,11 @@
bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
{
    //TODO check reply?
    return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
    return SyncSend(&BHTopicReqRepCenter(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
}
bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
{
    return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
    return SyncSend(&BHTopicReqRepCenter(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
}
bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
{
src/topic_request.cpp
@@ -191,7 +191,7 @@
    BHMsg result;
    const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
    if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) {
    if (SyncSendAndRecv(&BHTopicReqRepCenter(), &msg, &result, timeout_ms)) {
        if (result.type() == kMsgTypeQueryTopicReply) {
            MsgQueryTopicReply reply;
            if (reply.ParseFromString(result.body())) {