lichao
2021-05-14 9bf199a4770b08c03d553129757d960b605e598a
add center info at fixed address in shm.
12个文件已修改
201 ■■■■ 已修改文件
box/center.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_main.cc 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.h 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/simple_tests.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -98,7 +98,7 @@
        if (now < time_to_clean_) {
            return;
        }
        LOG_FUNCTION;
        // LOG_FUNCTION;
        time_to_clean_ = now + 1;
        int64_t limit = std::max(10000ul, msgs_.size() / 10);
        int64_t n = 0;
@@ -109,7 +109,7 @@
                msg.Free();
                it = msgs_.erase(it);
                ++n;
            } else if (msg.timestamp() + 10 < NowSec()) {
            } else if (msg.timestamp() + 60 < NowSec()) {
                msg.Free();
                it = msgs_.erase(it);
                ++n;
box/center_main.cc
@@ -83,12 +83,6 @@
    std::atomic<bool> run_;
};
bool CenterInit(bhome_shm::SharedMemory &shm)
{
    ShmSocket create(shm, BHGlobalSenderAddress(), 16);
    return true;
}
} // namespace
int center_main(int argc, const char *argv[])
{
@@ -108,7 +102,10 @@
    if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); }
    auto &shm = BHomeShm();
    CenterInit(shm);
    if (!CenterInit(shm)) {
        LOG_FATAL() << "init memory error.";
        exit(0);
    }
    GlobalInit(shm);
    InstanceFlag inst(shm, kCenterRunningFlag);
src/defs.cpp
@@ -18,6 +18,10 @@
#include "defs.h"
#include "msg.h"
#include "shm_msg_queue.h"
#include "socket.h"
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
namespace
{
@@ -70,7 +74,77 @@
const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]);
static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough.");
static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
const int64_t kCenterInfoFixedAddress = 1024 * 4;
const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
struct CenterMetaInfo {
    boost::uuids::uuid tag_;
    CenterInfo info_;
};
int64_t Addr(void *ptr) { return reinterpret_cast<int64_t>(ptr); }
// void *Ptr(const int64_t offset) { return reinterpret_cast<void *>(offset); }
template <class T = void>
T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
} // namespace
CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm)
{
    auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
    if (pmeta->tag_ == kCenterInfoTag) {
        return &pmeta->info_;
    }
    return nullptr;
}
// put center info at fixed memory position.
// as boost shm find object (find socket/mq by id, etc...) also locks inside,
// which node might crash inside and cause deadlock.
bool CenterInit(bhome_shm::SharedMemory &shm)
{
    Mutex *mutex = shm.Create<Mutex>("shm_center_lock");
    if (!mutex || !mutex->try_lock()) {
        return false;
    }
    DEFER1(mutex->unlock());
    auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
    if (pmeta->tag_ == kCenterInfoTag) {
        return true;
    } else {
        auto base = Addr(shm.get_address());
        auto offset = kCenterInfoFixedAddress;
        void *p = shm.Alloc(offset * 2);
        if (Addr(p) - base <= offset) {
            pmeta = new (Ptr(offset + base)) CenterMetaInfo;
            auto &info = pmeta->info_;
            auto InitMQ = [&](auto &mq, auto &&id) {
                mq.id_ = id;
                ShmSocket tmp(shm, id, 16);
                mq.offset_ = tmp.AbsAddr();
            };
            int id = 100;
            auto NextId = [&]() { return ++id; };
            InitMQ(info.mq_sender_, NextId());
            InitMQ(info.mq_center_, NextId());
            InitMQ(info.mq_bus_, NextId());
            InitMQ(info.mq_init_, NextId());
            pmeta->tag_ = kCenterInfoTag;
            return true;
        }
    }
    return false;
}
uint64_t BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_.id_; }
uint64_t BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_.id_; }
uint64_t BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_.id_; }
uint64_t BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_.id_; }
int64_t CalcAllocIndex(int64_t size)
{
@@ -93,9 +167,8 @@
bool GlobalInit(bhome_shm::SharedMemory &shm)
{
    MsgI::BindShm(shm);
    typedef std::atomic<MQId> IdSrc;
    IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
    return psrc && ShmMsgQueue::SetData(*psrc);
    CenterInfo *pinfo = GetCenterInfo(shm);
    return pinfo && ShmMsgQueue::SetData(pinfo->mqid_);
}
void SetLastError(const int ec, const std::string &msg)
src/defs.h
@@ -19,19 +19,28 @@
#ifndef DEFS_KP8LKGD0
#define DEFS_KP8LKGD0
#include <atomic>
#include <string>
typedef uint64_t MQId;
const MQId kBHDefaultSender = 99;
const MQId kBHTopicCenter = 100;
const MQId kBHTopicBus = 101;
inline const MQId BHGlobalSenderAddress() { return kBHDefaultSender; }
inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
int64_t CalcAllocIndex(int64_t size);
int64_t GetAllocSize(int index);
struct CenterInfo {
    struct MQInfo {
        int64_t id_ = 0;
        int64_t offset_ = 0;
    };
    MQInfo mq_center_;
    MQInfo mq_bus_;
    MQInfo mq_init_;
    MQInfo mq_sender_;
    std::atomic<MQId> mqid_;
    CenterInfo() :
        mqid_(100000) {}
};
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
@@ -42,10 +51,17 @@
std::string BHomeShmName();
bhome_shm::SharedMemory &BHomeShm();
CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm);
bool CenterInit(bhome_shm::SharedMemory &shm);
bool GlobalInit(bhome_shm::SharedMemory &shm);
typedef std::string Topic;
void SetLastError(const int ec, const std::string &msg);
void GetLastError(int &ec, std::string &msg);
//TODO center can check shm for previous crash.
uint64_t BHGlobalSenderAddress();
uint64_t BHTopicCenterAddress();
uint64_t BHTopicBusAddress();
uint64_t BHCenterReplyAddress();
#endif // end of include guard: DEFS_KP8LKGD0
src/msg.cpp
@@ -17,6 +17,7 @@
 */
