lichao
2021-06-02 993c556000a414011626770540678948f16eaa9e
center restart with new shm; set center node ssn.
9个文件已修改
197 ■■■■ 已修改文件
box/center_main.cc 75 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_topic_node.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 96 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.h 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/tcp_test.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_main.cc
@@ -27,63 +27,6 @@
using namespace std::chrono_literals;
using namespace bhome_shm;
namespace
{
const std::string kCenterRunningFlag = "bh_center_single_flag_0";
class InstanceFlag
{
public:
    InstanceFlag(SharedMemory &shm, const std::string &name) :
        shm_(shm), name_(name), run_(false) {}
    ~InstanceFlag() { Stop(); }
    bool TryStartAsFirstInstance()
    {
        if (run_) {
            return true;
        }
        auto mtx(shm_.FindOrCreate<Mutex>(name_ + "_mutex_0"));
        auto time_stamp(shm_.FindOrCreate<int64_t>(name_ + "_timestamp_0", 0));
        if (mtx && time_stamp) {
            Guard lock(*mtx);
            auto now = NowSec();
            LOG_DEBUG() << "old: " << *time_stamp << ", now: " << now;
            if (now > *time_stamp + 10) {
                *time_stamp = now;
                auto UpdateTime = [this, time_stamp]() {
                    while (run_) {
                        std::this_thread::sleep_for(1s);
                        *time_stamp = NowSec();
                    }
                };
                run_.store(true);
                std::thread(UpdateTime).swap(worker_);
                return true;
            }
        }
        return false;
    }
private:
    void Stop()
    {
        run_.store(false);
        if (worker_.joinable()) {
            worker_.join();
        }
    }
    std::thread worker_;
    SharedMemory &shm_;
    std::string name_;
    std::atomic<bool> run_;
};
} // namespace
int center_main(int argc, const char *argv[])
{
    AppArg args(argc, argv);
@@ -101,22 +44,14 @@
    if (strcasecmp(lvl.c_str(), "error") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::error); }
    if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); }
    auto &shm = BHomeShm();
    if (!CenterInit(shm)) {
        auto msg = "init memory error.";
    if (!CenterInit()) {
        auto msg = "init memory failed, or center is already running.";
        LOG_FATAL() << msg;
        printf("%s\n", msg);
        exit(0);
    }
    auto &shm = BHomeShm();
    GlobalInit(shm);
    InstanceFlag inst(shm, kCenterRunningFlag);
    if (!inst.TryStartAsFirstInstance()) {
        auto msg = "another instance is running, exit.";
        LOG_INFO() << msg;
        printf("%s\n", msg);
        return 0;
    }
    if (args.Has("daemon") || args.Has("d")) {
        int r = daemon(0, 0); // TODO center control msg to close itself.
@@ -125,9 +60,9 @@
    BHCenter center(shm);
    center.Start();
    auto msg = "center started ...";
    auto msg = "center (" + shm.name() + ") started ...";
    LOG_INFO() << msg;
    printf("%s\n", msg);
    printf("%s\n", msg.c_str());
    WaitForSignals({SIGINT, SIGTERM});
    center.Stop();
    LOG_INFO() << "center stopped.";
box/center_topic_node.cpp
@@ -56,7 +56,7 @@
} // namespace
CenterTopicNode::CenterTopicNode(CenterPtr center, SharedMemory &shm) :
    pscenter_(center), pnode_(new TopicNode(shm)), run_(false) {}
    pscenter_(center), pnode_(new TopicNode(shm, 200)), run_(false) {}
CenterTopicNode::~CenterTopicNode() { Stop(); }
src/defs.cpp
@@ -22,6 +22,7 @@
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/string_generator.hpp>
#include <boost/uuid/uuid.hpp>
#include <sys/file.h>
namespace
{
@@ -76,8 +77,16 @@
static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
const int64_t kCenterInfoFixedAddress = 1024 * 4;
const int64_t kShmMetaInfoFixedAddress = 1024 * 16;
const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
const boost::uuids::uuid kMetaInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a");
struct BHomeMetaInfo {
    boost::uuids::uuid tag_;
    std::atomic<uint64_t> shm_id_;
    std::atomic<uint64_t> ssn_id_;
};
struct CenterMetaInfo {
    boost::uuids::uuid tag_;
    CenterInfo info_;
@@ -88,16 +97,43 @@
template <class T = void>
T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); }
class FileLock
{
public:
    FileLock(const std::string &path) :
        fd_(Open(path))
    {
        if (fd_ == -1) { throw std::runtime_error("error open file:" + path); }
    }
    ~FileLock() { Close(fd_); }
    bool try_lock() { return fd_ != -1 && (flock(fd_, LOCK_EX | LOCK_NB) == 0); }
    void unlock() { flock(fd_, LOCK_UN); }
private:
    static int Open(const std::string &path) { return open(path.c_str(), O_RDONLY, 0666); }
    static int Close(int fd) { return close(fd); }
    int fd_;
    std::mutex mtx_;
};
SharedMemory &BHomeMetaShm()
{
    static std::string name("bhshmq_meta_v0");
    static SharedMemory shm(name, 1024 * 128);
    return shm;
}
} // namespace
CenterInfo *GetCenterInfo(SharedMemory &shm)
{
    auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
    if (pmeta->tag_ == kCenterInfoTag) {
    if (pmeta->tag_ == kMetaInfoTag) {
        return &pmeta->info_;
    }
    return nullptr;
}
ShmSocket &DefaultSender(SharedMemory &shm)
{
    typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
@@ -121,11 +157,55 @@
    local_cache = store.back();
    return *local_cache.second;
}
BHomeMetaInfo *GetBHomeMeta()
{
    auto p = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
    return (p->tag_ == kMetaInfoTag) ? p : nullptr;
}
bool ShmMetaInit()
{
    SharedMemory &shm = BHomeMetaShm();
    static FileLock fl("/dev/shm/" + shm.name());
    if (!fl.try_lock()) { // single center instance only.
        return false;
    }
    auto pmeta = GetBHomeMeta();
    if (pmeta && pmeta->tag_ == kMetaInfoTag) {
        ++pmeta->shm_id_; // inc shm id
        return true;      // already exist.
    } else {
        Mutex *mutex = shm.FindOrCreate<Mutex>("bhshmq_meta_lock");
        if (!mutex || !mutex->try_lock()) {
            return false;
        }
        DEFER1(mutex->unlock());
        auto base = Addr(shm.get_address());
        auto offset = kShmMetaInfoFixedAddress;
        void *p = shm.Alloc(offset * 2);
        if (Addr(p) - base <= offset) {
            pmeta = new (Ptr(offset + base)) BHomeMetaInfo;
            pmeta->tag_ = kMetaInfoTag;
            pmeta->shm_id_ = 100;
            pmeta->ssn_id_ = 10000;
            return true;
        }
    }
    return false;
}
// put center info at fixed memory position.
// as boost shm find object (find socket/mq by id, etc...) also locks inside,
// which node might crash inside and cause deadlock.
bool CenterInit(SharedMemory &shm)
bool CenterInit()
{
    if (!ShmMetaInit()) { return false; }
    SharedMemory &shm = BHomeShm();
    Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
    if (!mutex || !mutex->try_lock()) {
        return false;
@@ -133,7 +213,7 @@
    DEFER1(mutex->unlock());
    auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
    if (pmeta->tag_ == kCenterInfoTag) {
    if (pmeta->tag_ == kMetaInfoTag) {
        return true;
    } else {
        auto base = Addr(shm.get_address());
@@ -155,7 +235,7 @@
            InitMQ(info.mq_center_, NextId());
            InitMQ(info.mq_bus_, NextId());
            pmeta->tag_ = kCenterInfoTag;
            pmeta->tag_ = kMetaInfoTag;
            return true;
        }
    }
@@ -183,8 +263,10 @@
std::string BHomeShmName()
{
    return "bhome_default_shm_v0";
    auto bhome_meta = Ptr<BHomeMetaInfo>(kShmMetaInfoFixedAddress + Addr(BHomeMetaShm().get_address()));
    return "bhome_shmq_id_" + std::to_string(bhome_meta->shm_id_.load());
}
SharedMemory &BHomeShm()
{
    static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
@@ -193,7 +275,7 @@
bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); }
MQId NewSession() { return 10 * (++GetBHomeMeta()->ssn_id_); }
void SetLastError(const int ec, const std::string &msg)
{
src/defs.h
@@ -39,9 +39,6 @@
    MQInfo mq_bus_;
    MQInfo mq_sender_;
    robust::AtomicReqRep init_rr_;
    std::atomic<MQId> mqid_;
    CenterInfo() :
        mqid_(100000) {}
};
const int kBHCenterPort = 24287;
@@ -59,7 +56,7 @@
ShmSocket &DefaultSender(SharedMemory &shm);
MQId NewSession();
bool CenterInit(SharedMemory &shm);
bool CenterInit();
bool GlobalInit(SharedMemory &shm);
typedef std::string Topic;
void SetLastError(const int ec, const std::string &msg);
src/robust.h
@@ -67,6 +67,8 @@
    typedef std::function<Data(const Data)> Handler;
    bool ClientRequest(const Data request, Data &reply);
    bool ServerProcess(Handler onReq);
    AtomicReqRep() :
        data_(0), timestamp_(now()) {}
private:
    enum State {
@@ -79,7 +81,7 @@
    static Data Decode(Data d) { return d >> 3; }
    typedef std::chrono::steady_clock steady_clock;
    typedef steady_clock::duration Duration;
    Duration now() { return steady_clock::now().time_since_epoch(); }
    static Duration now() { return steady_clock::now().time_since_epoch(); }
    bool DataCas(Data expected, Data val) { return data_.compare_exchange_strong(expected, val); }
    std::atomic<Data> data_;
src/topic_node.cpp
@@ -50,8 +50,8 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), state_(eStateUninited)
TopicNode::TopicNode(SharedMemory &shm, MQId ssn_id) :
    shm_(shm), state_(eStateUninited), ssn_id_(ssn_id)
{
}
src/topic_node.h
@@ -39,7 +39,7 @@
    const MQInfo &BusAddr() const { return BHTopicBusAddress(shm()); }
public:
    TopicNode(SharedMemory &shm);
    TopicNode(SharedMemory &shm, MQId ssn_id = 0);
    ~TopicNode();
    // topic node
utest/api_test.cpp
@@ -235,8 +235,9 @@
        printf("query with ip set\n");
        host.set_ip("127.0.0.1");
        host.set_port(kBHCenterPort);
        host.set_mq_id(1000011);
        host.set_abs_addr(10296);
        // center topic node address.
        host.set_mq_id(201);
        host.set_abs_addr(10072);
        std::string dest(host.SerializeAsString());
        void *proc_id = 0;
utest/tcp_test.cpp
@@ -52,8 +52,8 @@
        head.mutable_dest()->set_ip(connect_addr);
        head.mutable_dest()->set_port(port);
        head.mutable_dest()->set_mq_id(1000011);
        head.mutable_dest()->set_abs_addr(10296);
        head.mutable_dest()->set_mq_id(201);
        head.mutable_dest()->set_abs_addr(10072);
        return (MsgI::Serialize(head, req));
    };