add center info at fixed address in shm.
| | |
| | | if (now < time_to_clean_) { |
| | | return; |
| | | } |
| | | LOG_FUNCTION; |
| | | // LOG_FUNCTION; |
| | | time_to_clean_ = now + 1; |
| | | int64_t limit = std::max(10000ul, msgs_.size() / 10); |
| | | int64_t n = 0; |
| | |
| | | msg.Free(); |
| | | it = msgs_.erase(it); |
| | | ++n; |
| | | } else if (msg.timestamp() + 10 < NowSec()) { |
| | | } else if (msg.timestamp() + 60 < NowSec()) { |
| | | msg.Free(); |
| | | it = msgs_.erase(it); |
| | | ++n; |
| | |
| | | std::atomic<bool> run_; |
| | | }; |
| | | |
| | | bool CenterInit(bhome_shm::SharedMemory &shm) |
| | | { |
| | | ShmSocket create(shm, BHGlobalSenderAddress(), 16); |
| | | return true; |
| | | } |
| | | |
| | | } // namespace |
| | | int center_main(int argc, const char *argv[]) |
| | | { |
| | |
| | | if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); } |
| | | |
| | | auto &shm = BHomeShm(); |
| | | CenterInit(shm); |
| | | if (!CenterInit(shm)) { |
| | | LOG_FATAL() << "init memory error."; |
| | | exit(0); |
| | | } |
| | | GlobalInit(shm); |
| | | |
| | | InstanceFlag inst(shm, kCenterRunningFlag); |
| | |
| | | #include "defs.h" |
| | | #include "msg.h" |
| | | #include "shm_msg_queue.h" |
| | | #include "socket.h" |
| | | #include <boost/uuid/random_generator.hpp> |
| | | #include <boost/uuid/string_generator.hpp> |
| | | #include <boost/uuid/uuid.hpp> |
| | | |
| | | namespace |
| | | { |
| | |
| | | const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]); |
| | | static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough."); |
| | | static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct."); |
| | | |
| | | const int64_t kCenterInfoFixedAddress = 1024 * 4; |
| | | |
| | | const boost::uuids::uuid kCenterInfoTag = boost::uuids::string_generator()("fc5007bd-0e62-4d91-95dc-948cf1f02e5a"); |
| | | struct CenterMetaInfo { |
| | | boost::uuids::uuid tag_; |
| | | CenterInfo info_; |
| | | }; |
| | | |
| | | int64_t Addr(void *ptr) { return reinterpret_cast<int64_t>(ptr); } |
| | | // void *Ptr(const int64_t offset) { return reinterpret_cast<void *>(offset); } |
| | | template <class T = void> |
| | | T *Ptr(const int64_t offset) { return reinterpret_cast<T *>(offset); } |
| | | |
| | | } // namespace |
| | | |
| | | CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm) |
| | | { |
| | | auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); |
| | | if (pmeta->tag_ == kCenterInfoTag) { |
| | | return &pmeta->info_; |
| | | } |
| | | return nullptr; |
| | | } |
| | | |
| | | // put center info at fixed memory position. |
| | | // as boost shm find object (find socket/mq by id, etc...) also locks inside, |
| | | // which node might crash inside and cause deadlock. |
| | | bool CenterInit(bhome_shm::SharedMemory &shm) |
| | | { |
| | | Mutex *mutex = shm.Create<Mutex>("shm_center_lock"); |
| | | if (!mutex || !mutex->try_lock()) { |
| | | return false; |
| | | } |
| | | DEFER1(mutex->unlock()); |
| | | |
| | | auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); |
| | | if (pmeta->tag_ == kCenterInfoTag) { |
| | | return true; |
| | | } else { |
| | | auto base = Addr(shm.get_address()); |
| | | auto offset = kCenterInfoFixedAddress; |
| | | void *p = shm.Alloc(offset * 2); |
| | | if (Addr(p) - base <= offset) { |
| | | pmeta = new (Ptr(offset + base)) CenterMetaInfo; |
| | | auto &info = pmeta->info_; |
| | | |
| | | auto InitMQ = [&](auto &mq, auto &&id) { |
| | | mq.id_ = id; |
| | | ShmSocket tmp(shm, id, 16); |
| | | mq.offset_ = tmp.AbsAddr(); |
| | | }; |
| | | |
| | | int id = 100; |
| | | auto NextId = [&]() { return ++id; }; |
| | | InitMQ(info.mq_sender_, NextId()); |
| | | InitMQ(info.mq_center_, NextId()); |
| | | InitMQ(info.mq_bus_, NextId()); |
| | | InitMQ(info.mq_init_, NextId()); |
| | | |
| | | pmeta->tag_ = kCenterInfoTag; |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | uint64_t BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_.id_; } |
| | | uint64_t BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_.id_; } |
| | | uint64_t BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_.id_; } |
| | | uint64_t BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_.id_; } |
| | | |
| | | int64_t CalcAllocIndex(int64_t size) |
| | | { |
| | |
| | | bool GlobalInit(bhome_shm::SharedMemory &shm) |
| | | { |
| | | MsgI::BindShm(shm); |
| | | typedef std::atomic<MQId> IdSrc; |
| | | IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000); |
| | | return psrc && ShmMsgQueue::SetData(*psrc); |
| | | CenterInfo *pinfo = GetCenterInfo(shm); |
| | | return pinfo && ShmMsgQueue::SetData(pinfo->mqid_); |
| | | } |
| | | |
| | | void SetLastError(const int ec, const std::string &msg) |
| | |
| | | #ifndef DEFS_KP8LKGD0 |
| | | #define DEFS_KP8LKGD0 |
| | | |
| | | #include <atomic> |
| | | #include <string> |
| | | |
| | | typedef uint64_t MQId; |
| | | |
| | | const MQId kBHDefaultSender = 99; |
| | | const MQId kBHTopicCenter = 100; |
| | | const MQId kBHTopicBus = 101; |
| | | inline const MQId BHGlobalSenderAddress() { return kBHDefaultSender; } |
| | | inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; } |
| | | inline const MQId BHTopicBusAddress() { return kBHTopicBus; } |
| | | |
| | | int64_t CalcAllocIndex(int64_t size); |
| | | int64_t GetAllocSize(int index); |
| | | |
| | | struct CenterInfo { |
| | | struct MQInfo { |
| | | int64_t id_ = 0; |
| | | int64_t offset_ = 0; |
| | | }; |
| | | |
| | | MQInfo mq_center_; |
| | | MQInfo mq_bus_; |
| | | MQInfo mq_init_; |
| | | MQInfo mq_sender_; |
| | | std::atomic<MQId> mqid_; |
| | | CenterInfo() : |
| | | mqid_(100000) {} |
| | | }; |
| | | |
| | | const int kBHCenterPort = 24287; |
| | | const char kTopicSep = '.'; |
| | |
| | | |
| | | std::string BHomeShmName(); |
| | | bhome_shm::SharedMemory &BHomeShm(); |
| | | CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm); |
| | | bool CenterInit(bhome_shm::SharedMemory &shm); |
| | | bool GlobalInit(bhome_shm::SharedMemory &shm); |
| | | typedef std::string Topic; |
| | | void SetLastError(const int ec, const std::string &msg); |
| | | void GetLastError(int &ec, std::string &msg); |
| | | //TODO center can check shm for previous crash. |
| | | |
| | | uint64_t BHGlobalSenderAddress(); |
| | | uint64_t BHTopicCenterAddress(); |
| | | uint64_t BHTopicBusAddress(); |
| | | uint64_t BHCenterReplyAddress(); |
| | | |
| | | #endif // end of include guard: DEFS_KP8LKGD0 |
| | |
| | | */ |
| | | #include "msg.h" |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "socket.h" |
| | | |
| | | namespace bhome_msg |
| | |
| | | |
| | | ShmSocket &ShmMsg::Sender() |
| | | { |
| | | static ShmSocket sender(shm(), false, BHGlobalSenderAddress(), 16); |
| | | static auto &mq = GetCenterInfo(shm())->mq_sender_; |
| | | static ShmSocket sender(mq.offset_, shm(), mq.id_); |
| | | return sender; |
| | | } |
| | | |
| | |
| | | int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree); |
| | | Sender().Send(BHTopicCenterAddress(), free_cmd); |
| | | } else if (n < 0) { |
| | | throw -123; |
| | | LOG_FATAL() << "error double release data."; |
| | | throw std::runtime_error("double release msg."); |
| | | } |
| | | return n; |
| | | } |
| | |
| | | pdata_ = shm_.Find<Data>(ObjName(name_)); |
| | | } |
| | | } |
| | | ShmObject(const int64_t offset, ShmType &segment, const std::string &name) : |
| | | shm_(segment), name_(name) |
| | | { |
| | | pdata_ = reinterpret_cast<Data *>(Addr(shm_.get_address()) + offset); |
| | | } |
| | | |
| | | bool IsOk() const { return pdata_; } |
| | | |
| | | static bool Remove(SharedMemory &shm, const std::string &name) { return shm.destroy<Data>(ObjName(name).c_str()); } |
| | |
| | | std::string name() const { return name_; } |
| | | Data *data() { return pdata_; } |
| | | const Data *data() const { return pdata_; } |
| | | int64_t offset() const { return Addr(pdata_) - Addr(shm_.get_address()); } |
| | | Data *operator->() { return data(); } |
| | | const Data *operator->() const { return data(); } |
| | | bool Remove() { return Remove(shm_, name_); } |
| | | |
| | | private: |
| | | static int64_t Addr(const void *p) { return reinterpret_cast<int64_t>(p); } |
| | | ShmType &shm_; |
| | | std::string name_; |
| | | Data *pdata_ = nullptr; |
| | |
| | | return (++id) * 10; |
| | | } |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const MQId id, const int len) : |
| | | id_(id), |
| | | queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager()) |
| | | { |
| | | } |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len) : |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len) : |
| | | id_(id), |
| | | queue_(segment, create_or_else_find, MsgQIdToName(id_), len, segment.get_segment_manager()) |
| | | { |
| | |
| | | throw("error create/find msgq " + std::to_string(id_)); |
| | | } |
| | | } |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const int len) : |
| | | ShmMsgQueue(NewId(), true, segment, len) {} |
| | | ShmMsgQueue::ShmMsgQueue(const int64_t abs_addr, ShmType &segment, const MQId id) : |
| | | id_(id), queue_(abs_addr, segment, MsgQIdToName(id_)) |
| | | { |
| | | //TODO check some tag. |
| | | } |
| | | |
| | | ShmMsgQueue::~ShmMsgQueue() {} |
| | | |
| | |
| | | return Shmq::Find(shm, MsgQIdToName(remote_id)); |
| | | } |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, int64_t val) |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote, int64_t val) |
| | | { |
| | | try { |
| | | ShmMsgQueue dest(remote_id, false, shm, 1); |
| | | //TODO find from center, or use offset. |
| | | ShmMsgQueue dest(shm, false, remote, 1); |
| | | #ifndef BH_USE_ATOMIC_Q |
| | | Guard lock(GetMutex(remote_id)); |
| | | #endif |
| | |
| | | |
| | | static MQId NewId(); |
| | | |
| | | ShmMsgQueue(const MQId id, ShmType &segment, const int len); |
| | | ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len); |
| | | ShmMsgQueue(ShmType &segment, const int len); |
| | | ShmMsgQueue(ShmType &segment, const MQId id, const int len); |
| | | ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len); |
| | | ShmMsgQueue(const int64_t abs_addr, ShmType &segment, const MQId id); |
| | | ~ShmMsgQueue(); |
| | | static bool Remove(ShmType &shm, const MQId id); |
| | | MQId Id() const { return id_; } |
| | | ShmType &shm() const { return queue_.shm(); } |
| | | int64_t AbsAddr() const { return queue_.offset(); } |
| | | |
| | | bool Recv(RawData &val, const int timeout_ms) |
| | | { |
| | |
| | | |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); } |
| | | bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); } |
| | | static Queue *Find(ShmType &shm, const MQId remote_id); |
| | | static bool TrySend(ShmType &shm, const MQId remote_id, const RawData val); |
| | | static bool TrySend(ShmType &shm, const MQId remote_id, MsgI msg) { return TrySend(shm, remote_id, msg.Offset()); } |
| | | bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); } |
| | | bool TrySend(const MQId remote_id, const RawData val) { return TrySend(shm(), remote_id, val); } |
| | | static Queue *Find(ShmType &shm, const MQId remote); |
| | | static bool TrySend(ShmType &shm, const MQId remote, const RawData val); |
| | | bool TrySend(const MQId remote, const RawData val) { return TrySend(shm(), remote, val); } |
| | | |
| | | private: |
| | | #ifndef BH_USE_ATOMIC_Q |
| | |
| | | using namespace bhome_shm; |
| | | |
| | | ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : |
| | | run_(false), mq_(id, shm, len), alloc_id_(0) |
| | | { |
| | | Start(); |
| | | } |
| | | run_(false), mq_(shm, id, len), alloc_id_(0) { Start(); } |
| | | ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) : |
| | | run_(false), mq_(id, create_or_else_find, shm, len), alloc_id_(0) |
| | | { |
| | | Start(); |
| | | } |
| | | ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : |
| | | run_(false), mq_(shm, len), alloc_id_(0) |
| | | { |
| | | Start(); |
| | | } |
| | | run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0) { Start(); } |
| | | ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) : |
| | | run_(false), mq_(abs_addr, shm, id), alloc_id_(0) { Start(); } |
| | | |
| | | ShmSocket::~ShmSocket() |
| | | { |
| | | Stop(); |
| | | } |
| | | ShmSocket::~ShmSocket() { Stop(); } |
| | | |
| | | bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle) |
| | | { |
| | |
| | | |
| | | ShmSocket(Shm &shm, const MQId id, const int len); |
| | | ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len); |
| | | ShmSocket(Shm &shm, const int len = 12); |
| | | ShmSocket(int64_t offset, Shm &shm, const MQId id); |
| | | ~ShmSocket(); |
| | | static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); } |
| | | bool Remove() { return Remove(shm(), id()); } |
| | | MQId id() const { return mq().Id(); } |
| | | int64_t AbsAddr() const { return mq().AbsAddr(); } |
| | | void SetNodeProc(const int proc_index, const int socket_index) |
| | | { |
| | | node_proc_index_ = proc_index; |
| | |
| | | { |
| | | SharedMemory &shm = TestShm(); |
| | | GlobalInit(shm); |
| | | ShmMsgQueue q(shm, 64); |
| | | ShmMsgQueue q(shm, ShmMsgQueue::NewId(), 64); |
| | | for (int i = 0; i < 2; ++i) { |
| | | int ms = i * 100; |
| | | printf("Timeout Test %4d: ", ms); |
| | |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | | ShmSocket srv(shm, qlen); |
| | | ShmSocket cli(shm, qlen); |
| | | ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen); |
| | | ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen); |
| | | |
| | | int ncli = 1; |
| | | uint64_t nmsg = 1000 * 1000 * 1; |