lichao
2021-05-08 36e6a35a886252516f168b90f7a9a7c1c5177312
center alloc node queue; node just find them.
8个文件已修改
124 ■■■■■ 已修改文件
box/center.cpp 31 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 56 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -102,22 +102,43 @@
    // center name, no relative to shm.
    const std::string &id() const { return id_; }
    void OnNodeInit(const int64_t msg)
    void OnNodeInit(SharedMemory &shm, const int64_t msg)
    {
        MQId ssn = msg;
        if (nodes_.find(ssn) != nodes_.end()) {
            return; // ignore in exists.
        }
        auto UpdateRegInfo = [&](Node &node) {
            for (int i = 0; i < 10; ++i) {
                node->addrs_.insert(ssn + i);
            }
            node->state_.timestamp_ = NowSec() - offline_time_;
            node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
            // create sockets.
            try {
                auto CreateSocket = [](SharedMemory &shm, const MQId id) {
                    ShmSocket tmp(shm, true, id, 16);
                };
                // alloc(-1), node, server, sub, request,
                for (int i = -1; i < 4; ++i) {
                    CreateSocket(shm, ssn + i);
                    node->addrs_.insert(ssn + i);
                }
                return true;
            } catch (...) {
                return false;
            }
        };
        Node node(new NodeInfo);
        UpdateRegInfo(node);
        nodes_[ssn] = node;
        LOG_INFO() << "new node ssn (" << ssn << ") init";
        if (UpdateRegInfo(node)) {
            nodes_[ssn] = node;
            LOG_INFO() << "new node ssn (" << ssn << ") init";
        }
    }
    MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
    {
        if (msg.proc().proc_id() != head.proc_id()) {
@@ -475,7 +496,7 @@
{
    auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) {
        auto &center = *center_ptr;
        center->OnNodeInit(msg.Offset());
        center->OnNodeInit(socket.shm(), msg.Offset());
    };
    auto Nothing = [](ShmSocket &socket) {};
src/defs.cpp
@@ -50,7 +50,7 @@
    MsgI::BindShm(shm);
    typedef std::atomic<MQId> IdSrc;
    IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
    return ShmMsgQueue::SetData(*psrc);
    return psrc && ShmMsgQueue::SetData(*psrc);
}
void SetLastError(const int ec, const std::string &msg)
src/shm_msg_queue.cpp
@@ -36,21 +36,23 @@
    static auto &id = GetData();
    return (++id) * 10;
}
// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
    id_(id),
    queue_(segment, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
{
}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
    id_(NewId()),
    queue_(segment, true, MsgQIdToName(id_)) //, AdjustMQLength(len), segment.get_segment_manager())
ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) :
    id_(id),
    queue_(segment, create_or_else_find, MsgQIdToName(id_))
{
    if (!queue_.IsOk()) {
        throw("error create msgq " + std::to_string(id_));
        throw("error create/find msgq " + std::to_string(id_));
    }
}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
    ShmMsgQueue(NewId(), true, segment, len) {}
ShmMsgQueue::~ShmMsgQueue() {}
src/shm_msg_queue.h
@@ -38,6 +38,7 @@
    static MQId NewId();
    ShmMsgQueue(const MQId id, ShmType &segment, const int len);
    ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len);
    ShmMsgQueue(ShmType &segment, const int len);
    ~ShmMsgQueue();
    static bool Remove(SharedMemory &shm, const MQId id);
src/socket.cpp
@@ -29,6 +29,11 @@
{
    Start();
}
ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
    run_(false), mq_(id, create_or_else_find, shm, len)
{
    Start();
}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
    run_(false), mq_(shm, len)
{
src/socket.h
@@ -48,6 +48,7 @@
    typedef std::function<void(ShmSocket &sock)> IdleCB;
    ShmSocket(Shm &shm, const MQId id, const int len);
    ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len);
    ShmSocket(Shm &shm, const int len = 12);
    ~ShmSocket();
    static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
@@ -142,6 +143,7 @@
    template <class... Rest>
    bool SendImpl(const MQId remote, Rest &&...rest)
    {
        // TODO send alloc request, and pack later, higher bit means alloc?
        send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
        return true;
    }