#include "msg.h"
#include "bh_util.h"
#include "defs.h"
#include "socket.h"
namespace bhome_msg
@@ -24,7 +25,8 @@
ShmSocket &ShmMsg::Sender()
{
    static ShmSocket sender(shm(), false, BHGlobalSenderAddress(), 16);
    static auto &mq = GetCenterInfo(shm())->mq_sender_;
    static ShmSocket sender(mq.offset_, shm(), mq.id_);
    return sender;
}
@@ -38,7 +40,8 @@
        int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree);
        Sender().Send(BHTopicCenterAddress(), free_cmd);
    } else if (n < 0) {
        throw -123;
        LOG_FATAL() << "error double release data.";
        throw std::runtime_error("double release msg.");
    }
    return n;
}
src/shm.h
@@ -192,6 +192,12 @@
            pdata_ = shm_.Find<Data>(ObjName(name_));
        }
    }
    ShmObject(const int64_t offset, ShmType &segment, const std::string &name) :
        shm_(segment), name_(name)
    {
        pdata_ = reinterpret_cast<Data *>(Addr(shm_.get_address()) + offset);
    }
    bool IsOk() const { return pdata_; }
    static bool Remove(SharedMemory &shm, const std::string &name) { return shm.destroy<Data>(ObjName(name).c_str()); }
@@ -201,11 +207,13 @@
    std::string name() const { return name_; }
    Data *data() { return pdata_; }
    const Data *data() const { return pdata_; }
    int64_t offset() const { return Addr(pdata_) - Addr(shm_.get_address()); }
    Data *operator->() { return data(); }
    const Data *operator->() const { return data(); }
    bool Remove() { return Remove(shm_, name_); }
private:
    static int64_t Addr(const void *p) { return reinterpret_cast<int64_t>(p); }
    ShmType &shm_;
    std::string name_;
    Data *pdata_ = nullptr;
src/shm_msg_queue.cpp
@@ -37,13 +37,13 @@
    return (++id) * 10;
}
ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const MQId id, const int len) :
    id_(id),
    queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager())
{
}
ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) :
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len) :
    id_(id),
    queue_(segment, create_or_else_find, MsgQIdToName(id_), len, segment.get_segment_manager())
{
@@ -51,8 +51,11 @@
        throw("error create/find msgq " + std::to_string(id_));
    }
}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) :
    ShmMsgQueue(NewId(), true, segment, len) {}
