lichao
2021-04-15 c64c54d8e75b9354dc49a7b6b2d326e7dd59eb37
add api; fix send, socknode mem leak.
1个文件已添加
15个文件已修改
701 ■■■■ 已修改文件
.vscode/launch.json 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/settings.json 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/app_arg.h 59 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_main.cc 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 142 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.cpp 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 129 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 196 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/util.h 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/launch.json
@@ -11,7 +11,7 @@
            "program": "${workspaceFolder}/debug/bin/utest",
            "args": [
                "-t",
                "SRTest"
                "ApiTest"
            ],
            "stopAtEntry": false,
            "cwd": "${workspaceFolder}",
.vscode/settings.json
@@ -60,7 +60,8 @@
        "*.inc": "cpp",
        "strstream": "cpp",
        "unordered_set": "cpp",
        "cfenv": "cpp"
        "cfenv": "cpp",
        "*.ipp": "cpp"
    },
    "files.exclude": {
        "**/*.un~": true,
box/app_arg.h
New file
@@ -0,0 +1,59 @@
#ifndef APP_ARG_OQMELZBX
#define APP_ARG_OQMELZBX
#include <map>
#include <string>
class AppArg
{
    typedef std::map<std::string, std::string> ArgMap;
public:
    AppArg(int argc, const char *argv[]) {
        Parse(argc, argv);
    }
    bool Has(const std::string &key) const {
        return Pos(key) != args.end();
    }
    std::string Get(const std::string &key, const std::string &def = "") const {
        ArgMap::const_iterator pos = Pos(key);
        if (pos != args.end()) {
            return pos->second;
        } else {
            return def;
        }
    }
private:
    void Parse(int argc, const char *argv[]) {
        for (int i = 1; i < argc; ++i) {
            std::string text(argv[i]);
            if (text.substr(0, 2) == "--") {
                text = text.substr(2);
                std::string::size_type sep = text.find('=');
                if (sep == std::string::npos) {
                    args[text].clear();
                } else {
                    args[text.substr(0, sep)] = text.substr(sep+1);
                }
            } else if (text.substr(0,1) == "-") {
                text = text.substr(1);
                args[text].clear();
                if (i+1 < argc) {
                    std::string next(argv[i+1]);
                    if (next.substr(0,1) != "-") {
                        args[text] = next;
                        ++i;
                    }
                }
            }
        }
    }
    ArgMap::const_iterator Pos(const std::string &key) const {
        return args.find(key);
    }
    ArgMap args;
};
#endif // end of include guard: APP_ARG_OQMELZBX
box/center.cpp
@@ -121,20 +121,18 @@
            };
            auto pos = nodes_.find(head.proc_id());
            if (pos == nodes_.end()) { // new client
                Node node(new NodeInfo);
                UpdateRegInfo(node);
                nodes_[node->proc_.proc_id()] = node;
            } else {
            if (pos != nodes_.end()) { // new client
                Node &node = pos->second;
                if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) {
                    // node restarted, release old mq.
                    for (auto &addr : node->addrs_) {
                        cleaner_(addr);
                    }
                    node->addrs_.clear();
                    RemoveNode(node);
                    node.reset(new NodeInfo);
                }
                UpdateRegInfo(node);
            } else {
                Node node(new NodeInfo);
                UpdateRegInfo(node);
                nodes_[node->proc_.proc_id()] = node;
            }
            return MakeReply(eSuccess);
        } catch (...) {
@@ -334,11 +332,7 @@
            auto &cli = *it->second;
            cli.state_.UpdateState(now, offline_time_, kill_time_);
            if (cli.state_.flag_ == kStateKillme) {
                if (cleaner_) {
                    for (auto &addr : cli.addrs_) {
                        cleaner_(addr);
                    }
                }
                RemoveNode(it->second);
                it = nodes_.erase(it);
            } else {
                ++it;
@@ -357,6 +351,30 @@
    {
        auto node = weak.lock();
        return node && Valid(*node);
    }
    void RemoveNode(Node &node)
    {
        auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) {
            for (auto &addr_topics : node_rec) {
                TopicDest dest{addr_topics.first, node};
                for (auto &topic : addr_topics.second) {
                    auto pos = rec_map.find(topic);
                    if (pos != rec_map.end()) {
                        pos->second.erase(dest);
                        if (pos->second.empty()) {
                            rec_map.erase(pos);
                        }
                    }
                }
            }
        };
        EraseMapRec(service_map_, node->services_);
        EraseMapRec(subscribe_map_, node->subscriptions_);
        for (auto &addr : node->addrs_) {
            cleaner_(addr);
        }
        node->addrs_.clear();
    }
    std::string id_; // center proc id;
@@ -403,11 +421,8 @@
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
        return [&](auto &&rep_body) {
            auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
            MsgI msg;
            if (msg.Make(socket.shm(), reply_head, rep_body)) {
                auto &remote = head.route(0).mq_id();
                bool r = socket.Send(remote.data(), msg);
            }
            auto &remote = head.route(0).mq_id();
            socket.Send(remote.data(), reply_head, rep_body);
        };
    };
