lichao
2021-05-18 3788226ee9332945e90066b58f2b85026c2a0460
change node init, no shm lock any more.
11个文件已修改
347 ■■■■■ 已修改文件
box/center.cpp 64 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.h 134 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/robust_test.cpp 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -212,30 +212,27 @@
    // center name, no relative to shm.
    const std::string &id() const { return id_; }
    void OnNodeInit(ShmSocket &socket, const int64_t val)
    int64_t OnNodeInit(ShmSocket &socket, const int64_t val)
    {
        LOG_FUNCTION;
        SharedMemory &shm = socket.shm();
        MQId ssn = (val >> 4) & MaskBits(60);
        MQId ssn = (val >> 4) & MaskBits(56);
        int reply = EncodeCmd(eCmdNodeInitReply);
        if (nodes_.find(ssn) != nodes_.end()) {
            return; // ignore in exists.
            return reply; // ignore if exists.
        }
        auto UpdateRegInfo = [&](Node &node) {
            node->state_.timestamp_ = NowSec() - offline_time_;
            node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
            // create sockets.
            const int nsocks = 4;
            try {
                for (int i = 0; i < nsocks; ++i) {
                    ShmSocket tmp(shm, true, ssn + i, 16);
                    node->addrs_.emplace(ssn + i, tmp.AbsAddr());
                }
                ShmSocket tmp(shm, true, ssn, 16);
                node->addrs_.emplace(ssn, tmp.AbsAddr());
                return true;
            } catch (...) {
                for (int i = 0; i < nsocks; ++i) {
                    ShmSocket::Remove(shm, ssn + i);
                }
                return false;
            }
        };
@@ -243,25 +240,29 @@
        auto PrepareProcInit = [&](Node &node) {
            bool r = false;
            ShmMsg init_msg;
            if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) {
                // 31bit pointer, 4bit cmd+flag
                int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply);
                r = SendAllocReply(socket, {ssn, node->addrs_[ssn]}, reply, init_msg);
            }
            return r;
            DEFER1(init_msg.Release());
            MsgProcInit body;
            auto head = InitMsgHead(GetType(body), id(), ssn);
            return init_msg.Make(GetAllocSize(CalcAllocIndex(900))) &&
                   init_msg.Fill(ShmMsg::Serialize(head, body)) &&
                   SendAllocMsg(socket, {ssn, node->addrs_[ssn]}, init_msg);
        };
        Node node(new NodeInfo);
        if (UpdateRegInfo(node) && PrepareProcInit(node)) {
            reply |= (node->addrs_[ssn] << 4);
            nodes_[ssn] = node;
            LOG_INFO() << "new node ssn (" << ssn << ") init";
        } else {
            for (int i = 0; i < 10; ++i) {
                ShmSocket::Remove(shm, ssn + i);
            ShmSocket::Remove(shm, ssn);
            }
        return reply;
        }
    void RecordMsg(const MsgI &msg)
    {
        msg.reset_managed(true);
        msgs_.RecordMsg(msg);
    }
    void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); }
    bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg)
    {
@@ -325,7 +326,6 @@
        assert(IsCmd(val));
        int cmd = DecodeCmd(val);
        switch (cmd) {
        case eCmdNodeInit: OnNodeInit(socket, val); break;
        case eCmdAllocRequest0: OnAlloc(socket, val); break;
        case eCmdFree: OnFree(socket, val); break;
        default: return false;
@@ -336,10 +336,28 @@
    MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg)
    {
        LOG_DEBUG() << "center got proc init.";
        auto pos = nodes_.find(head.ssn_id());
        if (pos == nodes_.end()) {
            return MakeReply<MsgProcInitReply>(eNotFound, "Node Not Initialised");
        }
        auto index = procs_.Put(head.proc_id(), head.ssn_id());
        auto reply(MakeReply<MsgProcInitReply>(eSuccess));
        reply.set_proc_index(index);
        auto &node = pos->second;
        try {
            for (int i = 0; i < msg.extra_mq_num(); ++i) {
                ShmSocket tmp(BHomeShm(), true, head.ssn_id() + i + 1, 16);
                node->addrs_.emplace(tmp.id(), tmp.AbsAddr());
                auto addr = reply.add_extra_mqs();
                addr->set_mq_id(tmp.id());
                addr->set_abs_addr(tmp.AbsAddr());
            }
        return reply;
        } catch (...) {
            LOG_ERROR() << "proc init create mq error";
            return MakeReply<MsgProcInitReply>(eError, "Create mq failed.");
        }
    }
    MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
@@ -711,6 +729,10 @@
    // now we can talk.
    auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
        auto &center = *center_ptr;
        auto onInit = [&](const int64_t request) {
            return center->OnNodeInit(socket, request);
        };
        BHCenterHandleInit(onInit);
        center->OnTimer();
    };