ShmMsgQueue::ShmMsgQueue(const int64_t abs_addr, ShmType &segment, const MQId id) :
    id_(id), queue_(abs_addr, segment, MsgQIdToName(id_))
{
    //TODO check some tag.
}
ShmMsgQueue::~ShmMsgQueue() {}
@@ -93,10 +96,11 @@
    return Shmq::Find(shm, MsgQIdToName(remote_id));
}
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, int64_t val)
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote, int64_t val)
{
    try {
        ShmMsgQueue dest(remote_id, false, shm, 1);
        //TODO find from center, or use offset.
        ShmMsgQueue dest(shm, false, remote, 1);
#ifndef BH_USE_ATOMIC_Q
        Guard lock(GetMutex(remote_id));
#endif
src/shm_msg_queue.h
@@ -47,13 +47,14 @@
    static MQId NewId();
    ShmMsgQueue(const MQId id, ShmType &segment, const int len);
    ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len);
    ShmMsgQueue(ShmType &segment, const int len);
    ShmMsgQueue(ShmType &segment, const MQId id, const int len);
    ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len);
    ShmMsgQueue(const int64_t abs_addr, ShmType &segment, const MQId id);
    ~ShmMsgQueue();
    static bool Remove(ShmType &shm, const MQId id);
    MQId Id() const { return id_; }
    ShmType &shm() const { return queue_.shm(); }
    int64_t AbsAddr() const { return queue_.offset(); }
    bool Recv(RawData &val, const int timeout_ms)
    {
@@ -73,11 +74,9 @@
    bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
    bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
    static Queue *Find(ShmType &shm, const MQId remote_id);
    static bool TrySend(ShmType &shm, const MQId remote_id, const RawData val);
    static bool TrySend(ShmType &shm, const MQId remote_id, MsgI msg) { return TrySend(shm, remote_id, msg.Offset()); }
    bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
    bool TrySend(const MQId remote_id, const RawData val) { return TrySend(shm(), remote_id, val); }
    static Queue *Find(ShmType &shm, const MQId remote);
    static bool TrySend(ShmType &shm, const MQId remote, const RawData val);
    bool TrySend(const MQId remote, const RawData val) { return TrySend(shm(), remote, val); }
private:
#ifndef BH_USE_ATOMIC_Q
src/socket.cpp
@@ -28,25 +28,13 @@
using namespace bhome_shm;
ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
    run_(false), mq_(id, shm, len), alloc_id_(0)
{
    Start();
}
    run_(false), mq_(shm, id, len), alloc_id_(0) { Start(); }
ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
    run_(false), mq_(id, create_or_else_find, shm, len), alloc_id_(0)
{
    Start();
}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
    run_(false), mq_(shm, len), alloc_id_(0)
{
    Start();
}
    run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0) { Start(); }
ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) :
    run_(false), mq_(abs_addr, shm, id), alloc_id_(0) { Start(); }
ShmSocket::~ShmSocket()
{
    Stop();
}
ShmSocket::~ShmSocket() { Stop(); }
bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle)
{
src/socket.h
@@ -49,11 +49,12 @@
    ShmSocket(Shm &shm, const MQId id, const int len);
    ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len);
    ShmSocket(Shm &shm, const int len = 12);
    ShmSocket(int64_t offset, Shm &shm, const MQId id);
    ~ShmSocket();
    static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
    bool Remove() { return Remove(shm(), id()); }
    MQId id() const { return mq().Id(); }
    int64_t AbsAddr() const { return mq().AbsAddr(); }
    void SetNodeProc(const int proc_index, const int socket_index)
    {
        node_proc_index_ = proc_index;
utest/simple_tests.cpp
@@ -108,7 +108,7 @@
{
    SharedMemory &shm = TestShm();
    GlobalInit(shm);
    ShmMsgQueue q(shm, 64);
    ShmMsgQueue q(shm, ShmMsgQueue::NewId(), 64);
    for (int i = 0; i < 2; ++i) {
        int ms = i * 100;
        printf("Timeout Test %4d: ", ms);
utest/speed_test.cpp
@@ -158,8 +158,8 @@
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    ShmSocket srv(shm, qlen);
    ShmSocket cli(shm, qlen);
    ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen);
    ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen);
    int ncli = 1;
    uint64_t nmsg = 1000 * 1000 * 1;