box/center_main.cc
@@ -15,17 +15,40 @@
 *
 * =====================================================================================
 */
#include "app_arg.h"
#include "box.h"
#include "center.h"
#include "defs.h"
#include "signalhandle.h"
#include <chrono>
#include <thread>
using namespace std::chrono_literals;
int center_main(int argc, const char *argv[])
{
    AppArg args(argc, argv);
    if (args.Has("remove")) {
        BHomeShm().Remove();
        return 0;
    }
    bool run = true;
    auto showStatus = [&]() {
        auto init = BHomeShm().get_free_memory();
        uint64_t idx = 0;
        while (run) {
            std::this_thread::sleep_for(1s);
            printf("%8d shared memory: avail : %ld / %ld\n", ++idx, BHomeShm().get_free_memory(), init);
        }
    };
    std::thread t(showStatus);
    BHCenter center(BHomeShm());
    center.Start();
    printf("center started ...\n");
    WaitForSignals({SIGINT, SIGTERM});
    // BHomeShm().Remove(); // remove ?
    run = false;
    t.join();
    return 0;
}
src/bh_api.cpp
@@ -39,11 +39,25 @@
    }
    size_t size() const { return size_; }
    operator bool() const { return ptr_; }
    bool ReleaseTo(void **pdata, int *psize)
    {
        if (!ptr_) {
            return false;
        }
        if (pdata && psize) {
            *psize = size();
            *pdata = release();
        }
        return true;
    }
};
template <class Msg>
bool PackOutput(const Msg &msg, void **out, int *out_len)
{
    if (!out || !out_len) {
        return true; // not wanted.
    }
    auto size = msg.ByteSizeLong();
    TmpPtr p(size);
    if (!p) {
@@ -51,30 +65,37 @@
        return false;
    }
    msg.SerializePartialToArray(p.get(), size);
    *out = p.release();
    *out_len = size;
    p.ReleaseTo(out, out_len);
    return true;
}
template <class MsgIn, class MsgOut = MsgCommonReply>
bool BHApiIn1Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
                  const void *request,
                  const int request_len,
                  void **reply,
                  int *reply_len,
                  const int timeout_ms)
{
    MsgIn input;
    if (!input.ParseFromArray(request, request_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    MsgOut msg_reply;
    if ((ProcNode().*mfunc)(input, msg_reply, timeout_ms)) {
        return PackOutput(msg_reply, reply, reply_len);
    } else {
        return false;
    }
}
} // namespace
bool BHRegister(const void *proc_info,
                const int proc_info_len,
                void **reply,
                int *reply_len,
                const int timeout_ms)
bool BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
    ProcInfo pi;
    if (!pi.ParseFromArray(proc_info, proc_info_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    MsgCommonReply msg_reply;
    if (ProcNode().Register(pi, msg_reply, timeout_ms)) {
        return PackOutput(msg_reply, reply, reply_len);
    } else {
        return false;
    }
    return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
bool BHHeartBeatEasy(const int timeout_ms)
@@ -82,23 +103,19 @@
    return ProcNode().Heartbeat(timeout_ms);
}
bool BHHeartBeat(const void *proc_info,
                 const int proc_info_len,
                 void **reply,
                 int *reply_len,
                 const int timeout_ms)
bool BHHeartBeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
    ProcInfo pi;
    if (!pi.ParseFromArray(proc_info, proc_info_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    MsgCommonReply msg_reply;
    if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) {
        return PackOutput(msg_reply, reply, reply_len);
    } else {
        return false;
    }
    return BHApiIn1Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
bool BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
}
bool BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
}
bool BHPublish(const void *msgpub,
@@ -125,8 +142,35 @@
    if (ProcNode().RecvSub(proc, pub, timeout_ms)) {
        TmpPtr pproc(proc);
        if (pproc && PackOutput(pub, msgpub, msgpub_len)) {
            *proc_id = pproc.release();
            *proc_id_len = pproc.size();
            pproc.ReleaseTo(proc_id, proc_id_len);
            return true;
        } else {
            SetLastError(ENOMEM, "out of mem");
        }
    }
    return false;
}
bool BHAsyncRequest(const void *request,
                    const int request_len,
                    void **msg_id,
                    int *msg_id_len)
{
    MsgRequestTopic req;
    if (!req.ParseFromArray(request, request_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    std::string str_msg_id;
    MsgRequestTopicReply out_msg;
    if (ProcNode().ClientAsyncRequest(req, str_msg_id)) {
        if (!msg_id || !msg_id_len) {
            return true;
        }
        TmpPtr ptr(str_msg_id);
        if (ptr) {
            ptr.ReleaseTo(msg_id, msg_id_len);
            return true;
        } else {
            SetLastError(ENOMEM, "out of mem");
        }
@@ -152,8 +196,8 @@
    if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
        TmpPtr pproc(proc);
        if (pproc && PackOutput(out_msg, reply, reply_len)) {
            *proc_id = pproc.release();
            *proc_id_len = pproc.size();
            pproc.ReleaseTo(proc_id, proc_id_len);
            return true;
        } else {
            SetLastError(ENOMEM, "out of mem");
        }
@@ -174,9 +218,9 @@
    if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) {
        TmpPtr pproc(proc);
        if (pproc && PackOutput(out_msg, request, request_len)) {
            *proc_id = pproc.release();
            *proc_id_len = pproc.size();
            pproc.ReleaseTo(proc_id, proc_id_len);
            *src = src_info;
            return true;
        } else {
            SetLastError(ENOMEM, "out of mem");
        }
@@ -206,10 +250,11 @@
typedef std::function<bool(const void *, const int)> ServerSender;
} // namespace
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb)
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
{
    TopicNode::ServerCB on_req;
    TopicNode::SubDataCB on_sub;
    TopicNode::RequestResultCB on_reply;
    if (server_cb) {
        on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
            std::string sreq(request.SerializeAsString());
@@ -228,8 +273,16 @@
            sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size());
        };
    }
    if (client_cb) {
        on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) {
            std::string s(rep.SerializeAsString());
            client_cb(head.proc_id().data(), head.proc_id().size(),
                      head.msg_id().data(), head.msg_id().size(),
                      s.data(), s.size());
        };
    }
    ProcNode().Start(on_req, on_sub);
    ProcNode().Start(on_req, on_sub, on_reply);
}
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                           const void *data,
@@ -251,10 +304,7 @@
        std::string err_msg;
        GetLastError(ec, err_msg);
        TmpPtr p(err_msg);
        if (p) {
            *msg = p.release();
            *msg_len = p.size();
        }
        p.ReleaseTo(msg, msg_len);
    }
    return ec;
}
src/bh_api.h
@@ -14,6 +14,18 @@
                int *reply_len,
                const int timeout_ms);
