lichao
2021-04-08 c338820e4db43ad32c20ff429a038b06bcb980f8
BIG change, join center,bus; now msg is head+body.
19个文件已修改
5个文件已添加
8个文件已删除
2695 ■■■■ 已修改文件
.gitignore 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/launch.json 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/settings.json 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/tasks.json 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto 84 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg_api.proto 71 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/error_msg.proto 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.cpp 402 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 145 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 69 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.cpp 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.h 78 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.cpp 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.h 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.cpp 147 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.h 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.cpp 173 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.h 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 73 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 97 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 322 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 121 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_reply.cpp 142 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_reply.h 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_request.cpp 210 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_request.h 108 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/simple_tests.cpp 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 68 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -1,4 +1,8 @@
*.un~
build/
debug/
release/
Makefile
utest/utest
*.bak
gmon.out
.vscode/launch.json
@@ -8,7 +8,7 @@
            "name": "g++ - Build and debug active file",
            "type": "cppdbg",
            "request": "launch",
            "program": "${workspaceFolder}/utest/utest",
            "program": "${workspaceFolder}/debug/bin/utest",
            "args": [
                "-t",
                "ReqRepTest"
.vscode/settings.json
@@ -55,7 +55,12 @@
        "cinttypes": "cpp",
        "typeindex": "cpp",
        "typeinfo": "cpp",
        "variant": "cpp"
        "variant": "cpp",
        "iomanip": "cpp",
        "*.inc": "cpp",
        "strstream": "cpp",
        "unordered_set": "cpp",
        "cfenv": "cpp"
    },
    "files.exclude": {
        "**/*.un~": true
.vscode/tasks.json
@@ -6,10 +6,10 @@
            "command": "ninja",
            "args": [
                "-C",
                "../build"
                "debug"
            ],
            "options": {
                "cwd": "${workspaceFolder}/utest"
                "cwd": "${workspaceFolder}"
            },
            "problemMatcher": [
                "$gcc"
proto/source/bhome_msg.proto
@@ -1,39 +1,21 @@
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
import "google/protobuf/descriptor.proto";
import "error_msg.proto";
// import "google/protobuf/descriptor.proto";
import "bhome_msg_api.proto";
package bhome.msg;
// message format : header(BHMsgHead) + body(variable types)
message BHAddress {
    bytes mq_id = 1; // mqid, uuid
    bytes ip = 2;   //
    int32 port = 3;
}
message ProcInfo
{
    bytes id = 1; // serial number, maybe managed
    bytes name = 2;
    bytes public_info = 3;
    bytes private_info = 4;
}
// message format : head_len(4) + head(BHMsgHead) + body_len(4) + body(variable types)
message BHMsgHead {
    bytes msg_id = 1;
    repeated BHAddress route = 2; // for reply and proxy.
    int64 timestamp = 3;
    int32 type = 4;
    ProcInfo proc = 5;
    bytes proc_id = 5;
    bytes topic = 6; // for request route
}
message BHMsgBody {
    bytes data = 1;
}
message BHMsg { // deprecated
@@ -46,6 +28,7 @@
enum MsgType {
    kMsgTypeInvalid = 0;
    kMsgTypeRawData = 1;
    kMsgTypeCommonReply = 2;
@@ -57,57 +40,16 @@
    kMsgTypeQueryTopicReply = 15;
    kMsgTypeRequestTopic = 16;
    kMsgTypeRequestTopicReply = 17;
    kMsgTypeRegisterRPC = 18;
    // reply
    kMsgTypePublish = 100;
    // kMsgTypePublishReply = 101;
    kMsgTypeSubscribe = 102;
    // kMsgTypeSubscribeReply = 103;
    kMsgTypeUnsubscribe = 104;
    // kMsgTypeUnsubscribeReply = 105;
    kMsgTypePublish = 20;
    // kMsgTypePublishReply = 21;
    kMsgTypeSubscribe = 22;
    // kMsgTypeSubscribeReply = 23;
    kMsgTypeUnsubscribe = 24;
    // kMsgTypeUnsubscribeReply = 25;
}
message MsgPub {
    bytes topic = 1;
    bytes data = 2;
}
message MsgSub {
    repeated bytes topics = 1;
}
message MsgCommonReply {
    ErrorMsg errmsg = 1;
}
message MsgRequestTopic {
    bytes topic = 1;
    bytes data = 2;
}
message MsgRequestTopicReply {
    ErrorMsg errmsg = 1;
    bytes data = 2;
}
message MsgRegister
{
    ProcInfo proc = 1;
    repeated bytes topics = 2;
}
message MsgHeartbeat
{
    ProcInfo proc = 1;
}
message MsgQueryTopic {
    bytes topic = 1;
}
message MsgQueryTopicReply {
    ErrorMsg errmsg = 1;
    BHAddress address = 2;
}
service TopicRPC {
proto/source/bhome_msg_api.proto
New file
@@ -0,0 +1,71 @@
syntax = "proto3";
option optimize_for = LITE_RUNTIME;
// public messages
import "error_msg.proto";
package bhome.msg;
message BHAddress {
    bytes mq_id = 1; // mqid, uuid
    // bytes ip = 2;   //
    // int32 port = 3;
}
message ProcInfo
{
    bytes proc_id = 1; // serial number, maybe managed
    bytes name = 2;
    bytes public_info = 3; // maybe json.
    bytes private_info = 4;
}
message MsgPublish {
    bytes topic = 1;
    bytes data = 2;
}
message MsgSubscribe {
    repeated bytes topics = 1;
}
message MsgUnsubscribe {
    repeated bytes topics = 1;
}
message MsgCommonReply {
    ErrorMsg errmsg = 1;
}
message MsgRequestTopic {
    bytes topic = 1;
    bytes data = 2;
}
message MsgRequestTopicReply {
    ErrorMsg errmsg = 1;
    bytes data = 2;
}
message MsgRegister
{
    ProcInfo proc = 1;
}
message MsgRegisterRPC
{
    repeated bytes topics = 1;
}
message MsgHeartbeat
{
    ProcInfo proc = 1;
}
message MsgQueryTopic {
    bytes topic = 1;
}
message MsgQueryTopicReply {
    ErrorMsg errmsg = 1;
    BHAddress address = 2;
}
proto/source/error_msg.proto
@@ -8,6 +8,11 @@
    eSuccess = 0;
    eError = 1;
    eInvalidInput = 2;
    eNotRegistered = 3;
    eNotFound = 4;
    eOffline = 5;
    eNoRespond = 6;
    eAddressNotMatch = 7;
}
message ErrorMsg {
src/center.cpp
@@ -16,20 +16,387 @@
 * =====================================================================================
 */
#include "center.h"
#include "bh_util.h"
#include "defs.h"
#include "pubsub_center.h"
#include "reqrep_center.h"
#include "shm.h"
#include <set>
using namespace bhome_shm;
using namespace bhome_msg;
using namespace bhome::msg;
typedef BHCenter::MsgHandler Handler;
Handler Join(Handler h1, Handler h2)
namespace
{
    return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg) {
        return h1(socket, imsg, msg) || h2(socket, imsg, msg);
auto Now = []() { time_t t; return time(&t); };
//TODO check proc_id
class NodeCenter
{
public:
    typedef std::string ProcId;
    typedef std::string Address;
    typedef bhome::msg::ProcInfo ProcInfo;
private:
    enum {
        kStateInvalid = 0,
        kStateNormal = 1,
        kStateNoRespond = 2,
        kStateOffline = 3,
    };
    struct ProcState {
        time_t timestamp_ = 0;
        uint32_t flag_ = 0; // reserved
    };
    typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
    struct NodeInfo {
        ProcState state_;             // state
        Address addr_;                // registered_mqid.
        ProcInfo proc_;               //
        AddressTopics services_;      // address: topics
        AddressTopics subscriptions_; // address: topics
    };
    typedef std::shared_ptr<NodeInfo> Node;
    typedef std::weak_ptr<NodeInfo> WeakNode;
    struct TopicDest {
        Address mq_;
        WeakNode weak_node_;
        bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
    };
    const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
public:
    typedef std::set<TopicDest> Clients;
    NodeCenter(const std::string &id = "#Center") :
        id_(id) {}
    const std::string &id() const { return id_; } // no need to lock.
    //TODO maybe just return serialized string.
    MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
    {
        if (msg.proc().proc_id() != head.proc_id()) {
            return MakeReply(eInvalidInput, "invalid proc id.");
        }
        try {
            Node node(new NodeInfo);
            node->addr_ = SrcAddr(head);
            node->proc_.Swap(msg.mutable_proc());
            node->state_.timestamp_ = Now();
            node->state_.flag_ = kStateNormal;
            nodes_[node->proc_.proc_id()] = node;
            return MakeReply(eSuccess);
        } catch (...) {
            return MakeReply(eError, "register node error.");
        }
    }
    template <class OnSuccess, class OnError>
    auto HandleMsg(const BHMsgHead &head, OnSuccess onOk, OnError onErr)
    {
        auto pos = nodes_.find(head.proc_id());
        if (pos == nodes_.end()) {
            return onErr(eNotRegistered, "Node is not registered.");
        } else {
            auto node = pos->second;
            if (head.type() == kMsgTypeHeartbeat && node->addr_ != SrcAddr(head)) {
                return onErr(eAddressNotMatch, "Node address error.");
            } else if (!Valid(*node)) {
                return onErr(eNoRespond, "Node is not alive.");
            } else {
                return onOk(node);
            }
        }
    }
    template <class Reply, class Func>
    Reply HandleMsg(const BHMsgHead &head, Func const &op)
    {
        try {
            auto onErr = [](const ErrorCode ec, const std::string &str) { return MakeReply<Reply>(ec, str); };
            return HandleMsg(head, op, onErr);
            auto pos = nodes_.find(head.proc_id());
            if (pos == nodes_.end()) {
                return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
            } else {
                auto node = pos->second;
                if (node->addr_ != SrcAddr(head)) {
                    return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
                } else if (!Valid(*node)) {
                    return MakeReply<Reply>(eNoRespond, "Node is not alive.");
                } else {
                    return op(node);
                }
            }
        } catch (...) {
            //TODO error log
            return MakeReply<Reply>(eError, "internal error.");
        }
    }
    template <class Func>
    inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op)
    {
        return HandleMsg<MsgCommonReply, Func>(head, op);
    }
    MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg)
    {
        return HandleMsg(
            head, [&](Node node) -> MsgCommonReply {
                auto &src = SrcAddr(head);
                node->services_[src].insert(msg.topics().begin(), msg.topics().end());
                TopicDest dest = {src, node};
                for (auto &topic : msg.topics()) {
                    service_map_[topic].insert(dest);
                }
                return MakeReply(eSuccess);
            });
    }
    MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg)
    {
        return HandleMsg(head, [&](Node node) {
            NodeInfo &ni = *node;
            ni.state_.timestamp_ = Now();
            auto &info = msg.proc();
            if (!info.public_info().empty()) {
                ni.proc_.set_public_info(info.public_info());
            }
            if (!info.private_info().empty()) {
                ni.proc_.set_private_info(info.private_info());
            }
            return MakeReply(eSuccess);
        });
    }
    MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req)
    {
        typedef MsgQueryTopicReply Reply;
        auto query = [&](Node self) -> MsgQueryTopicReply {
            auto pos = service_map_.find(req.topic());
            if (pos != service_map_.end() && !pos->second.empty()) {
                // now just find first one.
                const TopicDest &dest = *(pos->second.begin());
                Node dest_node(dest.weak_node_.lock());
                if (!dest_node) {
                    service_map_.erase(pos);
                    return MakeReply<Reply>(eOffline, "topic server offline.");
                } else if (!Valid(*dest_node)) {
                    return MakeReply<Reply>(eNoRespond, "topic server not responding.");
                } else {
                    MsgQueryTopicReply reply = MakeReply<Reply>(eSuccess);
                    reply.mutable_address()->set_mq_id(dest.mq_);
                    return reply;
                }
            } else {
                return MakeReply<Reply>(eNotFound, "topic server not found.");
            }
        };
        return HandleMsg<Reply>(head, query);
    }
    MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
    {
        return HandleMsg(head, [&](Node node) {
            auto &src = SrcAddr(head);
            node->subscriptions_[src].insert(msg.topics().begin(), msg.topics().end());
            TopicDest dest = {src, node};
            for (auto &topic : msg.topics()) {
                subscribe_map_[topic].insert(dest);
            }
            return MakeReply(eSuccess);
        });
    }
    MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
    {
        return HandleMsg(head, [&](Node node) {
            auto &src = SrcAddr(head);
            auto pos = node->subscriptions_.find(src);
            auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
                auto pos = subscribe_map_.find(topic);
                if (pos != subscribe_map_.end() &&
                    pos->second.erase(dest) != 0 &&
                    pos->second.empty()) {
                    subscribe_map_.erase(pos);
                }
            };
            if (pos != node->subscriptions_.end()) {
                const TopicDest &dest = {src, node};
                // clear node sub records;
                for (auto &topic : msg.topics()) {
                    pos->second.erase(topic);
                    RemoveSubTopicDestRecord(topic, dest);
                }
                if (pos->second.empty()) {
                    node->subscriptions_.erase(pos);
                }
            }
            return MakeReply(eSuccess);
        });
    }
    Clients DoFindClients(const std::string &topic)
    {
        Clients dests;
        auto Find1 = [&](const std::string &t) {
            auto pos = subscribe_map_.find(topic);
            if (pos != subscribe_map_.end()) {
                auto &clients = pos->second;
                for (auto &cli : clients) {
                    if (Valid(cli.weak_node_)) {
                        dests.insert(cli);
                    }
                }
            }
        };
        Find1(topic);
        size_t pos = 0;
        while (true) {
            pos = topic.find(kTopicSep, pos);
            if (pos == topic.npos || ++pos == topic.size()) {
                // Find1(std::string()); // sub all.
                break;
            } else {
                Find1(topic.substr(0, pos));
            }
        }
        return dests;
    }
    bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply)
    {
        bool ret = false;
        HandleMsg(head, [&](Node node) {
            DoFindClients(msg.topic()).swap(out);
            ret = true;
            return MakeReply(eSuccess);
        }).Swap(&reply);
        return ret;
    }
