lichao
2021-04-01 d26327b3cde043a9470dcd7fea8e704ea517fdae
add req/rep center;
2个文件已添加
13个文件已修改
554 ■■■■ 已修改文件
proto/source/bhome_msg.proto 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 68 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.cpp 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.h 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep.cpp 112 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep.h 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.cpp 121 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.h 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto
@@ -25,8 +25,11 @@
    kMsgTypePublish = 3;
    kMsgTypeSubscribe = 4;
    kMsgTypeUnsubscribe = 5;
    kMsgTypeQueryTopic = 6;
    kMsgTypeQueryTopicReply = 7;
    kMsgTypeProcQueryTopic = 6;
    kMsgTypeProcQueryTopicReply = 7;
    kMsgTypeProcRegisterTopics = 8;
    kMsgTypeProcHeartbeat = 9;
}
message DataPub {
@@ -47,10 +50,27 @@
    bytes data = 1; 
}
message DataQueryTopic {
message ProcInfo
{
    bytes name = 1;
    bytes info = 2;
}
message DataProcRegister
{
    ProcInfo proc = 1;
    repeated bytes topics = 2;
}
message DataProcHeartbeat
{
    ProcInfo proc = 1;
}
message DataProcQueryTopic {
    bytes topic = 1;
}
message DataQueryTopicReply {
message DataProcQueryTopicReply {
    BHAddress address = 1;
}
src/defs.h
@@ -24,8 +24,8 @@
typedef boost::uuids::uuid MQId;
const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
const MQId kBHTopicRPCId = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
namespace bhome_shm
src/msg.cpp
@@ -24,36 +24,61 @@
const uint32_t kMsgTag = 0xf1e2d3c4;
const uint32_t kMsgPrefixLen = 4;
BHMsg InitMsg(MsgType type)
inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); }
std::string RandId()
{
    boost::uuids::uuid id = boost::uuids::random_generator()();
    return std::string((char *) &id, sizeof(id));
}
BHMsg InitMsg(MsgType type, const std::string &msgid = RandId())
{
    BHMsg msg;
    msg.set_msg_id(msgid);
    msg.set_type(type);
    time_t tm = 0;
    msg.set_timestamp(time(&tm));
    return msg;
}
BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size)
{
    assert(data && size);
    BHMsg msg(InitMsg(kMsgTypeRequest));
    msg.set_body(data, size);
    msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
    return msg;
}
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
{
    BHMsg msg(InitMsg(kMsgTypeRequest));
    AddRoute(msg, src_id);
    DataRequest req;
    req.set_topic(topic);
    req.set_data(data, size);
    const std::string &body(req.SerializeAsString());
    return MakeRequest(src_id, body.data(), body.size());
    msg.set_body(req.SerializeAsString());
    return msg;
}
BHMsg MakeReply(const void *data, const size_t size)
BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
{
    BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics));
    AddRoute(msg, src_id);
    DataProcRegister reg;
    reg.mutable_proc()->Swap(&info);
    for (auto &t : topics) {
        reg.add_topics(t);
    }
    msg.set_body(reg.SerializeAsString());
    return msg;
}
BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
{
    BHMsg msg(InitMsg(kMsgTypeProcHeartbeat));
    AddRoute(msg, src_id);
    DataProcRegister reg;
    reg.mutable_proc()->Swap(&info);
    msg.set_body(reg.SerializeAsString());
    return msg;
}
BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
{
    assert(data && size);
    BHMsg msg(InitMsg(kMsgTypeReply));
    BHMsg msg(InitMsg(kMsgTypeReply, src_msgid));
    DataReply reply;
    reply.set_data(data, size);
    msg.set_body(reply.SerializeAsString());
@@ -64,7 +89,7 @@
{
    assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
    BHMsg msg(InitMsg(sub_unsub));
    msg.add_route()->set_mq_id(&client, sizeof(client));
    AddRoute(msg, client);
    DataSub subs;
    for (auto &t : topics) {
        subs.add_topics(t);
@@ -87,14 +112,23 @@
    return msg;
}
BHMsg MakeQueryTopic(const std::string &topic)
BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
{
    BHMsg msg(InitMsg(kMsgTypeQueryTopic));
    DataQueryTopic query;
    BHMsg msg(InitMsg(kMsgTypeProcQueryTopic));
    AddRoute(msg, client);
    DataProcQueryTopic query;
    query.set_topic(topic);
    msg.set_body(query.SerializeAsString());
    return msg;
}
BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
{
    BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid));
    DataProcQueryTopicReply reply;
    reply.mutable_address()->set_mq_id(mqid);
    msg.set_body(reply.SerializeAsString());
    return msg;
}
void *Pack(SharedMemory &shm, const BHMsg &msg)
{
src/msg.h
@@ -59,10 +59,12 @@
    int num_ = 1;
};
BHMsg MakeQueryTopic(const std::string &topic);
BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size);
BHMsg MakeQueryTopic(const MQId &client, const std::string &topic);
BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid);
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size);
BHMsg MakeReply(const void *data, const size_t size);
BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size);
BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics);
BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info);
BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
src/pubsub.cpp
@@ -30,7 +30,7 @@
            return false;
        }
        DEFER1(imsg.Release(shm()));
        return ShmMsgQueue::Send(shm(), kBHBusQueueId, imsg, timeout_ms);
        return ShmMsgQueue::Send(shm(), kBHTopicBus, imsg, timeout_ms);
    } catch (...) {
        return false;
    }
