lichao
2021-04-20 1f3729698a131b3f701f67adb6a1258aa1235dce
api server callback change tag to src; refactor.
12个文件已修改
241 ■■■■ 已修改文件
.vscode/settings.json 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/go/bhome_node.go 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/util.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/settings.json
@@ -65,8 +65,14 @@
    },
    "files.exclude": {
        "**/*.un~": true,
        "**/bhshmq_center": true,
        "**/bhshmq_status": true,
        "**/bhshmqbox": true,
        "**/gmon.out": true,
        "api/go/bhome_msg": true,
        "build/": true,
        "debug/": true
        "debug/": true,
        "utest/utest": true
    },
    "cmake.configureOnOpen": false,
    "C_Cpp.default.includePath": [
api/go/bhome_node.go
@@ -147,16 +147,11 @@
}
func ServerCallbackReply(tag unsafe.Pointer, rep *bh.MsgRequestTopicReply) bool {
    data, _ := rep.Marshal()
    return C.BHServerCallbackReply(tag, unsafe.Pointer(&data[0]), C.int(len(data))) > 0
}
type ServecCB func(proc_id *string, req *bh.MsgRequestTopic, reply *bh.MsgRequestTopicReply) bool
type ServecCB func(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic)
type SubDataCB func(proc_id *string, pub *bh.MsgPublish)
type ClientCB func(proc_id *string, msg_id *[]byte, reply *bh.MsgRequestTopicReply)
func cserver_callback(cpid *unsafe.Pointer, cpid_len unsafe.Pointer) {
func cserver_callback(cpid unsafe.Pointer, pid_len C.int, src unsafe.Pointer) {
}
func StartWorker(server_cb ServecCB, sub_cb SubDataCB, client_cb ClientCB) {
src/bh_api.cpp
@@ -246,31 +246,15 @@
    return ProcNode().ServerSendReply(src, rep);
}
int BHCleanUp()
{
    return 0;
}
namespace
{
typedef std::function<bool(const void *, const int)> ServerSender;
} // namespace
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
{
    TopicNode::ServerCB on_req;
    TopicNode::ServerAsyncCB on_req;
    TopicNode::SubDataCB on_sub;
    TopicNode::RequestResultCB on_reply;
    if (server_cb) {
        on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
        on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) {
            std::string sreq(request.SerializeAsString());
            bool r = false;
            ServerSender sender = [&](const void *p, const int len) {
                r = reply.ParseFromArray(p, len);
                return r;
            };
            server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), &sender);
            return r;
            server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src);
        };
    }
    if (sub_cb) {
@@ -289,13 +273,6 @@
    }
    ProcNode().Start(on_req, on_sub, on_reply);
}
int BHServerCallbackReply(const void *tag,
                          const void *data,
                          const int data_len)
{
    auto &sender = *(const ServerSender *) (tag);
    return sender(data, data_len);
}
void BHFree(void *data, int size)
src/bh_api.h
@@ -45,7 +45,7 @@
                                const int proc_id_len,
                                const void *data,
                                const int data_len,
                                const void *tag);
                                void *src);
typedef void (*FClientCallback)(const void *proc_id,
                                const int proc_id_len,
@@ -55,10 +55,6 @@
                                const int data_len);
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb);
int BHServerCallbackReply(const void *tag,
                          const void *data,
                          const int data_len);