bool BHRegisterTopics(const void *topics,
                      const int topics_len,
                      void **reply,
                      int *reply_len,
                      const int timeout_ms);
bool BHSubscribeTopics(const void *topics,
                       const int topics_len,
                       void **reply,
                       int *reply_len,
                       const int timeout_ms);
typedef void (*FSubDataCallback)(const void *proc_id,
                                 const int proc_id_len,
                                 const void *data,
@@ -25,7 +37,14 @@
                                const int data_len,
                                BHServerCallbackTag *tag);
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb);
typedef void (*FClientCallback)(const void *proc_id,
                                const int proc_id_len,
                                const void *msg_id,
                                const int msg_id_len,
                                const void *data,
                                const int data_len);
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb);
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
                           const void *data,
                           const int data_len);
@@ -47,6 +66,11 @@
               int *msgpub_len,
               const int timeout_ms);
bool BHAsyncRequest(const void *request,
                    const int request_len,
                    void **msg_id,
                    int *msg_id_len);
bool BHRequest(const void *request,
               const int request_len,
               void **proc_id,
src/proto.cpp
@@ -30,6 +30,8 @@
} // namespace
std::string NewMsgId() { return RandId(); }
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id)
{
    return InitMsgHead(type, proc_id, RandId());
src/proto.h
@@ -72,7 +72,7 @@
    SetError(*msg.mutable_errmsg(), err_code, err_str);
    return msg;
}
std::string NewMsgId();
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
// inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
src/sendq.h
@@ -55,7 +55,7 @@
    void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
    {
        using namespace std::chrono_literals;
        Append(addr, msg, Now() + 60s, onExpire);
        Append(addr, msg, Now() + 3s, onExpire);
    }
    bool TrySend(bhome_shm::ShmMsgQueue &mq);
    // bool empty() const { return store_.empty(); }
