lichao
2021-04-09 2197cf91e7a3bd5941327ba630a42946b88f069e
join pub/sub to node; refactor.
4个文件已删除
13个文件已修改
635 ■■■■■ 已修改文件
proto/source/bhome_msg.proto 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.cpp 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.cpp 92 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.h 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 63 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 121 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_rpc.cpp 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_rpc.h 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/util.h 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto
@@ -3,6 +3,7 @@
// import "google/protobuf/descriptor.proto";
import "bhome_msg_api.proto";
import "error_msg.proto";
package bhome.msg;
@@ -18,12 +19,21 @@
    bytes topic = 6; // for request route
}
message BHMsg { // deprecated
    bytes msg_id = 1;
    int64 timestamp = 2;
    int32 type = 3;
    repeated BHAddress route = 4; // for reply and proxy.
    bytes body = 5;
message MsgRequest {
    MsgType type = 1;
    // oneof body;
}
message MsgReply {
    ErrorMsg err_msg = 1;
    // oneof reply
}
message BHMsgBody {
    oneof reqrep {
        MsgRequest request = 1;
        MsgReply reply = 2;
    }
}
enum MsgType {
src/center.cpp
@@ -336,7 +336,7 @@
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
        return [&](auto &&rep_body) {
            auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
            bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10);
            bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100);
            if (!r) {
                printf("send reply failed.\n");
            }
@@ -364,18 +364,20 @@
            MsgPublish pub;
            NodeCenter::Clients clients;
            MsgCommonReply reply;
            MsgI pubmsg;
            if (head.route_size() != 1 || !msg.ParseBody(pub)) {
                return;
            } else if (!center->FindClients(head, pub, clients, reply)) {
                // send error reply.
                MakeReplyer(socket, head, center->id())(reply);
            } else if (pubmsg.MakeRC(socket.shm(), msg)) {
                DEFER1(pubmsg.Release(socket.shm()));
            } else {
                MakeReplyer(socket, head, center->id())(MakeReply(eSuccess));
                if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
                for (auto &cli : clients) {
                    auto node = cli.weak_node_.lock();
                    if (node) {
                        socket.Send(cli.mq_.data(), pubmsg, 10);
                        if (!socket.Send(cli.mq_.data(), msg, 100)) {
                            printf("center route publish failed. need resend.\n");
                        }
                    }
                }
            }
src/msg.cpp
@@ -78,6 +78,14 @@
    return true;
}
bool MsgI::EnableRefCount(SharedMemory &shm)
{
    if (!IsCounted()) {
        count_ = shm.New<RefCount>();
    }
    return IsCounted();
}
int MsgI::Release(SharedMemory &shm)
{
    if (IsCounted()) {
src/msg.h
@@ -105,27 +105,21 @@
    bool IsCounted() const { return static_cast<bool>(count_); }
    template <class Body>
    bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    {
        return Make(shm, Pack(shm, head, body));
    }
    template <class Body>
    bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    inline bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    {
        return MakeRC(shm, Pack(shm, head, body));
    }
    bool MakeRC(SharedMemory &shm, MsgI &a)
    bool EnableRefCount(SharedMemory &shm);
    template <class Body>
    inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
    {
        if (a.IsCounted()) {
            *this = a;
            AddRef();
            return true;
        } else {
            void *p = a.ptr_.get();
            a.ptr_ = 0;
            return MakeRC(shm, p);
        }
        void *p = Pack(shm, head, body);
        auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; };
        return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p);
    }
    bool ParseHead(BHMsgHead &head) const;
    template <class Body>
    bool ParseBody(Body &body) const