int BHHeartbeatEasy(const int timeout_ms);
int BHHeartbeat(const void *proc_info,
src/socket.cpp
@@ -52,7 +52,7 @@
        auto DoRecv = [=] {
            auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
                RecvCB cb;
                if (per_msg_cbs_->Find(head.msg_id(), cb)) {
                if (per_msg_cbs_->Pick(head.msg_id(), cb)) {
                    cb(socket, imsg, head);
                } else if (onData) {
                    onData(socket, imsg, head);
src/socket.h
@@ -66,17 +66,17 @@
    size_t Pending() const { return mq().Pending(); }
    template <class Body>
    bool Send(const void *valid_remote, BHMsgHead &head, Body &body, const RecvCB &cb = RecvCB())
    bool Send(const void *valid_remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
    {
        try {
            if (!cb) {
                return SendImpl(valid_remote, MsgI::Serialize(head, body));
            } else {
                std::string msg_id(head.msg_id());
                per_msg_cbs_->Add(msg_id, cb);
                per_msg_cbs_->Store(msg_id, std::move(cb));
                auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                    RecvCB cb_no_use;
                    per_msg_cbs_->Find(msg_id, cb_no_use);
                    per_msg_cbs_->Pick(msg_id, cb_no_use);
                };
                return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB);
            }
@@ -117,7 +117,7 @@
            };
            std::unique_lock<std::mutex> lk(st->mutex);
            bool sendok = Send(remote, head, body, OnRecv);
            bool sendok = Send(remote, head, body, std::move(OnRecv));
            if (!sendok) {
                printf("send timeout\n");
            }
@@ -154,8 +154,8 @@
    public:
        bool empty() const { return store_.empty(); }
        bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; }
        bool Find(const std::string &id, RecvCB &cb)
        bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
        bool Pick(const std::string &id, RecvCB &cb)
        {
            auto pos = store_.find(id);
            if (pos != store_.end()) {
src/topic_node.cpp
@@ -54,7 +54,7 @@
    }
}
void TopicNode::Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
{
    if (nworker < 1) {
        nworker = 1;
@@ -178,7 +178,7 @@
    }
}
bool TopicNode::ServerStart(const ServerCB &rcb, int nworker)
bool TopicNode::ServerStart(const ServerSyncCB &rcb, int nworker)
{
    auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
        if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
@@ -199,6 +199,23 @@
    auto &sock = SockServer();
    return rcb && sock.Start(onRecv, nworker);
}
bool TopicNode::ServerStart(const ServerAsyncCB &acb, int nworker)
{
    auto onRecv = [this, acb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
        if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
        MsgRequestTopic req;
        if (!imsg.ParseBody(req)) { return; }
        SrcInfo *p = new SrcInfo;
        p->route.assign(head.route().begin(), head.route().end());
        p->msg_id = head.msg_id();
        acb(p, *head.mutable_proc_id(), req);
    };
    auto &sock = SockServer();
    return acb && sock.Start(onRecv, nworker);
}
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
@@ -296,13 +313,15 @@
    };
    try {
        auto &sock = SockClient();
        BHAddress addr;
        if (topic_query_cache_.Find(req.topic(), addr)) {
#if 1
        return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb);
#else
        if (topic_query_cache_.Pick(req.topic(), addr)) {
            return SendTo(addr, req, cb);
        }
        auto &sock = SockClient();
        MsgQueryTopic query;
        query.set_topic(req.topic());
        BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
@@ -313,12 +332,13 @@
            if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) {
                auto &addr = rep.address();
                if (!addr.mq_id().empty()) {
                    topic_query_cache_.Update(req.topic(), addr);
                    topic_query_cache_.Store(req.topic(), addr);
                    SendTo(addr, req, cb);
                }
            }
        };
        return sock.Send(&BHTopicCenterAddress(), head, query, onQueryResult);
        return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult));
#endif
    } catch (...) {
        SetLastError(eError, "internal error.");
@@ -391,7 +411,7 @@
                if (addr.mq_id().empty()) {
                    return false;
                } else {
                    topic_query_cache_.Update(topic, addr);
                    topic_query_cache_.Store(topic, addr);
                    return true;
                }
            }
src/topic_node.h
@@ -34,7 +34,6 @@
    SharedMemory &shm() { return shm_; }
public:
    typedef std::function<void(std::string &proc_id, const void *data, const int len)> DataCB;
    TopicNode(SharedMemory &shm);
    ~TopicNode();
@@ -44,8 +43,10 @@
    bool Heartbeat(const int timeout_ms);
    // topic rpc server
    typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerCB;
    bool ServerStart(ServerCB const &cb, const int nworker = 2);
    typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;
    typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB;
    bool ServerStart(ServerSyncCB const &cb, const int nworker = 2);
    bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2);
    bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
    bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms);
    bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