src/socket.h
@@ -36,7 +36,7 @@
class ShmSocket : private boost::noncopyable
{
    bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
    bool SendImpl(const void *valid_remote, MsgI const &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
    {
        // if (!mq().TrySend(*(MQId *) valid_remote, imsg)) {
        send_buffer_.Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
@@ -69,7 +69,11 @@
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body)
    {
        MsgI msg;
        return msg.Make(shm(), head, body) && SendImpl(valid_remote, msg);
        if (msg.Make(shm(), head, body)) {
            DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
            return SendImpl(valid_remote, msg);
        }
        return false;
    }
    template <class Body>
@@ -78,6 +82,7 @@
        //TODO send_buffer_ need flag, and remove callback on expire.
        MsgI msg;
        if (msg.Make(shm(), head, body)) {
            DEFER1(if (msg.IsCounted()) { msg.Release(shm()); });
            std::string msg_id(head.msg_id());
            per_msg_cbs_->Add(msg_id, cb);
            auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
@@ -85,6 +90,8 @@
                per_msg_cbs_->Find(msg_id, cb_no_use);
            };
            return SendImpl(valid_remote, msg, onExpireRemoveCB);
        } else {
            printf("out of mem?, avail: %ld\n", shm().get_free_memory());
        }
        return false;
    }
src/topic_node.cpp
@@ -35,35 +35,45 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm), registered_(false)
{
    SockNode().Start();
    // recv msgs to avoid memory leak.
    auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
    SockNode().Start(default_ignore_msg);
}
TopicNode::~TopicNode()
{
    Stop();
    SockNode().Stop();
}
void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb)
void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
{
    ServerStart(server_cb, 1);
    SubscribeStartWorker(sub_cb, 1);
    // SockClient().Start();
    if (nworker < 1) {
        nworker = 1;
    } else if (nworker > 16) {
        nworker = 16;
    }
    ServerStart(server_cb, nworker);
    SubscribeStartWorker(sub_cb, nworker);
    ClientStartWorker(client_cb, nworker);
}
void TopicNode::Stop()
{
    SockSub().Stop();
    SockServer().Stop();
    SockClient().Stop();
    SockNode().Stop();
}
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    info_ = proc;
    auto &sock = SockNode();
    MsgRegister body;
    *body.mutable_proc() = proc;
    body.mutable_proc()->Swap(&proc);
    auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); };
    AddId(SockNode().id());
    AddId(SockServer().id());
@@ -74,27 +84,39 @@
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    AddRoute(head, sock.id());
    auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
        bool ok = head.type() == kMsgTypeCommonReply &&
                  msg.ParseBody(rbody) &&
                  IsSuccess(rbody.errmsg().errcode());
        printf("async regisered %s\n", ok ? "ok" : "failed");
        registered_.store(ok);
    };
    if (timeout_ms == 0) {
        return sock.Send(&BHTopicCenterAddress(), head, body);
        auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
            MsgCommonReply body;
            CheckResult(imsg, head, body);
        };
        return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
    } else {
        MsgI reply;
        DEFER1(reply.Release(shm_););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
        if (r && IsSuccess(reply_body.errmsg().errcode())) {
            info_ = body;
            return true;
        if (r) {
            CheckResult(reply, reply_head, reply_body);
        }
        return false;
        return IsRegistered();
    }
}
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsRegistered()) { return false; }
    auto &sock = SockNode();
    MsgHeartbeat body;
    *body.mutable_proc() = proc;
    body.mutable_proc()->Swap(&proc);
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    AddRoute(head, sock.id());
@@ -120,7 +142,8 @@
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
    //TODO check registered
    if (!IsRegistered()) { return false; }
    auto &sock = SockServer();
    MsgRegisterRPC body;
    body.mutable_topics()->Swap(&topics);