src/proto.h
@@ -74,5 +74,5 @@
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
// inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; }
#endif // end of include guard: PROTO_UA9UWKL1
src/pubsub.cpp
File was deleted
src/pubsub.h
File was deleted
src/shm_queue.h
@@ -136,25 +136,8 @@
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
    template <class... Extra>
    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, Extra const &...extra)
    {
        return Send(shm(), remote_id, msg, timeout_ms, extra...);
    }
    template <class Body, class... Extra>
    bool Send(const MQId &remote_id, const BHMsgHead &head, const Body &body, const int timeout_ms, Extra const &...extra)
    {
        MsgI msg;
        if (msg.Make(shm(), head, body)) {
            if (Send(shm(), remote_id, msg, timeout_ms, extra...)) {
                return true;
            } else {
                msg.Release(shm());
            }
        }
        return false;
    }
    template <class... Rest>
    bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
    size_t Pending() const { return data()->size(); }
};
src/socket.cpp
@@ -43,51 +43,37 @@
bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
    auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
        auto Find = [&](RecvCB &cb) {
            std::lock_guard<std::mutex> lock(mutex());
            const std::string &msgid = head.msg_id();
            auto pos = async_cbs_.find(msgid);
            if (pos != async_cbs_.end()) {
                cb.swap(pos->second);
                async_cbs_.erase(pos);
                return true;
            } else {
                return false;
            }
        };
    auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
        RecvCB cb;
        if (Find(cb)) {
        if (async_cbs_->Find(head.msg_id(), cb)) {
            cb(socket, imsg, head);
        } else if (onData) {
            onData(socket, imsg, head);
        } // else ignored, or dropped
    };
    std::lock_guard<std::mutex> lock(mutex_);
    StopNoLock();
    auto RecvProc = [this, onRecv, onIdle]() {
        while (run_) {
            try {
                MsgI imsg;
                if (mq().Recv(imsg, 10)) {
                    DEFER1(imsg.Release(shm()));
                    BHMsgHead head;
                    if (imsg.ParseHead(head)) {
                        onRecv(*this, imsg, head);
                    }
                } else if (onIdle) {
                    onIdle(*this);
    auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() {
        try {
            MsgI imsg;
            if (mq().Recv(imsg, 10)) {
                DEFER1(imsg.Release(shm()));
                BHMsgHead head;
                if (imsg.ParseHead(head)) {
                    onRecvWithPerMsgCB(*this, imsg, head);
                }
            } catch (...) {
            } else if (onIdle) {
                onIdle(*this);
            }
        } catch (...) {
        }
    };
    std::lock_guard<std::mutex> lock(mutex_);
    StopNoLock();
    run_.store(true);
    for (int i = 0; i < nworker; ++i) {
        workers_.emplace_back(RecvProc);
        workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } });
    }
    return true;
}
src/socket.h
@@ -19,6 +19,7 @@
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
#include "bh_util.h"
#include "defs.h"
#include "shm_queue.h"
#include <atomic>
@@ -34,6 +35,15 @@
class ShmSocket : private boost::noncopyable
{
    template <class DoSend>
    inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend)
    {
        bool r = false;
        DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); });
        r = doSend(msg);
        return r;
    }
protected:
    typedef bhome_shm::ShmMsgQueue Queue;
@@ -55,30 +65,28 @@
    bool Stop();
    size_t Pending() const { return mq().Pending(); }
    bool Send(const void *id, const MsgI &imsg, const int timeout_ms)
    bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms)
    {
        return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms);
        assert(valid_remote);
        return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms);
    }
    //TODO reimplment, using async.
    bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB())
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb)
    {
        assert(valid_remote);
        try {
            if (cb) {
                auto RegisterCB = [&]() {
                    std::lock_guard<std::mutex> lock(mutex());
                    async_cbs_.emplace(head.msg_id(), cb);
                };
                return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB);
            } else {
                return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms);
            }
        } catch (...) {
            return false;
        }
        auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); };
        MsgI msg;
        return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
    }
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms)
    {
        auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); };
        MsgI msg;
        return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
    }
    template <class Body>
@@ -133,7 +141,26 @@
    std::atomic<bool> run_;
    Queue mq_;
    std::unordered_map<std::string, RecvCB> async_cbs_;
    class AsyncCBs
    {
        std::unordered_map<std::string, RecvCB> store_;
    public:
        bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; }
        bool Find(const std::string &id, RecvCB &cb)
        {
            auto pos = store_.find(id);
            if (pos != store_.end()) {
                cb.swap(pos->second);
                store_.erase(pos);
                return true;
            } else {
                return false;
            }
        }
    };
    Synced<AsyncCBs> async_cbs_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
