lichao
2021-04-23 65a230ec6ccb61c3ce6816730da2106f07f40b4a
add api, Unregister, QueryTopicAddress.
11个文件已修改
253 ■■■■ 已修改文件
api/bhsgo/bhome_node.go 25 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/bhsgo/bhome_node_test.go 18 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg_api.proto 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 124 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
api/bhsgo/bhome_node.go
@@ -8,8 +8,9 @@
import "C"
import (
    bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
    "unsafe"
    bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
)
func getPtr(n *[]byte) unsafe.Pointer {
@@ -46,8 +47,8 @@
    return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHSubscribeTopics), data, reply, timeout_ms)
}
func Heartbeat(topics *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := topics.Marshal()
func Heartbeat(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := proc.Marshal()
    return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHHeartbeat), data, reply, timeout_ms)
}
@@ -55,6 +56,24 @@
    return C.BHHeartbeatEasy(C.int(timeout_ms)) > 0
}
func Unregister(proc *bh.ProcInfo, reply *bh.MsgCommonReply, timeout_ms int) bool {
    data, _ := proc.Marshal()
    return bhApiIn1Out1(C.FBHApiIn1Out1(C.BHUnregister), data, reply, timeout_ms)
}
func QueryTopicAddress(topic *bh.MsgQueryTopic, reply *bh.MsgQueryTopicReply, timeout_ms int) bool {
    data, _ := topic.Marshal()
    creply := unsafe.Pointer(nil)
    creply_len := C.int(0)
    defer C.BHFree(creply, creply_len)
    r := C.BHQueryTopicAddress(getPtr(&data), C.int(len(data)), &creply, &creply_len, C.int(timeout_ms)) > 0
    if r {
        reply.Unmarshal(C.GoBytes(creply, creply_len))
    }
    return r
}
func Publish(pub *bh.MsgPublish, timeout_ms int) bool {
    data, _ := pub.Marshal()
    return C.BHPublish(getPtr(&data), C.int(len(data)), C.int(timeout_ms)) > 0
api/bhsgo/bhome_node_test.go
@@ -1,11 +1,12 @@
package bhsgo
import (
    bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
    "fmt"
    "testing"
    "time"
    "unsafe"
    bh "basic.com/valib/bhshmq.git/proto/source/bhome_msg"
)
func ServerCallback(src unsafe.Pointer, proc_id *string, req *bh.MsgRequestTopic) {
@@ -37,6 +38,21 @@
        t.Log("register error")
        return
    }
    r = Unregister(&proc, &reply, 1000)
    if r {
        fmt.Println("Unregister ok")
    } else {
        fmt.Println("Unregister failed")
    }
    r = Register(&proc, &reply, 1000)
    if r {
        fmt.Println("register ok")
    } else {
        fmt.Println("register failed")
        t.Log("register error")
        return
    }
    r = HeartbeatEasy(1000)
    if r {
box/center.cpp
@@ -166,6 +166,18 @@
        return HandleMsg<MsgCommonReply, Func>(head, op);
    }
    MsgCommonReply Unregister(const BHMsgHead &head, MsgUnregister &msg)
    {
        return HandleMsg(
            head, [&](Node node) -> MsgCommonReply {
                NodeInfo &ni = *node;
                auto now = NowSec(); // just set to offline.
                ni.state_.timestamp_ = now - offline_time_;
                ni.state_.UpdateState(now, offline_time_, kill_time_);
                return MakeReply(eSuccess);
            });
    }
    MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg)
    {
        return HandleMsg(
@@ -206,20 +218,17 @@
        auto query = [&](Node self) -> MsgQueryTopicReply {
            auto pos = service_map_.find(req.topic());
            if (pos != service_map_.end() && !pos->second.empty()) {
                // now just find first one.
                const TopicDest &dest = *(pos->second.begin());
                Node dest_node(dest.weak_node_.lock());
                if (!dest_node) {
                    service_map_.erase(pos);
                    return MakeReply<Reply>(eOffline, "topic server offline.");
                } else if (!Valid(*dest_node)) {
                    return MakeReply<Reply>(eNoRespond, "topic server not responding.");
                } else {
                    MsgQueryTopicReply reply = MakeReply<Reply>(eSuccess);
                    reply.mutable_address()->set_mq_id(dest.mq_);
                    return reply;
                auto &clients = pos->second;
                Reply reply = MakeReply<Reply>(eSuccess);
                for (auto &dest : clients) {
                    Node dest_node(dest.weak_node_.lock());
                    if (dest_node && Valid(*dest_node)) {
                        auto node_addr = reply.add_node_address();
                        node_addr->set_proc_id(dest_node->proc_.proc_id());
                        node_addr->mutable_addr()->set_mq_id(dest.mq_);
                    }
                }
                return reply;
            } else {
                return MakeReply<Reply>(eNotFound, "topic server not found.");
            }
@@ -433,6 +442,7 @@
        switch (head.type()) {
            CASE_ON_MSG_TYPE(Register);
            CASE_ON_MSG_TYPE(Heartbeat);
            CASE_ON_MSG_TYPE(Unregister);
            CASE_ON_MSG_TYPE(RegisterRPC);
            CASE_ON_MSG_TYPE(QueryTopic);
proto/source/bhome_msg.proto
@@ -44,6 +44,8 @@
    // kMsgTypeSubscribeReply = 23;
    kMsgTypeUnsubscribe = 24;
    // kMsgTypeUnsubscribeReply = 25;
    kMsgTypeUnregister = 26;
    // kMsgTypeUnregisterReply = 27;
}
proto/source/bhome_msg_api.proto
@@ -51,6 +51,11 @@
    repeated BHAddress addrs = 2;
}
message MsgUnregister
{
    ProcInfo proc = 1;
}
message MsgHeartbeat
{
    ProcInfo proc = 1;
@@ -62,5 +67,19 @@
message MsgQueryTopicReply {
    ErrorMsg errmsg = 1;
    BHAddress address = 2;
message BHNodeAddress {
    bytes proc_id = 1;
    BHAddress addr = 2;
}
    repeated BHNodeAddress node_address = 2;
}
message MsgQueryProc {
    bytes proc_id = 1;
}
message MsgQueryProcReply {
    ErrorMsg errmsg = 1;
    repeated ProcInfo proc = 2;
}
src/bh_api.cpp
@@ -104,6 +104,10 @@
{
    return BHApiIn1Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms);
}
int BHHeartbeatEasy(const int timeout_ms)
{
@@ -120,6 +124,10 @@
    return BHApiIn1Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
}
int BHQueryTopicAddress(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<MsgQueryTopic, MsgQueryTopicReply>(&TopicNode::QueryTopicAddress, topics, topics_len, reply, reply_len, timeout_ms);
}
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
{
    return BHApiIn1Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
src/bh_api.h
@@ -24,12 +24,24 @@
               int *reply_len,
               const int timeout_ms);
int BHUnregister(const void *proc_info,
                 const int proc_info_len,
                 void **reply,
                 int *reply_len,
                 const int timeout_ms);
int BHRegisterTopics(const void *topics,
                     const int topics_len,
                     void **reply,
                     int *reply_len,
                     const int timeout_ms);
int BHQueryTopicAddress(const void *topics,
                        const int topics_len,
                        void **reply,
                        int *reply_len,
                        const int timeout_ms);
int BHSubscribeTopics(const void *topics,
                      const int topics_len,
                      void **reply,
src/proto.h
@@ -38,6 +38,7 @@
BHOME_SIMPLE_MAP_MSG(CommonReply);
BHOME_SIMPLE_MAP_MSG(Register);
BHOME_SIMPLE_MAP_MSG(Unregister);
BHOME_SIMPLE_MAP_MSG(RegisterRPC);
BHOME_SIMPLE_MAP_MSG(Heartbeat);
BHOME_SIMPLE_MAP_MSG(QueryTopic);
src/topic_node.cpp
@@ -120,6 +120,39 @@
        return IsOnline();
    }
}
bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    info_.Clear();
    state_cas(eStateOnline, eStateOffline);
    auto &sock = SockNode();
    MsgUnregister body;
    body.mutable_proc()->Swap(&proc);
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    AddRoute(head, sock.id());
    auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
        bool r = head.type() == kMsgTypeCommonReply &&
                 msg.ParseBody(rbody) &&
                 IsSuccess(rbody.errmsg().errcode());
        return r;
    };
    if (timeout_ms == 0) {
        auto onResult = [this, CheckResult](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
            MsgCommonReply body;
            CheckResult(imsg, head, body);
        };
        return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
    } else {
        MsgI reply;
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        return r && CheckResult(reply, reply_head, reply_body);
    }
}
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
@@ -152,6 +185,25 @@
    proc.set_proc_id(proc_id());
    MsgCommonReply reply_body;
    return Heartbeat(proc, reply_body, timeout_ms);
}
bool TopicNode::QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        return false;
    }
    auto &sock = SockNode();
    BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
    AddRoute(head, sock.id());
    MsgI reply;
    DEFER1(reply.Release());
    BHMsgHead reply_head;
    return (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
            reply_head.type() == kMsgTypeQueryTopicReply &&
            reply.ParseBody(reply_body));
}
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
@@ -317,32 +369,7 @@
    try {
        BHAddress addr;
#if 1
        return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb);