proto/source/bhome_msg.proto
@@ -62,11 +62,14 @@
    MsgTopicList topics = 1;
}
message MsgProcInit{ } // proc_id is in header.
message MsgProcInit{
    int32 extra_mq_num = 1;
} // proc_id is in header.
message MsgProcInitReply {
    ErrorMsg errmsg = 1;
    int32 proc_index = 2;
    repeated BHAddress extra_mqs = 3;
}
service TopicRPC {
src/defs.cpp
@@ -132,7 +132,6 @@
            InitMQ(info.mq_sender_, NextId());
            InitMQ(info.mq_center_, NextId());
            InitMQ(info.mq_bus_, NextId());
            InitMQ(info.mq_init_, NextId());
            pmeta->tag_ = kCenterInfoTag;
            return true;
@@ -144,7 +143,14 @@
const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; }
const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; }
const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; }
const MQInfo &BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_; }
bool BHNodeInit(const int64_t request, int64_t &reply)
{
    return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply);
}
void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq)
{
    GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq);
}
int64_t CalcAllocIndex(int64_t size)
{
src/defs.h
@@ -19,6 +19,7 @@
#ifndef DEFS_KP8LKGD0
#define DEFS_KP8LKGD0
#include "robust.h"
#include <atomic>
#include <string>
@@ -35,8 +36,8 @@
struct CenterInfo {
    MQInfo mq_center_;
    MQInfo mq_bus_;
    MQInfo mq_init_;
    MQInfo mq_sender_;
    robust::AtomicReqRep init_rr_;
    std::atomic<MQId> mqid_;
    CenterInfo() :
        mqid_(100000) {}
@@ -62,6 +63,7 @@
const MQInfo &BHGlobalSenderAddress();
const MQInfo &BHTopicCenterAddress();
const MQInfo &BHTopicBusAddress();
const MQInfo &BHCenterReplyAddress();
bool BHNodeInit(const int64_t request, int64_t &reply);
void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq);
#endif // end of include guard: DEFS_KP8LKGD0
src/msg.cpp
@@ -37,8 +37,12 @@
    }
    auto n = meta()->count_.Dec();
    if (n == 0) {
        if (meta()->managed_) {
        int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree);
        Sender().Send(BHTopicCenterAddress(), free_cmd);
        } else {
            Free();
        }
    } else if (n < 0) {
        LOG_FATAL() << "error double release data.";
        throw std::runtime_error("double release msg.");
src/msg.h
@@ -36,7 +36,10 @@
class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg>
{
public:
    static inline SharedMemory &shm() { return GetData(); }
private:
    static ShmSocket &Sender();
    // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
@@ -74,6 +77,7 @@
        const uint32_t size_ = 0;
        const int64_t id_ = 0;
        std::atomic<int64_t> timestamp_;
        bool managed_ = false;
        Meta(uint32_t size) :
            size_(size), id_(NewId()), timestamp_(NowSec()) {}
    };
@@ -151,6 +155,10 @@
    int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
    int Release();
    void Free();
    void reset_managed(const bool val) const
    {
        if (valid()) { meta()->managed_ = val; }
    }
    template <class Body>
    inline bool Make(const BHMsgHead &head, const Body &body)
@@ -224,7 +232,7 @@
constexpr inline bool IsCmd(int64_t msg) { return (msg & 1) != 0; }
// int64_t pack format: cmd data ,3bit cmd, 1bit flag.
enum MsgCmd {
    eCmdNodeInit = 0,      // upto 59bit ssn id
    eCmdNodeInit = 0,      // upto 56bit ssn id
    eCmdNodeInitReply = 1, // 31bit proc index,
    eCmdAllocRequest0 = 2, // 8bit size, 4bit socket index, 16bit proc index, 28bit id
    eCmdAllocReply0 = 3,   // 31bit ptr, 28bit id,
src/robust.h
@@ -177,74 +177,6 @@
    Lock &l_;
};
template <class D, class Alloc = std::allocator<D>>
class CircularBuffer
{
    typedef uint32_t size_type;
    typedef uint32_t count_type;
    typedef uint64_t meta_type;
    static size_type Pos(meta_type meta) { return meta & 0xFFFFFFFF; }
    static count_type Count(meta_type meta) { return meta >> 32; }
    static meta_type Meta(meta_type count, size_type pos) { return (count << 32) | pos; }
public:
    typedef D Data;
    CircularBuffer(const size_type cap) :
        CircularBuffer(cap, Alloc()) {}
    CircularBuffer(const size_type cap, Alloc const &al) :
        capacity_(cap + 1), mhead_(0), mtail_(0), al_(al), buf(al_.allocate(capacity_))
    {
        if (!buf) {
            throw("robust CircularBuffer allocate error: alloc buffer failed, out of mem!");
        } else {
            memset(&buf[0], 0, sizeof(D) * capacity_);
        }
    }
    ~CircularBuffer() { al_.deallocate(buf, capacity_); }
    bool push_back(const Data d)
    {
        auto old = mtail();
        auto pos = Pos(old);
        auto full = ((capacity_ + pos + 1 - head()) % capacity_ == 0);
        if (!full) {
            buf[pos] = d;
            return mtail_.compare_exchange_strong(old, next(old));
        }
        return false;
    }
    bool pop_front(Data &d)
    {
        auto old = mhead();
        auto pos = Pos(old);
        if (!(pos == tail())) {
            d = buf[pos];
            return mhead_.compare_exchange_strong(old, next(old));
        } else {
            return false;
        }
    }
private:
    CircularBuffer(const CircularBuffer &);
    CircularBuffer(CircularBuffer &&);
    CircularBuffer &operator=(const CircularBuffer &) = delete;
    CircularBuffer &operator=(CircularBuffer &&) = delete;
    meta_type next(meta_type meta) const { return Meta(Count(meta) + 1, (Pos(meta) + 1) % capacity_); }
    size_type head() const { return Pos(mhead()); }
    size_type tail() const { return Pos(mtail()); }
    meta_type mhead() const { return mhead_.load(); }
    meta_type mtail() const { return mtail_.load(); }
    // data
    const size_type capacity_;
    std::atomic<meta_type> mhead_;
    std::atomic<meta_type> mtail_;
    Alloc al_;
    typename Alloc::pointer buf = nullptr;
};
template <unsigned PowerSize = 4, class Int = int64_t>
class AtomicQueue
{
@@ -339,5 +271,71 @@
    AData buf;
};
class AtomicReqRep
{
public:
    typedef int64_t Data;
    typedef std::function<Data(const Data)> Handler;
    bool ClientRequest(const Data request, Data &reply)
    {
        auto end_time = now() + 3s;
        do {
            Data cur = data_.load();
            if (GetState(cur) == eStateFree &&
                DataCas(cur, Encode(request, eStateRequest))) {
                do {
                    yield();
                    cur = data_.load();
                    if (GetState(cur) == eStateReply) {
                        DataCas(cur, Encode(0, eStateFree));
                        reply = Decode(cur);
                        return true;
                    }
                } while (now() < end_time);
            }
            yield();
        } while (now() < end_time);
        return false;
    }
    bool ServerProcess(Handler onReq)
    {
        Data cur = data_.load();
        switch (GetState(cur)) {
        case eStateRequest:
            if (DataCas(cur, Encode(onReq(Decode(cur)), eStateReply))) {
                timestamp_ = now();
                return true;
            }
            break;
        case eStateReply:
            if (timestamp_.load() + 3s < now()) {
                DataCas(cur, Encode(0, eStateFree));
            }
            break;
        case eStateFree:
        default: break;
        }
        return false;
    }
private:
    enum State {
        eStateFree,
        eStateRequest,
        eStateReply
    };
    static int GetState(Data d) { return d & MaskBits(3); }
    static Data Encode(Data d, State st) { return (d << 3) | st; }
    static Data Decode(Data d) { return d >> 3; }
    static void yield() { QuickSleep(); }
    typedef steady_clock::duration Duration;
    Duration now() { return steady_clock::now().time_since_epoch(); }
    bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
    std::atomic<Data> data_;
    std::atomic<Duration> timestamp_;
};
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU
src/shm_queue.h
@@ -30,7 +30,6 @@
template <class D>
using Circular = boost::circular_buffer<D, Allocator<D>>;
// using Circular = robust::CircularBuffer<D, Allocator<D>>;
template <class D>
class SharedQueue
src/socket.cpp
@@ -162,7 +162,7 @@
{
    size_t size = content.size();
    auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
        if (!msg.Fill(content)) { return; }
        if (!msg.Fill(content)) { return false; }
        try {
            if (!cb) {
@@ -175,12 +175,23 @@
                };
                Send(remote, msg, onExpireRemoveCB);
            }
            return true;
        } catch (...) {
            SetLastError(eError, "Send internal error.");
            return false;
        }
    };