private:
    bool Valid(const NodeInfo &node)
    {
        return node.state_.flag_ == kStateNormal;
    }
    bool Valid(const WeakNode &weak)
    {
        auto node = weak.lock();
        return node && Valid(*node);
    }
    void CheckAllNodes(); //TODO, call it in timer.
    std::string id_;      // center proc id;
    std::unordered_map<Topic, Clients> service_map_;
    std::unordered_map<Topic, Clients> subscribe_map_;
    std::unordered_map<ProcId, Node> nodes_;
};
template <class Body, class OnMsg, class Replyer>
inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer)
{
    if (head.route_size() != 1) { return; }
    Body body;
    if (msg.ParseBody(body)) {
        replyer(onmsg(body));
    }
}
Handler Combine(const Handler &h1, const Handler &h2)
{
    return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) {
        return h1(socket, msg, head) || h2(socket, msg, head);
    };
}
template <class... H>
Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest)
{
    return Combine(Combine(h0, h1), h2, rest...);
}
#define CASE_ON_MSG_TYPE(MsgTag)                                                         \
    case kMsgType##MsgTag:                                                               \
        Dispatch<Msg##MsgTag>(                                                           \
            msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
        return true;
bool InstallCenter()
{
    auto center_ptr = std::make_shared<Synced<NodeCenter>>();
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
        return [&](auto &&rep_body) {
            auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
            bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10);
            if (!r) {
                printf("send reply failed.\n");
            }
            //TODO resend failed.
        };
    };
    auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
        auto &center = *center_ptr;
        auto replyer = MakeReplyer(socket, head, center->id());
        switch (head.type()) {
            CASE_ON_MSG_TYPE(Register);
            CASE_ON_MSG_TYPE(Heartbeat);
            CASE_ON_MSG_TYPE(RegisterRPC);
            CASE_ON_MSG_TYPE(QueryTopic);
        default: return false;
        }
    };
    auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
        auto &center = *center_ptr;
        auto replyer = MakeReplyer(socket, head, center->id());
        auto OnPublish = [&]() {
            MsgPublish pub;
            NodeCenter::Clients clients;
            MsgCommonReply reply;
            MsgI pubmsg;
            if (head.route_size() != 1 || !msg.ParseBody(pub)) {
                return;
            } else if (!center->FindClients(head, pub, clients, reply)) {
                // send error reply.
                MakeReplyer(socket, head, center->id())(reply);
            } else if (pubmsg.MakeRC(socket.shm(), msg)) {
                DEFER1(pubmsg.Release(socket.shm()));
                for (auto &cli : clients) {
                    auto node = cli.weak_node_.lock();
                    if (node) {
                        socket.Send(cli.mq_.data(), pubmsg, 10);
                    }
                }
            }
        };
        switch (head.type()) {
            CASE_ON_MSG_TYPE(Subscribe);
            CASE_ON_MSG_TYPE(Unsubscribe);
        case kMsgTypePublish: OnPublish(); return true;
        default: return false;
        }
    };
    BHCenter::Install("#center.reg", OnCenter, BHTopicCenterAddress(), 1000);
    BHCenter::Install("#center.bus", OnPubSub, BHTopicBusAddress(), 1000);
    return true;
}
#undef CASE_ON_MSG_TYPE
} // namespace
SharedMemory &BHomeShm()
{
@@ -42,17 +409,24 @@
    static CenterRecords rec;
    return rec;
}
bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len)
{
    CenterRecords()[name] = CenterInfo{name, handler, mqid, mq_len};
    Centers()[name] = CenterInfo{name, handler, mqid, mq_len};
    return true;
}
bool BHCenter::Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len)
{
    return Install(name, handler, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
}
BHCenter::BHCenter(Socket::Shm &shm)
{
    sockets_["center"] = std::make_shared<ShmSocket>(shm, &BHTopicCenterAddress(), 1000);
    sockets_["bus"] = std::make_shared<ShmSocket>(shm, &BHTopicBusAddress(), 1000);
    InstallCenter();
    for (auto &kv : Centers()) {
        sockets_[kv.first] = std::make_shared<ShmSocket>(shm, kv.second.mqid_.data(), kv.second.mq_len_);
        auto &info = kv.second;
        sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_);
    }
}
@@ -61,16 +435,12 @@
bool BHCenter::Start()
{
    auto onCenter = MakeReqRepCenter();
    auto onBus = MakeBusCenter();
    sockets_["center"]->Start(onCenter);
    sockets_["bus"]->Start(onBus);
    for (auto &kv : Centers()) {
        sockets_[kv.first]->Start(kv.second.handler_);
        auto &info = kv.second;
        sockets_[info.name_]->Start(info.handler_);
    }
    return true;
    // socket_.Start(Join(onCenter, onBus));
}
bool BHCenter::Stop()
src/center.h
@@ -28,8 +28,9 @@
    typedef ShmSocket Socket;
