lichao
2021-04-06 bb9a7e348892eb5c4fccb063380aa6fcd9612b71
server resend failed; rename msgs; refactor.
3个文件已添加
2 文件已重命名
14个文件已修改
615 ■■■■■ 已修改文件
.vscode/launch.json 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/settings.json 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/tasks.json 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/error_msg.proto 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.cpp 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_reply.cpp 142 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_reply.h 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_request.cpp 106 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_request.h 32 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/launch.json
@@ -9,7 +9,10 @@
            "type": "cppdbg",
            "request": "launch",
            "program": "${workspaceFolder}/utest/utest",
            "args": [],
            "args": [
                "-t",
                "ReqRepTest"
            ],
            "stopAtEntry": false,
            "cwd": "${workspaceFolder}",
            "environment": [],
.vscode/settings.json
@@ -56,5 +56,9 @@
        "typeindex": "cpp",
        "typeinfo": "cpp",
        "variant": "cpp"
    }
    },
    "files.exclude": {
        "**/*.un~": true
    },
    "cmake.configureOnOpen": false
}
.vscode/tasks.json
@@ -3,32 +3,13 @@
        {
            "type": "cppbuild",
            "label": "C/C++: g++ build active file",
            "command": "/usr/bin/g++",
            "command": "ninja",
            "args": [
                "-g",
                "${file}",
                "-o",
                "${fileDirname}/${fileBasenameNoExtension}"
                "-C",
                "../build"
            ],
            "options": {
                "cwd": "${workspaceFolder}"
            },
            "problemMatcher": [
                "$gcc"
            ],
            "group": {
                "kind": "build",
                "isDefault": true
            },
            "detail": "Task generated by Debugger."
        },
        {
            "type": "cppbuild",
            "label": "C/C++: g++ build active file",
            "command": "make",
            "args": ["build"],
            "options": {
                "cwd": "${workspaceFolder}"
                "cwd": "${workspaceFolder}/utest"
            },
            "problemMatcher": [
                "$gcc"
@@ -36,7 +17,6 @@
            "group": "build",
            "detail": "compiler: /usr/bin/g++"
        }
    ],
    "version": "2.0.0"
}
proto/source/bhome_msg.proto
@@ -2,7 +2,11 @@
option optimize_for = LITE_RUNTIME;
import "google/protobuf/descriptor.proto";
import "error_msg.proto";
package bhome.msg;
// message format : header(BHMsgHead) + body(variable types)
message BHAddress {
@@ -13,7 +17,7 @@
message ProcInfo
{
    bytes id = 1;
    bytes id = 1; // serial number, maybe managed
    bytes name = 2;
    bytes public_info = 3;
    bytes private_info = 4;
@@ -28,6 +32,10 @@
    bytes topic = 6; // for request route
}
message BHMsgBody {
    bytes data = 1;
}
message BHMsg { // deprecated
    bytes msg_id = 1;
    int64 timestamp = 2;
@@ -38,55 +46,71 @@
enum MsgType {
    kMsgTypeInvalid = 0;
    kMsgTypeRequest = 1;
    kMsgTypeReply = 2;
    kMsgTypePublish = 3;
    kMsgTypeSubscribe = 4;
    kMsgTypeUnsubscribe = 5;
    kMsgTypeProcQueryTopic = 6;
    kMsgTypeProcQueryTopicReply = 7;
    kMsgTypeProcRegisterTopics = 8;
    kMsgTypeProcHeartbeat = 9;
    kMsgTypeCommonReply = 2;
    kMsgTypeRegister= 10;
    // kMsgTypeRegisterReply= 11;
    kMsgTypeHeartbeat = 12;
    // kMsgTypeHeartbeatReply = 13;
    kMsgTypeQueryTopic = 14;
    kMsgTypeQueryTopicReply = 15;
    kMsgTypeRequestTopic = 16;
    kMsgTypeRequestTopicReply = 17;
    kMsgTypePublish = 100;
    // kMsgTypePublishReply = 101;
    kMsgTypeSubscribe = 102;
    // kMsgTypeSubscribeReply = 103;
    kMsgTypeUnsubscribe = 104;
    // kMsgTypeUnsubscribeReply = 105;
}
message DataPub {
message MsgPub {
    bytes topic = 1;
    bytes data = 2; 
}
message DataSub {
message MsgSub {
    repeated bytes topics = 1;
}
message DataRequest {
message MsgCommonReply {
    ErrorMsg errmsg = 1;
}
message MsgRequestTopic {
    bytes topic = 1;
    bytes data = 2; 
}
message DataReply {
    bytes data = 1;
message MsgRequestTopicReply {
    ErrorMsg errmsg = 1;
    bytes data = 2;
}
message DataProcRegister
message MsgRegister
{
    ProcInfo proc = 1;
    repeated bytes topics = 2;
}
message DataProcHeartbeat
message MsgHeartbeat
{
    ProcInfo proc = 1;
}
message DataProcQueryTopic {
message MsgQueryTopic {
    bytes topic = 1;
}
message DataProcQueryTopicReply {
    BHAddress address = 1;
message MsgQueryTopicReply {
    ErrorMsg errmsg = 1;
    BHAddress address = 2;
}
service TopicRequestReplyService {
    rpc Request (DataRequest) returns (DataReply);
}
service TopicRPC {
    rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply);
    rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply);
}
proto/source/error_msg.proto
New file
@@ -0,0 +1,16 @@
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
package bhome.msg;
enum ErrorCode {
    eSuccess = 0;
    eError = 1;
    eInvalidInput = 2;
}
message ErrorMsg {
    ErrorCode errCode = 1;
    bytes errString = 2;
}
src/msg.cpp
@@ -20,7 +20,10 @@
namespace bhome_msg
{
/*TODO change msg format, header has proc info;
reply has errer msg
    center accept request and route.;
//*/
const uint32_t kMsgTag = 0xf1e2d3c4;
const uint32_t kMsgPrefixLen = 4;
@@ -43,9 +46,9 @@
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
{
    BHMsg msg(InitMsg(kMsgTypeRequest));
    BHMsg msg(InitMsg(kMsgTypeRequestTopic));
    AddRoute(msg, src_id);
    DataRequest req;
    MsgRequestTopic req;
    req.set_topic(topic);
    req.set_data(data, size);
    msg.set_body(req.SerializeAsString());
@@ -54,9 +57,9 @@
BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
{
    BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics));
    BHMsg msg(InitMsg(kMsgTypeRegister));
    AddRoute(msg, src_id);
    DataProcRegister reg;
    MsgRegister reg;
    reg.mutable_proc()->Swap(&info);
    for (auto &t : topics) {
        reg.add_topics(t);
@@ -67,9 +70,9 @@
BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
{
    BHMsg msg(InitMsg(kMsgTypeProcHeartbeat));
    BHMsg msg(InitMsg(kMsgTypeHeartbeat));
    AddRoute(msg, src_id);
    DataProcRegister reg;
    MsgHeartbeat reg;
    reg.mutable_proc()->Swap(&info);
    msg.set_body(reg.SerializeAsString());
    return msg;
@@ -78,8 +81,8 @@
BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
{
    assert(data && size);
    BHMsg msg(InitMsg(kMsgTypeReply, src_msgid));
    DataReply reply;
    BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid));
    MsgRequestTopicReply reply;
    reply.set_data(data, size);
    msg.set_body(reply.SerializeAsString());
    return msg;
@@ -90,7 +93,7 @@
    assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
    BHMsg msg(InitMsg(sub_unsub));
    AddRoute(msg, client);
    DataSub subs;
    MsgSub subs;
    for (auto &t : topics) {
        subs.add_topics(t);
    }
@@ -105,7 +108,7 @@
{
    assert(data && size);
    BHMsg msg(InitMsg(kMsgTypePublish));
    DataPub pub;
    MsgPub pub;
    pub.set_topic(topic);
    pub.set_data(data, size);
    msg.set_body(pub.SerializeAsString());
@@ -114,17 +117,17 @@
BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
{
    BHMsg msg(InitMsg(kMsgTypeProcQueryTopic));
    BHMsg msg(InitMsg(kMsgTypeQueryTopic));
    AddRoute(msg, client);
    DataProcQueryTopic query;
    MsgQueryTopic query;
    query.set_topic(topic);
    msg.set_body(query.SerializeAsString());
    return msg;
}
BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
{
    BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid));
    DataProcQueryTopicReply reply;
    BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid));
    MsgQueryTopicReply reply;
    reply.mutable_address()->set_mq_id(mqid);
    msg.set_body(reply.SerializeAsString());
    return msg;
src/pubsub.cpp
@@ -49,7 +49,7 @@
{
    auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
        if (msg.type() == kMsgTypePublish) {
            DataPub d;
            MsgPub d;
            if (d.ParseFromString(msg.body())) {
                tdcb(d.topic(), d.data());
            }
@@ -65,7 +65,7 @@
{
    BHMsg msg;
    if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
        DataPub d;
        MsgPub d;
        if (d.ParseFromString(msg.body())) {
            d.mutable_topic()->swap(topic);
            d.mutable_data()->swap(data);
src/pubsub_center.cpp
@@ -94,7 +94,7 @@
        auto &shm = socket.shm();
        auto OnSubChange = [&](auto &&update) {
            DataSub sub;
            MsgSub sub;
            if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
                assert(sizeof(MQId) == msg.route(0).mq_id().size());
                MQId client;
@@ -106,7 +106,7 @@
        auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
        auto OnPublish = [&]() {
            DataPub pub;
            MsgPub pub;
            if (!pub.ParseFromString(msg.body())) {
                return;
            }
src/reqrep_center.cpp
@@ -100,12 +100,6 @@
    std::unordered_map<ProcId, Node> nodes_;
};
Synced<NodeCenter> &Center()
{
    static Synced<NodeCenter> s;
    return s;
}
} // namespace
BHCenter::MsgHandler MakeReqRepCenter()
@@ -120,7 +114,7 @@
        time_t now = 0;
        time(&now);
        if (last.exchange(now) < now) {
            printf("bus queue size: %ld\n", socket.Pending());
            printf("center queue size: %ld\n", socket.Pending());
        }
#endif
        auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
@@ -128,7 +122,7 @@
        auto OnRegister = [&]() {
            if (msg.route_size() != 1) { return; }
            DataProcRegister reg;
            MsgRegister reg;
            if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
                center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
            }
@@ -138,7 +132,7 @@
            if (msg.route_size() != 1) { return; }
            auto &src_mq = msg.route(0).mq_id();
            DataProcHeartbeat hb;
            MsgHeartbeat hb;
            if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
                center->Heartbeat(*hb.mutable_proc(), SrcMQ());
            }
@@ -147,7 +141,7 @@
        auto OnQueryTopic = [&]() {
            if (msg.route_size() != 1) { return; }
            DataProcQueryTopic query;
            MsgQueryTopic query;
            NodeCenter::ProcAddr dest;
            if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
                MQId remote;
@@ -161,9 +155,9 @@
        };
        switch (msg.type()) {
        case kMsgTypeProcRegisterTopics: OnRegister(); return true;
        case kMsgTypeProcHeartbeat: OnHeartbeat(); return true;
        case kMsgTypeProcQueryTopic: OnQueryTopic(); return true;
        case kMsgTypeRegister: OnRegister(); return true;
        case kMsgTypeHeartbeat: OnHeartbeat(); return true;
        case kMsgTypeQueryTopic: OnQueryTopic(); return true;
        default: return false;
        }
    };
@@ -176,4 +170,4 @@
    const int kMaxWorker = 16;
    return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}
}
src/shm_queue.cpp
@@ -76,24 +76,16 @@
    Queue *remote = Find(shm, MsgQIdToName(remote_id));
    return remote && remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
}
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
{
    Queue *remote = Find(shm, MsgQIdToName(remote_id));
    return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
}
// Test shows that in the 2 cases:
// 1) build msg first, then find remote queue;
// 2) find remote queue first, then build msg;
// 1 is about 50% faster than 2, maybe cache related.
bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, const std::function<void()> &onsend)
{
    MsgI msg;
    if (msg.Make(shm(), data)) {
        if (Send(remote_id, msg, timeout_ms, onsend)) {
            return true;
        } else {
            msg.Release(shm());
        }
    }
    return false;
}
bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
{
src/shm_queue.h
@@ -134,22 +134,26 @@
    bool Recv(BHMsg &msg, const int timeout_ms);
    bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
    template <class... Extra>
    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, Extra const &...extra)
    {
        return Send(shm, remote_id, msg, timeout_ms, []() {});
        return Send(shm(), remote_id, msg, timeout_ms, extra...);
    }
    bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms, OnSend const &onsend);
    bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms)
    template <class... Extra>
    bool Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, Extra const &...extra)
    {
        return Send(remote_id, msg, timeout_ms, []() {});
    }
    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
    {
        return Send(shm(), remote_id, msg, timeout_ms, onsend);
    }
    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
    {
        return Send(shm(), remote_id, msg, timeout_ms);
        MsgI msg;
        if (msg.Make(shm(), data)) {
            if (Send(shm(), remote_id, msg, timeout_ms, extra...)) {
                return true;
            } else {
                msg.Release(shm());
            }
        }
        return false;
    }
    size_t Pending() const { return data()->size(); }
};
src/socket.cpp
@@ -49,15 +49,15 @@
    Stop(); //TODO should stop in sub class, incase thread access sub class data.
}
bool ShmSocket::Start(const RecvCB &onData, int nworker)
bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker)
{
    if (!mq_) {
        return false;
    if (!mq_ || !onData) {
        return false; // TODO error code.
    }
    std::lock_guard<std::mutex> lock(mutex_);
    StopNoLock();
    auto RecvProc = [this, onData]() {
    auto RecvProc = [this, onData, onIdle]() {
        while (run_) {
            try {
                MsgI imsg;
@@ -67,6 +67,8 @@
                    if (imsg.Unpack(msg)) {
                        onData(*this, imsg, msg);
                    }
                } else if (onIdle) {
                    onIdle(*this);
                }
            } catch (...) {
            }
src/socket.h
@@ -37,6 +37,7 @@
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB;
    typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB;
    typedef std::function<void(ShmSocket &sock)> IdleCB;
    ShmSocket(Shm &shm, const void *id, const int len);
    ShmSocket(Shm &shm, const int len = 12);
@@ -44,22 +45,27 @@
    Shm &shm() { return shm_; }
    // start recv.
    bool Start(const RecvCB &onData, int nworker = 1);
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1);
    bool Start(const RecvCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); }
    bool Start(const RecvBHMsgCB &onData, const IdleCB &onIdle, int nworker = 1)
    {
        return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, onIdle, nworker);
    }
    bool Start(const RecvBHMsgCB &onData, int nworker = 1)
    {
        return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, nworker);
        return Start(onData, IdleCB(), nworker);
    }
    bool Stop();
    size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
    bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
    bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