#if 0
    // self alloc
    MsgI msg;
    if (msg.Make(size)) {
        DEFER1(msg.Release());
        return OnResult(msg);
    }
#else
    // center alloc
    return RequestAlloc(size, OnResult);
#endif
}
bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
src/topic_node.cpp
@@ -70,69 +70,50 @@
    }
    LOG_DEBUG() << "Node Init, id " << ssn_id_;
    auto NodeInit = [&]() {
        auto SendInitCmd = [&]() {
            int64_t init_cmd = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit);
            auto end_time = steady_clock::now() + 3s;
            bool r = false;
            do {
                r = ShmMsgQueue::TrySend(shm(), BHTopicCenterAddress(), init_cmd);
            } while (!r && steady_clock::now() < end_time);
            return r;
        };
        if (SendInitCmd()) {
            LOG_DEBUG() << "node send init ok";
            auto end_time = steady_clock::now() + 3s;
            do {
                try {
                    //TODO recv offset, avoid query.
                    for (int i = eSockStart; i < eSockEnd; ++i) {
                        sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
                    }
                    break;
                } catch (...) {
                    sockets_.clear();
                    std::this_thread::sleep_for(100ms);
                }
            } while (steady_clock::now() < end_time);
        int64_t init_request = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit);
        int64_t reply = 0;
        if (BHNodeInit(init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) {
            int64_t abs_addr = reply >> 4;
            sockets_.emplace_back(new ShmSocket(abs_addr, shm_, ssn_id_));
            LOG_DEBUG() << "node init ok";
        } else {
            LOG_ERROR() << "Node Init Error";
        }
    };
    if (sockets_.empty()) {
        NodeInit();
    }
    if (!sockets_.empty()) {
        auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) {
            LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val);
            switch (DecodeCmd(val)) {
            case eCmdNodeInitReply: {
                MsgI msg(val >> 4);
                DEFER1(msg.Release());
        auto onMsg = [this](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
            LOG_DEBUG() << "node recv type: " << head.type();
            switch (head.type()) {
            case kMsgTypeProcInit: {
                // reuse msg to send proc init.
                MsgProcInit body;
                body.set_extra_mq_num(eSockEnd - eSockStart - 1);
                auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_);
                AddRoute(head, socket);
                if (msg.Fill(head, body)) {
                    socket.Send(BHTopicCenterAddress(), msg);
                if (imsg.Fill(head, body)) {
                    socket.Send(BHTopicCenterAddress(), imsg);
                }
            } break;
            default:
                break;
            }
            return true;
        };
        // recv msgs to avoid memory leak.
        auto onMsg = [this](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
            LOG_DEBUG() << "node recv type: " << head.type();
            if (head.type() == kMsgTypeProcInitReply) {
            case kMsgTypeProcInitReply: {
                LOG_DEBUG() << "got proc init reply";
                MsgProcInitReply reply;
                if (imsg.ParseBody(reply)) {
                if (imsg.ParseBody(reply) && IsSuccess(reply.errmsg().errcode())) {
                    for (auto &addr : reply.extra_mqs()) {
                        LOG_DEBUG() << "add socket " << addr.abs_addr() << ", id:" << addr.mq_id();
                        sockets_.emplace_back(new ShmSocket(addr.abs_addr(), shm(), addr.mq_id()));
                    }
                    SetProcIndex(reply.proc_index());
                    this->state_ = eStateUnregistered;
                }
            } break;
            default: break;
            }
            return true;
        };
        SockNode().Start(1, onMsg, onNodeCmd);
        SockNode().Start(1, onMsg);
        return true;
    }
    return false;
utest/robust_test.cpp
@@ -16,6 +16,35 @@
/////////////////////////////////////////////////////////////////////////////////////////
BOOST_AUTO_TEST_CASE(InitTest)
{
    AtomicReqRep rr;
    auto client = [&]() {
        for (int i = 0; i < 20; ++i) {
            int64_t reply = 0;
            bool r = rr.ClientRequest(i, reply);
            printf("init request %d, %s, reply %d\n", i, (r ? "ok" : "failed"), reply);
        }
    };
    bool run = true;
    auto server = [&]() {
        auto onReq = [](int64_t req) { return req + 100; };
        while (run) {
            rr.ServerProcess(onReq);
        }
    };
    ThreadManager clients, servers;
    servers.Launch(server);
    for (int i = 0; i < 2; ++i) {
        clients.Launch(client);
    }
    clients.WaitAll();
    run = false;
    servers.WaitAll();
}
BOOST_AUTO_TEST_CASE(QueueTest)
{
    const int nthread = 100;