@@ -39,7 +39,7 @@
bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
{
    try {
        return mq().Send(kBHBusQueueId, MakeSub(mq().Id(), topics), timeout_ms);
        return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms);
    } catch (...) {
        return false;
    }
src/pubsub.h
@@ -50,9 +50,11 @@
        Socket(shm, 64) {}
    SocketSubscribe() :
        SocketSubscribe(BHomeShm()) {}
    ~SocketSubscribe() { Stop(); }
    typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB;
    bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
    bool Stop() { return Socket::Stop(); }
    bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
    bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
};
src/pubsub_center.cpp
@@ -17,9 +17,7 @@
 */
#include "pubsub_center.h"
#include "bh_util.h"
PubSubCenter::PubSubCenter(SharedMemory &shm) :
    socket_(shm) {}
using namespace bhome_shm;
bool PubSubCenter::Start(const int nworker)
{
src/pubsub_center.h
@@ -23,7 +23,6 @@
#include <mutex>
#include <set>
#include <unordered_map>
using namespace bhome_shm;
// publish/subcribe manager.
class PubSubCenter
@@ -31,18 +30,19 @@
    class SocketBus : public ShmSocket
    {
    public:
        SocketBus(SharedMemory &shm) :
            ShmSocket(shm, &kBHBusQueueId, 1000) {}
        SocketBus(ShmSocket::Shm &shm) :
            ShmSocket(shm, &kBHTopicBus, 1000) {}
        using ShmSocket::shm;
    };
    SocketBus socket_;
    ShmSocket::Shm &shm() { return socket_.shm(); }
    std::mutex mutex_;
    typedef std::set<MQId> Clients;
    std::unordered_map<std::string, Clients> records_;
    ShmSocket::Shm &shm() { return socket_.shm(); }
public:
    PubSubCenter(SharedMemory &shm);
    PubSubCenter(ShmSocket::Shm &shm) :
        socket_(shm) {}
    PubSubCenter() :
        PubSubCenter(BHomeShm()) {}
    ~PubSubCenter() { Stop(); }
src/reqrep.cpp
@@ -16,6 +16,7 @@
 * =====================================================================================
 */
#include "reqrep.h"
#include "bh_util.h"
#include "msg.h"
#include <chrono>
#include <condition_variable>
@@ -73,30 +74,33 @@
        BHAddress addr;
        if (QueryRPCTopic(topic, addr, timeout_ms)) {
            return Call(addr.mq_id().data());
        } else {
            return false;
        }
    } catch (...) {
        return false;
    }
}
bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
{
    try {
        BHAddress addr;
        if (QueryRPCTopic(topic, addr, timeout_ms)) {
            const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
            const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
            BHMsg reply;
            if (SyncSendAndRecv(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
            if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
                DataReply dr;
                if (dr.ParseFromString(msg.body())) {
                if (dr.ParseFromString(reply.body())) {
                    dr.mutable_data()->swap(out);
                    return true;
                }
            }
        } else {
        }
    } catch (...) {
        return false;
    }
    return false;
}
bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
@@ -132,11 +136,13 @@
            if (!st->canceled) {
                static_cast<BHMsg *>(result)->Swap(&msg);
                st->cv.notify_one();
            } // else result is no longer valid.
            } else {
            }
        };
        std::unique_lock<std::mutex> lk(st->mutex);
        if (AsyncSend(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
        bool sendok = AsyncSend(remote, msg, timeout_ms, OnRecv);
        if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
            return true;
        } else {
            st->canceled = true;
@@ -149,16 +155,100 @@
bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
    if (tmp_cache_.first == topic) {
        addr = tmp_cache_.second;
        return true;
    }
    BHMsg result;
    const BHMsg &msg = MakeQueryTopic(topic);
    if (SyncSendAndRecv(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
        if (result.type() == kMsgTypeQueryTopicReply) {
            DataQueryTopicReply reply;
    const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
    if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) {
        if (result.type() == kMsgTypeProcQueryTopicReply) {
            DataProcQueryTopicReply reply;
            if (reply.ParseFromString(result.body())) {
                addr = reply.address();
                tmp_cache_.first = topic;
                tmp_cache_.second = addr;
                return !addr.mq_id().empty();
            }
        }
    } else {
    }
    return false;
}
// reply socket
namespace
{
struct SrcInfo {
    std::vector<BHAddress> route;
    std::string msg_id;
};
} // namespace
bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
{
    //TODO check reply?
    return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
}
bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
{
    return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
}
bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
{
    auto onRecv = [this, rcb](BHMsg &msg) {
        if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) {
            DataRequest req;
            if (req.ParseFromString(msg.body())) {
                std::string out;
                if (rcb(req.topic(), req.data(), out)) {
                    BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
                    for (int i = 0; i < msg.route_size() - 1; ++i) {
                        msg.add_route()->Swap(msg.mutable_route(i));
                    }
                    SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100);
                }
            }
        } else {
            // ignored, or dropped
        }
    };
    return rcb && Start(onRecv, nworker);
}
bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
{
    BHMsg msg;
    if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) {
        DataRequest request;
        if (request.ParseFromString(msg.body())) {
            request.mutable_topic()->swap(topic);
            request.mutable_data()->swap(data);
            SrcInfo *p = new SrcInfo;
            p->route.assign(msg.route().begin(), msg.route().end());
            p->msg_id = msg.msg_id();
            src_info = p;
            return true;
        }
    }
    return false;
}
bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
{
    SrcInfo *p = static_cast<SrcInfo *>(src_info);
    DEFER1(delete p);
    if (!p || p->route.empty()) {
        return false;
    }
    BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
    for (unsigned i = 0; i < p->route.size() - 1; ++i) {
        msg.add_route()->Swap(&p->route[i]);
    }
    return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
}
src/reqrep.h
@@ -19,9 +19,12 @@
#define REQREP_ACEH09NK
#include "defs.h"
#include "msg.h"
#include "socket.h"
#include <functional>
#include <unordered_map>
using bhome::msg::ProcInfo;
class SocketRequest : private ShmSocket
{
@@ -32,19 +35,21 @@
        Socket(shm, 64) { StartWorker(); }
    SocketRequest() :
        SocketRequest(BHomeShm()) {}
    ~SocketRequest() { Stop(); }
    typedef std::function<void(const std::string &data)> RequestResultCB;
    bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
    bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
    bool Stop() { return Socket::Stop(); }
    bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
    bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
    {
        return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
    }
    bool SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
    bool SyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, std::string &out)
    bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
    bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms)
    {
        return SyncRequest(topic, data.data(), data.size(), timeout_ms, out);
        return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
    }