protected:
    const Shm &shm() const { return shm_; }
    Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return *mq_; }
    std::mutex &mutex() { return mutex_; }
    bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
    bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
private:
    bool StopNoLock();
src/topic_reply.cpp
New file
@@ -0,0 +1,142 @@
/*
 * =====================================================================================
 *
 *       Filename:  topic_reply.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月06日 14时40分52秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#include "topic_reply.h"
#include <chrono>
#include <list>
using namespace bhome_msg;
using namespace std::chrono;
using namespace std::chrono_literals;
namespace
{
struct SrcInfo {
    std::vector<BHAddress> route;
    std::string msg_id;
};
class FailedQ
{
    struct FailedMsg {
        steady_clock::time_point xpr;
        std::string remote_;
        BHMsg msg_;
        FailedMsg(const std::string &addr, BHMsg &&msg) :
            xpr(steady_clock::now() + 10s), remote_(addr), msg_(std::move(msg)) {}
        bool Expired() { return steady_clock::now() > xpr; }
    };
    typedef std::list<FailedMsg> Queue;
    Synced<Queue> queue_;
public:
    void Push(const std::string &remote, BHMsg &&msg)
    {
        queue_->emplace_back(remote, std::move(msg));
    }
    void TrySend(ShmSocket &socket, const int timeout_ms = 0)
    {
        queue_.Apply([&](Queue &q) {
            if (!q.empty()) {
                auto it = q.begin();
                do {
                    if (it->Expired() || socket.SyncSend(it->remote_.data(), it->msg_, timeout_ms)) {
                        it = q.erase(it);
                    } else {
                        ++it;
                    }
                } while (it != q.end());
            }
        });
    }
};
} // namespace
bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
{
    //TODO check reply?
    return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
}
bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
{
    return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
}
bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
{
    auto failed_q = std::make_shared<FailedQ>();
    auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
    auto onRecv = [this, rcb, failed_q, onIdle](BHMsg &msg) {
        if (msg.type() == kMsgTypeRequestTopic && msg.route_size() > 0) {
            MsgRequestTopic req;
            if (req.ParseFromString(msg.body())) {
                std::string out;
                if (rcb(req.topic(), req.data(), out)) {
                    BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
                    for (int i = 0; i < msg.route_size() - 1; ++i) {
                        msg.add_route()->Swap(msg.mutable_route(i));
                    }
                    if (!SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 10)) {
                        failed_q->Push(msg.route().rbegin()->mq_id(), std::move(msg_reply));
                    }
                }
            }
        } else {
            // ignored, or dropped
        }
        onIdle(*this);
    };
    return rcb && Start(onRecv, onIdle, nworker);
}
bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
{
    BHMsg msg;
    if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequestTopic) {
        MsgRequestTopic request;
        if (request.ParseFromString(msg.body())) {
            request.mutable_topic()->swap(topic);
            request.mutable_data()->swap(data);
            SrcInfo *p = new SrcInfo;
            p->route.assign(msg.route().begin(), msg.route().end());
            p->msg_id = msg.msg_id();
            src_info = p;
            return true;
        }
    }
    return false;
}
bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
{
    SrcInfo *p = static_cast<SrcInfo *>(src_info);
    DEFER1(delete p);
    if (!p || p->route.empty()) {
        return false;
    }
    BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
    for (unsigned i = 0; i < p->route.size() - 1; ++i) {
        msg.add_route()->Swap(&p->route[i]);
    }
    return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
}
src/topic_reply.h
New file
@@ -0,0 +1,52 @@
/*
 * =====================================================================================
 *
 *       Filename:  topic_reply.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月06日 14时41分12秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef TOPIC_REPLY_3RVYPPWI
#define TOPIC_REPLY_3RVYPPWI
#include "bh_util.h"
#include "defs.h"
#include "msg.h"
#include "socket.h"
#include <deque>
#include <functional>
using bhome::msg::ProcInfo;
class SocketReply : private ShmSocket
{
    typedef ShmSocket Socket;
public:
    SocketReply(Socket::Shm &shm) :
        Socket(shm, 64) {}
    SocketReply() :
        SocketReply(BHomeShm()) {}
    ~SocketReply() { Stop(); }
    typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
    bool StartWorker(const OnRequest &rcb, int nworker = 2);
    bool Stop() { return Socket::Stop(); }
    bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
    bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
    bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
    bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
private:
};
#endif // end of include guard: TOPIC_REPLY_3RVYPPWI
src/topic_request.cpp
File was renamed from src/reqrep.cpp
@@ -1,9 +1,9 @@
/*
 * =====================================================================================
 *
 *       Filename:  reqrep.cpp
 *       Filename:  topic_request.cpp
 *
 *    Description:  topic request/reply sockets
 *    Description:  topic request sockets
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 09时35分35秒
@@ -15,7 +15,7 @@
 *
 * =====================================================================================
 */