@@ -65,7 +66,7 @@
    bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms);
    bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
    void Start(ServerCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
    void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
    void Stop();
private:
@@ -77,45 +78,40 @@
    {
        class Impl
        {
            typedef std::unordered_map<Topic, Address> Store;
            Store store_;
            typedef std::unordered_map<Topic, Address> Records;
            Records records_;
        public:
            bool Find(const Topic &topic, Address &addr)
            {
                auto pos = store_.find(topic);
                if (pos != store_.end()) {
                auto pos = records_.find(topic);
                if (pos != records_.end()) {
                    addr = pos->second;
                    return true;
                } else {
                    return false;
                }
            }
            bool Update(const Topic &topic, const Address &addr)
            bool Store(const Topic &topic, const Address &addr)
            {
                store_[topic] = addr;
                records_[topic] = addr;
                return true;
            }
        };
        Synced<Impl> impl_;
        // Impl &impl()
        // {
        //     thread_local Impl impl;
        //     return impl;
        // }
    public:
        bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
        bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
        bool Store(const Topic &topic, const Address &addr) { return impl_->Store(topic, addr); }
    };
    // some sockets may be the same one, using functions make it easy to change.
    auto &SockNode() { return sock_node_; }
    auto &SockPub() { return SockNode(); }
    auto &SockSub() { return sock_sub_; }
    auto &SockClient() { return sock_client_; }
    auto &SockServer() { return sock_server_; }
    ShmSocket &SockNode() { return sock_node_; }
    ShmSocket &SockPub() { return SockNode(); }
    ShmSocket &SockSub() { return sock_sub_; }
    ShmSocket &SockClient() { return sock_client_; }
    ShmSocket &SockServer() { return sock_server_; }
    bool IsRegistered() const { return registered_.load(); }
    ShmSocket sock_node_;
utest/api_test.cpp
@@ -66,7 +66,7 @@
                const int proc_id_len,
                const void *data,
                const int data_len,
                const void *tag)
                void *src)
{
    // printf("ServerProc: ");
    // DEFER1(printf("\n"););
@@ -76,7 +76,7 @@
        reply.set_data(" reply: " + request.data());
        std::string s(reply.SerializeAsString());
        // printf("%s", reply.data().c_str());
        BHServerCallbackReply(tag, s.data(), s.size());
        BHSendReply(src, s.data(), s.size());
        ++Status().nserved_;
    }
}
utest/speed_test.cpp
@@ -119,40 +119,38 @@
    const std::string shm_name("ShmSendRecv");
    ShmRemover auto_remove(shm_name);
    const int qlen = 64;
    const size_t msg_length = 1000;
    const size_t msg_length = 100;
    std::string msg_content(msg_length, 'a');
    msg_content[20] = '\0';
    const std::string client_proc_id = "client_proc";
    const std::string server_proc_id = "server_proc";
    SharedMemory shm(shm_name, 1024 * 1024 * 50);
    SharedMemory shm(shm_name, 1024 * 1024 * 512);
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    ShmSocket srv(shm, qlen);
    ShmSocket cli(shm, qlen);
    MsgI request_rc;
    MsgRequestTopic req_body;
    req_body.set_topic("topic");
    req_body.set_data(msg_content);
    auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
    req_head.add_route()->set_mq_id(&cli.id(), cli.id().size());
    request_rc.MakeRC(shm, req_head, req_body);
    DEFER1(request_rc.Release(shm));
    MsgRequestTopic reply_body;
    reply_body.set_topic("topic");
    reply_body.set_data(msg_content);
    auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id));
    reply_head.add_route()->set_mq_id(&srv.id(), srv.id().size());
    MsgI reply_rc;
    reply_rc.MakeRC(shm, reply_head, reply_body);
    DEFER1(reply_rc.Release(shm));
    int ncli = 1;
    uint64_t nmsg = 1000 * 1000 * 1;
    std::atomic<uint64_t> count(0);
    std::atomic<ptime> last_time(Now() - seconds(1));
    std::atomic<int64_t> last_time(NowSec() - 1);
    std::atomic<uint64_t> last_count(0);
    auto PrintStatus = [&](int64_t cur) {
        std::cout << "time: " << cur;
        printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld\n",
               count.load(), count - last_count.exchange(count), init_avail - Avail());
    };
    auto onRecv = [&](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
        ++count;
        auto cur = NowSec();
        if (last_time.exchange(cur) < cur) {
            PrintStatus(cur);
        }
    };
    cli.Start(onRecv, 2);
    auto Client = [&](int cli_id, int nmsg) {
        for (int i = 0; i < nmsg; ++i) {
@@ -161,28 +159,11 @@
                req_body.set_topic("topic");
                req_body.set_data(msg_content);
                auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
                req_head.add_route()->set_mq_id(&cli.id(), cli.id().size());
                return cli.Send(&srv.id(), req_head, req_body);
            };
            auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc); };
            if (!ReqRC()) {
                printf("********** client send error.\n");
                continue;
            }
            MsgI msg;
            BHMsgHead head;
            if (!cli.SyncRecv(msg, head, 1000)) {
                printf("********** client recv error.\n");
            } else {
                DEFER1(msg.Release(shm));
                ++count;
                auto cur = Now();
                if (last_time.exchange(cur) < cur) {
                    std::cout << "time: " << cur;
                    printf(", total msg:%10ld, speed:[%8ld/s], used mem:%8ld, refcount:%d\n",
                           count.load(), count - last_count.exchange(count), init_avail - Avail(), request_rc.Count());
                }
            }
            Req();
        }
    };
