From cab831748a2a9cc18b7f18f3b5e14a4374b7ab68 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 17 五月 2021 18:34:26 +0800 Subject: [PATCH] socket send using abs addr, avoid shm find by id. --- src/socket.h | 44 +--- box/center.cpp | 103 ++++++----- .vscode/settings.json | 11 + src/shm_msg_queue.h | 5 src/socket.cpp | 40 +++ src/defs.h | 18 +- box/center.h | 4 src/sendq.cpp | 33 +++ src/topic_node.cpp | 100 ++++++---- src/shm_msg_queue.cpp | 6 src/defs.cpp | 8 utest/speed_test.cpp | 27 +- proto/source/bhome_msg_api.proto | 6 utest/api_test.cpp | 14 + src/bh_util.h | 4 src/topic_node.h | 5 src/sendq.h | 48 +++-- src/bh_api.cpp | 31 ++- 18 files changed, 307 insertions(+), 200 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index c0b9587..df60df7 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -61,7 +61,16 @@ "strstream": "cpp", "unordered_set": "cpp", "cfenv": "cpp", - "*.ipp": "cpp" + "*.ipp": "cpp", + "cassert": "cpp", + "cerrno": "cpp", + "cfloat": "cpp", + "ciso646": "cpp", + "climits": "cpp", + "ios": "cpp", + "locale": "cpp", + "queue": "cpp", + "random": "cpp" }, "files.exclude": { "**/*.un~": true, diff --git a/box/center.cpp b/box/center.cpp index 7d51f2f..2f244b4 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -105,17 +105,18 @@ auto it = msgs_.begin(); while (it != msgs_.end() && --limit > 0) { ShmMsg msg(it->second); - if (msg.Count() == 0) { + auto Free = [&]() { msg.Free(); it = msgs_.erase(it); ++n; - } else if (msg.timestamp() + 60 < NowSec()) { - msg.Free(); - it = msgs_.erase(it); - ++n; - // LOG_DEBUG() << "release timeout msg, someone crashed."; - } else { + }; + int n = now - msg.timestamp(); + if (n < 10) { ++it; + } else if (msg.Count() == 0) { + Free(); + } else if (n > 60) { + Free(); } } if (n > 0) { @@ -181,22 +182,24 @@ typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; struct NodeInfo { - ProcState state_; // state - std::set<Address> addrs_; // registered mqs - ProcInfo proc_; // - AddressTopics services_; // address: topics - AddressTopics subscriptions_; // address: topics + ProcState state_; // state + std::map<MQId, int64_t> addrs_; // registered mqs + ProcInfo proc_; // + AddressTopics services_; // address: topics + AddressTopics subscriptions_; // address: topics }; typedef std::shared_ptr<NodeInfo> Node; typedef std::weak_ptr<NodeInfo> WeakNode; struct TopicDest { - Address mq_; + MQId mq_id_; + int64_t mq_abs_addr_; WeakNode weak_node_; - bool operator<(const TopicDest &a) const { return mq_ < a.mq_; } + bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; } }; inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } - inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } + inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); } + inline bool MatchAddr(std::map<Address, int64_t> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) : id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {} @@ -218,39 +221,38 @@ return; // ignore in exists. } auto UpdateRegInfo = [&](Node &node) { - for (int i = 0; i < 10; ++i) { - node->addrs_.insert(ssn + i); - } node->state_.timestamp_ = NowSec() - offline_time_; node->state_.UpdateState(NowSec(), offline_time_, kill_time_); // create sockets. + const int nsocks = 4; try { - auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); }; - // alloc(-1), node, server, sub, request, - for (int i = 0; i < 4; ++i) { - CreateSocket(ssn + i); - node->addrs_.insert(ssn + i); + for (int i = 0; i < nsocks; ++i) { + ShmSocket tmp(shm, true, ssn + i, 16); + node->addrs_.emplace(ssn + i, tmp.AbsAddr()); } return true; } catch (...) { + for (int i = 0; i < nsocks; ++i) { + ShmSocket::Remove(shm, ssn + i); + } return false; } }; - auto PrepareProcInit = [&]() { + auto PrepareProcInit = [&](Node &node) { bool r = false; ShmMsg init_msg; if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) { // 31bit pointer, 4bit cmd+flag int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply); - r = SendAllocReply(socket, ssn, reply, init_msg); + r = SendAllocReply(socket, {ssn, node->addrs_[ssn]}, reply, init_msg); } return r; }; Node node(new NodeInfo); - if (UpdateRegInfo(node) && PrepareProcInit()) { + if (UpdateRegInfo(node) && PrepareProcInit(node)) { nodes_[ssn] = node; LOG_INFO() << "new node ssn (" << ssn << ") init"; } else { @@ -261,13 +263,13 @@ } void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); } - bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg) + bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg) { RecordMsg(msg); auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; return socket.Send(dest, reply, onExpireFree); } - bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg) + bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg) { RecordMsg(msg); return socket.Send(dest, msg); @@ -284,7 +286,21 @@ if (proc_rec.proc_.empty()) { return; } - Address dest = proc_rec.ssn_ + socket_index; + + MQInfo dest = {proc_rec.ssn_ + socket_index, 0}; + auto FindMq = [&]() { + auto pos = nodes_.find(proc_rec.ssn_); + if (pos != nodes_.end()) { + for (auto &&mq : pos->second->addrs_) { + if (mq.first == dest.id_) { + dest.offset_ = mq.second; + return true; + } + } + } + return false; + }; + if (!FindMq()) { return; } auto size = GetAllocSize((val >> 52) & MaskBits(8)); MsgI new_msg; @@ -337,10 +353,6 @@ // when node restart, ssn will change, // and old node will be removed after timeout. auto UpdateRegInfo = [&](Node &node) { - node->addrs_.insert(SrcAddr(head)); - for (auto &addr : msg.addrs()) { - node->addrs_.insert(addr.mq_id()); - } node->proc_.Swap(msg.mutable_proc()); node->state_.timestamp_ = head.timestamp(); node->state_.UpdateState(NowSec(), offline_time_, kill_time_); @@ -420,11 +432,11 @@ auto src = SrcAddr(head); auto &topics = msg.topics().topic_list(); node->services_[src].insert(topics.begin(), topics.end()); - TopicDest dest = {src, node}; + TopicDest dest = {src, SrcAbsAddr(head), node}; for (auto &topic : topics) { service_map_[topic].insert(dest); } - LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << *node->addrs_.begin() << " serve " << topics.size() << " topics:\n"; + LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << node->addrs_.begin()->first << " serve " << topics.size() << " topics:\n"; for (auto &topic : topics) { LOG_DEBUG() << "\t" << topic; } @@ -464,7 +476,8 @@ if (dest_node && Valid(*dest_node)) { auto node_addr = reply.add_node_address(); node_addr->set_proc_id(dest_node->proc_.proc_id()); - node_addr->mutable_addr()->set_mq_id(dest.mq_); + node_addr->mutable_addr()->set_mq_id(dest.mq_id_); + node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); } } return reply; @@ -482,7 +495,7 @@ auto src = SrcAddr(head); auto &topics = msg.topics().topic_list(); node->subscriptions_[src].insert(topics.begin(), topics.end()); - TopicDest dest = {src, node}; + TopicDest dest = {src, SrcAbsAddr(head), node}; for (auto &topic : topics) { subscribe_map_[topic].insert(dest); } @@ -505,7 +518,7 @@ }; if (pos != node->subscriptions_.end()) { - const TopicDest &dest = {src, node}; + const TopicDest &dest = {src, SrcAbsAddr(head), node}; auto &topics = msg.topics().topic_list(); // clear node sub records; for (auto &topic : topics) { @@ -602,7 +615,7 @@ { auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) { for (auto &addr_topics : node_rec) { - TopicDest dest{addr_topics.first, node}; + TopicDest dest{addr_topics.first, 0, node}; // abs_addr is not used. for (auto &topic : addr_topics.second) { auto pos = rec_map.find(topic); if (pos != rec_map.end()) { @@ -626,7 +639,7 @@ } for (auto &addr : node->addrs_) { - cleaner_(addr); + cleaner_(addr.first); } node->addrs_.clear(); @@ -678,7 +691,7 @@ { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); - auto remote = head.route(0).mq_id(); + MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; MsgI msg; if (msg.Make(reply_head, rep_body)) { DEFER1(msg.Release();); @@ -741,7 +754,7 @@ if (node) { // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. - socket.Send(cli.mq_, msg); + socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); ++it; } else { it = clients.erase(it); @@ -772,9 +785,9 @@ return rec; } -bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len) +bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len) { - Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len}; + Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mq, mq_len}; return true; } @@ -792,7 +805,7 @@ for (auto &kv : Centers()) { auto &info = kv.second; - sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_); + sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_); } } diff --git a/box/center.h b/box/center.h index d68573b..ebe48b4 100644 --- a/box/center.h +++ b/box/center.h @@ -31,7 +31,7 @@ typedef Socket::PartialRecvCB MsgHandler; typedef Socket::RawRecvCB RawHandler; typedef Socket::IdleCB IdleHandler; - static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len); + static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len); BHCenter(Socket::Shm &shm); ~BHCenter() { Stop(); } @@ -44,7 +44,7 @@ MsgHandler handler_; RawHandler raw_handler_; IdleHandler idle_; - MQId mqid_; + MQInfo mq_; int mq_len_ = 0; }; typedef std::map<std::string, CenterInfo> CenterRecords; diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto index 94bc82e..8b422c7 100644 --- a/proto/source/bhome_msg_api.proto +++ b/proto/source/bhome_msg_api.proto @@ -9,8 +9,9 @@ message BHAddress { uint64 mq_id = 1; - bytes ip = 2; - int32 port = 3; + int64 abs_addr = 2; + bytes ip = 3; + int32 port = 4; } message ProcInfo @@ -48,7 +49,6 @@ message MsgRegister { ProcInfo proc = 1; - repeated BHAddress addrs = 2; } message MsgUnregister diff --git a/src/bh_api.cpp b/src/bh_api.cpp index c9ceb20..ca7249d 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -30,16 +30,21 @@ } std::unique_ptr<TopicNode> &ProcNodePtr() { - static bool init = GlobalInit(BHomeShm()); - auto InitLog = []() { - auto id = GetProcExe(); - char path[200] = {0}; - sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str()); - ns_log::AddLog(path); - return true; - }; - static bool init_log = InitLog(); - static std::unique_ptr<TopicNode> ptr(new TopicNode(BHomeShm())); + static std::mutex mtx; + std::lock_guard<std::mutex> lk(mtx); + + static std::unique_ptr<TopicNode> ptr; + if (!ptr && GlobalInit(BHomeShm())) { + auto InitLog = []() { + auto id = GetProcExe(); + char path[200] = {0}; + sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str()); + ns_log::AddLog(path); + return true; + }; + static bool init_log = InitLog(); + ptr.reset(new TopicNode(BHomeShm())); + } return ptr; } TopicNode &ProcNode() @@ -114,6 +119,12 @@ return false; } MsgOut msg_reply; + auto &ptr = ProcNodePtr(); + if (!ptr) { + SetLastError(eNotFound, "center not started."); + return 0; + } + return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) && PackOutput(msg_reply, reply, reply_len); } diff --git a/src/bh_util.h b/src/bh_util.h index 223da2a..15ffeb0 100644 --- a/src/bh_util.h +++ b/src/bh_util.h @@ -157,9 +157,9 @@ } protected: - static inline T &GetData() + static inline T &GetData(const std::string &msg = "Must set data before use!") { - if (!ptr()) { throw std::string("Must set ShmMsg shm before use!"); } + if (!ptr()) { throw std::logic_error(msg); } return *ptr(); } diff --git a/src/defs.cpp b/src/defs.cpp index b812b65..6e7a5fd 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -141,10 +141,10 @@ return false; } -uint64_t BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_.id_; } -uint64_t BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_.id_; } -uint64_t BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_.id_; } -uint64_t BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_.id_; } +const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; } +const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; } +const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; } +const MQInfo &BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_; } int64_t CalcAllocIndex(int64_t size) { diff --git a/src/defs.h b/src/defs.h index 5c770a7..cc3dc02 100644 --- a/src/defs.h +++ b/src/defs.h @@ -27,12 +27,12 @@ int64_t CalcAllocIndex(int64_t size); int64_t GetAllocSize(int index); -struct CenterInfo { - struct MQInfo { - int64_t id_ = 0; - int64_t offset_ = 0; - }; +struct MQInfo { + MQId id_ = 0; + int64_t offset_ = 0; +}; +struct CenterInfo { MQInfo mq_center_; MQInfo mq_bus_; MQInfo mq_init_; @@ -59,9 +59,9 @@ void GetLastError(int &ec, std::string &msg); //TODO center can check shm for previous crash. -uint64_t BHGlobalSenderAddress(); -uint64_t BHTopicCenterAddress(); -uint64_t BHTopicBusAddress(); -uint64_t BHCenterReplyAddress(); +const MQInfo &BHGlobalSenderAddress(); +const MQInfo &BHTopicCenterAddress(); +const MQInfo &BHTopicBusAddress(); +const MQInfo &BHCenterReplyAddress(); #endif // end of include guard: DEFS_KP8LKGD0 diff --git a/src/sendq.cpp b/src/sendq.cpp index 1eaefe6..f1e5918 100644 --- a/src/sendq.cpp +++ b/src/sendq.cpp @@ -21,6 +21,24 @@ using namespace bhome_shm; +void SendQ::AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire) +{ + TimedMsg tmp(expire, MsgInfo{mq, data, std::move(onExpire)}); + std::unique_lock<std::mutex> lock(mutex_in_); + + try { + auto &al = in_[mq.id_]; + if (!al.empty()) { + al.front().emplace_back(std::move(tmp)); + } else { + al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); + } + } catch (std::exception &e) { + LOG_ERROR() << "sendq error: " << e.what(); + throw e; + } +} + int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr) { auto FirstNotExpired = [](Array &l) { @@ -36,7 +54,7 @@ } } - while (pos != arr.end() && mq.TrySend(remote, pos->data().data_)) { + while (pos != arr.end() && mq.TrySend(pos->data().mq_, pos->data().data_)) { ++pos; } @@ -59,6 +77,8 @@ bool SendQ::TrySend(ShmMsgQueue &mq) { std::unique_lock<std::mutex> lock(mutex_out_); + // if (TooFast()) { return false; } + size_t nsend = 0; if (!out_.empty()) { auto rec = out_.begin(); @@ -89,3 +109,14 @@ return !out_.empty(); } + +bool SendQ::TooFast() +{ + auto cur = NowSec(); + if (cur > last_time_) { + last_time_ = cur; + count_ = 0; + } + + return ++count_ > 1000 * 100; +} // not accurate in multi-thread. \ No newline at end of file diff --git a/src/sendq.h b/src/sendq.h index 9e2b5ca..d1ba30a 100644 --- a/src/sendq.h +++ b/src/sendq.h @@ -39,6 +39,7 @@ typedef int64_t Data; typedef std::function<void(const Data &)> OnMsgEvent; struct MsgInfo { + MQInfo mq_; Data data_; OnMsgEvent on_expire_; }; @@ -46,45 +47,51 @@ typedef TimedMsg::TimePoint TimePoint; typedef TimedMsg::Duration Duration; - void Append(const Remote addr, const MsgI msg) + bool Append(const MQInfo &mq, MsgI msg) { msg.AddRef(); auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); }; - AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire); + try { + AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); + return true; + } catch (...) { + msg.Release(); + return false; + } } - void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire) + bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire) { msg.AddRef(); auto onMsgExpire = [onExpire](const Data &d) { onExpire(d); MsgI(d).Release(); }; - AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire); + try { + AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); + return true; + } catch (...) { + msg.Release(); + return false; + } } - void Append(const Remote addr, const Data command, OnMsgEvent onExpire = OnMsgEvent()) + bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent()) { - AppendData(addr, command, DefaultExpire(), onExpire); + try { + AppendData(mq, command, DefaultExpire(), onExpire); + return true; + } catch (...) { + return false; + } } bool TrySend(ShmMsgQueue &mq); private: static TimePoint Now() { return TimedMsg::Clock::now(); } static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); } - void AppendData(const Remote addr, const Data data, const TimePoint &expire, OnMsgEvent onExpire) - { - //TODO simple queue, organize later ? + void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire); - TimedMsg tmp(expire, MsgInfo{data, std::move(onExpire)}); - std::unique_lock<std::mutex> lock(mutex_in_); - auto &al = in_[addr]; - if (!al.empty()) { - al.front().emplace_back(std::move(tmp)); - } else { - al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); - } - } typedef std::deque<TimedMsg> Array; typedef std::list<Array> ArrayList; typedef std::unordered_map<Remote, ArrayList> Store; @@ -92,10 +99,15 @@ int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr); int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr); + bool TooFast(); + std::mutex mutex_in_; std::mutex mutex_out_; Store in_; Store out_; + + int64_t count_ = 0; + int64_t last_time_ = 0; }; #endif // end of include guard: SENDQ_IWKMSK7M diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp index 663da1e..1d78e8c 100644 --- a/src/shm_msg_queue.cpp +++ b/src/shm_msg_queue.cpp @@ -33,7 +33,7 @@ ShmMsgQueue::MQId ShmMsgQueue::NewId() { - static auto &id = GetData(); + static auto &id = GetData("Must init shared memory before use! Please make sure center is running."); return (++id) * 10; } @@ -96,11 +96,11 @@ return Shmq::Find(shm, MsgQIdToName(remote_id)); } -bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote, int64_t val) +bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQInfo &remote, const RawData val) { try { //TODO find from center, or use offset. - ShmMsgQueue dest(shm, false, remote, 1); + ShmMsgQueue dest(remote.offset_, shm, remote.id_); #ifndef BH_USE_ATOMIC_Q Guard lock(GetMutex(remote_id)); #endif diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h index eead739..de60fde 100644 --- a/src/shm_msg_queue.h +++ b/src/shm_msg_queue.h @@ -18,6 +18,7 @@ #ifndef SHM_MSG_QUEUE_D847TQXH #define SHM_MSG_QUEUE_D847TQXH +#include "defs.h" #include "msg.h" #include "shm_queue.h" @@ -75,8 +76,8 @@ bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); } bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); } static Queue *Find(ShmType &shm, const MQId remote); - static bool TrySend(ShmType &shm, const MQId remote, const RawData val); - bool TrySend(const MQId remote, const RawData val) { return TrySend(shm(), remote, val); } + static bool TrySend(ShmType &shm, const MQInfo &remote, const RawData val); + bool TrySend(const MQInfo &remote, const RawData val) { return TrySend(shm(), remote, val); } private: #ifndef BH_USE_ATOMIC_Q diff --git a/src/socket.cpp b/src/socket.cpp index 4f09517..55b43f9 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -80,15 +80,13 @@ } }; ShmMsgQueue::RawData val = 0; - auto TryRecvMore = [&]() { - for (int i = 0; i < 100; ++i) { - if (mq().TryRecv(val)) { - return true; - } + for (int i = 0; i < 100; ++i) { + if (mq().TryRecv(val)) { + onRecv(val); + return true; } - return false; - }; - return TryRecvMore() ? (onRecv(val), true) : false; + } + return false; }; try { @@ -160,6 +158,31 @@ return false; } +bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) +{ + size_t size = content.size(); + auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + if (!msg.Fill(content)) { return; } + + try { + if (!cb) { + Send(remote, msg); + } else { + per_msg_cbs_->Store(msg_id, std::move(cb)); + auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { + RecvCB cb_no_use; + per_msg_cbs_->Pick(msg_id, cb_no_use); + }; + Send(remote, msg, onExpireRemoveCB); + } + } catch (...) { + SetLastError(eError, "Send internal error."); + } + }; + + return RequestAlloc(size, OnResult); +} + bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) { // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag // LOG_FUNCTION; @@ -184,5 +207,6 @@ RawRecvCB cb_no_use; alloc_cbs_->Pick(id, cb_no_use); }; + return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB); } \ No newline at end of file diff --git a/src/socket.h b/src/socket.h index 8e9db69..dea106c 100644 --- a/src/socket.h +++ b/src/socket.h @@ -59,7 +59,6 @@ { node_proc_index_ = proc_index; socket_index_ = socket_index; - LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_; } // start recv. bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB()); @@ -68,7 +67,7 @@ bool Stop(); template <class Body> - bool CenterSend(const MQId remote, BHMsgHead &head, Body &body) + bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body) { try { //TODO alloc outsiez and use send. @@ -86,39 +85,17 @@ bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); template <class Body> - bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) + bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) { - std::string msg_id(head.msg_id()); - std::string content(MsgI::Serialize(head, body)); - size_t size = content.size(); - auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { - if (!msg.Fill(content)) { return; } - - try { - if (!cb) { - Send(remote, msg); - } else { - per_msg_cbs_->Store(msg_id, std::move(cb)); - auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { - RecvCB cb_no_use; - per_msg_cbs_->Pick(msg_id, cb_no_use); - }; - Send(remote, msg, onExpireRemoveCB); - } - } catch (...) { - SetLastError(eError, "Send internal error."); - } - }; - - return RequestAlloc(size, OnResult); + return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); } template <class... T> - bool Send(const MQId remote, const MsgI &imsg, T &&...t) + bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t) { return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...); } template <class... T> - bool Send(const MQId remote, const int64_t cmd, T &&...t) + bool Send(const MQInfo &remote, const int64_t cmd, T &&...t) { return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...); } @@ -126,7 +103,7 @@ bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); template <class Body> - bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) + bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) { struct State { std::mutex mutex; @@ -136,6 +113,7 @@ try { std::shared_ptr<State> st(new State); + auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { @@ -176,12 +154,12 @@ bool StopNoLock(); bool RunningNoLock() { return !workers_.empty(); } + bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB()); + template <class... Rest> - bool SendImpl(const MQId remote, Rest &&...rest) + bool SendImpl(const MQInfo &remote, Rest &&...rest) { - // TODO send alloc request, and pack later, higher bit means alloc? - send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); - return true; + return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); } std::vector<std::thread> workers_; diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 35228b4..51a0ab7 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -28,7 +28,12 @@ namespace { -inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); } +inline void AddRoute(BHMsgHead &head, const ShmSocket &sock) +{ + auto route = head.add_route(); + route->set_mq_id(sock.id()); + route->set_abs_addr(sock.AbsAddr()); +} struct SrcInfo { std::vector<BHAddress> route; @@ -40,7 +45,7 @@ } // namespace TopicNode::TopicNode(SharedMemory &shm) : - shm_(shm), state_(eStateUnregistered) + shm_(shm), state_(eStateUninited) { } @@ -79,6 +84,7 @@ auto end_time = steady_clock::now() + 3s; do { try { + //TODO recv offset, avoid query. for (int i = eSockStart; i < eSockEnd; ++i) { sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen)); } @@ -94,7 +100,6 @@ NodeInit(); } if (!sockets_.empty()) { - LOG_DEBUG() << "node sockets ok"; auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) { LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val); switch (DecodeCmd(val)) { @@ -103,7 +108,7 @@ DEFER1(msg.Release()); MsgProcInit body; auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_); - head.add_route()->set_mq_id(ssn_id_); + AddRoute(head, socket); if (msg.Fill(head, body)) { socket.Send(BHTopicCenterAddress(), msg); } @@ -122,12 +127,12 @@ MsgProcInitReply reply; if (imsg.ParseBody(reply)) { SetProcIndex(reply.proc_index()); + this->state_ = eStateUnregistered; } } return true; }; SockNode().Start(1, onMsg, onNodeCmd); - LOG_DEBUG() << "sockets ok."; return true; } return false; @@ -167,19 +172,22 @@ SetLastError(eError, kErrMsgNotInit); return false; } + auto end_time = steady_clock::now() + milliseconds(timeout_ms); + + while (state_ != eStateUnregistered && steady_clock::now() < end_time) { + std::this_thread::yield(); + } + if (state_ != eStateUnregistered) { + SetLastError(eError, kErrMsgNotInit); + return false; + } auto &sock = SockNode(); MsgRegister body; body.mutable_proc()->Swap(&proc); - auto AddId = [&](const MQId id) { body.add_addrs()->set_mq_id(id); }; - AddId(SockNode().id()); - AddId(SockServer().id()); - AddId(SockClient().id()); - AddId(SockSub().id()); - AddId(SockPub().id()); auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); - AddRoute(head, sock.id()); + AddRoute(head, sock); auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { bool ok = head.type() == kMsgTypeCommonReply && @@ -224,7 +232,7 @@ body.mutable_proc()->Swap(&proc); auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); - AddRoute(head, sock.id()); + AddRoute(head, sock); auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { bool r = head.type() == kMsgTypeCommonReply && @@ -260,7 +268,7 @@ body.mutable_proc()->Swap(&proc); auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); - AddRoute(head, sock.id()); + AddRoute(head, sock); if (timeout_ms == 0) { return sock.Send(BHTopicCenterAddress(), head, body); @@ -290,7 +298,7 @@ auto &sock = SockNode(); BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); - AddRoute(head, sock.id()); + AddRoute(head, sock); MsgI reply; DEFER1(reply.Release()); @@ -312,7 +320,7 @@ body.mutable_topics()->Swap(&topics); auto head(InitMsgHead(GetType(body), proc_id(), ssn())); - AddRoute(head, sock.id()); + AddRoute(head, sock); if (timeout_ms == 0) { return sock.Send(BHTopicCenterAddress(), head, body); @@ -341,7 +349,7 @@ for (int i = 0; i < head.route_size() - 1; ++i) { reply_head.add_route()->Swap(head.mutable_route(i)); } - auto remote = head.route().rbegin()->mq_id(); + MQInfo remote = {head.route().rbegin()->mq_id(), head.route().rbegin()->abs_addr()}; sock.Send(remote, reply_head, reply_body); } }; @@ -357,10 +365,17 @@ MsgRequestTopic req; if (!imsg.ParseBody(req)) { return; } - SrcInfo *p = new SrcInfo; - p->route.assign(head.route().begin(), head.route().end()); - p->msg_id = head.msg_id(); - acb(p, *head.mutable_proc_id(), req); + try { + SrcInfo *p = new SrcInfo; + if (!p) { + throw std::runtime_error("no memory."); + } + p->route.assign(head.route().begin(), head.route().end()); + p->msg_id = head.msg_id(); + acb(p, *head.mutable_proc_id(), req); + } catch (std::exception &e) { + LOG_ERROR() << "error server handle msg:" << e.what(); + } }; auto &sock = SockServer(); @@ -381,11 +396,19 @@ if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { if (imsg.ParseBody(request)) { head.mutable_proc_id()->swap(proc_id); - SrcInfo *p = new SrcInfo; - p->route.assign(head.route().begin(), head.route().end()); - p->msg_id = head.msg_id(); - src_info = p; - return true; + try { + SrcInfo *p = new SrcInfo; + if (!p) { + throw std::runtime_error("no memory."); + } + p->route.assign(head.route().begin(), head.route().end()); + p->msg_id = head.msg_id(); + src_info = p; + return true; + } catch (std::exception &e) { + LOG_ERROR() << "error recv request: " << e.what(); + return false; + } } } return false; @@ -409,7 +432,8 @@ for (unsigned i = 0; i < p->route.size() - 1; ++i) { head.add_route()->Swap(&p->route[i]); } - return sock.Send(p->route.back().mq_id(), head, body); + MQInfo dest = {p->route.back().mq_id(), p->route.back().abs_addr()}; + return sock.Send(dest, head, body); } bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) @@ -440,10 +464,10 @@ out_msg_id = msg_id; - auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) { + auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) { auto &sock = SockClient(); BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id)); - AddRoute(head, sock.id()); + AddRoute(head, sock); head.set_topic(req.topic()); if (cb) { @@ -455,15 +479,15 @@ } } }; - return sock.Send(addr.mq_id(), head, req, onRecv); + return sock.Send(remote, head, req, onRecv); } else { - return sock.Send(addr.mq_id(), head, req); + return sock.Send(remote, head, req); } }; try { BHAddress addr; - return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb); + return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb); } catch (...) { SetLastError(eError, "internal error."); return false; @@ -484,14 +508,14 @@ if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id(); BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); - AddRoute(head, sock.id()); + AddRoute(head, sock); head.set_topic(request.topic()); MsgI reply_msg; DEFER1(reply_msg.Release();); BHMsgHead reply_head; - if (sock.SendAndRecv(addr.mq_id(), head, request, reply_msg, reply_head, timeout_ms) && + if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply && reply_msg.ParseBody(out_reply)) { reply_head.mutable_proc_id()->swap(out_proc_id); @@ -504,7 +528,7 @@ return false; } -int TopicNode::QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms) +int TopicNode::QueryTopicServers(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms) { int n = 0; MsgQueryTopic query; @@ -532,7 +556,7 @@ return true; } std::vector<NodeAddress> lst; - if (QueryRPCTopics(topic, lst, timeout_ms)) { + if (QueryTopicServers(topic, lst, timeout_ms)) { addr = lst.front().addr(); if (addr.mq_id() != 0) { topic_query_cache_.Store(topic, addr); @@ -555,7 +579,7 @@ try { auto &sock = SockPub(); BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn())); - AddRoute(head, sock.id()); + AddRoute(head, sock); if (timeout_ms == 0) { return sock.Send(BHTopicBusAddress(), head, pub); @@ -589,7 +613,7 @@ sub.mutable_topics()->Swap(&topics); BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn())); - AddRoute(head, sock.id()); + AddRoute(head, sock); if (timeout_ms == 0) { return sock.Send(BHTopicBusAddress(), head, sub); } else { diff --git a/src/topic_node.h b/src/topic_node.h index b018807..be82cf6 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -78,7 +78,7 @@ MQId ssn() { return SockNode().id(); } bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms); typedef MsgQueryTopicReply::BHNodeAddress NodeAddress; - int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms); + int QueryTopicServers(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms); const std::string &proc_id() { return info_.proc_id(); } typedef BHAddress Address; @@ -139,6 +139,7 @@ } enum State { + eStateUninited, eStateUnregistered, eStateOnline, eStateOffline // heartbeat fail. @@ -146,7 +147,7 @@ void state(const State st) { state_.store(st); } void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); } State state() const { return state_.load(); } - bool IsOnline() { return Init() && state() == eStateOnline; } + bool IsOnline() { return state() == eStateOnline; } bool Init(); bool Valid() const { return !sockets_.empty(); } std::mutex mutex_; diff --git a/utest/api_test.cpp b/utest/api_test.cpp index f48f307..44c809d 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -129,7 +129,14 @@ void *reply = 0; int reply_len = 0; reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000); - printf("register %s\n", reg ? "ok" : "failed"); + if (reg) { + printf("register ok\n"); + } else { + int ec = 0; + std::string msg; + GetLastError(ec, msg); + printf("register failed, %d, %s\n", ec, msg.c_str()); + } BHFree(reply, reply_len); Sleep(1s); @@ -239,6 +246,7 @@ DEFER1(BHFree(msg_id, len);); // Sleep(10ms, false); std::string dest(BHAddress().SerializeAsString()); + bool r = BHAsyncRequest(dest.data(), dest.size(), s.data(), s.size(), 0, 0); if (r) { ++Status().nrequest_; @@ -294,11 +302,12 @@ int same = 0; uint64_t last = 0; - while (last < nreq * ncli && same < 2) { + while (last < nreq * ncli && same < 3) { Sleep(1s, false); auto cur = Status().nreply_.load(); if (last == cur) { ++same; + printf("same %d\n", same); } else { last = cur; same = 0; @@ -308,6 +317,7 @@ run = false; threads.WaitAll(); auto &st = Status(); + Sleep(1s); printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load()); BHCleanup(); printf("after cleanup\n"); diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index 66e5179..4dea623 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -24,16 +24,8 @@ { SharedMemory &shm = TestShm(); GlobalInit(shm); - auto InitSem = [](auto id) { - auto sem_id = semget(id, 1, 0666 | IPC_CREAT); - union semun init_val; - init_val.val = 1; - semctl(sem_id, 0, SETVAL, init_val); - return; - }; - - MQId id = ShmMsgQueue::NewId(); - InitSem(id); + MQId server_id = ShmMsgQueue::NewId(); + ShmMsgQueue server(server_id, shm, 1000); const int timeout = 1000; const uint32_t data_size = 1001; @@ -44,7 +36,6 @@ std::string str(data_size, 'a'); auto Writer = [&](int writer_id, uint64_t n) { MQId cli_id = ShmMsgQueue::NewId(); - InitSem(cli_id); ShmMsgQueue mq(cli_id, shm, 64); MsgI msg; @@ -58,12 +49,12 @@ for (uint64_t i = 0; i < n; ++i) { msg.AddRef(); - while (!mq.TrySend(id, msg.Offset())) {} + while (!mq.TrySend({server.Id(), server.AbsAddr()}, msg.Offset())) {} ++nwrite; } }; auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) { - ShmMsgQueue mq(id, shm, 1000); + ShmMsgQueue &mq = server; auto now = []() { return steady_clock::now(); }; auto tm = now(); while (*run) { @@ -189,8 +180,10 @@ req_body.set_topic("topic"); req_body.set_data(msg_content); auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id())); - req_head.add_route()->set_mq_id(cli.id()); - return cli.Send(srv.id(), req_head, req_body); + auto route = req_head.add_route(); + route->set_mq_id(cli.id()); + route->set_abs_addr(cli.AbsAddr()); + return cli.Send({srv.id(), srv.AbsAddr()}, req_head, req_body); }; Req(); @@ -207,13 +200,13 @@ DEFER1(req.Release()); if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { - auto src_id = req_head.route()[0].mq_id(); + MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()}; auto Reply = [&]() { MsgRequestTopic reply_body; reply_body.set_topic("topic"); reply_body.set_data(msg_content); auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id())); - return srv.Send(src_id, reply_head, reply_body); + return srv.Send(src_mq, reply_head, reply_body); }; Reply(); } -- Gitblit v1.8.0