#include "reqrep.h"
#include "topic_request.h"
#include "bh_util.h"
#include "msg.h"
#include <chrono>
@@ -40,10 +40,10 @@
        };
        RecvBHMsgCB cb;
        if (Find(cb)) {
        if (Find(cb) && cb) {
            cb(msg);
        } else if (msg.type() == kMsgTypeReply) {
            DataReply reply;
        } else if (msg.type() == kMsgTypeRequestTopicReply && rrcb) {
            MsgRequestTopicReply reply;
            if (reply.ParseFromString(msg.body())) {
                rrcb(reply.data());
            }
@@ -74,8 +74,8 @@
    auto Call = [&](const void *remote) {
        const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
        auto onRecv = [cb](BHMsg &msg) {
            if (msg.type() == kMsgTypeReply) {
                DataReply reply;
            if (msg.type() == kMsgTypeRequestTopicReply) {
                MsgRequestTopicReply reply;
                if (reply.ParseFromString(msg.body())) {
                    cb(reply.data());
                }
@@ -103,16 +103,22 @@
        if (QueryRPCTopic(topic, addr, timeout_ms)) {
            const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
            BHMsg reply;
            if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
                DataReply dr;
            if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeRequestTopicReply) {
                MsgRequestTopicReply dr;
                if (dr.ParseFromString(reply.body())) {
                    dr.mutable_data()->swap(out);
                    return true;
                } else {
                    printf("error parse reply.\n");
                }
            } else {
                printf("error recv data. line: %d\n", __LINE__);
            }
        } else {
            printf("error recv data. line: %d\n", __LINE__);
        }
    } catch (...) {
        printf("error recv data. line: %d\n", __LINE__);
    }
    return false;
}
@@ -186,8 +192,8 @@
    BHMsg result;
    const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
    if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) {
        if (result.type() == kMsgTypeProcQueryTopicReply) {
            DataProcQueryTopicReply reply;
        if (result.type() == kMsgTypeQueryTopicReply) {
            MsgQueryTopicReply reply;
            if (reply.ParseFromString(result.body())) {
                addr = reply.address();
                if (addr.mq_id().empty()) {
@@ -202,79 +208,3 @@
    }
    return false;
}
// reply socket
namespace
{
struct SrcInfo {
    std::vector<BHAddress> route;
    std::string msg_id;
};
} // namespace
bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
{
    //TODO check reply?
    return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
}
bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
{
    return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
}
bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
{
    auto onRecv = [this, rcb](BHMsg &msg) {
        if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) {
            DataRequest req;
            if (req.ParseFromString(msg.body())) {
                std::string out;
                if (rcb(req.topic(), req.data(), out)) {
                    BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
                    for (int i = 0; i < msg.route_size() - 1; ++i) {
                        msg.add_route()->Swap(msg.mutable_route(i));
                    }
                    SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100);
                }
            }
        } else {
            // ignored, or dropped
        }
    };
    return rcb && Start(onRecv, nworker);
}
bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
{
    BHMsg msg;
    if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) {
        DataRequest request;
        if (request.ParseFromString(msg.body())) {
            request.mutable_topic()->swap(topic);
            request.mutable_data()->swap(data);
            SrcInfo *p = new SrcInfo;
            p->route.assign(msg.route().begin(), msg.route().end());
            p->msg_id = msg.msg_id();
            src_info = p;
            return true;
        }
    }
    return false;
}
bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
{
    SrcInfo *p = static_cast<SrcInfo *>(src_info);
    DEFER1(delete p);
    if (!p || p->route.empty()) {
        return false;
    }
    BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
    for (unsigned i = 0; i < p->route.size() - 1; ++i) {
        msg.add_route()->Swap(&p->route[i]);
    }
    return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
}
src/topic_request.h
File was renamed from src/reqrep.h
@@ -1,9 +1,9 @@
/*
 * =====================================================================================
 *
 *       Filename:  reqrep.h
 *       Filename:  topic_request.h
 *
 *    Description:  topic request/reply sockets
 *    Description:  topic request socket
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 09时36分06秒
@@ -15,8 +15,8 @@
 *
 * =====================================================================================
 */
#ifndef REQREP_ACEH09NK
#define REQREP_ACEH09NK
#ifndef TOPIC_REQUEST_ACEH09NK
#define TOPIC_REQUEST_ACEH09NK
#include "bh_util.h"
#include "defs.h"
@@ -105,26 +105,4 @@
    TopicCache topic_cache_;
};
class SocketReply : private ShmSocket
{
    typedef ShmSocket Socket;
public:
    SocketReply(Socket::Shm &shm) :
        Socket(shm, 64) {}
    SocketReply() :
        SocketReply(BHomeShm()) {}
    ~SocketReply() { Stop(); }
    typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
    bool StartWorker(const OnRequest &rcb, int nworker = 2);
    bool Stop() { return Socket::Stop(); }
    bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
    bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
    bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
    bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
private:
};
#endif // end of include guard: REQREP_ACEH09NK
#endif // end of include guard: TOPIC_REQUEST_ACEH09NK
utest/speed_test.cpp
@@ -160,7 +160,7 @@
    auto Server = [&]() {
        BHMsg req;
        while (!stop) {
            if (srv.Recv(req, 100) && req.type() == kMsgTypeRequest) {
            if (srv.Recv(req, 100) && req.type() == kMsgTypeRequestTopic) {
                auto &mqid = req.route()[0].mq_id();
                MQId src_id;
                memcpy(&src_id, mqid.data(), sizeof(src_id));
utest/utest.cpp
@@ -1,9 +1,10 @@
#include "defs.h"
#include "pubsub.h"
#include "pubsub_center.h"
#include "reqrep.h"
#include "reqrep_center.h"
#include "socket.h"
#include "topic_reply.h"
#include "topic_request.h"
#include "util.h"
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
@@ -189,24 +190,26 @@
                printf("count: %d\n", count.load());
            }
        };
        client.StartWorker(onRecv, 1);
        client.StartWorker(onRecv, 2);
        boost::timer::auto_cpu_timer timer;
        for (int i = 0; i < nreq; ++i) {
            if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) {
                printf("client request failed\n");
            }
            // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
            //     printf("client request failed\n");
            // } else {
            //     ++count;
            // }
        }
        printf("request %s %d done ", topic.c_str(), nreq);
        while (count.load() < nreq) {
        do {
            std::this_thread::yield();
        }
        } while (count.load() < nreq);
        client.Stop();
        printf("request %s %d done ", topic.c_str(), count.load());
    };
    std::atomic_uint64_t server_msg_count(0);
    auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
        SocketReply server(shm);
        ProcInfo info;
@@ -215,7 +218,8 @@
        if (!server.Register(info, topics, 100)) {
            printf("register failed\n");
        }
        auto onData = [](const std::string &topic, const std::string &data, std::string &reply) {
        auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
            ++server_msg_count;
            reply = topic + ':' + data;
            return true;
        };
@@ -229,9 +233,10 @@
    servers.Launch(Server, "server", topics);
    std::this_thread::sleep_for(100ms);
    for (auto &t : topics) {
        clients.Launch(Client, t, 1000 * 1000);
        clients.Launch(Client, t, 1000 * 100);
    }
    clients.WaitAll();
    printf("clients done, server replyed: %d\n", server_msg_count.load());
    run = false;
    servers.WaitAll();
}