@@ -155,11 +178,8 @@
            for (int i = 0; i < head.route_size() - 1; ++i) {
                reply_head.add_route()->Swap(head.mutable_route(i));
            }
            MsgI msg;
            if (msg.Make(sock.shm(), reply_head, reply_body)) {
                auto &remote = head.route().rbegin()->mq_id();
                sock.Send(remote.data(), msg);
            }
            auto &remote = head.route().rbegin()->mq_id();
            sock.Send(remote.data(), reply_head, reply_body);
        }
    };
@@ -169,6 +189,8 @@
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
{
    if (!IsRegistered()) { return false; }
    auto &sock = SockServer();
    MsgI imsg;
@@ -188,6 +210,8 @@
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
{
    if (!IsRegistered()) { return false; }
    auto &sock = SockServer();
    SrcInfo *p = static_cast<SrcInfo *>(src_info);
@@ -211,7 +235,7 @@
        if (head.type() == kMsgTypeRequestTopicReply) {
            MsgRequestTopicReply reply;
            if (imsg.ParseBody(reply)) {
                cb(head.proc_id(), reply);
                cb(head, reply);
            }
        }
    };
@@ -219,37 +243,60 @@
    return SockRequest().Start(onData, nworker);
}
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb)
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
    auto Call = [&](const void *remote) {
        auto &sock = SockRequest();
    if (!IsRegistered()) { return false; }
        BHMsgHead head(InitMsgHead(GetType(req), proc_id()));
    const std::string &msg_id(NewMsgId());
    out_msg_id = msg_id;
    auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) {
        auto &sock = SockClient();
        BHMsgHead head(InitMsgHead(GetType(req), proc_id(), msg_id));
        AddRoute(head, sock.id());
        head.set_topic(req.topic());
        if (cb) {
            auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
                if (head.type() == kMsgTypeRequestTopicReply) {
                    MsgRequestTopicReply reply;
                    if (imsg.ParseBody(reply)) {
                        cb(head.proc_id(), reply);
                        cb(head, reply);
                    }
                }
            };
            return sock.Send(remote, head, req, onRecv);
            return sock.Send(addr.mq_id().data(), head, req, onRecv);
        } else {
            return sock.Send(remote, head, req);
            return sock.Send(addr.mq_id().data(), head, req);
        }
    };
    try {
        auto &sock = SockClient();
        BHAddress addr;
        if (ClientQueryRPCTopic(req.topic(), addr, 1000)) {
            return Call(addr.mq_id().data());
        } else {
            SetLastError(eNotFound, "remote not found.");
            return false;
        if (topic_query_cache_.Find(req.topic(), addr)) {
            return SendTo(addr, req, cb);
        }
        MsgQueryTopic query;
        query.set_topic(req.topic());
        BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
        AddRoute(head, sock.id());
        auto onQueryResult = [this, SendTo, req, cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
            MsgQueryTopicReply rep;
            if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) {
                auto &addr = rep.address();
                if (!addr.mq_id().empty()) {
                    topic_query_cache_.Update(req.topic(), addr);
                    SendTo(addr, req, cb);
                }
            }
        };
        return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult);
    } catch (...) {
        return false;
    }