public:
    typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler;
    typedef Socket::PartialRecvCB MsgHandler;
    static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len);
    static bool Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len);
    BHCenter(Socket::Shm &shm);
    BHCenter();
src/msg.cpp
@@ -25,140 +25,38 @@
    center accept request and route.;
//*/
const uint32_t kMsgTag = 0xf1e2d3c4;
const uint32_t kMsgPrefixLen = 4;
inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); }
std::string RandId()
void *MsgI::Pack(SharedMemory &shm,
                 const uint32_t head_len, const ToArray &headToArray,
                 const uint32_t body_len, const ToArray &bodyToArray)
{
    boost::uuids::uuid id = boost::uuids::random_generator()();
    return std::string((char *) &id, sizeof(id));
}
BHMsg InitMsg(MsgType type, const std::string &msgid = RandId())
{
    BHMsg msg;
    msg.set_msg_id(msgid);
    msg.set_type(type);
    time_t tm = 0;
    msg.set_timestamp(time(&tm));
    return msg;
}
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
{
    BHMsg msg(InitMsg(kMsgTypeRequestTopic));
    AddRoute(msg, src_id);
    MsgRequestTopic req;
    req.set_topic(topic);
    req.set_data(data, size);
    msg.set_body(req.SerializeAsString());
    return msg;
}
BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
{
    BHMsg msg(InitMsg(kMsgTypeRegister));
    AddRoute(msg, src_id);
    MsgRegister reg;
    reg.mutable_proc()->Swap(&info);
    for (auto &t : topics) {
        reg.add_topics(t);
    void *addr = shm.Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len);
    if (addr) {
        auto p = static_cast<char *>(addr);
        auto Pack1 = [&p](auto len, auto &writer) {
            Put32(p, len);
            p += sizeof(len);
            writer(p, len);
            p += len;
        };
        Pack1(head_len, headToArray);
        Pack1(body_len, bodyToArray);
    }
    msg.set_body(reg.SerializeAsString());
    return msg;
    return addr;
}
BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
bool MsgI::ParseHead(BHMsgHead &head) const
{
    BHMsg msg(InitMsg(kMsgTypeHeartbeat));
    AddRoute(msg, src_id);
    MsgHeartbeat reg;
    reg.mutable_proc()->Swap(&info);
    msg.set_body(reg.SerializeAsString());
    return msg;
}
BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
{
    assert(data && size);
    BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid));
    MsgRequestTopicReply reply;
    reply.set_data(data, size);
    msg.set_body(reply.SerializeAsString());
    return msg;
}
BHMsg MakeSubUnsub(const MQId &client, const std::vector<std::string> &topics, const MsgType sub_unsub)
{
    assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
    BHMsg msg(InitMsg(sub_unsub));
    AddRoute(msg, client);
    MsgSub subs;
    for (auto &t : topics) {
        subs.add_topics(t);
    }
    msg.set_body(subs.SerializeAsString());
    return msg;
}
BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); }
BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeUnsubscribe); }
BHMsg MakePub(const std::string &topic, const void *data, const size_t size)
{
    assert(data && size);
    BHMsg msg(InitMsg(kMsgTypePublish));
    MsgPub pub;
    pub.set_topic(topic);
    pub.set_data(data, size);
    msg.set_body(pub.SerializeAsString());
    return msg;
}
BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
{
    BHMsg msg(InitMsg(kMsgTypeQueryTopic));
    AddRoute(msg, client);
    MsgQueryTopic query;
    query.set_topic(topic);
    msg.set_body(query.SerializeAsString());
    return msg;
}
BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
{
    BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid));
    MsgQueryTopicReply reply;
    reply.mutable_address()->set_mq_id(mqid);
    msg.set_body(reply.SerializeAsString());
    return msg;
}
void *Pack(SharedMemory &shm, const BHMsg &msg)
{
    uint32_t msg_size = msg.ByteSizeLong();
    void *p = shm.Alloc(4 + msg_size);
    if (p) {
        Put32(p, msg_size);
        if (!msg.SerializeToArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size)) {
            shm.Dealloc(p);
            p = 0;
        }
    }
    return p;
}
bool MsgI::Unpack(BHMsg &msg) const
{
    void *p = ptr_.get();
    auto p = static_cast<char *>(ptr_.get());
    assert(p);
    uint32_t msg_size = Get32(p);
    return msg.ParseFromArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size);
    p += 4;
    return head.ParseFromArray(p, msg_size);
}
// with ref count;
bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg)
bool MsgI::MakeRC(SharedMemory &shm, void *p)
{
    void *p = Pack(shm, msg);
    if (!p) {
        return false;
    }
@@ -171,9 +69,8 @@
    return true;
}
bool MsgI::Make(SharedMemory &shm, const BHMsg &msg)
bool MsgI::Make(SharedMemory &shm, void *p)
{
    void *p = Pack(shm, msg);
    if (!p) {
        return false;
    }
src/msg.h
@@ -18,10 +18,12 @@
#ifndef MSG_5BILLZET
#define MSG_5BILLZET
#include "bhome_msg.pb.h"
#include "bh_util.h"
#include "proto.h"
#include "shm.h"
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <functional>
#include <stdint.h>
namespace bhome_msg
@@ -59,16 +61,6 @@
    int num_ = 1;
};
BHMsg MakeQueryTopic(const MQId &client, const std::string &topic);
BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid);
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size);
BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size);
BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics);
BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info);
BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
// message content layout: header_size + header + data_size + data
class MsgI
{
@@ -76,7 +68,22 @@
    offset_ptr<void> ptr_;
    offset_ptr<RefCount> count_;
    bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub);
    typedef std::function<void(void *p, int len)> ToArray;
    void *Pack(SharedMemory &shm,
               const uint32_t head_len, const ToArray &headToArray,
               const uint32_t body_len, const ToArray &bodyToArray);
    template <class Body>
    void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    {
        return Pack(
            shm,
            uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); },
            uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
    }
    bool MakeRC(SharedMemory &shm, void *addr);
    bool Make(SharedMemory &shm, void *addr);
