class MsgI, ShmMsgQueue, no bind to shm.
| | |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); |
| | | MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; |
| | | MsgI msg; |
| | | MsgI msg(socket.shm()); |
| | | if (msg.Make(reply_head, rep_body)) { |
| | | DEFER1(msg.Release();); |
| | | center->SendAllocMsg(socket, remote, msg); |
| | |
| | | }; |
| | | } |
| | | |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm) |
| | | { |
| | | // command |
| | | auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { |
| | |
| | | auto onInit = [&](const int64_t request) { |
| | | return center->OnNodeInit(socket, request); |
| | | }; |
| | | BHCenterHandleInit(onInit); |
| | | BHCenterHandleInit(socket.shm(), onInit); |
| | | center->OnTimer(); |
| | | }; |
| | | |
| | |
| | | default: return false; |
| | | } |
| | | }; |
| | | BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); |
| | | BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); |
| | | |
| | | auto OnBusIdle = [=](ShmSocket &socket) {}; |
| | | auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; |
| | |
| | | } |
| | | }; |
| | | |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); |
| | | |
| | | return true; |
| | | } |
| | |
| | | { |
| | | auto nsec = NodeTimeoutSec(); |
| | | auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. |
| | | AddCenter(center_ptr); |
| | | AddCenter(center_ptr, shm); |
| | | |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | |
| | | |
| | | topic_node_.reset(new CenterTopicNode(center_ptr, shm)); |
| | | } |
| | | |
| | | BHCenter::~BHCenter() { Stop(); } |
| | | |
| | | bool BHCenter::Start() |
| | |
| | | { |
| | | auto pos = msgs_.find(id); |
| | | if (pos != msgs_.end()) { |
| | | ShmMsg(pos->second).Free(); |
| | | pos->second.Free(); |
| | | msgs_.erase(pos); |
| | | } else { |
| | | LOG_TRACE() << "ignore late free request."; |
| | |
| | | int i = 0; |
| | | int total_count = 0; |
| | | for (auto &kv : msgs_) { |
| | | MsgI msg(kv.second); |
| | | auto &msg = kv.second; |
| | | total_count += msg.Count(); |
| | | LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); |
| | | LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size(); |
| | | } |
| | | LOG_TRACE() << "total count: " << total_count; |
| | | } |
| | |
| | | |
| | | auto PrepareProcInit = [&](Node &node) { |
| | | bool r = false; |
| | | ShmMsg init_msg; |
| | | ShmMsg init_msg(shm); |
| | | DEFER1(init_msg.Release()); |
| | | MsgProcInit body; |
| | | auto head = InitMsgHead(GetType(body), id(), ssn); |
| | |
| | | if (!FindMq()) { return; } |
| | | |
| | | auto size = GetAllocSize((val >> 52) & MaskBits(8)); |
| | | MsgI new_msg; |
| | | MsgI new_msg(socket.shm()); |
| | | if (new_msg.Make(size)) { |
| | | // 31bit proc index, 28bit id, ,4bit cmd+flag |
| | | int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); |
| | |
| | | pub.set_topic(topic); |
| | | pub.set_data(content); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), id(), 0)); |
| | | MsgI msg; |
| | | MsgI msg(shm); |
| | | if (msg.Make(head, pub)) { |
| | | DEFER1(msg.Release()); |
| | | RecordMsg(msg); |
| | | |
| | | auto &mq = GetCenterInfo(shm)->mq_sender_; |
| | | ShmSocket sender(mq.offset_, shm, mq.id_); |
| | | |
| | | for (auto &cli : clients) { |
| | | auto node = cli.weak_node_.lock(); |
| | | if (node && node->state_.flag_ == kStateNormal) { |
| | | sender.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | } |
| | | } |
| | | } |
| | |
| | | typedef int64_t Offset; |
| | | |
| | | public: |
| | | void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } |
| | | void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); } |
| | | void FreeMsg(MsgId id); |
| | | void AutoRemove(); |
| | | size_t size() const { return msgs_.size(); } |
| | | void DebugPrint() const; |
| | | |
| | | private: |
| | | std::unordered_map<MsgId, Offset> msgs_; |
| | | std::unordered_map<MsgId, MsgI> msgs_; |
| | | int64_t time_to_clean_ = 0; |
| | | }; |
| | | |
| | |
| | | |
| | | } // namespace |
| | | |
| | | CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm) |
| | | CenterInfo *GetCenterInfo(SharedMemory &shm) |
| | | { |
| | | auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); |
| | | if (pmeta->tag_ == kCenterInfoTag) { |
| | |
| | | } |
| | | return nullptr; |
| | | } |
| | | ShmSocket &DefaultSender(SharedMemory &shm) |
| | | { |
| | | typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair; |
| | | static std::vector<Pair> store; |
| | | static std::mutex s_mtx; |
| | | |
| | | thread_local Pair local_cache; |
| | | if (local_cache.first == &shm) { |
| | | return *local_cache.second; |
| | | } |
| | | |
| | | std::lock_guard<std::mutex> lk(s_mtx); |
| | | for (auto &kv : store) { |
| | | if (kv.first == &shm) { |
| | | local_cache = kv; |
| | | return *local_cache.second; |
| | | } |
| | | } |
| | | auto &mq = GetCenterInfo(shm)->mq_sender_; |
| | | store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_)); |
| | | local_cache = store.back(); |
| | | return *local_cache.second; |
| | | } |
| | | // put center info at fixed memory position. |
| | | // as boost shm find object (find socket/mq by id, etc...) also locks inside, |
| | | // which node might crash inside and cause deadlock. |
| | | bool CenterInit(bhome_shm::SharedMemory &shm) |
| | | bool CenterInit(SharedMemory &shm) |
| | | { |
| | | Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock"); |
| | | if (!mutex || !mutex->try_lock()) { |
| | |
| | | return false; |
| | | } |
| | | |
| | | const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; } |
| | | const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; } |
| | | const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; } |
| | | bool BHNodeInit(const int64_t request, int64_t &reply) |
| | | const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; } |
| | | const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; } |
| | | bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply) |
| | | { |
| | | return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply); |
| | | return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply); |
| | | } |
| | | void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq) |
| | | void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq) |
| | | { |
| | | GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq); |
| | | GetCenterInfo(shm)->init_rr_.ServerProcess(onReq); |
| | | } |
| | | |
| | | int64_t CalcAllocIndex(int64_t size) |
| | |
| | | { |
| | | return "bhome_default_shm_v0"; |
| | | } |
| | | bhome_shm::SharedMemory &BHomeShm() |
| | | SharedMemory &BHomeShm() |
| | | { |
| | | static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); |
| | | static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); |
| | | return shm; |
| | | } |
| | | |
| | | bool GlobalInit(bhome_shm::SharedMemory &shm) |
| | | { |
| | | MsgI::BindShm(shm); |
| | | CenterInfo *pinfo = GetCenterInfo(shm); |
| | | return pinfo && ShmMsgQueue::SetData(pinfo->mqid_); |
| | | } |
| | | bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); } |
| | | |
| | | MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); } |
| | | |
| | | void SetLastError(const int ec, const std::string &msg) |
| | | { |
| | |
| | | #include <atomic> |
| | | #include <string> |
| | | |
| | | class ShmSocket; |
| | | typedef uint64_t MQId; |
| | | |
| | | int64_t CalcAllocIndex(int64_t size); |
| | |
| | | class SharedMemory; |
| | | } // namespace bhome_shm |
| | | |
| | | using bhome_shm::SharedMemory; |
| | | |
| | | std::string BHomeShmName(); |
| | | bhome_shm::SharedMemory &BHomeShm(); |
| | | CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm); |
| | | bool CenterInit(bhome_shm::SharedMemory &shm); |
| | | bool GlobalInit(bhome_shm::SharedMemory &shm); |
| | | SharedMemory &BHomeShm(); |
| | | CenterInfo *GetCenterInfo(SharedMemory &shm); |
| | | ShmSocket &DefaultSender(SharedMemory &shm); |
| | | |
| | | MQId NewSession(); |
| | | bool CenterInit(SharedMemory &shm); |
| | | bool GlobalInit(SharedMemory &shm); |
| | | typedef std::string Topic; |
| | | void SetLastError(const int ec, const std::string &msg); |
| | | void GetLastError(int &ec, std::string &msg); |
| | | //TODO center can check shm for previous crash. |
| | | |
| | | const MQInfo &BHGlobalSenderAddress(); |
| | | const MQInfo &BHTopicCenterAddress(); |
| | | const MQInfo &BHTopicBusAddress(); |
| | | bool BHNodeInit(const int64_t request, int64_t &reply); |
| | | void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq); |
| | | const MQInfo &BHTopicCenterAddress(SharedMemory &shm); |
| | | const MQInfo &BHTopicBusAddress(SharedMemory &shm); |
| | | bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply); |
| | | void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq); |
| | | |
| | | // node mq is avail with in timeout; after that may get killed. |
| | | int NodeTimeoutSec(); |
| | |
| | | namespace bhome_msg |
| | | { |
| | | |
| | | ShmSocket &ShmMsg::Sender() |
| | | { |
| | | static auto &mq = GetCenterInfo(shm())->mq_sender_; |
| | | static ShmSocket sender(mq.offset_, shm(), mq.id_); |
| | | return sender; |
| | | } |
| | | |
| | | int ShmMsg::Release() |
| | | { |
| | | if (!valid()) { |
| | |
| | | if (n == 0) { |
| | | if (meta()->managed_) { |
| | | int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree); |
| | | Sender().Send(BHTopicCenterAddress(), free_cmd); |
| | | DefaultSender(shm()).Send(BHTopicCenterAddress(shm()), free_cmd); |
| | | } else { |
| | | Free(); |
| | | } |
| | |
| | | // ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required. |
| | | // message content layout: (meta) / header_size + header + data_size + data |
| | | |
| | | class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg> |
| | | class ShmMsg |
| | | { |
| | | public: |
| | | static inline SharedMemory &shm() { return GetData(); } |
| | | |
| | | private: |
| | | static ShmSocket &Sender(); |
| | | |
| | | // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. |
| | | class RefCount : private boost::noncopyable |
| | | { |
| | |
| | | typedef int64_t OffsetType; |
| | | static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); } |
| | | static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); } |
| | | static inline OffsetType BaseAddr() |
| | | { |
| | | static const OffsetType base = Addr(shm().get_address()); // cache value. |
| | | return base; |
| | | } |
| | | OffsetType BaseAddr() const { return Addr(shm().get_address()); } |
| | | |
| | | static const uint32_t kMsgTag = 0xf1e2d3c4; |
| | | struct Meta { |
| | |
| | | capacity_(size), id_(NewId()), timestamp_(NowSec()) {} |
| | | }; |
| | | OffsetType offset_; |
| | | static void *Alloc(const size_t size) |
| | | SharedMemory *pshm_; |
| | | void *Alloc(const size_t size) |
| | | { |
| | | void *p = shm().Alloc(sizeof(Meta) + size); |
| | | if (p) { |
| | |
| | | if (!addr) { |
| | | return false; |
| | | } |
| | | ShmMsg(addr).swap(*this); |
| | | offset_ = Addr(addr) - BaseAddr(); |
| | | return true; |
| | | } |
| | | ShmMsg(void *p) : |
| | | offset_(p ? (Addr(p) - BaseAddr()) : 0) {} |
| | | |
| | | template <class T = void> |
| | | T *get() const { return offset_ != 0 ? static_cast<T *>(Ptr(offset_ + BaseAddr())) : nullptr; } |
| | | |
| | | public: |
| | | static bool BindShm(SharedMemory &shm) { return SetData(shm); } |
| | | ShmMsg() : |
| | | offset_(0) {} |
| | | explicit ShmMsg(const OffsetType offset) : |
| | | offset_(offset) {} |
| | | explicit ShmMsg(SharedMemory &shm) : |
| | | offset_(0), pshm_(&shm) {} |
| | | ShmMsg(const OffsetType offset, SharedMemory &shm) : |
| | | offset_(offset), pshm_(&shm) {} |
| | | OffsetType Offset() const { return offset_; } |
| | | OffsetType &OffsetRef() { return offset_; } |
| | | void swap(ShmMsg &a) { std::swap(offset_, a.offset_); } |
| | | SharedMemory &shm() const { return *pshm_; } |
| | | |
| | | void swap(ShmMsg &a) |
| | | { |
| | | std::swap(offset_, a.offset_); |
| | | std::swap(pshm_, a.pshm_); |
| | | } |
| | | bool valid() const { return offset_ != 0 && meta()->tag_ == kMsgTag; } |
| | | int64_t id() const { return valid() ? meta()->id_ : 0; } |
| | | int64_t timestamp() const { return valid() ? meta()->timestamp_.load() : 0; } |
| | |
| | | } else { |
| | | al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); |
| | | } |
| | | count_in_.Count1(); |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << "sendq error: " << e.what(); |
| | | throw e; |
| | | } |
| | | } |
| | | |
| | | int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr) |
| | | int SendQ::DoSend1Remote(const Remote remote, Array &arr) |
| | | { |
| | | auto FirstNotExpired = [](Array &l) { |
| | | auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; |
| | |
| | | info.on_expire_(info.data_); |
| | | } |
| | | } |
| | | |
| | | while (pos != arr.end() && mq.TrySend(pos->data().mq_, pos->data().data_)) { |
| | | auto TrySend1 = [this](MsgInfo const &info) { return ShmMsgQueue::TrySend(shm_, info.mq_, info.data_); }; |
| | | while (pos != arr.end() && TrySend1(pos->data())) { |
| | | ++pos; |
| | | } |
| | | |
| | |
| | | return nprocessed; |
| | | } |
| | | |
| | | int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al) |
| | | int SendQ::DoSend1Remote(const Remote remote, ArrayList &al) |
| | | { |
| | | int nsend = 0; |
| | | auto AllSent = [&](Array &arr) { |
| | | nsend += DoSend1Remote(mq, remote, arr); |
| | | nsend += DoSend1Remote(remote, arr); |
| | | return arr.empty(); |
| | | }; |
| | | for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {} |
| | | return nsend; |
| | | } |
| | | |
| | | bool SendQ::TrySend(ShmMsgQueue &mq) |
| | | bool SendQ::TrySend() |
| | | { |
| | | std::unique_lock<std::mutex> lock(mutex_out_); |
| | | // if (TooFast()) { return false; } |
| | | |
| | | size_t nsend = 0; |
| | | if (!out_.empty()) { |
| | | auto rec = out_.begin(); |
| | | do { |
| | | nsend += DoSend1Remote(mq, rec->first, rec->second); |
| | | nsend += DoSend1Remote(rec->first, rec->second); |
| | | if (rec->second.empty()) { |
| | | rec = out_.erase(rec); |
| | | } else { |
| | |
| | | } |
| | | } while (rec != out_.end()); |
| | | } |
| | | count_out_.Count(nsend); |
| | | |
| | | auto Collect = [&]() { |
| | | std::unique_lock<std::mutex> lock(mutex_in_); |
| | |
| | | |
| | | return !out_.empty(); |
| | | } |
| | | |
| | | bool SendQ::TooFast() |
| | | { |
| | | auto cur = NowSec(); |
| | | if (cur > last_time_) { |
| | | last_time_ = cur; |
| | | count_ = 0; |
| | | } |
| | | |
| | | return ++count_ > 1000 * 100; |
| | | } // not accurate in multi-thread. |
| | |
| | | typedef TimedData<MsgInfo> TimedMsg; |
| | | typedef TimedMsg::TimePoint TimePoint; |
| | | typedef TimedMsg::Duration Duration; |
| | | SendQ(SharedMemory &shm) : |
| | | shm_(shm) {} |
| | | |
| | | bool Append(const MQInfo &mq, MsgI msg) |
| | | { |
| | | msg.AddRef(); |
| | | auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); }; |
| | | auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); }; |
| | | try { |
| | | AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); |
| | | return true; |
| | |
| | | bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire) |
| | | { |
| | | msg.AddRef(); |
| | | auto onMsgExpire = [onExpire](const Data &d) { |
| | | auto onMsgExpire = [onExpire, msg](const Data &d) mutable { |
| | | onExpire(d); |
| | | MsgI(d).Release(); |
| | | msg.Release(); |
| | | }; |
| | | try { |
| | | AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); |
| | |
| | | return false; |
| | | } |
| | | } |
| | | bool TrySend(ShmMsgQueue &mq); |
| | | bool TrySend(); |
| | | |
| | | private: |
| | | static TimePoint Now() { return TimedMsg::Clock::now(); } |
| | |
| | | typedef std::list<Array> ArrayList; |
| | | typedef std::unordered_map<Remote, ArrayList> Store; |
| | | |
| | | int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr); |
| | | int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr); |
| | | int DoSend1Remote(const Remote remote, Array &arr); |
| | | int DoSend1Remote(const Remote remote, ArrayList &arr); |
| | | |
| | | bool TooFast(); |
| | | |
| | | SharedMemory &shm_; |
| | | std::mutex mutex_in_; |
| | | std::mutex mutex_out_; |
| | | Store in_; |
| | | Store out_; |
| | | |
| | | int64_t count_ = 0; |
| | | int64_t last_time_ = 0; |
| | | struct Counter { |
| | | std::atomic<int64_t> count_; |
| | | std::atomic<int64_t> count_1sec_; |
| | | std::atomic<int64_t> last_time_; |
| | | Counter() : |
| | | count_(0), count_1sec_(0), last_time_(0) {} |
| | | void Count1() |
| | | { |
| | | CheckTime(); |
| | | ++count_1sec_; |
| | | ++count_; |
| | | } |
| | | void Count(int n) |
| | | { |
| | | CheckTime(); |
| | | count_1sec_ += n; |
| | | count_ += n; |
| | | } |
| | | void CheckTime() |
| | | { |
| | | auto cur = NowSec(); |
| | | if (cur > last_time_) { |
| | | count_1sec_ = 0; |
| | | last_time_ = cur; |
| | | } |
| | | } |
| | | int64_t GetCount() const { return count_.load(); } |
| | | int64_t LastSec() const { return count_1sec_.load(); } |
| | | }; |
| | | Counter count_in_; |
| | | Counter count_out_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SENDQ_IWKMSK7M |
| | |
| | | |
| | | } // namespace |
| | | |
| | | ShmMsgQueue::MQId ShmMsgQueue::NewId() |
| | | { |
| | | static auto &id = GetData("Must init shared memory before use! Please make sure center is running."); |
| | | return (++id) * 10; |
| | | } |
| | | |
| | | ShmMsgQueue::ShmMsgQueue(ShmType &segment, const MQId id, const int len) : |
| | | id_(id), |
| | | queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager()) |
| | |
| | | if (IsCmd(val)) { |
| | | LOG_DEBUG() << "clsing queue " << id << ", has a cmd" << DecodeCmd(val); |
| | | } else { |
| | | MsgI(val).Release(); |
| | | MsgI(val, shm).Release(); |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | #define BH_USE_ATOMIC_Q |
| | | |
| | | class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> |
| | | class ShmMsgQueue |
| | | { |
| | | public: |
| | | typedef int64_t RawData; |
| | |
| | | typedef Shmq::Data Queue; |
| | | typedef Shmq::ShmType ShmType; |
| | | typedef uint64_t MQId; |
| | | |
| | | static MQId NewId(); |
| | | |
| | | ShmMsgQueue(ShmType &segment, const MQId id, const int len); |
| | | ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len); |
| | |
| | | using namespace bhome_shm; |
| | | |
| | | ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : |
| | | run_(false), mq_(shm, id, len), alloc_id_(0) { Start(); } |
| | | run_(false), mq_(shm, id, len), alloc_id_(0), send_buffer_(shm) { Start(); } |
| | | ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) : |
| | | run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0) { Start(); } |
| | | run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0), send_buffer_(shm) { Start(); } |
| | | ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) : |
| | | run_(false), mq_(abs_addr, shm, id), alloc_id_(0) { Start(); } |
| | | run_(false), mq_(abs_addr, shm, id), alloc_id_(0), send_buffer_(shm) { Start(); } |
| | | |
| | | ShmSocket::~ShmSocket() { Stop(); } |
| | | |
| | | bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle) |
| | | { |
| | | auto ioProc = [this, onData, onRaw, onIdle]() { |
| | | auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; |
| | | auto DoSend = [this]() { return send_buffer_.TrySend(); }; |
| | | auto DoRecv = [=] { |
| | | // do not recv if no cb is set. |
| | | if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; } |
| | |
| | | if (IsCmd(val)) { |
| | | onCmdCB(*this, val); |
| | | } else { |
| | | MsgI imsg(val); |
| | | MsgI imsg(val, shm()); |
| | | DEFER1(imsg.Release()); |
| | | BHMsgHead head; |
| | | if (imsg.ParseHead(head)) { |
| | |
| | | while (run_) { ioProc(); } |
| | | // try send pending msgs. |
| | | auto end_time = steady_clock::now() + 3s; |
| | | while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) { |
| | | while (send_buffer_.TrySend() && steady_clock::now() < end_time) { |
| | | // LOG_DEBUG() << "try send pending msgs."; |
| | | } |
| | | }; |
| | |
| | | }; |
| | | #if 0 |
| | | // self alloc |
| | | MsgI msg; |
| | | MsgI msg(shm()); |
| | | if (msg.Make(size)) { |
| | | DEFER1(msg.Release()); |
| | | return OnResult(msg); |
| | |
| | | (id << 4) | |
| | | EncodeCmd(eCmdAllocRequest0); |
| | | auto rawCB = [onResult](ShmSocket &sock, int64_t &val) { |
| | | MsgI msg((val >> 32) & MaskBits(31)); |
| | | MsgI msg(((val >> 32) & MaskBits(31)), sock.shm()); |
| | | DEFER1(msg.Release()); |
| | | onResult(msg); |
| | | return true; |
| | |
| | | alloc_cbs_->Pick(id, cb_no_use); |
| | | }; |
| | | |
| | | return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB); |
| | | return Send(BHTopicCenterAddress(shm()), cmd, onExpireRemoveCB); |
| | | } |
| | |
| | | bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } |
| | | bool Stop(); |
| | | |
| | | template <class Body> |
| | | bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body) |
| | | { |
| | | try { |
| | | //TODO alloc outsiez and use send. |
| | | MsgI msg; |
| | | if (!msg.Make(head, body)) { return false; } |
| | | DEFER1(msg.Release()); |
| | | |
| | | return Send(remote, msg); |
| | | } catch (...) { |
| | | SetLastError(eError, "Send internal error."); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); |
| | | |
| | | template <class Body> |
| | |
| | | bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB()); |
| | | |
| | | template <class... Rest> |
| | | bool SendImpl(const MQInfo &remote, Rest &&...rest) |
| | | bool SendImpl(Rest &&...rest) |
| | | { |
| | | return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); |
| | | return send_buffer_.Append(std::forward<decltype(rest)>(rest)...); |
| | | } |
| | | |
| | | std::vector<std::thread> workers_; |
| | |
| | | |
| | | Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_; |
| | | Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_; |
| | | SendQ send_buffer_; |
| | | |
| | | // node request center alloc memory. |
| | | int node_proc_index_ = -1; |
| | | int socket_index_ = -1; |
| | | std::atomic<int> alloc_id_; |
| | | |
| | | SendQ send_buffer_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SHM_SOCKET_GWTJHBPO |
| | |
| | | } |
| | | |
| | | if (ssn_id_ == 0) { |
| | | ssn_id_ = ShmMsgQueue::NewId(); |
| | | ssn_id_ = NewSession(); |
| | | } |
| | | LOG_DEBUG() << "Node Init, id " << ssn_id_; |
| | | auto NodeInit = [&]() { |
| | | int64_t init_request = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit); |
| | | int64_t reply = 0; |
| | | if (BHNodeInit(init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) { |
| | | if (BHNodeInit(shm(), init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) { |
| | | int64_t abs_addr = reply >> 4; |
| | | sockets_.emplace_back(new ShmSocket(abs_addr, shm_, ssn_id_)); |
| | | LOG_DEBUG() << "node init ok"; |
| | |
| | | auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_); |
| | | AddRoute(head, socket); |
| | | if (imsg.Fill(head, body)) { |
| | | socket.Send(BHTopicCenterAddress(), imsg); |
| | | socket.Send(CenterAddr(), imsg); |
| | | } |
| | | } break; |
| | | case kMsgTypeProcInitReply: { |
| | |
| | | MsgCommonReply body; |
| | | CheckResult(imsg, head, body); |
| | | }; |
| | | return sock.Send(BHTopicCenterAddress(), head, body, onResult); |
| | | return sock.Send(CenterAddr(), head, body, onResult); |
| | | } else { |
| | | MsgI reply; |
| | | MsgI reply(shm()); |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); |
| | | if (r) { |
| | | CheckResult(reply, reply_head, reply_body); |
| | | } |
| | |
| | | MsgCommonReply body; |
| | | CheckResult(imsg, head, body); |
| | | }; |
| | | return sock.Send(BHTopicCenterAddress(), head, body, onResult); |
| | | return sock.Send(CenterAddr(), head, body, onResult); |
| | | } else { |
| | | MsgI reply; |
| | | MsgI reply(shm()); |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); |
| | | return r && CheckResult(reply, reply_head, reply_body); |
| | | } |
| | | } |
| | |
| | | AddRoute(head, sock); |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(BHTopicCenterAddress(), head, body); |
| | | return sock.Send(CenterAddr(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | MsgI reply(shm()); |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | return (r && IsSuccess(reply_body.errmsg().errcode())); |
| | | } |
| | |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); |
| | | AddRoute(head, sock); |
| | | |
| | | MsgI reply; |
| | | MsgI reply(shm()); |
| | | DEFER1(reply.Release()); |
| | | BHMsgHead reply_head; |
| | | return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && |
| | | return (sock.SendAndRecv(CenterAddr(), head, query, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeQueryTopicReply && |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); |
| | | AddRoute(head, sock); |
| | | |
| | | MsgI reply; |
| | | MsgI reply(shm()); |
| | | DEFER1(reply.Release()); |
| | | BHMsgHead reply_head; |
| | | return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && |
| | | return (sock.SendAndRecv(CenterAddr(), head, query, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeQueryProcReply && |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | |
| | | AddRoute(head, sock); |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(BHTopicCenterAddress(), head, body); |
| | | return sock.Send(CenterAddr(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | MsgI reply(shm()); |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply; |
| | | r = r && reply.ParseBody(reply_body); |
| | | return r; |
| | |
| | | AddRoute(head, sock); |
| | | head.set_topic(request.topic()); |
| | | |
| | | MsgI reply_msg; |
| | | MsgI reply_msg(shm()); |
| | | DEFER1(reply_msg.Release();); |
| | | BHMsgHead reply_head; |
| | | |
| | |
| | | AddRoute(head, sock); |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(BHTopicBusAddress(), head, pub); |
| | | return sock.Send(BusAddr(), head, pub); |
| | | } else { |
| | | MsgI reply; |
| | | MsgI reply(shm()); |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && |
| | | return sock.SendAndRecv(BusAddr(), head, pub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | | IsSuccess(reply_body.errmsg().errcode()); |
| | |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn())); |
| | | AddRoute(head, sock); |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(BHTopicBusAddress(), head, sub); |
| | | return sock.Send(BusAddr(), head, sub); |
| | | } else { |
| | | MsgI reply; |
| | | MsgI reply(shm()); |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | return sock.SendAndRecv(BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && |
| | | return sock.SendAndRecv(BusAddr(), head, sub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | | IsSuccess(reply_body.errmsg().errcode()); |
| | |
| | | SharedMemory &shm_; |
| | | ProcInfo info_; |
| | | |
| | | SharedMemory &shm() { return shm_; } |
| | | SharedMemory &shm() const { return shm_; } |
| | | const MQInfo &CenterAddr() const { return BHTopicCenterAddress(shm()); } |
| | | const MQInfo &BusAddr() const { return BHTopicBusAddress(shm()); } |
| | | |
| | | public: |
| | | TopicNode(SharedMemory &shm); |
| | |
| | | { |
| | | SharedMemory &shm = TestShm(); |
| | | GlobalInit(shm); |
| | | ShmMsgQueue q(shm, ShmMsgQueue::NewId(), 64); |
| | | ShmMsgQueue q(shm, NewSession(), 64); |
| | | for (int i = 0; i < 2; ++i) { |
| | | int ms = i * 100; |
| | | printf("Timeout Test %4d: ", ms); |
| | | boost::timer::auto_cpu_timer timer; |
| | | MsgI msg; |
| | | MsgI msg(shm); |
| | | bool r = q.Recv(msg, ms); |
| | | BOOST_CHECK(!r); |
| | | } |
| | |
| | | typedef MsgI Msg; |
| | | GlobalInit(shm); |
| | | |
| | | Msg m0(1000); |
| | | Msg m0(1000, shm); |
| | | BOOST_CHECK(m0.valid()); |
| | | BOOST_CHECK_EQUAL(m0.Count(), 1); |
| | | Msg m1 = m0; |
| | |
| | | { |
| | | SharedMemory &shm = TestShm(); |
| | | GlobalInit(shm); |
| | | MQId server_id = ShmMsgQueue::NewId(); |
| | | MQId server_id = NewSession(); |
| | | ShmMsgQueue server(server_id, shm, 1000); |
| | | |
| | | const int timeout = 1000; |
| | |
| | | |
| | | std::string str(data_size, 'a'); |
| | | auto Writer = [&](int writer_id, uint64_t n) { |
| | | MQId cli_id = ShmMsgQueue::NewId(); |
| | | MQId cli_id = NewSession(); |
| | | |
| | | ShmMsgQueue mq(cli_id, shm, 64); |
| | | MsgI msg; |
| | | MsgI msg(shm); |
| | | MsgRequestTopic body; |
| | | body.set_topic("topic"); |
| | | body.set_data(str); |
| | |
| | | auto now = []() { return steady_clock::now(); }; |
| | | auto tm = now(); |
| | | while (*run) { |
| | | MsgI msg; |
| | | MsgI msg(shm); |
| | | BHMsgHead head; |
| | | if (mq.TryRecv(msg)) { |
| | | DEFER1(msg.Release()); |
| | |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | | ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen); |
| | | ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen); |
| | | ShmSocket srv(shm, NewSession(), qlen); |
| | | ShmSocket cli(shm, NewSession(), qlen); |
| | | |
| | | int ncli = 1; |
| | | uint64_t nmsg = 1000 * 1000 * 1; |