@@ -257,6 +304,8 @@
bool TopicNode::ClientSyncRequest(const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
{
    if (!IsRegistered()) { return false; }
    try {
        auto &sock = SockRequest();
@@ -264,6 +313,7 @@
        if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
            BHMsgHead head(InitMsgHead(GetType(request), proc_id()));
            AddRoute(head, sock.id());
            head.set_topic(request.topic());
            MsgI reply_msg;
            DEFER1(reply_msg.Release(shm_););
@@ -288,6 +338,8 @@
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
    if (!IsRegistered()) { return false; }
    auto &sock = SockRequest();
    if (topic_query_cache_.Find(topic, addr)) {
@@ -325,6 +377,8 @@
bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms)
{
    if (!IsRegistered()) { return false; }
    try {
        auto &sock = SockPub();
        BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
@@ -349,8 +403,10 @@
// subscribe
bool TopicNode::Subscribe(MsgTopicList &topics, const int timeout_ms)
bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsRegistered()) { return false; }
    try {
        auto &sock = SockSub();
        MsgSubscribe sub;
@@ -364,7 +420,6 @@
            MsgI reply;
            DEFER1(reply.Release(shm()););
            BHMsgHead reply_head;
            MsgCommonReply reply_body;
            return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
                   reply_head.type() == kMsgTypeCommonReply &&
                   reply.ParseBody(reply_body) &&
@@ -396,6 +451,8 @@
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
{
    if (!IsRegistered()) { return false; }
    auto &sock = SockSub();
    MsgI msg;
    DEFER1(msg.Release(shm()););
src/topic_node.h
@@ -29,7 +29,7 @@
class TopicNode
{
    SharedMemory &shm_;
    MsgRegister info_;
    ProcInfo info_;
    SharedMemory &shm() { return shm_; }
@@ -51,9 +51,9 @@
    bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
    // topic client
    typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB;
    typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB;
    bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
    bool ClientAsyncRequest(const MsgRequestTopic &request, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientAsyncRequest(const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
    // publish
@@ -62,15 +62,15 @@
    // subscribe
    typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB;
    bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2);
    bool Subscribe(MsgTopicList &topics, const int timeout_ms);
    bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms);
    bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
    void Start(ServerCB const &server_cb, SubDataCB const &sub_cb);
    void Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
    void Stop();
private:
    bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    const std::string &proc_id() { return info_.proc().proc_id(); }
    const std::string &proc_id() { return info_.proc_id(); }
    typedef bhome_msg::BHAddress Address;
    class TopicQueryCache
@@ -118,7 +118,9 @@
    auto &SockClient() { return SockRequest(); }
    auto &SockReply() { return sock_reply_; }
    auto &SockServer() { return SockReply(); }
    bool IsRegistered() const { return registered_.load(); }
    std::atomic<bool> registered_;
    ShmSocket sock_node_;
    ShmSocket sock_request_;
    ShmSocket sock_reply_;
utest/api_test.cpp
@@ -17,11 +17,73 @@
 */
#include "bh_api.h"
#include "util.h"
#include <atomic>
class DemoClient
using namespace bhome::msg;
namespace
{
public:
typedef std::atomic<uint64_t> Number;
struct MsgStatus {
    Number nrequest_;
    Number nreply_;
    Number nserved_;
    MsgStatus() :
        nrequest_(0), nreply_(0), nserved_(0) {}
};
MsgStatus &Status()
{
    static MsgStatus st;
    return st;
}
} // namespace
void SubRecvProc(const void *proc_id,
                 const int proc_id_len,
                 const void *data,
                 const int data_len)
{
    std::string proc((const char *) proc_id, proc_id_len);
    MsgPublish pub;
    pub.ParseFromArray(data, data_len);
    // printf("Sub data, %s : %s\n", pub.topic().c_str(), pub.data().c_str());
}
void ServerProc(const void *proc_id,
                const int proc_id_len,
                const void *data,
                const int data_len,
                BHServerCallbackTag *tag)
{
    // printf("ServerProc: ");
    // DEFER1(printf("\n"););
    MsgRequestTopic request;
    if (request.ParseFromArray(data, data_len)) {
        MsgRequestTopicReply reply;
        reply.set_data(" reply: " + request.data());
        std::string s(reply.SerializeAsString());
        // printf("%s", reply.data().c_str());
        BHServerCallbackReply(tag, s.data(), s.size());
        ++Status().nserved_;
    }
}
void ClientProc(const void *proc_id,
                const int proc_id_len,
                const void *msg_id,
                const int msg_id_len,
                const void *data,
                const int data_len)
{
    std::string proc((const char *) proc_id, proc_id_len);
    MsgRequestTopicReply reply;
    if (reply.ParseFromArray(data, data_len)) {
        ++Status().nreply_;
    }
    // printf("client Recv reply : %s\n", reply.data().c_str());
}
BOOST_AUTO_TEST_CASE(ApiTest)
{
@@ -36,19 +98,125 @@
           nsec, nhour, nday, years);
    std::chrono::steady_clock::duration a(123456);
    printf("nowsec: %ld\n", NowSec());
    // for (int i = 0; i < 5; ++i) {
    //     std::this_thread::sleep_for(1s);
    //     printf("nowsec: %ld\n", NowSec());
    // }
    printf("maxsec: %ld\n", CountSeconds(max_time));
    ProcInfo proc;
    proc.set_proc_id("demo_client");
    proc.set_public_info("public info of demo_client. etc...");
    std::string proc_buf(proc.SerializeAsString());
    void *reply = 0;
    int reply_len = 0;
    bool r = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 1000);
    printf("register %s\n", r ? "ok" : "failed");
    bool reg = false;
    for (int i = 0; i < 10 && !reg; ++i) {
        ProcInfo proc;
        proc.set_proc_id("demo_client");
        proc.set_public_info("public info of demo_client. etc...");
        std::string proc_buf(proc.SerializeAsString());
        void *reply = 0;
        int reply_len = 0;
        reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
        printf("register %s\n", reg ? "ok" : "failed");
        BHFree(reply, reply_len);
        Sleep(1s);
    }
    const std::string topic_ = "topic_";
    {
        MsgTopicList topics;
        for (int i = 0; i < 10; ++i) {
            topics.add_topic_list(topic_ + std::to_string(i));
        }
        std::string s = topics.SerializeAsString();
        void *reply = 0;
        int reply_len = 0;
        bool r = BHRegisterTopics(s.data(), s.size(), &reply, &reply_len, 1000);
        BHFree(reply, reply_len);
        // printf("register topic : %s\n", r ? "ok" : "failed");
        Sleep(1s);
    }
    {
        MsgTopicList topics;
        for (int i = 0; i < 10; ++i) {
            topics.add_topic_list(topic_ + std::to_string(i * 2));
        }
        std::string s = topics.SerializeAsString();
        void *reply = 0;
        int reply_len = 0;
        bool r = BHSubscribeTopics(s.data(), s.size(), &reply, &reply_len, 1000);
        BHFree(reply, reply_len);
        printf("subscribe topic : %s\n", r ? "ok" : "failed");
    }
    BHStartWorker(&ServerProc, &SubRecvProc, &ClientProc);
    {
        for (int i = 0; i < 1; ++i) {
            MsgPublish pub;
            pub.set_topic(topic_ + std::to_string(i));
            pub.set_data("pub_data_" + std::string(1024 * 1024, 'a'));
            std::string s(pub.SerializeAsString());
            BHPublish(s.data(), s.size(), 0);
            // Sleep(1s);
        }
    }
    auto asyncRequest = [&](uint64_t nreq) {
        for (uint64_t i = 0; i < nreq; ++i) {
            MsgRequestTopic req;
            req.set_topic(topic_ + std::to_string(0));
            req.set_data("request_data_" + std::to_string(i));
            std::string s(req.SerializeAsString());
            void *msg_id = 0;
            int len = 0;
            bool r = BHAsyncRequest(s.data(), s.size(), 0, 0);
            DEFER1(BHFree(msg_id, len););
            if (r) {
                ++Status().nrequest_;
            } else {
                printf("request topic : %s\n", r ? "ok" : "failed");
            }
        }
    };
    auto showStatus = [](std::atomic<bool> *run) {
        int64_t last = 0;
        while (*run) {
            auto &st = Status();
            std::this_thread::sleep_for(1s);
            int cur = st.nreply_.load();
            printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld, speed %8ld\n", st.nrequest_.load(), st.nserved_.load(), cur, cur - last);
            last = cur;
        }
    };
    auto hb = [](std::atomic<bool> *run) {
        while (*run) {
            BHHeartBeatEasy(0);
            std::this_thread::sleep_for(1s);
        }
    };
    std::atomic<bool> run(true);
    ThreadManager threads;
    boost::timer::auto_cpu_timer timer;
    threads.Launch(hb, &run);
    // threads.Launch(showStatus, &run);
    int ncli = 10;
    const uint64_t nreq = 1000 * 100;
    for (int i = 0; i < ncli; ++i) {
        threads.Launch(asyncRequest, nreq);
    }
    int same = 0;
    int64_t last = 0;
    while (last < nreq * ncli && same < 3) {
        Sleep(1s);
        auto cur = Status().nreply_.load();
        if (last == cur) {
            ++same;
        } else {
            last = cur;
            same = 0;
        }
    }
    run = false;
    threads.WaitAll();
    auto &st = Status();
    printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
}
utest/utest.cpp
@@ -99,7 +99,7 @@
    BHCenter center(shm);
    center.Start();
    std::this_thread::sleep_for(100ms);
    Sleep(100ms);
    std::atomic<uint64_t> total_count(0);
    std::atomic<ptime> last_time(Now() - seconds(1));
@@ -113,7 +113,8 @@
        for (auto &t : topics) {
            tlist.add_topic_list(t);
        }
        bool r = client.Subscribe(tlist, timeout);
        MsgCommonReply reply_body;
        bool r = client.Subscribe(tlist, reply_body, timeout);
        if (!r) {
            printf("client subscribe failed.\n");
        }
@@ -149,7 +150,7 @@
            MsgPublish pub;
            pub.set_topic(topic);
            pub.set_data(data);
            bool r = provider.Publish(pub, timeout);
            bool r = provider.Publish(pub, 0);
            if (!r) {
                static std::atomic<int> an(0);
                int n = ++an;
@@ -169,7 +170,7 @@
        part.push_back(topics[i]);
        threads.Launch(Sub, i, topics);
    }
    std::this_thread::sleep_for(100ms);
    Sleep(100ms);
    for (auto &topic : topics) {
        threads.Launch(Pub, topic);
    }
@@ -217,7 +218,7 @@
        std::atomic<int> count(0);
        std::string reply;
        auto onRecv = [&](const std::string &proc_id, const MsgRequestTopicReply &msg) {
        auto onRecv = [&](const BHMsgHead &head, const MsgRequestTopicReply &msg) {
            reply = msg.data();
            if (++count >= nreq) {
                printf("count: %d\n", count.load());
@@ -229,7 +230,8 @@
            MsgRequestTopic req;
            req.set_topic(topic);
            req.set_data("data " + std::to_string(i));
            if (!client.ClientAsyncRequest(req)) {
            std::string msg_id;
            if (!client.ClientAsyncRequest(req, msg_id)) {
                printf("client request failed\n");
                ++count;
            }
@@ -274,9 +276,9 @@
    ThreadManager clients, servers;
    std::vector<Topic> topics = {"topic1", "topic2"};
    servers.Launch(Server, "server", topics);
    std::this_thread::sleep_for(100ms);
    Sleep(100ms);
    for (auto &t : topics) {
        clients.Launch(Client, t, 1000 * 1);
        clients.Launch(Client, t, 1000 * 100);
    }
    clients.WaitAll();
    printf("clients done, server replyed: %ld\n", server_msg_count.load());
@@ -302,18 +304,16 @@
        };
        Check();
        for (int i = 0; i < 3; ++i) {
            std::this_thread::sleep_for(1s);
            Sleep(1s);
            Check();
        }
        printf("sleep 4\n");
        std::this_thread::sleep_for(4s);
        Sleep(4s);
        for (int i = 0; i < 2; ++i) {
            std::this_thread::sleep_for(1s);
            Sleep(1s);
            Check();
        }
    }
    printf("sleep 8\n");
    std::this_thread::sleep_for(8s);
    Sleep(8s);
}
inline int MyMin(int a, int b)
{
utest/util.h
@@ -38,6 +38,13 @@
using namespace std::chrono_literals;
template <class D>
inline void Sleep(D d)
{
    printf("sleep for %ld ms\n", std::chrono::duration_cast<std::chrono::milliseconds>(d).count());
    std::this_thread::sleep_for(d);
}
typedef std::function<void(void)> FuncVV;
class ScopeCall : private boost::noncopyable