#else
        if (topic_query_cache_.Pick(req.topic(), addr)) {
            return SendTo(addr, req, cb);
        }
        auto &sock = SockClient();
        MsgQueryTopic query;
        query.set_topic(req.topic());
        BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
        AddRoute(head, sock.id());
        auto onQueryResult = [this, SendTo, req, cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
            MsgQueryTopicReply rep;
            if (head.type() == kMsgTypeQueryTopicReply && imsg.ParseBody(rep)) {
                auto &addr = rep.address();
                if (!addr.mq_id().empty()) {
                    topic_query_cache_.Store(req.topic(), addr);
                    SendTo(addr, req, cb);
                }
            }
        };
        return sock.Send(&BHTopicCenterAddress(), head, query, std::move(onQueryResult));
#endif
    } catch (...) {
        SetLastError(eError, "internal error.");
        return false;
@@ -384,6 +411,22 @@
    return false;
}
int TopicNode::QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms)
{
    int n = 0;
    MsgQueryTopic query;
    query.set_topic(topic);
    MsgQueryTopicReply rep;
    if (QueryTopicAddress(query, rep, timeout_ms)) {
        auto &ls = rep.node_address();
        n = ls.size();
        for (auto &na : ls) {
            addr.push_back(na);
        }
    }
    return n;
}
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms)
{
    if (!IsOnline()) {
@@ -391,35 +434,16 @@
        return false;
    }
    auto &sock = SockClient();
    if (topic_query_cache_.Find(topic, addr)) {
        return true;
    }
    MsgQueryTopic query;
    query.set_topic(topic);
    BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
    AddRoute(head, sock.id());
    MsgI reply;
    DEFER1(reply.Release());
    BHMsgHead reply_head;
    if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) {
        if (reply_head.type() == kMsgTypeQueryTopicReply) {
            MsgQueryTopicReply rep;
            if (reply.ParseBody(rep)) {
                addr = rep.address();
                if (addr.mq_id().empty()) {
                    return false;
                } else {
                    topic_query_cache_.Store(topic, addr);
                    return true;
                }
            }
    std::vector<NodeAddress> lst;
    if (QueryRPCTopics(topic, lst, timeout_ms)) {
        addr = lst.front().addr();
        if (!addr.mq_id().empty()) {
            topic_query_cache_.Store(topic, addr);
            return true;
        }
    } else {
    }
    return false;
}
src/topic_node.h
@@ -41,8 +41,10 @@
    // topic node
    bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
    bool Heartbeat(const int timeout_ms);
    bool QueryTopicAddress(MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
    // topic rpc server
    typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;
@@ -73,6 +75,8 @@
private:
    bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
    typedef MsgQueryTopicReply::BHNodeAddress NodeAddress;
    int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
    const std::string &proc_id() { return info_.proc_id(); }
    typedef BHAddress Address;
utest/api_test.cpp
@@ -219,7 +219,7 @@
        TLMutex mutex;
        // CasMutex mutex;
        auto Lock = [&]() {
            for (int i = 0; i < 1000 * 1000 * 10; ++i) {
            for (int i = 0; i < 10; ++i) {
                mutex.lock();
                mutex.unlock();
            }