private:
@@ -52,6 +57,30 @@
    bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
    bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    std::unordered_map<std::string, RecvCB> async_cbs_;
    std::pair<std::string, bhome::msg::BHAddress> tmp_cache_;
};
class SocketReply : private ShmSocket
{
    typedef ShmSocket Socket;
public:
    SocketReply(Socket::Shm &shm) :
        Socket(shm, 64) {}
    SocketReply() :
        SocketReply(BHomeShm()) {}
    ~SocketReply() { Stop(); }
    typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
    bool StartWorker(const OnRequest &rcb, int nworker = 2);
    bool Stop() { return Socket::Stop(); }
    bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
    bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
    bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
    bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
private:
};
#endif // end of include guard: REQREP_ACEH09NK
src/reqrep_center.cpp
New file
@@ -0,0 +1,121 @@
/*
 * =====================================================================================
 *
 *       Filename:  reqrep_center.cpp
 *
 *    Description:  topic request/reply center
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 14时08分50秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#include "reqrep_center.h"
#include "bh_util.h"
using namespace bhome_shm;
struct A {
    void F(int){};
};
namespace
{
inline uint64_t Now()
{
    time_t t;
    return time(&t);
}
} // namespace
bool ReqRepCenter::Start(const int nworker)
{
    auto onRecv = [&](BHMsg &msg) {
#ifndef NDEBUG
        static std::atomic<time_t> last(0);
        time_t now = 0;
        time(&now);
        if (last.exchange(now) < now) {
            printf("bus queue size: %ld\n", socket_.Pending());
        }
#endif
        if (msg.route_size() == 0) {
            return;
        }
        auto &src_mq = msg.route(0).mq_id();
        auto OnRegister = [&]() {
            DataProcRegister reg;
            if (!reg.ParseFromString(msg.body())) {
                return;
            }
            ProcInfo pi;
            pi.server_mqid_ = src_mq;
            pi.proc_id_ = reg.proc().name();
            pi.ext_info_ = reg.proc().info();
            pi.timestamp_ = Now();
            std::lock_guard<std::mutex> lock(mutex_);
            for (auto &t : reg.topics()) {
                topic_mq_[t] = pi.server_mqid_;
            }
            procs_[pi.proc_id_] = pi;
        };
        auto OnHeartbeat = [&]() {
            DataProcHeartbeat hb;
            if (!hb.ParseFromString(msg.body())) {
                return;
            }
            std::lock_guard<std::mutex> lock(mutex_);
            auto pos = procs_.find(hb.proc().name());
            if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same.
                pos->second.timestamp_ = Now();
                pos->second.ext_info_ = hb.proc().info();
            }
        };
        auto OnQueryTopic = [&]() {
            DataProcQueryTopic query;
            if (!query.ParseFromString(msg.body())) {
                return;
            }
            std::string dest;
            auto FindDest = [&]() {
                std::lock_guard<std::mutex> lock(mutex_);
                auto pos = topic_mq_.find(query.topic());
                if (pos != topic_mq_.end()) {
                    dest = pos->second;
                    return true;
                } else {
                    return false;
                }
            };
            if (FindDest()) {
                MQId remote;
                memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
                MsgI imsg;
                if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
                if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
                    imsg.Release(shm());
                }
            }
        };
        switch (msg.type()) {
        case kMsgTypeProcRegisterTopics: OnRegister(); break;
        case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
        case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
        default: break;
        }
    };
    const int kMaxWorker = 16;
    return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}
src/reqrep_center.h
New file
@@ -0,0 +1,61 @@
/*
 * =====================================================================================
 *
 *       Filename:  reqrep_center.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 14时09分13秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef REQREP_CENTER_US3RBM60
#define REQREP_CENTER_US3RBM60
#include "defs.h"
#include "socket.h"
#include <chrono>
#include <mutex>
#include <set>
class ReqRepCenter
{
    class Socket : public ShmSocket
    {
    public:
        Socket(ShmSocket::Shm &shm) :
            ShmSocket(shm, &kBHTopicReqRepCenter, 1000) {}
        using ShmSocket::shm;
    };
    Socket socket_;
    ShmSocket::Shm &shm() { return socket_.shm(); }
    struct ProcInfo {
        std::string proc_id_; // unique name
        std::string server_mqid_;
        std::string ext_info_; // maybe json.
        uint64_t timestamp_ = 0;
    };
    typedef std::string Dests;
    std::mutex mutex_;
    std::unordered_map<std::string, Dests> topic_mq_;
    std::unordered_map<std::string, ProcInfo> procs_;
public:
    ReqRepCenter(ShmSocket::Shm &shm) :
        socket_(shm) {}
    ReqRepCenter() :
        ReqRepCenter(BHomeShm()) {}
    ~ReqRepCenter() { Stop(); }
    bool Start(const int nworker = 2);
    bool Stop() { return socket_.Stop(); }
};
#endif // end of include guard: REQREP_CENTER_US3RBM60
src/socket.cpp
@@ -46,7 +46,7 @@
ShmSocket::~ShmSocket()
{
    Stop();
    Stop(); //TODO should stop in sub class, incase thread access sub class data.
}
bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
@@ -102,12 +102,7 @@
bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
{
    std::lock_guard<std::mutex> lock(mutex_);
    if (!mq_ || RunningNoLock()) {
        return false;
    } else {
        return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
    }
    return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
}
bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
utest/speed_test.cpp
@@ -24,9 +24,9 @@
{
    const std::string shm_name("ShmSpeed");
    ShmRemover auto_remove(shm_name);
    const int mem_size       = 1024 * 1024 * 50;
    MQId id                  = boost::uuids::random_generator()();
    const int timeout        = 100;
    const int mem_size = 1024 * 1024 * 50;
    MQId id = boost::uuids::random_generator()();
    const int timeout = 100;
    const uint32_t data_size = 4000;
    auto Writer = [&](int writer_id, uint64_t n) {
@@ -35,7 +35,7 @@
        std::string str(data_size, 'a');
        MsgI msg;
        DEFER1(msg.Release(shm););
        msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
        msg.MakeRC(shm, MakeRequest(mq.Id(), "topic", str.data(), str.size()));
        for (uint64_t i = 0; i < n; ++i) {
            // mq.Send(id, str.data(), str.size(), timeout);
            mq.Send(id, msg, timeout);
@@ -70,7 +70,7 @@
    auto Test = [&](auto &www, auto &rrr, bool isfork) {
        for (auto nreader : nreaders) {
            for (auto nwriter : nwriters) {
                const uint64_t nmsg      = 1000 * 1000 * 10 / nwriter;
                const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
                const uint64_t total_msg = nmsg * nwriter;
                std::atomic<bool> run(true);
                std::this_thread::sleep_for(10ms);
@@ -104,26 +104,26 @@
    run.store(false);
}
// Request Reply Test
BOOST_AUTO_TEST_CASE(RRTest)
// Send Recv Test
BOOST_AUTO_TEST_CASE(SRTest)
{
    const std::string shm_name("ShmReqRep");
    const std::string shm_name("ShmSendRecv");
    ShmRemover auto_remove(shm_name);
    const int qlen          = 64;
    const int qlen = 64;
    const size_t msg_length = 1000;
    std::string msg_content(msg_length, 'a');
    msg_content[20] = '\0';
    SharedMemory shm(shm_name, 1024 * 1024 * 50);
    auto Avail      = [&]() { return shm.get_free_memory(); };
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    ShmMsgQueue srv(shm, qlen);
    ShmMsgQueue cli(shm, qlen);
    MsgI request_rc;
    request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
    request_rc.MakeRC(shm, MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()));
    MsgI reply_rc;
    reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
    reply_rc.MakeRC(shm, MakeReply("fakemsgid", msg_content.data(), msg_content.size()));
    std::atomic<uint64_t> count(0);
@@ -133,7 +133,7 @@
    auto Client = [&](int cli_id, int nmsg) {
        for (int i = 0; i < nmsg; ++i) {
            auto Req = [&]() {
                return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
                return cli.Send(srv.Id(), MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()), 100);
            };
            auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
@@ -165,7 +165,7 @@
                MQId src_id;
                memcpy(&src_id, mqid.data(), sizeof(src_id));
                auto Reply = [&]() {
                    return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
                    return srv.Send(src_id, MakeReply(req.msg_id(), msg_content.data(), msg_content.size()), 100);
                };
                auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
@@ -180,7 +180,7 @@
    ThreadManager clients, servers;
    for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
    int ncli      = 100 * 1;
    int ncli = 100 * 1;
    uint64_t nmsg = 100 * 100 * 2;
    printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
    for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
utest/utest.cpp
@@ -1,6 +1,8 @@
#include "defs.h"
#include "pubsub.h"
#include "pubsub_center.h"
#include "reqrep.h"
#include "reqrep_center.h"
#include "socket.h"
#include "util.h"
#include <atomic>
@@ -150,6 +152,62 @@
    bus.Stop();
}
BOOST_AUTO_TEST_CASE(ReqRepTest)
{
    const std::string shm_name("ShmReqRep");
    ShmRemover auto_remove(shm_name);
    SharedMemory shm(shm_name, 1024 * 1024 * 50);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    int *flag = shm.find_or_construct<int>("flag")(123);
    printf("flag = %d\n", *flag);
    ++*flag;
    ReqRepCenter center(shm);
    center.Start(2);
    std::atomic<bool> run(true);
    auto Client = [&](const std::string &topic, const int nreq) {
        SocketRequest client(shm);
        std::string reply;
        boost::timer::auto_cpu_timer timer;
        for (int i = 0; i < nreq; ++i) {
            if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
                printf("client request failed\n");
            }
        }
        printf("request %s %d done ", topic.c_str(), nreq);
    };
    auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
        SocketReply server(shm);
        ProcInfo info;
        info.set_name(name);
        info.set_info(name);
        if (!server.Register(info, topics, 100)) {
            printf("register failed\n");
        }
        auto onData = [](const std::string &topic, const std::string &data, std::string &reply) {
            reply = topic + ':' + data;
            return true;
        };
        server.StartWorker(onData);
        while (run) {
            std::this_thread::yield();
        }
    };
    ThreadManager clients, servers;
    std::vector<std::string> topics = {"topic1", "topic2"};
    servers.Launch(Server, "server", topics);
    std::this_thread::sleep_for(100ms);
    for (auto &t : topics) {
        clients.Launch(Client, t, 1000 * 100);
    }
    clients.WaitAll();
    run = false;
    servers.WaitAll();
}
inline int MyMin(int a, int b)
{
    printf("MyMin\n");