public:
    MsgI(void *p = 0, RefCount *c = 0) :
@@ -97,9 +104,41 @@
    int Count() const { return IsCounted() ? count_->Get() : 1; }
    bool IsCounted() const { return static_cast<bool>(count_); }
    bool Make(SharedMemory &shm, const BHMsg &msg);
    bool MakeRC(SharedMemory &shm, const BHMsg &msg);
    bool Unpack(BHMsg &msg) const;
    template <class Body>
    bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    {
        return Make(shm, Pack(shm, head, body));
    }
    template <class Body>
    bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    {
        return MakeRC(shm, Pack(shm, head, body));
    }
    bool MakeRC(SharedMemory &shm, MsgI &a)
    {
        if (a.IsCounted()) {
            *this = a;
            AddRef();
            return true;
        } else {
            void *p = a.ptr_.get();
            a.ptr_ = 0;
            return MakeRC(shm, p);
        }
    }
    bool ParseHead(BHMsgHead &head) const;
    template <class Body>
    bool ParseBody(Body &body) const
    {
        auto p = static_cast<char *>(ptr_.get());
        assert(p);
        uint32_t size = Get32(p);
        p += 4;
        p += size;
        size = Get32(p);
        p += 4;
        return body.ParseFromArray(p, size);
    }
};
inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); }
src/proto.cpp
New file
@@ -0,0 +1,41 @@
/*
 * =====================================================================================
 *
 *       Filename:  proto.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月07日 17时04分36秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "proto.h"
#include <boost/uuid/uuid_generators.hpp>
std::string RandId()
{
    boost::uuids::uuid id = boost::uuids::random_generator()();
    return std::string((char *) &id, sizeof(id));
}
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id)
{
    return InitMsgHead(type, proc_id, RandId());
}
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid)
{
    BHMsgHead msg;
    msg.set_msg_id(msgid);
    msg.set_type(type);
    msg.set_proc_id(proc_id);
    time_t tm = 0;
    msg.set_timestamp(time(&tm));
    return msg;
}
src/proto.h
New file
@@ -0,0 +1,78 @@
/*
 * =====================================================================================
 *
 *       Filename:  proto.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月07日 13时48分51秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef PROTO_UA9UWKL1
#define PROTO_UA9UWKL1
#include "bhome_msg.pb.h"
using namespace bhome::msg;
template <class Msg>
struct MsgToType {
};
#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE)              \
    template <>                                        \
    struct MsgToType<mSG> {                            \
        static const bhome::msg::MsgType value = tYPE; \
    };
#define BHOME_SIMPLE_MAP_MSG(name) BHOME_MAP_MSG_AND_TYPE(Msg##name, kMsgType##name)
BHOME_SIMPLE_MAP_MSG(CommonReply);
BHOME_SIMPLE_MAP_MSG(Register);
BHOME_SIMPLE_MAP_MSG(RegisterRPC);
BHOME_SIMPLE_MAP_MSG(Heartbeat);
BHOME_SIMPLE_MAP_MSG(QueryTopic);
BHOME_SIMPLE_MAP_MSG(QueryTopicReply);
BHOME_SIMPLE_MAP_MSG(RequestTopic);
BHOME_SIMPLE_MAP_MSG(RequestTopicReply);
BHOME_SIMPLE_MAP_MSG(Publish);
BHOME_SIMPLE_MAP_MSG(Subscribe);
BHOME_SIMPLE_MAP_MSG(Unsubscribe);
#undef BHOME_SIMPLE_MAP_MSG
#undef BHOME_MAP_MSG_AND_TYPE
template <class Msg>
constexpr inline bhome::msg::MsgType GetType(const Msg &)
{
    return MsgToType<Msg>::value;
}
inline void SetError(ErrorMsg &em, const ErrorCode err_code, const std::string &err_str = "")
{
    em.set_errcode(err_code);
    if (!err_str.empty()) {
        em.set_errstring(err_str);
    }
}
template <class Reply = MsgCommonReply>
inline Reply MakeReply(const ErrorCode err_code, const std::string &err_str = "")
{
    Reply msg;
    SetError(*msg.mutable_errmsg(), err_code, err_str);
    return msg;
}
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
// inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
#endif // end of include guard: PROTO_UA9UWKL1
src/pubsub.cpp
@@ -22,24 +22,38 @@
using namespace std::chrono_literals;
using namespace bhome_msg;
bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms)
{
    try {
        MsgPublish pub;
        pub.set_topic(topic);
        pub.set_data(data, size);
        BHMsgHead head(InitMsgHead(GetType(pub), proc_id));
        MsgI imsg;
        if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) {
            return false;
        if (imsg.MakeRC(shm(), head, pub)) {
            DEFER1(imsg.Release(shm()));
            return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
        }
        DEFER1(imsg.Release(shm()));
        return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
    } catch (...) {
        return false;
    }
    return false;
}
namespace
{
inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
} // namespace
bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms)
{
    try {
        return mq().Send(BHTopicBusAddress(), MakeSub(mq().Id(), topics), timeout_ms);
        MsgSubscribe sub;
        for (auto &topic : topics) {
            sub.add_topics(topic);
        }
        BHMsgHead head(InitMsgHead(GetType(sub), proc_id));
        AddRoute(head, mq().Id());
        return Send(&BHTopicBusAddress(), head, sub, timeout_ms);
    } catch (...) {
        return false;
    }
@@ -47,11 +61,11 @@
bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
{
    auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
        if (msg.type() == kMsgTypePublish) {
            MsgPub d;
            if (d.ParseFromString(msg.body())) {
                tdcb(d.topic(), d.data());
    auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
        if (head.type() == kMsgTypePublish) {
            MsgPublish pub;
            if (imsg.ParseBody(pub)) {
                tdcb(head.proc_id(), pub.topic(), pub.data());
            }
        } else {
            // ignored, or dropped
@@ -61,14 +75,16 @@
    return tdcb && Start(AsyncRecvProc, nworker);
}
bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms)
bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
{
    BHMsg msg;
    if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
        MsgPub d;
        if (d.ParseFromString(msg.body())) {
            d.mutable_topic()->swap(topic);
            d.mutable_data()->swap(data);
    MsgI msg;
    BHMsgHead head;
    if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
        MsgPublish pub;
        if (msg.ParseBody(pub)) {
            head.mutable_proc_id()->swap(proc_id);
            pub.mutable_topic()->swap(topic);
            pub.mutable_data()->swap(data);
            return true;
        }
    }
src/pubsub.h
@@ -33,11 +33,7 @@
        shm_(shm) {}
    SocketPublish() :
        SocketPublish(BHomeShm()) {}
    bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
    bool Publish(const Topic &topic, const std::string &data, const int timeout_ms)
    {
        return Publish(topic, data.data(), data.size(), timeout_ms);
    }
    bool Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms);
};
// socket subscribe
@@ -52,11 +48,11 @@
        SocketSubscribe(BHomeShm()) {}
    ~SocketSubscribe() { Stop(); }
    typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB;
    typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
    bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
    bool Stop() { return Socket::Stop(); }
    bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
    bool RecvSub(Topic &topic, std::string &data, const int timeout_ms);
    bool Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms);
    bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
};
#endif // end of include guard: PUBSUB_4KGRA997
src/pubsub_center.cpp
File was deleted
src/pubsub_center.h
File was deleted
src/reqrep_center.cpp
File was deleted
src/reqrep_center.h
File was deleted
src/shm_queue.cpp
@@ -87,15 +87,14 @@
// 2) find remote queue first, then build msg;
// 1 is about 50% faster than 2, maybe cache related.
bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
{
    MsgI imsg;
    if (Read(imsg, timeout_ms)) {
        DEFER1(imsg.Release(shm()););
        return imsg.Unpack(msg);
    } else {
        return false;
    }
}
// bool ShmMsgQueue::Recv(MsgI &imsg, BHMsgHead &head, const int timeout_ms)
// {
//     if (Read(imsg, timeout_ms)) {
//         // DEFER1(imsg.Release(shm()););
//         return imsg.ParseHead(head);
//     } else {
//         return false;
//     }
// }
} // namespace bhome_shm
src/shm_queue.h
@@ -131,7 +131,7 @@
    ~ShmMsgQueue();
    const MQId &Id() const { return id_; }
    bool Recv(BHMsg &msg, const int timeout_ms);
    // bool Recv(MsgI &msg, BHMsgHead &head, const int timeout_ms);
    bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
@@ -141,12 +141,11 @@
    {
        return Send(shm(), remote_id, msg, timeout_ms, extra...);
    }
    template <class... Extra>
    bool Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, Extra const &...extra)
    template <class Body, class... Extra>
    bool Send(const MQId &remote_id, const BHMsgHead &head, const Body &body, const int timeout_ms, Extra const &...extra)
    {
        MsgI msg;
        if (msg.Make(shm(), data)) {
        if (msg.Make(shm(), head, body)) {
            if (Send(shm(), remote_id, msg, timeout_ms, extra...)) {
                return true;
            } else {
@@ -155,6 +154,7 @@
        }
        return false;
    }
    size_t Pending() const { return data()->size(); }
};
src/socket.cpp
@@ -29,43 +29,53 @@
} // namespace
ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) :
    shm_(shm), run_(false)
ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
    shm_(shm), run_(false), mq_(id, shm, len)
{
    if (id && len > 0) {
        mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len));
    }
}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
    shm_(shm), run_(false)
{
    if (len > 0) {
        mq_.reset(new Queue(shm_, len));
    }
}
    shm_(shm), run_(false), mq_(shm, len) {}
ShmSocket::~ShmSocket()
{
    Stop(); //TODO should stop in sub class, incase thread access sub class data.
}
bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker)
bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
    if (!mq_ || !onData) {
        return false; // TODO error code.
    }
    auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
        auto Find = [&](RecvCB &cb) {
            std::lock_guard<std::mutex> lock(mutex());
            const std::string &msgid = head.msg_id();
            auto pos = async_cbs_.find(msgid);
            if (pos != async_cbs_.end()) {
                cb.swap(pos->second);
                async_cbs_.erase(pos);
                return true;
            } else {
                return false;
            }
        };
        RecvCB cb;
        if (Find(cb)) {
            cb(socket, imsg, head);
        } else if (onData) {
            onData(socket, imsg, head);
        } // else ignored, or dropped
    };
    std::lock_guard<std::mutex> lock(mutex_);
    StopNoLock();
    auto RecvProc = [this, onData, onIdle]() {
    auto RecvProc = [this, onRecv, onIdle]() {
        while (run_) {
            try {
                MsgI imsg;
                DEFER1(imsg.Release(shm_));
                if (mq_->Recv(imsg, 100)) {
                    BHMsg msg;
                    if (imsg.Unpack(msg)) {
                        onData(*this, imsg, msg);
                if (mq().Recv(imsg, 10)) {
                    DEFER1(imsg.Release(shm()));
                    BHMsgHead head;
                    if (imsg.ParseHead(head)) {
                        onRecv(*this, imsg, head);
                    }
                } else if (onIdle) {
                    onIdle(*this);
@@ -102,17 +112,18 @@
    return false;
}
bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
{
    return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
}
bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms)
{
    std::lock_guard<std::mutex> lock(mutex_);
    if (!mq_ || RunningNoLock()) {
    auto Recv = [&]() {
        if (mq().Recv(msg, timeout_ms)) {
            if (msg.ParseHead(head)) {
                return true;
            } else {
                msg.Release(shm());
            }
        }
        return false;
    } else {
        return mq_->Recv(msg, timeout_ms);
    }
    };
    return !RunningNoLock() && Recv();
}
src/socket.h
@@ -19,14 +19,18 @@
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
#include "defs.h"
#include "shm_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
using namespace bhome_msg;
class ShmSocket : private boost::noncopyable
{
@@ -35,36 +39,88 @@
public:
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB;
    typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB;
    typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
    typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
    typedef std::function<void(ShmSocket &sock)> IdleCB;
    ShmSocket(Shm &shm, const void *id, const int len);
    ShmSocket(Shm &shm, const MQId &id, const int len);
    ShmSocket(Shm &shm, const int len = 12);
    ~ShmSocket();
    const MQId &id() const { return mq().Id(); }
    Shm &shm() { return shm_; }
    // start recv.
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1);
    bool Start(const RecvCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); }
    bool Start(const RecvBHMsgCB &onData, const IdleCB &onIdle, int nworker = 1)
    {
        return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, onIdle, nworker);
    }
    bool Start(const RecvBHMsgCB &onData, int nworker = 1)
    {
        return Start(onData, IdleCB(), nworker);
    }
    bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
    bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
    bool Stop();
    size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
    size_t Pending() const { return mq().Pending(); }
    bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
    bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
    bool Send(const void *id, const MsgI &imsg, const int timeout_ms)
    {
        return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms);
    }
    //TODO reimplment, using async.
    bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB())
    {
        assert(valid_remote);
        try {
            if (cb) {
                auto RegisterCB = [&]() {
                    std::lock_guard<std::mutex> lock(mutex());
                    async_cbs_.emplace(head.msg_id(), cb);
                };
                return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB);
            } else {
                return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms);
            }
        } catch (...) {
            return false;
        }
    }
    template <class Body>
    bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
    {
        struct State {
            std::mutex mutex;
            std::condition_variable cv;
            bool canceled = false;
        };
        try {
            std::shared_ptr<State> st(new State);
            auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
            auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
                std::unique_lock<std::mutex> lk(st->mutex);
                if (!st->canceled) {
                    reply.swap(msg);
                    reply_head.Swap(&head);
                    st->cv.notify_one();
                } else {
                }
            };
            std::unique_lock<std::mutex> lk(st->mutex);
            bool sendok = Send(remote, head, body, timeout_ms, OnRecv);
            if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
                return true;
            } else {
                st->canceled = true;
                return false;
            }
        } catch (...) {
            return false;
        }
    }
protected:
    const Shm &shm() const { return shm_; }
    Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return *mq_; }
    Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return mq_; }
    std::mutex &mutex() { return mutex_; }
private:
@@ -76,7 +132,8 @@
    std::mutex mutex_;
    std::atomic<bool> run_;
    std::unique_ptr<Queue> mq_;
    Queue mq_;
    std::unordered_map<std::string, RecvCB> async_cbs_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
src/topic_node.cpp
New file
@@ -0,0 +1,322 @@
/*
 * =====================================================================================
 *
 *       Filename:  topic_node.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月07日 09时01分48秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#include "topic_node.h"
#include "bh_util.h"
#include <chrono>
#include <list>
using namespace std::chrono;
using namespace std::chrono_literals;
namespace
{
inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
struct SrcInfo {
    std::vector<BHAddress> route;
    std::string msg_id;
};
class ServerFailedQ
{
    struct FailedMsg {
        steady_clock::time_point xpr;
        std::string remote_;
        BHMsgHead head_;
        MsgRequestTopicReply body_;
        FailedMsg(const std::string &addr, BHMsgHead &&head, MsgRequestTopicReply &&body) :
            xpr(steady_clock::now() + 10s), remote_(addr), head_(std::move(head)), body_(std::move(body)) {}
        bool Expired() { return steady_clock::now() > xpr; }
    };
    typedef std::list<FailedMsg> Queue;
    Synced<Queue> queue_;
public:
    void Push(const std::string &remote, BHMsgHead &&head, MsgRequestTopicReply &&body)
    {
        queue_->emplace_back(remote, std::move(head), std::move(body));
    }
    void TrySend(ShmSocket &socket, const int timeout_ms = 0)
    {
        queue_.Apply([&](Queue &q) {
            if (!q.empty()) {
                auto it = q.begin();
                do {
                    if (it->Expired()) {
                        // it->msg_.Release(socket.shm());
                        it = q.erase(it);
                    } else if (socket.Send(it->remote_.data(), it->head_, it->body_, timeout_ms)) {
                        it = q.erase(it);
                    } else {
                        ++it;
                    }
                } while (it != q.end());
            }
        });
    }
};
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
{
    SockNode().Start();
}
TopicNode::~TopicNode()
{
    StopAll();
    SockNode().Stop();
}
void TopicNode::StopAll()
{
    ServerStop();
    ClientStopWorker();
}
bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms)
{
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    AddRoute(head, SockNode().id());
    MsgI reply;
    DEFER1(reply.Release(shm_););
    BHMsgHead reply_head;
    bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
    r = r && reply_head.type() == kMsgTypeCommonReply;
    r = r && reply.ParseBody(reply_body);
    if (r) {
        info_ = body;
    }
    return r;
}
bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms)
{
    //TODO check registered
    auto head(InitMsgHead(GetType(body), proc_id()));
    AddRoute(head, SockReply().id());
    MsgI reply;
    DEFER1(reply.Release(shm_););
    BHMsgHead reply_head;
    bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
    r = r && reply_head.type() == kMsgTypeCommonReply;
    r = r && reply.ParseBody(reply_body);
    return r;
}
bool TopicNode::ServerStart(const OnRequest &rcb, int nworker)
{
    //TODO check registered
    auto failed_q = std::make_shared<ServerFailedQ>();
    auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
    auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
        if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) {
            MsgRequestTopic req;
            if (imsg.ParseBody(req)) {
                std::string out;
                if (rcb(req.topic(), req.data(), out)) {
                    MsgRequestTopicReply reply_body;
                    reply_body.set_data(std::move(out));
                    BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
                    for (int i = 0; i < head.route_size() - 1; ++i) {
                        reply_head.add_route()->Swap(head.mutable_route(i));
                    }
                    if (!sock.Send(head.route().rbegin()->mq_id().data(), reply_head, reply_body, 10)) {
                        failed_q->Push(head.route().rbegin()->mq_id(), std::move(reply_head), std::move(reply_body));
                    }
                }
            }
        } else {
            // ignored, or dropped
        }
        onIdle(sock);
    };
    return rcb && SockReply().Start(onRecv, onIdle, nworker);
}
bool TopicNode::ServerStop() { return SockReply().Stop(); }
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
{
    MsgI imsg;
    BHMsgHead head;
    if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
        MsgRequestTopic request;
        if (imsg.ParseBody(request)) {
            request.mutable_topic()->swap(topic);
            request.mutable_data()->swap(data);
            SrcInfo *p = new SrcInfo;
            p->route.assign(head.route().begin(), head.route().end());
            p->msg_id = head.msg_id();
            src_info = p;
            return true;
        }
    }
    return false;
}
bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms)
{
    SrcInfo *p = static_cast<SrcInfo *>(src_info);
    DEFER1(delete p);
    if (!p || p->route.empty()) {
        return false;
    }
    MsgRequestTopicReply body;
    body.set_data(data);
    BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id));
    for (unsigned i = 0; i < p->route.size() - 1; ++i) {
        head.add_route()->Swap(&p->route[i]);
    }
    return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms);
}
bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
{
    if (!cb) {
        return false;
    }
    auto onData = [this, cb](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
        if (head.type() == kMsgTypeRequestTopicReply) {
            MsgRequestTopicReply reply;
            if (imsg.ParseBody(reply)) {
                cb(reply.data());
            }
        }
    };
    return SockRequest().Start(onData, nworker);
}
bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); }
bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
{
    auto Call = [&](const void *remote) {
        auto &sock = SockRequest();
        MsgRequestTopic req;
        req.set_topic(topic);
        req.set_data(data, size);
        BHMsgHead head(InitMsgHead(GetType(req), proc_id()));
        AddRoute(head, sock.id());
        if (cb) {
            auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
                if (head.type() == kMsgTypeRequestTopicReply) {
                    MsgRequestTopicReply reply;
                    if (imsg.ParseBody(reply)) {
                        cb(reply.data());
                    }
                }
            };
            return sock.Send(remote, head, req, timeout_ms, onRecv);
        } else {
            return sock.Send(remote, head, req, timeout_ms);
        }
    };
    try {
        BHAddress addr;
        if (ClientQueryRPCTopic(topic, addr, timeout_ms)) {
            return Call(addr.mq_id().data());
        } else {
            return false;
        }
    } catch (...) {
        return false;
    }
}
bool TopicNode::ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
{
    try {
        auto &sock = SockRequest();
        BHAddress addr;
        if (ClientQueryRPCTopic(topic, addr, timeout_ms)) {
            MsgRequestTopic req;
            req.set_topic(topic);
            req.set_data(data, size);
            BHMsgHead head(InitMsgHead(GetType(req), proc_id()));
            AddRoute(head, sock.id());
            MsgI reply;
            DEFER1(reply.Release(shm_););
            BHMsgHead reply_head;
            if (sock.SendAndRecv(addr.mq_id().data(), head, req, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) {
                MsgRequestTopicReply dr;
                if (reply.ParseBody(dr)) {
                    dr.mutable_data()->swap(out);
                    return true;
                } else {
                    printf("error parse reply.\n");
                }
            } else {
                printf("error recv data. line: %d\n", __LINE__);
            }
        } else {
            printf("error recv data. line: %d\n", __LINE__);
        }
    } catch (...) {
        printf("error recv data. line: %d\n", __LINE__);
    }
    return false;
}
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
    auto &sock = SockRequest();
    if (topic_query_cache_.Find(topic, addr)) {
        return true;
    }
    MsgQueryTopic query;
    query.set_topic(topic);
    BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
    AddRoute(head, sock.id());
    MsgI reply;
    DEFER1(reply.Release(shm_));
    BHMsgHead reply_head;
    if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) {
        if (reply_head.type() == kMsgTypeQueryTopicReply) {
            MsgQueryTopicReply rep;
            if (reply.ParseBody(rep)) {
                addr = rep.address();
                if (addr.mq_id().empty()) {
                    return false;
                } else {
                    topic_query_cache_.Update(topic, addr);
                    return true;
                }
            }
        }
    } else {
    }
    return false;
}
src/topic_node.h
New file
@@ -0,0 +1,121 @@
/*
 * =====================================================================================
 *
 *       Filename:  topic_node.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月07日 09时05分26秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef TOPIC_NODE_YVKWA6TF
#define TOPIC_NODE_YVKWA6TF
#include "msg.h"
#include "pubsub.h"
#include "socket.h"
#include <memory>
using namespace bhome_shm;
using namespace bhome_msg;
// a node is a client.
class TopicNode
{
    SharedMemory &shm_;
    MsgRegister info_;
public:
    TopicNode(SharedMemory &shm);
    ~TopicNode();
    bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms);
    bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms);
    // topic rpc server
    typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
    bool ServerStart(OnRequest const &cb, const int nworker = 2);
    bool ServerStop();
    bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
    bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms);
    // topic client
    typedef std::function<void(const std::string &data)> RequestResultCB;
    bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
    bool ClientStopWorker();
    bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB())
    {
        return ClientAsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
    }
    bool ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
    bool ClientSyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
    {
        return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms);
    }
    void StopAll();
private:
    bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    const std::string &proc_id() { return info_.proc().proc_id(); }
    typedef bhome_msg::BHAddress Address;
    class TopicQueryCache
    {
        class Impl
        {
            typedef std::unordered_map<Topic, Address> Store;
            Store store_;
        public:
            bool Find(const Topic &topic, Address &addr)
            {
                auto pos = store_.find(topic);
                if (pos != store_.end()) {
                    addr = pos->second;
                    return true;
                } else {
                    return false;
                }
            }
            bool Update(const Topic &topic, const Address &addr)
            {
                store_[topic] = addr;
                return true;
            }
        };
        Synced<Impl> impl_;
        // Impl &impl()
        // {
        //     thread_local Impl impl;
        //     return impl;
        // }
    public:
        bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
        bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
    };
    // some sockets may be the same one, using functions make it easy to change.
    auto &SockNode() { return sock_node_; }
    auto &SockSub() { return sock_sub_; }
    auto &SockRequest() { return sock_request_; }
    auto &SockReply() { return sock_reply_; }
    ShmSocket sock_node_;
    ShmSocket sock_request_;
    ShmSocket sock_reply_;
    SocketSubscribe sock_sub_;
    TopicQueryCache topic_query_cache_;
};
#endif // end of include guard: TOPIC_NODE_YVKWA6TF
src/topic_reply.cpp
File was deleted
src/topic_reply.h
File was deleted
src/topic_request.cpp
File was deleted
src/topic_request.h
File was deleted
utest/simple_tests.cpp
@@ -36,7 +36,7 @@
    BOOST_CHECK(!p);
    BOOST_CHECK(p.get() == 0);
    const char *str = "basic";
    p               = str;
    p = str;
    BOOST_CHECK(p);
    BOOST_CHECK(p.get() == str);
    p = 0;
@@ -49,7 +49,7 @@
        auto Code = [&](int id) {
            typedef ShmObject<s1000> Int;
            std::string name = std::to_string(id);
            auto a0          = Avail();
            auto a0 = Avail();
            Int i1(shm, name);
            auto a1 = Avail();
            BOOST_CHECK_LT(a1, a0);
@@ -64,7 +64,7 @@
            {
                auto old = Avail();
                void *p  = shm.Alloc(1024);
                void *p = shm.Alloc(1024);
                shm.Dealloc(p);
                BOOST_CHECK_EQUAL(old, Avail());
            }
@@ -80,7 +80,7 @@
    // boost::timer::auto_cpu_timer timer;
    ThreadManager threads;
    int nthread = 1;
    int nloop   = 1;
    int nloop = 1;
    for (int i = 0; i < nthread; ++i) {
        threads.Launch(BasicTest, i, nloop);
    }
@@ -114,7 +114,7 @@
        int ms = i * 100;
        printf("Timeout Test %4d: ", ms);
        boost::timer::auto_cpu_timer timer;
        BHMsg msg;
        MsgI msg;
        bool r = q.Recv(msg, ms);
        BOOST_CHECK(!r);
    }
utest/speed_test.cpp
@@ -28,14 +28,20 @@
    MQId id = boost::uuids::random_generator()();
    const int timeout = 100;
    const uint32_t data_size = 4000;
    const std::string proc_id = "demo_proc";
    auto Writer = [&](int writer_id, uint64_t n) {
        SharedMemory shm(shm_name, mem_size);
        ShmMsgQueue mq(shm, 64);
        std::string str(data_size, 'a');
        MsgI msg;
        MsgRequestTopic body;
        body.set_topic("topic");
        body.set_data(str);
        auto head(InitMsgHead(GetType(body), proc_id));
        msg.MakeRC(shm, head, body);
        DEFER1(msg.Release(shm););
        msg.MakeRC(shm, MakeRequest(mq.Id(), "topic", str.data(), str.size()));
        for (uint64_t i = 0; i < n; ++i) {
            // mq.Send(id, str.data(), str.size(), timeout);
            mq.Send(id, msg, timeout);
@@ -45,8 +51,10 @@
        SharedMemory shm(shm_name, mem_size);
        ShmMsgQueue mq(id, shm, 1000);
        while (*run) {
            BHMsg msg;
            MsgI msg;
            BHMsgHead head;
            if (mq.Recv(msg, timeout)) {
                DEFER1(msg.Release(shm));
                // ok
            } else if (isfork) {
                exit(0); // for forked quit after 1s.
@@ -113,6 +121,8 @@
    const size_t msg_length = 1000;
    std::string msg_content(msg_length, 'a');
    msg_content[20] = '\0';
    const std::string client_proc_id = "client_proc";
    const std::string server_proc_id = "server_proc";
    SharedMemory shm(shm_name, 1024 * 1024 * 50);
    auto Avail = [&]() { return shm.get_free_memory(); };
@@ -121,9 +131,18 @@
    ShmMsgQueue cli(shm, qlen);
    MsgI request_rc;
    request_rc.MakeRC(shm, MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()));
    MsgRequestTopic req_body;
    req_body.set_topic("topic");
    req_body.set_data(msg_content);
    auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
    request_rc.MakeRC(shm, req_head, req_body);
    MsgRequestTopic reply_body;
    reply_body.set_topic("topic");
    reply_body.set_data(msg_content);
    auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id));
    MsgI reply_rc;
    reply_rc.MakeRC(shm, MakeReply("fakemsgid", msg_content.data(), msg_content.size()));
    reply_rc.MakeRC(shm, reply_head, reply_body);
    std::atomic<uint64_t> count(0);
@@ -133,7 +152,11 @@
    auto Client = [&](int cli_id, int nmsg) {
        for (int i = 0; i < nmsg; ++i) {
            auto Req = [&]() {
                return cli.Send(srv.Id(), MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()), 100);
                MsgRequestTopic req_body;
                req_body.set_topic("topic");
                req_body.set_data(msg_content);
                auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
                return cli.Send(srv.Id(), req_head, req_body, 100);
            };
            auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
@@ -141,10 +164,12 @@
                printf("********** client send error.\n");
                continue;
            }
            BHMsg msg;
            MsgI msg;
            BHMsgHead head;
            if (!cli.Recv(msg, 1000)) {
                printf("********** client recv error.\n");
            } else {
                DEFER1(msg.Release(shm));
                ++count;
                auto cur = Now();
                if (last_time.exchange(cur) < cur) {
@@ -158,18 +183,27 @@
    std::atomic<bool> stop(false);
    auto Server = [&]() {
        BHMsg req;
        while (!stop) {
            if (srv.Recv(req, 100) && req.type() == kMsgTypeRequestTopic) {
                auto &mqid = req.route()[0].mq_id();
                MQId src_id;
                memcpy(&src_id, mqid.data(), sizeof(src_id));
                auto Reply = [&]() {
                    return srv.Send(src_id, MakeReply(req.msg_id(), msg_content.data(), msg_content.size()), 100);
                };
                auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
        MsgI req;
        BHMsgHead req_head;
                if (ReplyRC()) {
        while (!stop) {
            if (srv.Recv(req, 100)) {
                DEFER1(req.Release(shm));
                if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
                    auto &mqid = req_head.route()[0].mq_id();
                    MQId src_id;
                    memcpy(&src_id, mqid.data(), sizeof(src_id));
                    auto Reply = [&]() {
                        MsgRequestTopic reply_body;
                        reply_body.set_topic("topic");
                        reply_body.set_data(msg_content);
                        auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
                        return srv.Send(src_id, reply_head, reply_body, 100);
                    };
                    auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
                    if (ReplyRC()) {
                    }
                }
            }
        }
utest/utest.cpp
@@ -1,11 +1,8 @@
#include "center.h"
#include "defs.h"
#include "pubsub.h"
#include "pubsub_center.h"
#include "reqrep_center.h"
#include "socket.h"
#include "topic_reply.h"
#include "topic_request.h"
#include "topic_node.h"
#include "util.h"
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
@@ -15,6 +12,7 @@
#include <string>
#include <thread>
#include <vector>
using namespace bhome_msg;
template <class A, class B>
struct IsSameType {
@@ -79,9 +77,11 @@
    int *flag = shm.find_or_construct<int>("flag")(123);
    printf("flag = %d\n", *flag);
    ++*flag;
    const std::string sub_proc_id = "subscriber";
    const std::string pub_proc_id = "publisher";
    PubSubCenter bus(shm);
    bus.Start();
    BHCenter center(shm);
    center.Start();
    std::this_thread::sleep_for(100ms);
@@ -93,12 +93,12 @@
    const int timeout = 1000;
    auto Sub = [&](int id, const std::vector<std::string> &topics) {
        SocketSubscribe client(shm);
        bool r = client.Subscribe(topics, timeout);
        bool r = client.Subscribe(sub_proc_id, topics, timeout);
        std::mutex mutex;
        std::condition_variable cv;
        std::atomic<uint64_t> n(0);
        auto OnTopicData = [&](const std::string &topic, const std::string &data) {
        auto OnTopicData = [&](const std::string &proc_id, const std::string &topic, const std::string &data) {
            ++total_count;
            auto cur = Now();
@@ -123,7 +123,7 @@
        for (unsigned i = 0; i < nmsg; ++i) {
            std::string data = topic + std::to_string(i) + std::string(1000, '-');
            bool r = provider.Publish(topic, data, timeout);
            bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout);
            if (!r) {
                printf("pub ret: %s\n", r ? "ok" : "fail");
            }
@@ -150,9 +150,8 @@
    std::cout << "end : " << Now();
    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
           total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
    bus.Stop();
}
namespace
{
struct C {
@@ -177,12 +176,24 @@
    printf("flag = %d\n", *flag);
    ++*flag;
    const std::string client_proc_id = "client_proc_";
    const std::string server_proc_id = "server_proc_";
    BHCenter center(shm);
    center.Start();
    std::atomic<bool> run(true);
    auto Client = [&](const std::string &topic, const int nreq) {
        SocketRequest client(shm);
        TopicNode client(shm);
        MsgRegister reg;
        reg.mutable_proc()->set_proc_id(client_proc_id + topic);
        MsgCommonReply reply_body;
        if (!client.Register(reg, reply_body, 1000)) {
            printf("client register failed\n");
            return;
        }
        std::atomic<int> count(0);
        std::string reply;
        auto onRecv = [&](const std::string &rep) {
@@ -191,40 +202,54 @@
                printf("count: %d\n", count.load());
            }
        };
        client.StartWorker(onRecv, 2);
        client.ClientStartWorker(onRecv, 2);
        boost::timer::auto_cpu_timer timer;
        for (int i = 0; i < nreq; ++i) {
            if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) {
            if (!client.ClientAsyncRequest(topic, "data " + std::to_string(i), 1000)) {
                printf("client request failed\n");
                ++count;
            }
            // if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
            //     printf("client request failed\n");
            // } else {
            //     ++count;
            // }
            //     ++count;
        }
        do {
            std::this_thread::yield();
        } while (count.load() < nreq);
        client.Stop();
        client.ClientStopWorker();
        printf("request %s %d done ", topic.c_str(), count.load());
    };
    std::atomic_uint64_t server_msg_count(0);
    auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
        SocketReply server(shm);
        ProcInfo info;
        info.set_id(name);
        info.set_name(name);
        if (!server.Register(info, topics, 100)) {
            printf("register failed\n");
        TopicNode server(shm);
        MsgRegister reg;
        reg.mutable_proc()->set_proc_id(server_proc_id);
        reg.mutable_proc()->set_name(name);
        MsgCommonReply reply_body;
        if (!server.Register(reg, reply_body, 100)) {
            printf("server register failed\n");
            return;
        }
        auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
            ++server_msg_count;
            reply = topic + ':' + data;
            return true;
        };
        server.StartWorker(onData);
        server.ServerStart(onData);
        MsgRegisterRPC rpc;
        for (auto &topic : topics) {
            rpc.add_topics(topic);
        }
        if (!server.RegisterRPC(rpc, reply_body, 100)) {
            printf("server register topic failed\n");
            return;
        }
        while (run) {
            std::this_thread::yield();
        }
@@ -234,7 +259,7 @@
    servers.Launch(Server, "server", topics);
    std::this_thread::sleep_for(100ms);
    for (auto &t : topics) {
        clients.Launch(Client, t, 1000 * 100);
        clients.Launch(Client, t, 1000 * 1);
    }
    clients.WaitAll();
    printf("clients done, server replyed: %d\n", server_msg_count.load());