@@ -206,10 +187,7 @@
                        auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
                        return srv.Send(&src_id, reply_head, reply_body);
                    };
                    auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc); };
                    if (ReplyRC()) {
                    }
                    Reply();
                }
            }
        }
@@ -219,13 +197,15 @@
    DEFER1(printf("Request Reply Test:"););
    ThreadManager clients, servers;
    for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
    int ncli = 100 * 1;
    uint64_t nmsg = 100 * 100 * 2;
    for (int i = 0; i < 2; ++i) { servers.Launch(Server); }
    printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
    for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
    clients.WaitAll();
    printf("request ok: %ld\n", count.load());
    do {
        std::this_thread::sleep_for(100ms);
    } while (count.load() < ncli * nmsg);
    PrintStatus(NowSec());
    stop = true;
    servers.WaitAll();
    // BOOST_CHECK_THROW(reply.Count(), int);
utest/utest.cpp
@@ -102,7 +102,7 @@
    Sleep(100ms);
    std::atomic<uint64_t> total_count(0);
    std::atomic<ptime> last_time(Now() - seconds(1));
    std::atomic<int64_t> last_time(NowSec() - 1);
    std::atomic<uint64_t> last_count(0);
    const uint64_t nmsg = 100 * 2;
@@ -125,7 +125,7 @@
        auto OnTopicData = [&](const std::string &proc_id, const MsgPublish &pub) {
            ++total_count;
            auto cur = Now();
            auto cur = NowSec();
            if (last_time.exchange(cur) < cur) {
                std::cout << "time: " << cur;
                printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
@@ -177,7 +177,7 @@
    threads.Launch(Pub, "some_else");
    threads.WaitAll();
    std::cout << "end : " << Now();
    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
           total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
}
@@ -227,7 +227,9 @@
        MsgRequestTopic req;
        req.set_topic(topic);
        req.set_data("data " + std::string(100, 'a'));
        client.ClientStartWorker(onRecv, 2);
        boost::timer::auto_cpu_timer timer;
        for (int i = 0; i < nreq; ++i) {
            std::string msg_id;
@@ -244,7 +246,7 @@
            // ++count;
        }
        do {
            std::this_thread::yield();
            std::this_thread::sleep_for(100ms);
        } while (count.load() < nreq);
        client.Stop();
        printf("request %s %d done ", topic.c_str(), count.load());
@@ -254,12 +256,18 @@
    auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
        DemoNode server(name, shm);
        auto onData = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
        auto onDataSync = [&](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
            ++server_msg_count;
            reply.set_data(request.topic() + ':' + request.data());
            return true;
        };
        server.ServerStart(onData);
        auto onDataAsync = [&](void *src, std::string &proc_id, MsgRequestTopic &request) {
            ++server_msg_count;
            MsgRequestTopicReply reply;
            reply.set_data(request.topic() + ':' + request.data());
            server.ServerSendReply(src, reply);
        };
        server.ServerStart(onDataAsync);
        MsgTopicList rpc;
        for (auto &topic : topics) {
@@ -272,7 +280,7 @@
        }
        while (run) {
            std::this_thread::yield();
            std::this_thread::sleep_for(100ms);
        }
    };
    ThreadManager clients, servers;
utest/util.h
@@ -34,7 +34,6 @@
#include <vector>
using namespace boost::posix_time;
inline ptime Now() { return second_clock::universal_time(); };
using namespace std::chrono_literals;