From 1ff714838c03cba1a18884d5b48a20ee6c4275ac Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期五, 21 五月 2021 15:00:53 +0800 Subject: [PATCH] class MsgI, ShmMsgQueue, no bind to shm. --- utest/simple_tests.cpp | 6 box/center.cpp | 13 +- src/msg.h | 37 ++--- src/shm_msg_queue.h | 4 src/defs.h | 23 ++- src/sendq.cpp | 28 +--- box/node_center.cpp | 17 +- src/topic_node.cpp | 52 ++++---- src/shm_msg_queue.cpp | 8 - src/defs.cpp | 52 +++++-- src/msg.cpp | 9 - utest/speed_test.cpp | 12 +- src/shm_socket.cpp | 18 +- src/shm_socket.h | 23 --- src/topic_node.h | 4 box/node_center.h | 4 src/sendq.h | 48 ++++++- 17 files changed, 184 insertions(+), 174 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index e77c38f..53c1f42 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -65,7 +65,7 @@ return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; - MsgI msg; + MsgI msg(socket.shm()); if (msg.Make(reply_head, rep_body)) { DEFER1(msg.Release();); center->SendAllocMsg(socket, remote, msg); @@ -73,7 +73,7 @@ }; } -bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) +bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm) { // command auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { @@ -87,7 +87,7 @@ auto onInit = [&](const int64_t request) { return center->OnNodeInit(socket, request); }; - BHCenterHandleInit(onInit); + BHCenterHandleInit(socket.shm(), onInit); center->OnTimer(); }; @@ -106,7 +106,7 @@ default: return false; } }; - BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); + BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; @@ -142,7 +142,7 @@ } }; - BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); + BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000); return true; } @@ -167,7 +167,7 @@ { auto nsec = NodeTimeoutSec(); auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs. - AddCenter(center_ptr); + AddCenter(center_ptr, shm); for (auto &kv : Centers()) { auto &info = kv.second; @@ -176,6 +176,7 @@ topic_node_.reset(new CenterTopicNode(center_ptr, shm)); } + BHCenter::~BHCenter() { Stop(); } bool BHCenter::Start() diff --git a/box/node_center.cpp b/box/node_center.cpp index 4e228a7..cbaef0e 100644 --- a/box/node_center.cpp +++ b/box/node_center.cpp @@ -57,7 +57,7 @@ { auto pos = msgs_.find(id); if (pos != msgs_.end()) { - ShmMsg(pos->second).Free(); + pos->second.Free(); msgs_.erase(pos); } else { LOG_TRACE() << "ignore late free request."; @@ -101,9 +101,9 @@ int i = 0; int total_count = 0; for (auto &kv : msgs_) { - MsgI msg(kv.second); + auto &msg = kv.second; total_count += msg.Count(); - LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); + LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size(); } LOG_TRACE() << "total count: " << total_count; } @@ -173,7 +173,7 @@ auto PrepareProcInit = [&](Node &node) { bool r = false; - ShmMsg init_msg; + ShmMsg init_msg(shm); DEFER1(init_msg.Release()); MsgProcInit body; auto head = InitMsgHead(GetType(body), id(), ssn); @@ -238,7 +238,7 @@ if (!FindMq()) { return; } auto size = GetAllocSize((val >> 52) & MaskBits(8)); - MsgI new_msg; + MsgI new_msg(socket.shm()); if (new_msg.Make(size)) { // 31bit proc index, 28bit id, ,4bit cmd+flag int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); @@ -612,18 +612,15 @@ pub.set_topic(topic); pub.set_data(content); BHMsgHead head(InitMsgHead(GetType(pub), id(), 0)); - MsgI msg; + MsgI msg(shm); if (msg.Make(head, pub)) { DEFER1(msg.Release()); RecordMsg(msg); - auto &mq = GetCenterInfo(shm)->mq_sender_; - ShmSocket sender(mq.offset_, shm, mq.id_); - for (auto &cli : clients) { auto node = cli.weak_node_.lock(); if (node && node->state_.flag_ == kStateNormal) { - sender.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); + DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg); } } } diff --git a/box/node_center.h b/box/node_center.h index ca16cc5..caaf054 100644 --- a/box/node_center.h +++ b/box/node_center.h @@ -51,14 +51,14 @@ typedef int64_t Offset; public: - void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } + void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); } void FreeMsg(MsgId id); void AutoRemove(); size_t size() const { return msgs_.size(); } void DebugPrint() const; private: - std::unordered_map<MsgId, Offset> msgs_; + std::unordered_map<MsgId, MsgI> msgs_; int64_t time_to_clean_ = 0; }; diff --git a/src/defs.cpp b/src/defs.cpp index 2715911..a2f05cc 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -90,7 +90,7 @@ } // namespace -CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm) +CenterInfo *GetCenterInfo(SharedMemory &shm) { auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address())); if (pmeta->tag_ == kCenterInfoTag) { @@ -98,11 +98,33 @@ } return nullptr; } +ShmSocket &DefaultSender(SharedMemory &shm) +{ + typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair; + static std::vector<Pair> store; + static std::mutex s_mtx; + thread_local Pair local_cache; + if (local_cache.first == &shm) { + return *local_cache.second; + } + + std::lock_guard<std::mutex> lk(s_mtx); + for (auto &kv : store) { + if (kv.first == &shm) { + local_cache = kv; + return *local_cache.second; + } + } + auto &mq = GetCenterInfo(shm)->mq_sender_; + store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_)); + local_cache = store.back(); + return *local_cache.second; +} // put center info at fixed memory position. // as boost shm find object (find socket/mq by id, etc...) also locks inside, // which node might crash inside and cause deadlock. -bool CenterInit(bhome_shm::SharedMemory &shm) +bool CenterInit(SharedMemory &shm) { Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock"); if (!mutex || !mutex->try_lock()) { @@ -140,16 +162,15 @@ return false; } -const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; } -const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; } -const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; } -bool BHNodeInit(const int64_t request, int64_t &reply) +const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; } +const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; } +bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply) { - return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply); + return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply); } -void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq) +void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq) { - GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq); + GetCenterInfo(shm)->init_rr_.ServerProcess(onReq); } int64_t CalcAllocIndex(int64_t size) @@ -164,18 +185,15 @@ { return "bhome_default_shm_v0"; } -bhome_shm::SharedMemory &BHomeShm() +SharedMemory &BHomeShm() { - static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); + static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512); return shm; } -bool GlobalInit(bhome_shm::SharedMemory &shm) -{ - MsgI::BindShm(shm); - CenterInfo *pinfo = GetCenterInfo(shm); - return pinfo && ShmMsgQueue::SetData(pinfo->mqid_); -} +bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); } + +MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); } void SetLastError(const int ec, const std::string &msg) { diff --git a/src/defs.h b/src/defs.h index 51040e6..b117579 100644 --- a/src/defs.h +++ b/src/defs.h @@ -23,6 +23,7 @@ #include <atomic> #include <string> +class ShmSocket; typedef uint64_t MQId; int64_t CalcAllocIndex(int64_t size); @@ -50,21 +51,25 @@ class SharedMemory; } // namespace bhome_shm +using bhome_shm::SharedMemory; + std::string BHomeShmName(); -bhome_shm::SharedMemory &BHomeShm(); -CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm); -bool CenterInit(bhome_shm::SharedMemory &shm); -bool GlobalInit(bhome_shm::SharedMemory &shm); +SharedMemory &BHomeShm(); +CenterInfo *GetCenterInfo(SharedMemory &shm); +ShmSocket &DefaultSender(SharedMemory &shm); + +MQId NewSession(); +bool CenterInit(SharedMemory &shm); +bool GlobalInit(SharedMemory &shm); typedef std::string Topic; void SetLastError(const int ec, const std::string &msg); void GetLastError(int &ec, std::string &msg); //TODO center can check shm for previous crash. -const MQInfo &BHGlobalSenderAddress(); -const MQInfo &BHTopicCenterAddress(); -const MQInfo &BHTopicBusAddress(); -bool BHNodeInit(const int64_t request, int64_t &reply); -void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq); +const MQInfo &BHTopicCenterAddress(SharedMemory &shm); +const MQInfo &BHTopicBusAddress(SharedMemory &shm); +bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply); +void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq); // node mq is avail with in timeout; after that may get killed. int NodeTimeoutSec(); diff --git a/src/msg.cpp b/src/msg.cpp index dca2044..40a7b0d 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -23,13 +23,6 @@ namespace bhome_msg { -ShmSocket &ShmMsg::Sender() -{ - static auto &mq = GetCenterInfo(shm())->mq_sender_; - static ShmSocket sender(mq.offset_, shm(), mq.id_); - return sender; -} - int ShmMsg::Release() { if (!valid()) { @@ -39,7 +32,7 @@ if (n == 0) { if (meta()->managed_) { int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree); - Sender().Send(BHTopicCenterAddress(), free_cmd); + DefaultSender(shm()).Send(BHTopicCenterAddress(shm()), free_cmd); } else { Free(); } diff --git a/src/msg.h b/src/msg.h index 1ac153a..12922b5 100644 --- a/src/msg.h +++ b/src/msg.h @@ -34,14 +34,9 @@ // ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required. // message content layout: (meta) / header_size + header + data_size + data -class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg> +class ShmMsg { -public: - static inline SharedMemory &shm() { return GetData(); } - private: - static ShmSocket &Sender(); - // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. class RefCount : private boost::noncopyable { @@ -58,11 +53,7 @@ typedef int64_t OffsetType; static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); } static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); } - static inline OffsetType BaseAddr() - { - static const OffsetType base = Addr(shm().get_address()); // cache value. - return base; - } + OffsetType BaseAddr() const { return Addr(shm().get_address()); } static const uint32_t kMsgTag = 0xf1e2d3c4; struct Meta { @@ -83,7 +74,8 @@ capacity_(size), id_(NewId()), timestamp_(NowSec()) {} }; OffsetType offset_; - static void *Alloc(const size_t size) + SharedMemory *pshm_; + void *Alloc(const size_t size) { void *p = shm().Alloc(sizeof(Meta) + size); if (p) { @@ -132,24 +124,27 @@ if (!addr) { return false; } - ShmMsg(addr).swap(*this); + offset_ = Addr(addr) - BaseAddr(); return true; } - ShmMsg(void *p) : - offset_(p ? (Addr(p) - BaseAddr()) : 0) {} template <class T = void> T *get() const { return offset_ != 0 ? static_cast<T *>(Ptr(offset_ + BaseAddr())) : nullptr; } public: - static bool BindShm(SharedMemory &shm) { return SetData(shm); } - ShmMsg() : - offset_(0) {} - explicit ShmMsg(const OffsetType offset) : - offset_(offset) {} + explicit ShmMsg(SharedMemory &shm) : + offset_(0), pshm_(&shm) {} + ShmMsg(const OffsetType offset, SharedMemory &shm) : + offset_(offset), pshm_(&shm) {} OffsetType Offset() const { return offset_; } OffsetType &OffsetRef() { return offset_; } - void swap(ShmMsg &a) { std::swap(offset_, a.offset_); } + SharedMemory &shm() const { return *pshm_; } + + void swap(ShmMsg &a) + { + std::swap(offset_, a.offset_); + std::swap(pshm_, a.pshm_); + } bool valid() const { return offset_ != 0 && meta()->tag_ == kMsgTag; } int64_t id() const { return valid() ? meta()->id_ : 0; } int64_t timestamp() const { return valid() ? meta()->timestamp_.load() : 0; } diff --git a/src/sendq.cpp b/src/sendq.cpp index f1e5918..2a772b0 100644 --- a/src/sendq.cpp +++ b/src/sendq.cpp @@ -33,13 +33,14 @@ } else { al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); } + count_in_.Count1(); } catch (std::exception &e) { LOG_ERROR() << "sendq error: " << e.what(); throw e; } } -int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr) +int SendQ::DoSend1Remote(const Remote remote, Array &arr) { auto FirstNotExpired = [](Array &l) { auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; @@ -53,8 +54,8 @@ info.on_expire_(info.data_); } } - - while (pos != arr.end() && mq.TrySend(pos->data().mq_, pos->data().data_)) { + auto TrySend1 = [this](MsgInfo const &info) { return ShmMsgQueue::TrySend(shm_, info.mq_, info.data_); }; + while (pos != arr.end() && TrySend1(pos->data())) { ++pos; } @@ -63,27 +64,26 @@ return nprocessed; } -int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al) +int SendQ::DoSend1Remote(const Remote remote, ArrayList &al) { int nsend = 0; auto AllSent = [&](Array &arr) { - nsend += DoSend1Remote(mq, remote, arr); + nsend += DoSend1Remote(remote, arr); return arr.empty(); }; for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {} return nsend; } -bool SendQ::TrySend(ShmMsgQueue &mq) +bool SendQ::TrySend() { std::unique_lock<std::mutex> lock(mutex_out_); - // if (TooFast()) { return false; } size_t nsend = 0; if (!out_.empty()) { auto rec = out_.begin(); do { - nsend += DoSend1Remote(mq, rec->first, rec->second); + nsend += DoSend1Remote(rec->first, rec->second); if (rec->second.empty()) { rec = out_.erase(rec); } else { @@ -91,6 +91,7 @@ } } while (rec != out_.end()); } + count_out_.Count(nsend); auto Collect = [&]() { std::unique_lock<std::mutex> lock(mutex_in_); @@ -109,14 +110,3 @@ return !out_.empty(); } - -bool SendQ::TooFast() -{ - auto cur = NowSec(); - if (cur > last_time_) { - last_time_ = cur; - count_ = 0; - } - - return ++count_ > 1000 * 100; -} // not accurate in multi-thread. \ No newline at end of file diff --git a/src/sendq.h b/src/sendq.h index 759e12a..ec63b05 100644 --- a/src/sendq.h +++ b/src/sendq.h @@ -46,11 +46,13 @@ typedef TimedData<MsgInfo> TimedMsg; typedef TimedMsg::TimePoint TimePoint; typedef TimedMsg::Duration Duration; + SendQ(SharedMemory &shm) : + shm_(shm) {} bool Append(const MQInfo &mq, MsgI msg) { msg.AddRef(); - auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); }; + auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); }; try { AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); return true; @@ -63,9 +65,9 @@ bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire) { msg.AddRef(); - auto onMsgExpire = [onExpire](const Data &d) { + auto onMsgExpire = [onExpire, msg](const Data &d) mutable { onExpire(d); - MsgI(d).Release(); + msg.Release(); }; try { AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); @@ -85,7 +87,7 @@ return false; } } - bool TrySend(ShmMsgQueue &mq); + bool TrySend(); private: static TimePoint Now() { return TimedMsg::Clock::now(); } @@ -96,18 +98,48 @@ typedef std::list<Array> ArrayList; typedef std::unordered_map<Remote, ArrayList> Store; - int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr); - int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr); + int DoSend1Remote(const Remote remote, Array &arr); + int DoSend1Remote(const Remote remote, ArrayList &arr); bool TooFast(); + SharedMemory &shm_; std::mutex mutex_in_; std::mutex mutex_out_; Store in_; Store out_; - int64_t count_ = 0; - int64_t last_time_ = 0; + struct Counter { + std::atomic<int64_t> count_; + std::atomic<int64_t> count_1sec_; + std::atomic<int64_t> last_time_; + Counter() : + count_(0), count_1sec_(0), last_time_(0) {} + void Count1() + { + CheckTime(); + ++count_1sec_; + ++count_; + } + void Count(int n) + { + CheckTime(); + count_1sec_ += n; + count_ += n; + } + void CheckTime() + { + auto cur = NowSec(); + if (cur > last_time_) { + count_1sec_ = 0; + last_time_ = cur; + } + } + int64_t GetCount() const { return count_.load(); } + int64_t LastSec() const { return count_1sec_.load(); } + }; + Counter count_in_; + Counter count_out_; }; #endif // end of include guard: SENDQ_IWKMSK7M diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp index 1d78e8c..be2d2a2 100644 --- a/src/shm_msg_queue.cpp +++ b/src/shm_msg_queue.cpp @@ -31,12 +31,6 @@ } // namespace -ShmMsgQueue::MQId ShmMsgQueue::NewId() -{ - static auto &id = GetData("Must init shared memory before use! Please make sure center is running."); - return (++id) * 10; -} - ShmMsgQueue::ShmMsgQueue(ShmType &segment, const MQId id, const int len) : id_(id), queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager()) @@ -84,7 +78,7 @@ if (IsCmd(val)) { LOG_DEBUG() << "clsing queue " << id << ", has a cmd" << DecodeCmd(val); } else { - MsgI(val).Release(); + MsgI(val, shm).Release(); } } } diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h index de60fde..6d922aa 100644 --- a/src/shm_msg_queue.h +++ b/src/shm_msg_queue.h @@ -27,7 +27,7 @@ #define BH_USE_ATOMIC_Q -class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> +class ShmMsgQueue { public: typedef int64_t RawData; @@ -45,8 +45,6 @@ typedef Shmq::Data Queue; typedef Shmq::ShmType ShmType; typedef uint64_t MQId; - - static MQId NewId(); ShmMsgQueue(ShmType &segment, const MQId id, const int len); ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len); diff --git a/src/shm_socket.cpp b/src/shm_socket.cpp index f177b87..11824d7 100644 --- a/src/shm_socket.cpp +++ b/src/shm_socket.cpp @@ -30,18 +30,18 @@ using namespace bhome_shm; ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : - run_(false), mq_(shm, id, len), alloc_id_(0) { Start(); } + run_(false), mq_(shm, id, len), alloc_id_(0), send_buffer_(shm) { Start(); } ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) : - run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0) { Start(); } + run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0), send_buffer_(shm) { Start(); } ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) : - run_(false), mq_(abs_addr, shm, id), alloc_id_(0) { Start(); } + run_(false), mq_(abs_addr, shm, id), alloc_id_(0), send_buffer_(shm) { Start(); } ShmSocket::~ShmSocket() { Stop(); } bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle) { auto ioProc = [this, onData, onRaw, onIdle]() { - auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; + auto DoSend = [this]() { return send_buffer_.TrySend(); }; auto DoRecv = [=] { // do not recv if no cb is set. if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; } @@ -73,7 +73,7 @@ if (IsCmd(val)) { onCmdCB(*this, val); } else { - MsgI imsg(val); + MsgI imsg(val, shm()); DEFER1(imsg.Release()); BHMsgHead head; if (imsg.ParseHead(head)) { @@ -113,7 +113,7 @@ while (run_) { ioProc(); } // try send pending msgs. auto end_time = steady_clock::now() + 3s; - while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) { + while (send_buffer_.TrySend() && steady_clock::now() < end_time) { // LOG_DEBUG() << "try send pending msgs."; } }; @@ -170,7 +170,7 @@ }; #if 0 // self alloc - MsgI msg; + MsgI msg(shm()); if (msg.Make(size)) { DEFER1(msg.Release()); return OnResult(msg); @@ -194,7 +194,7 @@ (id << 4) | EncodeCmd(eCmdAllocRequest0); auto rawCB = [onResult](ShmSocket &sock, int64_t &val) { - MsgI msg((val >> 32) & MaskBits(31)); + MsgI msg(((val >> 32) & MaskBits(31)), sock.shm()); DEFER1(msg.Release()); onResult(msg); return true; @@ -206,5 +206,5 @@ alloc_cbs_->Pick(id, cb_no_use); }; - return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB); + return Send(BHTopicCenterAddress(shm()), cmd, onExpireRemoveCB); } \ No newline at end of file diff --git a/src/shm_socket.h b/src/shm_socket.h index 02500b2..bf78e89 100644 --- a/src/shm_socket.h +++ b/src/shm_socket.h @@ -66,22 +66,6 @@ bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } bool Stop(); - template <class Body> - bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body) - { - try { - //TODO alloc outsiez and use send. - MsgI msg; - if (!msg.Make(head, body)) { return false; } - DEFER1(msg.Release()); - - return Send(remote, msg); - } catch (...) { - SetLastError(eError, "Send internal error."); - return false; - } - } - bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); template <class Body> @@ -155,9 +139,9 @@ bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB()); template <class... Rest> - bool SendImpl(const MQInfo &remote, Rest &&...rest) + bool SendImpl(Rest &&...rest) { - return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); + return send_buffer_.Append(std::forward<decltype(rest)>(rest)...); } std::vector<std::thread> workers_; @@ -188,12 +172,13 @@ Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_; Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_; - SendQ send_buffer_; // node request center alloc memory. int node_proc_index_ = -1; int socket_index_ = -1; std::atomic<int> alloc_id_; + + SendQ send_buffer_; }; #endif // end of include guard: SHM_SOCKET_GWTJHBPO diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 6ed7713..fce7ed6 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -66,13 +66,13 @@ } if (ssn_id_ == 0) { - ssn_id_ = ShmMsgQueue::NewId(); + ssn_id_ = NewSession(); } LOG_DEBUG() << "Node Init, id " << ssn_id_; auto NodeInit = [&]() { int64_t init_request = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit); int64_t reply = 0; - if (BHNodeInit(init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) { + if (BHNodeInit(shm(), init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) { int64_t abs_addr = reply >> 4; sockets_.emplace_back(new ShmSocket(abs_addr, shm_, ssn_id_)); LOG_DEBUG() << "node init ok"; @@ -94,7 +94,7 @@ auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_); AddRoute(head, socket); if (imsg.Fill(head, body)) { - socket.Send(BHTopicCenterAddress(), imsg); + socket.Send(CenterAddr(), imsg); } } break; case kMsgTypeProcInitReply: { @@ -187,12 +187,12 @@ MsgCommonReply body; CheckResult(imsg, head, body); }; - return sock.Send(BHTopicCenterAddress(), head, body, onResult); + return sock.Send(CenterAddr(), head, body, onResult); } else { - MsgI reply; + MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; - bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); if (r) { CheckResult(reply, reply_head, reply_body); } @@ -228,12 +228,12 @@ MsgCommonReply body; CheckResult(imsg, head, body); }; - return sock.Send(BHTopicCenterAddress(), head, body, onResult); + return sock.Send(CenterAddr(), head, body, onResult); } else { - MsgI reply; + MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; - bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); return r && CheckResult(reply, reply_head, reply_body); } } @@ -253,12 +253,12 @@ AddRoute(head, sock); if (timeout_ms == 0) { - return sock.Send(BHTopicCenterAddress(), head, body); + return sock.Send(CenterAddr(), head, body); } else { - MsgI reply; + MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; - bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); return (r && IsSuccess(reply_body.errmsg().errcode())); } @@ -282,10 +282,10 @@ BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); AddRoute(head, sock); - MsgI reply; + MsgI reply(shm()); DEFER1(reply.Release()); BHMsgHead reply_head; - return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && + return (sock.SendAndRecv(CenterAddr(), head, query, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeQueryTopicReply && reply.ParseBody(reply_body)); } @@ -301,10 +301,10 @@ BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); AddRoute(head, sock); - MsgI reply; + MsgI reply(shm()); DEFER1(reply.Release()); BHMsgHead reply_head; - return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && + return (sock.SendAndRecv(CenterAddr(), head, query, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeQueryProcReply && reply.ParseBody(reply_body)); } @@ -324,12 +324,12 @@ AddRoute(head, sock); if (timeout_ms == 0) { - return sock.Send(BHTopicCenterAddress(), head, body); + return sock.Send(CenterAddr(), head, body); } else { - MsgI reply; + MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; - bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply; r = r && reply.ParseBody(reply_body); return r; @@ -525,7 +525,7 @@ AddRoute(head, sock); head.set_topic(request.topic()); - MsgI reply_msg; + MsgI reply_msg(shm()); DEFER1(reply_msg.Release();); BHMsgHead reply_head; @@ -596,13 +596,13 @@ AddRoute(head, sock); if (timeout_ms == 0) { - return sock.Send(BHTopicBusAddress(), head, pub); + return sock.Send(BusAddr(), head, pub); } else { - MsgI reply; + MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; MsgCommonReply reply_body; - return sock.SendAndRecv(BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && + return sock.SendAndRecv(BusAddr(), head, pub, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body) && IsSuccess(reply_body.errmsg().errcode()); @@ -629,12 +629,12 @@ BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn())); AddRoute(head, sock); if (timeout_ms == 0) { - return sock.Send(BHTopicBusAddress(), head, sub); + return sock.Send(BusAddr(), head, sub); } else { - MsgI reply; + MsgI reply(shm()); DEFER1(reply.Release();); BHMsgHead reply_head; - return sock.SendAndRecv(BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && + return sock.SendAndRecv(BusAddr(), head, sub, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body) && IsSuccess(reply_body.errmsg().errcode()); diff --git a/src/topic_node.h b/src/topic_node.h index dcc9518..c115010 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -34,7 +34,9 @@ SharedMemory &shm_; ProcInfo info_; - SharedMemory &shm() { return shm_; } + SharedMemory &shm() const { return shm_; } + const MQInfo &CenterAddr() const { return BHTopicCenterAddress(shm()); } + const MQInfo &BusAddr() const { return BHTopicBusAddress(shm()); } public: TopicNode(SharedMemory &shm); diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp index e1f1d2f..e9131b0 100644 --- a/utest/simple_tests.cpp +++ b/utest/simple_tests.cpp @@ -108,12 +108,12 @@ { SharedMemory &shm = TestShm(); GlobalInit(shm); - ShmMsgQueue q(shm, ShmMsgQueue::NewId(), 64); + ShmMsgQueue q(shm, NewSession(), 64); for (int i = 0; i < 2; ++i) { int ms = i * 100; printf("Timeout Test %4d: ", ms); boost::timer::auto_cpu_timer timer; - MsgI msg; + MsgI msg(shm); bool r = q.Recv(msg, ms); BOOST_CHECK(!r); } @@ -125,7 +125,7 @@ typedef MsgI Msg; GlobalInit(shm); - Msg m0(1000); + Msg m0(1000, shm); BOOST_CHECK(m0.valid()); BOOST_CHECK_EQUAL(m0.Count(), 1); Msg m1 = m0; diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index ef56678..f33f0db 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -24,7 +24,7 @@ { SharedMemory &shm = TestShm(); GlobalInit(shm); - MQId server_id = ShmMsgQueue::NewId(); + MQId server_id = NewSession(); ShmMsgQueue server(server_id, shm, 1000); const int timeout = 1000; @@ -35,10 +35,10 @@ std::string str(data_size, 'a'); auto Writer = [&](int writer_id, uint64_t n) { - MQId cli_id = ShmMsgQueue::NewId(); + MQId cli_id = NewSession(); ShmMsgQueue mq(cli_id, shm, 64); - MsgI msg; + MsgI msg(shm); MsgRequestTopic body; body.set_topic("topic"); body.set_data(str); @@ -58,7 +58,7 @@ auto now = []() { return steady_clock::now(); }; auto tm = now(); while (*run) { - MsgI msg; + MsgI msg(shm); BHMsgHead head; if (mq.TryRecv(msg)) { DEFER1(msg.Release()); @@ -149,8 +149,8 @@ auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); - ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen); - ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen); + ShmSocket srv(shm, NewSession(), qlen); + ShmSocket cli(shm, NewSession(), qlen); int ncli = 1; uint64_t nmsg = 1000 * 1000 * 1; -- Gitblit v1.8.0