@@ -151,14 +153,15 @@
    std::atomic<bool> run_;
    Queue mq_;
    class AsyncCBs
    template <class Key>
    class CallbackRecords
    {
        std::unordered_map<std::string, RecvCB> store_;
        std::unordered_map<Key, RecvCB> store_;
    public:
        bool empty() const { return store_.empty(); }
        bool Store(const std::string &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
        bool Pick(const std::string &id, RecvCB &cb)
        bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
        bool Pick(const Key &id, RecvCB &cb)
        {
            auto pos = store_.find(id);
            if (pos != store_.end()) {
@@ -171,9 +174,9 @@
        }
    };
    Synced<AsyncCBs> per_msg_cbs_;
    Synced<CallbackRecords<std::string>> per_msg_cbs_;
    SendQ send_buffer_;
    // Synced<SendQ> send_buffer_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
src/topic_node.cpp
@@ -23,6 +23,9 @@
using namespace std::chrono;
using namespace std::chrono_literals;
const char *const kErrMsgNotInit = "BHome node NOT initialized.";
const char *const kErrMsgNotRegistered = "BHome node NOT registered.";
namespace
{
inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); }
@@ -63,14 +66,25 @@
    MsgI msg;
    msg.OffsetRef() = ssn_id_;
    if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) {
        sockets_.resize(eSockEnd);
        for (int i = eSockStart; i < eSockEnd; ++i) {
            sockets_[i].reset(new ShmSocket(shm_, ssn_id_ + i, kMqLen));
        auto end_time = steady_clock::now() + 3s;
        do {
            try {
                for (int i = eSockStart; i < eSockEnd; ++i) {
                    sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
                }
                break;
            } catch (...) {
                sockets_.clear();
                std::this_thread::sleep_for(100ms);
            }
        } while (steady_clock::now() < end_time);
        if (!sockets_.empty()) {
            // recv msgs to avoid memory leak.
            auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
            SockNode().Start(default_ignore_msg);
            return true;
        }
        // recv msgs to avoid memory leak.
        auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
        SockNode().Start(default_ignore_msg);
        return true;
    }
    return false;
}
@@ -78,7 +92,7 @@
void TopicNode::Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker)
{
    if (!Init()) {
        SetLastError(eError, "BHome Node Not Inited.");
        SetLastError(eError, kErrMsgNotInit);
        return;
    }
    if (nworker < 1) {
@@ -101,7 +115,7 @@
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!Init()) {
        SetLastError(eError, "BHome Node Not Inited.");
        SetLastError(eError, kErrMsgNotInit);
        return false;
    }
@@ -151,7 +165,7 @@
bool TopicNode::Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
@@ -190,7 +204,7 @@
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
@@ -223,7 +237,7 @@
bool TopicNode::QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
    auto &sock = SockNode();
@@ -242,7 +256,7 @@
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
@@ -309,7 +323,7 @@
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
@@ -333,7 +347,7 @@
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
@@ -371,7 +385,7 @@
bool TopicNode::ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &req, std::string &out_msg_id, const RequestResultCB &cb)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
@@ -412,7 +426,7 @@
bool TopicNode::ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &out_proc_id, MsgRequestTopicReply &out_reply, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
@@ -463,7 +477,7 @@
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
@@ -487,7 +501,7 @@
bool TopicNode::Publish(const MsgPublish &pub, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
@@ -518,7 +532,7 @@
bool TopicNode::Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
@@ -567,7 +581,7 @@
bool TopicNode::RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms)
{
    if (!IsOnline()) {
        SetLastError(eNotRegistered, "Not Registered.");
        SetLastError(eNotRegistered, kErrMsgNotRegistered);
        return false;
    }
src/topic_node.h
@@ -122,7 +122,7 @@
           eSockSub,
           eSockEnd,
    };
    std::vector<std::unique_ptr<ShmSocket>> sockets_;
    std::vector<std::shared_ptr<ShmSocket>> sockets_;
    ShmSocket &SockNode() { return *sockets_[eSockNode]; }
    ShmSocket &SockPub() { return *sockets_[eSockPub]; }