src/topic_node.cpp
@@ -76,29 +76,34 @@
    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
{
    SockNode().Start();
    SockClient().Start();
    SockServer().Start();
}
TopicNode::~TopicNode()
{
    StopAll();
    SockNode().Stop();
}
void TopicNode::StopAll()
{
    ServerStop();
    ClientStopWorker();
    SockServer().Stop();
    SockClient().Stop();
    SockNode().Stop();
}
bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms)
{
    auto &sock = SockNode();
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    AddRoute(head, SockNode().id());
    AddRoute(head, sock.id());
    MsgI reply;
    DEFER1(reply.Release(shm_););
    BHMsgHead reply_head;
    bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
    r = r && reply_head.type() == kMsgTypeCommonReply;
    r = r && reply.ParseBody(reply_body);
    bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
    r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
    if (r) {
        info_ = body;
    }
@@ -108,14 +113,15 @@
bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms)
{
    //TODO check registered
    auto &sock = SockServer();
    auto head(InitMsgHead(GetType(body), proc_id()));
    AddRoute(head, SockReply().id());
    AddRoute(head, sock.id());
    MsgI reply;
    DEFER1(reply.Release(shm_););
    BHMsgHead reply_head;
    bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
    bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
    r = r && reply_head.type() == kMsgTypeCommonReply;
    r = r && reply.ParseBody(reply_body);
    return r;
@@ -154,15 +160,17 @@
        onIdle(sock);
    };
    return rcb && SockReply().Start(onRecv, onIdle, nworker);
    auto &sock = SockServer();
    return rcb && sock.Start(onRecv, onIdle, nworker);
}
bool TopicNode::ServerStop() { return SockReply().Stop(); }
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
{
    auto &sock = SockServer();
    MsgI imsg;
    BHMsgHead head;
    if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
    if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
        MsgRequestTopic request;
        if (imsg.ParseBody(request)) {
            request.mutable_topic()->swap(topic);
@@ -179,6 +187,8 @@
bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms)
{
    auto &sock = SockServer();
    SrcInfo *p = static_cast<SrcInfo *>(src_info);
    DEFER1(delete p);
    if (!p || p->route.empty()) {
@@ -192,7 +202,7 @@
        head.add_route()->Swap(&p->route[i]);
    }
    return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms);
    return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms);
}
bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -211,12 +221,12 @@
    return SockRequest().Start(onData, nworker);
}
bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); }
bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
{
    auto Call = [&](const void *remote) {
        auto &sock = SockRequest();
        MsgRequestTopic req;
        req.set_topic(topic);
        req.set_data(data, size);
@@ -254,6 +264,7 @@
{
    try {
        auto &sock = SockRequest();
        BHAddress addr;
        if (ClientQueryRPCTopic(topic, addr, timeout_ms)) {
@@ -290,6 +301,7 @@
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
    auto &sock = SockRequest();
    if (topic_query_cache_.Find(topic, addr)) {
        return true;
    }
@@ -319,4 +331,85 @@
    } else {
    }
    return false;
}
// publish
bool TopicNode::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
{
    try {
        auto &sock = SockPub();
        MsgPublish pub;
        pub.set_topic(topic);
        pub.set_data(data, size);
        BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
        AddRoute(head, sock.id());
        MsgI reply;
        DEFER1(reply.Release(shm()););
        BHMsgHead reply_head;
        MsgCommonReply reply_body;
        return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
               reply_head.type() == kMsgTypeCommonReply &&
               reply.ParseBody(reply_body) &&
               IsSuccess(reply_body.errmsg().errcode());
    } catch (...) {
    }
    return false;
}
// subscribe
bool TopicNode::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
{
    try {
        auto &sock = SockSub();
        MsgSubscribe sub;
        for (auto &topic : topics) {
            sub.add_topics(topic);
        }
        BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
        AddRoute(head, sock.id());
        return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms);
    } catch (...) {
        return false;
    }
}
bool TopicNode::SubscribeStartWorker(const TopicDataCB &tdcb, int nworker)
{
    auto &sock = SockSub();
    auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
        if (head.type() == kMsgTypePublish) {
            MsgPublish pub;
            if (imsg.ParseBody(pub)) {
                tdcb(head.proc_id(), pub.topic(), pub.data());
            }
        } else {
            // ignored, or dropped
        }
    };
    return tdcb && sock.Start(AsyncRecvProc, nworker);
}
bool TopicNode::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
{
    auto &sock = SockSub();
    MsgI msg;
    DEFER1(msg.Release(shm()););
    BHMsgHead head;
    if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
        MsgPublish pub;
        if (msg.ParseBody(pub)) {
            head.mutable_proc_id()->swap(proc_id);
            pub.mutable_topic()->swap(topic);
            pub.mutable_data()->swap(data);
            return true;
        }
    }
    return false;
}
src/topic_node.h
@@ -19,7 +19,6 @@
#define TOPIC_NODE_YVKWA6TF
#include "msg.h"
#include "pubsub.h"
#include "socket.h"
#include <memory>
@@ -32,23 +31,26 @@
    SharedMemory &shm_;
    MsgRegister info_;
    SharedMemory &shm() { return shm_; }
public:
    TopicNode(SharedMemory &shm);
    ~TopicNode();
    void StopAll();
    // topic node
    bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms);
    bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms);
    // topic rpc server
    typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
    bool ServerStart(OnRequest const &cb, const int nworker = 2);
    bool ServerStop();
    bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
    bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms);
    // topic client
    typedef std::function<void(const std::string &data)> RequestResultCB;
    bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
    bool ClientStopWorker();
    bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB())
    {
@@ -60,7 +62,14 @@
        return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms);
    }
    void StopAll();
    // publish
    bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
    // subscribe
    typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
    bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2);
    bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
    bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
private:
    bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
@@ -106,14 +115,17 @@
    // some sockets may be the same one, using functions make it easy to change.
    auto &SockNode() { return sock_node_; }
    auto &SockPub() { return SockNode(); }
    auto &SockSub() { return sock_sub_; }
    auto &SockRequest() { return sock_request_; }
    auto &SockClient() { return SockRequest(); }
    auto &SockReply() { return sock_reply_; }
    auto &SockServer() { return SockReply(); }
    ShmSocket sock_node_;
    ShmSocket sock_request_;
    ShmSocket sock_reply_;
    SocketSubscribe sock_sub_;
    ShmSocket sock_sub_;
    TopicQueryCache topic_query_cache_;
};
src/topic_rpc.cpp
File was deleted
src/topic_rpc.h
File was deleted
utest/speed_test.cpp
@@ -40,6 +40,7 @@
        body.set_data(str);
        auto head(InitMsgHead(GetType(body), proc_id));
        msg.MakeRC(shm, head, body);
        assert(msg.IsCounted());
        DEFER1(msg.Release(shm););
        for (uint64_t i = 0; i < n; ++i) {
@@ -127,8 +128,8 @@
    SharedMemory shm(shm_name, 1024 * 1024 * 50);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    ShmMsgQueue srv(shm, qlen);
    ShmMsgQueue cli(shm, qlen);
    ShmSocket srv(shm, qlen);
    ShmSocket cli(shm, qlen);
    MsgI request_rc;
    MsgRequestTopic req_body;
@@ -156,9 +157,9 @@
                req_body.set_topic("topic");
                req_body.set_data(msg_content);
                auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
                return cli.Send(srv.Id(), req_head, req_body, 100);
                return cli.Send(&srv.id(), req_head, req_body, 100);
            };
            auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
            auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc, 1000); };
            if (!ReqRC()) {
                printf("********** client send error.\n");
@@ -166,7 +167,7 @@
            }
            MsgI msg;
            BHMsgHead head;
            if (!cli.Recv(msg, 1000)) {
            if (!cli.SyncRecv(msg, head, 1000)) {
                printf("********** client recv error.\n");
            } else {
                DEFER1(msg.Release(shm));
@@ -187,8 +188,9 @@
        BHMsgHead req_head;
        while (!stop) {
            if (srv.Recv(req, 100)) {
            if (srv.SyncRecv(req, req_head, 100)) {
                DEFER1(req.Release(shm));
                if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
                    auto &mqid = req_head.route()[0].mq_id();
                    MQId src_id;
@@ -198,9 +200,9 @@
                        reply_body.set_topic("topic");
                        reply_body.set_data(msg_content);
                        auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
                        return srv.Send(src_id, reply_head, reply_body, 100);
                        return srv.Send(&src_id, reply_head, reply_body, 100);
                    };
                    auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
                    auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc, 100); };
                    if (ReplyRC()) {
                    }
utest/utest.cpp
@@ -1,8 +1,5 @@
#include "center.h"
#include "defs.h"
#include "pubsub.h"
#include "socket.h"
#include "topic_node.h"
#include "util.h"
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
@@ -92,8 +89,12 @@
    const uint64_t nmsg = 100 * 2;
    const int timeout = 1000;
    auto Sub = [&](int id, const std::vector<std::string> &topics) {
        SocketSubscribe client(shm);
        bool r = client.Subscribe(sub_proc_id, topics, timeout);
        DemoNode client("client_" + std::to_string(id), shm);
        bool r = client.Subscribe(topics, timeout);
        if (!r) {
            printf("client subscribe failed.\n");
        }
        std::mutex mutex;
        std::condition_variable cv;
@@ -112,18 +113,19 @@
            }
            // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
        };
        client.StartRecv(OnTopicData, 1);
        client.SubscribeStartWorker(OnTopicData, 1);
        std::unique_lock<std::mutex> lk(mutex);
        cv.wait(lk);
    };
    auto Pub = [&](const std::string &topic) {
        SocketPublish provider(shm);
        DemoNode provider("server_" + topic, shm);
        for (unsigned i = 0; i < nmsg; ++i) {
            std::string data = topic + std::to_string(i) + std::string(1000, '-');
            bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout);
            bool r = provider.Publish(topic, data.data(), data.size(), timeout);
            if (!r) {
                printf("pub ret: %s\n", r ? "ok" : "fail");
            }
@@ -184,15 +186,7 @@
    std::atomic<bool> run(true);
    auto Client = [&](const std::string &topic, const int nreq) {
        TopicNode client(shm);
        MsgRegister reg;
        reg.mutable_proc()->set_proc_id(client_proc_id + topic);
        MsgCommonReply reply_body;
        if (!client.Register(reg, reply_body, 1000)) {
            printf("client register failed\n");
            return;
        }
        DemoNode client(client_proc_id + topic, shm);
        std::atomic<int> count(0);
        std::string reply;
@@ -218,21 +212,13 @@
        do {
            std::this_thread::yield();
        } while (count.load() < nreq);
        client.ClientStopWorker();
        client.StopAll();
        printf("request %s %d done ", topic.c_str(), count.load());
    };
    std::atomic_uint64_t server_msg_count(0);
    auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
        TopicNode server(shm);
        MsgRegister reg;
        reg.mutable_proc()->set_proc_id(server_proc_id);
        reg.mutable_proc()->set_name(name);
        MsgCommonReply reply_body;
        if (!server.Register(reg, reply_body, 100)) {
            printf("server register failed\n");
            return;
        }
        DemoNode server(name, shm);
        auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
            ++server_msg_count;
@@ -245,6 +231,7 @@
        for (auto &topic : topics) {
            rpc.add_topics(topic);
        }
        MsgCommonReply reply_body;
        if (!server.RegisterRPC(rpc, reply_body, 100)) {
            printf("server register topic failed\n");
            return;
@@ -262,7 +249,7 @@
        clients.Launch(Client, t, 1000 * 1);
    }
    clients.WaitAll();
    printf("clients done, server replyed: %d\n", server_msg_count.load());
    printf("clients done, server replyed: %ld\n", server_msg_count.load());
    run = false;
    servers.WaitAll();
}
utest/util.h
@@ -20,9 +20,7 @@
#define UTIL_W8A0OA5U
#include "bh_util.h"
#include "msg.h"
#include "shm.h"
#include "shm_queue.h"
#include "topic_node.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/noncopyable.hpp>
#include <boost/test/unit_test.hpp>
@@ -107,4 +105,23 @@
    ~ShmRemover() { SharedMemory::Remove(name_); }
};
class DemoNode : public TopicNode
{
    std::string id_;
public:
    DemoNode(const std::string &id, SharedMemory &shm) :
        TopicNode(shm), id_(id) { Init(); }
    void Init()
    {
        MsgRegister reg;
        reg.mutable_proc()->set_proc_id(id_);
        MsgCommonReply reply_body;
        if (!Register(reg, reply_body, 1000)) {
            printf("node %s register failed\n", id_.c_str());
        }
    }
};
#endif // end of include guard: UTIL_W8A0OA5U