socket send using abs addr, avoid shm find by id.
| | |
| | | "strstream": "cpp", |
| | | "unordered_set": "cpp", |
| | | "cfenv": "cpp", |
| | | "*.ipp": "cpp" |
| | | "*.ipp": "cpp", |
| | | "cassert": "cpp", |
| | | "cerrno": "cpp", |
| | | "cfloat": "cpp", |
| | | "ciso646": "cpp", |
| | | "climits": "cpp", |
| | | "ios": "cpp", |
| | | "locale": "cpp", |
| | | "queue": "cpp", |
| | | "random": "cpp" |
| | | }, |
| | | "files.exclude": { |
| | | "**/*.un~": true, |
| | |
| | | auto it = msgs_.begin(); |
| | | while (it != msgs_.end() && --limit > 0) { |
| | | ShmMsg msg(it->second); |
| | | if (msg.Count() == 0) { |
| | | auto Free = [&]() { |
| | | msg.Free(); |
| | | it = msgs_.erase(it); |
| | | ++n; |
| | | } else if (msg.timestamp() + 60 < NowSec()) { |
| | | msg.Free(); |
| | | it = msgs_.erase(it); |
| | | ++n; |
| | | // LOG_DEBUG() << "release timeout msg, someone crashed."; |
| | | } else { |
| | | }; |
| | | int n = now - msg.timestamp(); |
| | | if (n < 10) { |
| | | ++it; |
| | | } else if (msg.Count() == 0) { |
| | | Free(); |
| | | } else if (n > 60) { |
| | | Free(); |
| | | } |
| | | } |
| | | if (n > 0) { |
| | |
| | | typedef std::unordered_map<Address, std::set<Topic>> AddressTopics; |
| | | |
| | | struct NodeInfo { |
| | | ProcState state_; // state |
| | | std::set<Address> addrs_; // registered mqs |
| | | ProcInfo proc_; // |
| | | AddressTopics services_; // address: topics |
| | | AddressTopics subscriptions_; // address: topics |
| | | ProcState state_; // state |
| | | std::map<MQId, int64_t> addrs_; // registered mqs |
| | | ProcInfo proc_; // |
| | | AddressTopics services_; // address: topics |
| | | AddressTopics subscriptions_; // address: topics |
| | | }; |
| | | typedef std::shared_ptr<NodeInfo> Node; |
| | | typedef std::weak_ptr<NodeInfo> WeakNode; |
| | | |
| | | struct TopicDest { |
| | | Address mq_; |
| | | MQId mq_id_; |
| | | int64_t mq_abs_addr_; |
| | | WeakNode weak_node_; |
| | | bool operator<(const TopicDest &a) const { return mq_ < a.mq_; } |
| | | bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; } |
| | | }; |
| | | inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } |
| | | inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } |
| | | inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); } |
| | | inline bool MatchAddr(std::map<Address, int64_t> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } |
| | | |
| | | NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) : |
| | | id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {} |
| | |
| | | return; // ignore in exists. |
| | | } |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | for (int i = 0; i < 10; ++i) { |
| | | node->addrs_.insert(ssn + i); |
| | | } |
| | | node->state_.timestamp_ = NowSec() - offline_time_; |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | | |
| | | // create sockets. |
| | | const int nsocks = 4; |
| | | try { |
| | | auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); }; |
| | | // alloc(-1), node, server, sub, request, |
| | | for (int i = 0; i < 4; ++i) { |
| | | CreateSocket(ssn + i); |
| | | node->addrs_.insert(ssn + i); |
| | | for (int i = 0; i < nsocks; ++i) { |
| | | ShmSocket tmp(shm, true, ssn + i, 16); |
| | | node->addrs_.emplace(ssn + i, tmp.AbsAddr()); |
| | | } |
| | | return true; |
| | | } catch (...) { |
| | | for (int i = 0; i < nsocks; ++i) { |
| | | ShmSocket::Remove(shm, ssn + i); |
| | | } |
| | | return false; |
| | | } |
| | | }; |
| | | |
| | | auto PrepareProcInit = [&]() { |
| | | auto PrepareProcInit = [&](Node &node) { |
| | | bool r = false; |
| | | ShmMsg init_msg; |
| | | if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) { |
| | | // 31bit pointer, 4bit cmd+flag |
| | | int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply); |
| | | r = SendAllocReply(socket, ssn, reply, init_msg); |
| | | r = SendAllocReply(socket, {ssn, node->addrs_[ssn]}, reply, init_msg); |
| | | } |
| | | return r; |
| | | }; |
| | | |
| | | Node node(new NodeInfo); |
| | | if (UpdateRegInfo(node) && PrepareProcInit()) { |
| | | if (UpdateRegInfo(node) && PrepareProcInit(node)) { |
| | | nodes_[ssn] = node; |
| | | LOG_INFO() << "new node ssn (" << ssn << ") init"; |
| | | } else { |
| | |
| | | } |
| | | void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); } |
| | | |
| | | bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg) |
| | | bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg) |
| | | { |
| | | RecordMsg(msg); |
| | | auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; |
| | | return socket.Send(dest, reply, onExpireFree); |
| | | } |
| | | bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg) |
| | | bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg) |
| | | { |
| | | RecordMsg(msg); |
| | | return socket.Send(dest, msg); |
| | |
| | | if (proc_rec.proc_.empty()) { |
| | | return; |
| | | } |
| | | Address dest = proc_rec.ssn_ + socket_index; |
| | | |
| | | MQInfo dest = {proc_rec.ssn_ + socket_index, 0}; |
| | | auto FindMq = [&]() { |
| | | auto pos = nodes_.find(proc_rec.ssn_); |
| | | if (pos != nodes_.end()) { |
| | | for (auto &&mq : pos->second->addrs_) { |
| | | if (mq.first == dest.id_) { |
| | | dest.offset_ = mq.second; |
| | | return true; |
| | | } |
| | | } |
| | | } |
| | | return false; |
| | | }; |
| | | if (!FindMq()) { return; } |
| | | |
| | | auto size = GetAllocSize((val >> 52) & MaskBits(8)); |
| | | MsgI new_msg; |
| | |
| | | // when node restart, ssn will change, |
| | | // and old node will be removed after timeout. |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | node->addrs_.insert(SrcAddr(head)); |
| | | for (auto &addr : msg.addrs()) { |
| | | node->addrs_.insert(addr.mq_id()); |
| | | } |
| | | node->proc_.Swap(msg.mutable_proc()); |
| | | node->state_.timestamp_ = head.timestamp(); |
| | | node->state_.UpdateState(NowSec(), offline_time_, kill_time_); |
| | |
| | | auto src = SrcAddr(head); |
| | | auto &topics = msg.topics().topic_list(); |
| | | node->services_[src].insert(topics.begin(), topics.end()); |
| | | TopicDest dest = {src, node}; |
| | | TopicDest dest = {src, SrcAbsAddr(head), node}; |
| | | for (auto &topic : topics) { |
| | | service_map_[topic].insert(dest); |
| | | } |
| | | LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << *node->addrs_.begin() << " serve " << topics.size() << " topics:\n"; |
| | | LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << node->addrs_.begin()->first << " serve " << topics.size() << " topics:\n"; |
| | | for (auto &topic : topics) { |
| | | LOG_DEBUG() << "\t" << topic; |
| | | } |
| | |
| | | if (dest_node && Valid(*dest_node)) { |
| | | auto node_addr = reply.add_node_address(); |
| | | node_addr->set_proc_id(dest_node->proc_.proc_id()); |
| | | node_addr->mutable_addr()->set_mq_id(dest.mq_); |
| | | node_addr->mutable_addr()->set_mq_id(dest.mq_id_); |
| | | node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_); |
| | | } |
| | | } |
| | | return reply; |
| | |
| | | auto src = SrcAddr(head); |
| | | auto &topics = msg.topics().topic_list(); |
| | | node->subscriptions_[src].insert(topics.begin(), topics.end()); |
| | | TopicDest dest = {src, node}; |
| | | TopicDest dest = {src, SrcAbsAddr(head), node}; |
| | | for (auto &topic : topics) { |
| | | subscribe_map_[topic].insert(dest); |
| | | } |
| | |
| | | }; |
| | | |
| | | if (pos != node->subscriptions_.end()) { |
| | | const TopicDest &dest = {src, node}; |
| | | const TopicDest &dest = {src, SrcAbsAddr(head), node}; |
| | | auto &topics = msg.topics().topic_list(); |
| | | // clear node sub records; |
| | | for (auto &topic : topics) { |
| | |
| | | { |
| | | auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) { |
| | | for (auto &addr_topics : node_rec) { |
| | | TopicDest dest{addr_topics.first, node}; |
| | | TopicDest dest{addr_topics.first, 0, node}; // abs_addr is not used. |
| | | for (auto &topic : addr_topics.second) { |
| | | auto pos = rec_map.find(topic); |
| | | if (pos != rec_map.end()) { |
| | |
| | | } |
| | | |
| | | for (auto &addr : node->addrs_) { |
| | | cleaner_(addr); |
| | | cleaner_(addr.first); |
| | | } |
| | | |
| | | node->addrs_.clear(); |
| | |
| | | { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); |
| | | auto remote = head.route(0).mq_id(); |
| | | MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()}; |
| | | MsgI msg; |
| | | if (msg.Make(reply_head, rep_body)) { |
| | | DEFER1(msg.Release();); |
| | |
| | | if (node) { |
| | | // should also make sure that mq is not killed before msg expires. |
| | | // it would be ok if (kill_time - offline_time) is longer than expire time. |
| | | socket.Send(cli.mq_, msg); |
| | | socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg); |
| | | ++it; |
| | | } else { |
| | | it = clients.erase(it); |
| | |
| | | return rec; |
| | | } |
| | | |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len) |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len) |
| | | { |
| | | Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len}; |
| | | Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mq, mq_len}; |
| | | return true; |
| | | } |
| | | |
| | |
| | | |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | | sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_); |
| | | sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_); |
| | | } |
| | | } |
| | | |
| | |
| | | typedef Socket::PartialRecvCB MsgHandler; |
| | | typedef Socket::RawRecvCB RawHandler; |
| | | typedef Socket::IdleCB IdleHandler; |
| | | static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len); |
| | | static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len); |
| | | |
| | | BHCenter(Socket::Shm &shm); |
| | | ~BHCenter() { Stop(); } |
| | |
| | | MsgHandler handler_; |
| | | RawHandler raw_handler_; |
| | | IdleHandler idle_; |
| | | MQId mqid_; |
| | | MQInfo mq_; |
| | | int mq_len_ = 0; |
| | | }; |
| | | typedef std::map<std::string, CenterInfo> CenterRecords; |
| | |
| | | |
| | | message BHAddress { |
| | | uint64 mq_id = 1; |
| | | bytes ip = 2; |
| | | int32 port = 3; |
| | | int64 abs_addr = 2; |
| | | bytes ip = 3; |
| | | int32 port = 4; |
| | | } |
| | | |
| | | message ProcInfo |
| | |
| | | message MsgRegister |
| | | { |
| | | ProcInfo proc = 1; |
| | | repeated BHAddress addrs = 2; |
| | | } |
| | | |
| | | message MsgUnregister |
| | |
| | | } |
| | | std::unique_ptr<TopicNode> &ProcNodePtr() |
| | | { |
| | | static bool init = GlobalInit(BHomeShm()); |
| | | auto InitLog = []() { |
| | | auto id = GetProcExe(); |
| | | char path[200] = {0}; |
| | | sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str()); |
| | | ns_log::AddLog(path); |
| | | return true; |
| | | }; |
| | | static bool init_log = InitLog(); |
| | | static std::unique_ptr<TopicNode> ptr(new TopicNode(BHomeShm())); |
| | | static std::mutex mtx; |
| | | std::lock_guard<std::mutex> lk(mtx); |
| | | |
| | | static std::unique_ptr<TopicNode> ptr; |
| | | if (!ptr && GlobalInit(BHomeShm())) { |
| | | auto InitLog = []() { |
| | | auto id = GetProcExe(); |
| | | char path[200] = {0}; |
| | | sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str()); |
| | | ns_log::AddLog(path); |
| | | return true; |
| | | }; |
| | | static bool init_log = InitLog(); |
| | | ptr.reset(new TopicNode(BHomeShm())); |
| | | } |
| | | return ptr; |
| | | } |
| | | TopicNode &ProcNode() |
| | |
| | | return false; |
| | | } |
| | | MsgOut msg_reply; |
| | | auto &ptr = ProcNodePtr(); |
| | | if (!ptr) { |
| | | SetLastError(eNotFound, "center not started."); |
| | | return 0; |
| | | } |
| | | |
| | | return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) && |
| | | PackOutput(msg_reply, reply, reply_len); |
| | | } |
| | |
| | | } |
| | | |
| | | protected: |
| | | static inline T &GetData() |
| | | static inline T &GetData(const std::string &msg = "Must set data before use!") |
| | | { |
| | | if (!ptr()) { throw std::string("Must set ShmMsg shm before use!"); } |
| | | if (!ptr()) { throw std::logic_error(msg); } |
| | | return *ptr(); |
| | | } |
| | | |
| | |
| | | return false; |
| | | } |
| | | |
| | | uint64_t BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_.id_; } |
| | | uint64_t BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_.id_; } |
| | | uint64_t BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_.id_; } |
| | | uint64_t BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_.id_; } |
| | | const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; } |
| | | const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; } |
| | | const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; } |
| | | const MQInfo &BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_; } |
| | | |
| | | int64_t CalcAllocIndex(int64_t size) |
| | | { |
| | |
| | | int64_t CalcAllocIndex(int64_t size); |
| | | int64_t GetAllocSize(int index); |
| | | |
| | | struct CenterInfo { |
| | | struct MQInfo { |
| | | int64_t id_ = 0; |
| | | int64_t offset_ = 0; |
| | | }; |
| | | struct MQInfo { |
| | | MQId id_ = 0; |
| | | int64_t offset_ = 0; |
| | | }; |
| | | |
| | | struct CenterInfo { |
| | | MQInfo mq_center_; |
| | | MQInfo mq_bus_; |
| | | MQInfo mq_init_; |
| | |
| | | void GetLastError(int &ec, std::string &msg); |
| | | //TODO center can check shm for previous crash. |
| | | |
| | | uint64_t BHGlobalSenderAddress(); |
| | | uint64_t BHTopicCenterAddress(); |
| | | uint64_t BHTopicBusAddress(); |
| | | uint64_t BHCenterReplyAddress(); |
| | | const MQInfo &BHGlobalSenderAddress(); |
| | | const MQInfo &BHTopicCenterAddress(); |
| | | const MQInfo &BHTopicBusAddress(); |
| | | const MQInfo &BHCenterReplyAddress(); |
| | | |
| | | #endif // end of include guard: DEFS_KP8LKGD0 |
| | |
| | | |
| | | using namespace bhome_shm; |
| | | |
| | | void SendQ::AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire) |
| | | { |
| | | TimedMsg tmp(expire, MsgInfo{mq, data, std::move(onExpire)}); |
| | | std::unique_lock<std::mutex> lock(mutex_in_); |
| | | |
| | | try { |
| | | auto &al = in_[mq.id_]; |
| | | if (!al.empty()) { |
| | | al.front().emplace_back(std::move(tmp)); |
| | | } else { |
| | | al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); |
| | | } |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << "sendq error: " << e.what(); |
| | | throw e; |
| | | } |
| | | } |
| | | |
| | | int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr) |
| | | { |
| | | auto FirstNotExpired = [](Array &l) { |
| | |
| | | } |
| | | } |
| | | |
| | | while (pos != arr.end() && mq.TrySend(remote, pos->data().data_)) { |
| | | while (pos != arr.end() && mq.TrySend(pos->data().mq_, pos->data().data_)) { |
| | | ++pos; |
| | | } |
| | | |
| | |
| | | bool SendQ::TrySend(ShmMsgQueue &mq) |
| | | { |
| | | std::unique_lock<std::mutex> lock(mutex_out_); |
| | | // if (TooFast()) { return false; } |
| | | |
| | | size_t nsend = 0; |
| | | if (!out_.empty()) { |
| | | auto rec = out_.begin(); |
| | |
| | | |
| | | return !out_.empty(); |
| | | } |
| | | |
| | | bool SendQ::TooFast() |
| | | { |
| | | auto cur = NowSec(); |
| | | if (cur > last_time_) { |
| | | last_time_ = cur; |
| | | count_ = 0; |
| | | } |
| | | |
| | | return ++count_ > 1000 * 100; |
| | | } // not accurate in multi-thread. |
| | |
| | | typedef int64_t Data; |
| | | typedef std::function<void(const Data &)> OnMsgEvent; |
| | | struct MsgInfo { |
| | | MQInfo mq_; |
| | | Data data_; |
| | | OnMsgEvent on_expire_; |
| | | }; |
| | |
| | | typedef TimedMsg::TimePoint TimePoint; |
| | | typedef TimedMsg::Duration Duration; |
| | | |
| | | void Append(const Remote addr, const MsgI msg) |
| | | bool Append(const MQInfo &mq, MsgI msg) |
| | | { |
| | | msg.AddRef(); |
| | | auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); }; |
| | | AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire); |
| | | try { |
| | | AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); |
| | | return true; |
| | | } catch (...) { |
| | | msg.Release(); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire) |
| | | bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire) |
| | | { |
| | | msg.AddRef(); |
| | | auto onMsgExpire = [onExpire](const Data &d) { |
| | | onExpire(d); |
| | | MsgI(d).Release(); |
| | | }; |
| | | AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire); |
| | | try { |
| | | AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire); |
| | | return true; |
| | | } catch (...) { |
| | | msg.Release(); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | void Append(const Remote addr, const Data command, OnMsgEvent onExpire = OnMsgEvent()) |
| | | bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | AppendData(addr, command, DefaultExpire(), onExpire); |
| | | try { |
| | | AppendData(mq, command, DefaultExpire(), onExpire); |
| | | return true; |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | bool TrySend(ShmMsgQueue &mq); |
| | | |
| | | private: |
| | | static TimePoint Now() { return TimedMsg::Clock::now(); } |
| | | static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); } |
| | | void AppendData(const Remote addr, const Data data, const TimePoint &expire, OnMsgEvent onExpire) |
| | | { |
| | | //TODO simple queue, organize later ? |
| | | void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire); |
| | | |
| | | TimedMsg tmp(expire, MsgInfo{data, std::move(onExpire)}); |
| | | std::unique_lock<std::mutex> lock(mutex_in_); |
| | | auto &al = in_[addr]; |
| | | if (!al.empty()) { |
| | | al.front().emplace_back(std::move(tmp)); |
| | | } else { |
| | | al.insert(al.begin(), Array())->emplace_back(std::move(tmp)); |
| | | } |
| | | } |
| | | typedef std::deque<TimedMsg> Array; |
| | | typedef std::list<Array> ArrayList; |
| | | typedef std::unordered_map<Remote, ArrayList> Store; |
| | |
| | | int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr); |
| | | int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr); |
| | | |
| | | bool TooFast(); |
| | | |
| | | std::mutex mutex_in_; |
| | | std::mutex mutex_out_; |
| | | Store in_; |
| | | Store out_; |
| | | |
| | | int64_t count_ = 0; |
| | | int64_t last_time_ = 0; |
| | | }; |
| | | |
| | | #endif // end of include guard: SENDQ_IWKMSK7M |
| | |
| | | |
| | | ShmMsgQueue::MQId ShmMsgQueue::NewId() |
| | | { |
| | | static auto &id = GetData(); |
| | | static auto &id = GetData("Must init shared memory before use! Please make sure center is running."); |
| | | return (++id) * 10; |
| | | } |
| | | |
| | |
| | | return Shmq::Find(shm, MsgQIdToName(remote_id)); |
| | | } |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote, int64_t val) |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQInfo &remote, const RawData val) |
| | | { |
| | | try { |
| | | //TODO find from center, or use offset. |
| | | ShmMsgQueue dest(shm, false, remote, 1); |
| | | ShmMsgQueue dest(remote.offset_, shm, remote.id_); |
| | | #ifndef BH_USE_ATOMIC_Q |
| | | Guard lock(GetMutex(remote_id)); |
| | | #endif |
| | |
| | | #ifndef SHM_MSG_QUEUE_D847TQXH |
| | | #define SHM_MSG_QUEUE_D847TQXH |
| | | |
| | | #include "defs.h" |
| | | #include "msg.h" |
| | | #include "shm_queue.h" |
| | | |
| | |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); } |
| | | bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); } |
| | | static Queue *Find(ShmType &shm, const MQId remote); |
| | | static bool TrySend(ShmType &shm, const MQId remote, const RawData val); |
| | | bool TrySend(const MQId remote, const RawData val) { return TrySend(shm(), remote, val); } |
| | | static bool TrySend(ShmType &shm, const MQInfo &remote, const RawData val); |
| | | bool TrySend(const MQInfo &remote, const RawData val) { return TrySend(shm(), remote, val); } |
| | | |
| | | private: |
| | | #ifndef BH_USE_ATOMIC_Q |
| | |
| | | } |
| | | }; |
| | | ShmMsgQueue::RawData val = 0; |
| | | auto TryRecvMore = [&]() { |
| | | for (int i = 0; i < 100; ++i) { |
| | | if (mq().TryRecv(val)) { |
| | | return true; |
| | | } |
| | | for (int i = 0; i < 100; ++i) { |
| | | if (mq().TryRecv(val)) { |
| | | onRecv(val); |
| | | return true; |
| | | } |
| | | return false; |
| | | }; |
| | | return TryRecvMore() ? (onRecv(val), true) : false; |
| | | } |
| | | return false; |
| | | }; |
| | | |
| | | try { |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb) |
| | | { |
| | | size_t size = content.size(); |
| | | auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { |
| | | if (!msg.Fill(content)) { return; } |
| | | |
| | | try { |
| | | if (!cb) { |
| | | Send(remote, msg); |
| | | } else { |
| | | per_msg_cbs_->Store(msg_id, std::move(cb)); |
| | | auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Pick(msg_id, cb_no_use); |
| | | }; |
| | | Send(remote, msg, onExpireRemoveCB); |
| | | } |
| | | } catch (...) { |
| | | SetLastError(eError, "Send internal error."); |
| | | } |
| | | }; |
| | | |
| | | return RequestAlloc(size, OnResult); |
| | | } |
| | | |
| | | bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) |
| | | { // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag |
| | | // LOG_FUNCTION; |
| | |
| | | RawRecvCB cb_no_use; |
| | | alloc_cbs_->Pick(id, cb_no_use); |
| | | }; |
| | | |
| | | return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB); |
| | | } |
| | |
| | | { |
| | | node_proc_index_ = proc_index; |
| | | socket_index_ = socket_index; |
| | | LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_; |
| | | } |
| | | // start recv. |
| | | bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB()); |
| | |
| | | bool Stop(); |
| | | |
| | | template <class Body> |
| | | bool CenterSend(const MQId remote, BHMsgHead &head, Body &body) |
| | | bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body) |
| | | { |
| | | try { |
| | | //TODO alloc outsiez and use send. |
| | |
| | | bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); |
| | | |
| | | template <class Body> |
| | | bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) |
| | | bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) |
| | | { |
| | | std::string msg_id(head.msg_id()); |
| | | std::string content(MsgI::Serialize(head, body)); |
| | | size_t size = content.size(); |
| | | auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { |
| | | if (!msg.Fill(content)) { return; } |
| | | |
| | | try { |
| | | if (!cb) { |
| | | Send(remote, msg); |
| | | } else { |
| | | per_msg_cbs_->Store(msg_id, std::move(cb)); |
| | | auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Pick(msg_id, cb_no_use); |
| | | }; |
| | | Send(remote, msg, onExpireRemoveCB); |
| | | } |
| | | } catch (...) { |
| | | SetLastError(eError, "Send internal error."); |
| | | } |
| | | }; |
| | | |
| | | return RequestAlloc(size, OnResult); |
| | | return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb)); |
| | | } |
| | | template <class... T> |
| | | bool Send(const MQId remote, const MsgI &imsg, T &&...t) |
| | | bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t) |
| | | { |
| | | return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...); |
| | | } |
| | | template <class... T> |
| | | bool Send(const MQId remote, const int64_t cmd, T &&...t) |
| | | bool Send(const MQInfo &remote, const int64_t cmd, T &&...t) |
| | | { |
| | | return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...); |
| | | } |
| | |
| | | bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); |
| | | |
| | | template <class Body> |
| | | bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) |
| | | bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) |
| | | { |
| | | struct State { |
| | | std::mutex mutex; |
| | |
| | | |
| | | try { |
| | | std::shared_ptr<State> st(new State); |
| | | |
| | | auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); |
| | | |
| | | auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) { |
| | |
| | | bool StopNoLock(); |
| | | bool RunningNoLock() { return !workers_.empty(); } |
| | | |
| | | bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB()); |
| | | |
| | | template <class... Rest> |
| | | bool SendImpl(const MQId remote, Rest &&...rest) |
| | | bool SendImpl(const MQInfo &remote, Rest &&...rest) |
| | | { |
| | | // TODO send alloc request, and pack later, higher bit means alloc? |
| | | send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); |
| | | return true; |
| | | return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); |
| | | } |
| | | |
| | | std::vector<std::thread> workers_; |
| | |
| | | |
| | | namespace |
| | | { |
| | | inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); } |
| | | inline void AddRoute(BHMsgHead &head, const ShmSocket &sock) |
| | | { |
| | | auto route = head.add_route(); |
| | | route->set_mq_id(sock.id()); |
| | | route->set_abs_addr(sock.AbsAddr()); |
| | | } |
| | | |
| | | struct SrcInfo { |
| | | std::vector<BHAddress> route; |
| | |
| | | } // namespace |
| | | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), state_(eStateUnregistered) |
| | | shm_(shm), state_(eStateUninited) |
| | | { |
| | | } |
| | | |
| | |
| | | auto end_time = steady_clock::now() + 3s; |
| | | do { |
| | | try { |
| | | //TODO recv offset, avoid query. |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen)); |
| | | } |
| | |
| | | NodeInit(); |
| | | } |
| | | if (!sockets_.empty()) { |
| | | LOG_DEBUG() << "node sockets ok"; |
| | | auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) { |
| | | LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val); |
| | | switch (DecodeCmd(val)) { |
| | |
| | | DEFER1(msg.Release()); |
| | | MsgProcInit body; |
| | | auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_); |
| | | head.add_route()->set_mq_id(ssn_id_); |
| | | AddRoute(head, socket); |
| | | if (msg.Fill(head, body)) { |
| | | socket.Send(BHTopicCenterAddress(), msg); |
| | | } |
| | |
| | | MsgProcInitReply reply; |
| | | if (imsg.ParseBody(reply)) { |
| | | SetProcIndex(reply.proc_index()); |
| | | this->state_ = eStateUnregistered; |
| | | } |
| | | } |
| | | return true; |
| | | }; |
| | | SockNode().Start(1, onMsg, onNodeCmd); |
| | | LOG_DEBUG() << "sockets ok."; |
| | | return true; |
| | | } |
| | | return false; |
| | |
| | | SetLastError(eError, kErrMsgNotInit); |
| | | return false; |
| | | } |
| | | auto end_time = steady_clock::now() + milliseconds(timeout_ms); |
| | | |
| | | while (state_ != eStateUnregistered && steady_clock::now() < end_time) { |
| | | std::this_thread::yield(); |
| | | } |
| | | if (state_ != eStateUnregistered) { |
| | | SetLastError(eError, kErrMsgNotInit); |
| | | return false; |
| | | } |
| | | |
| | | auto &sock = SockNode(); |
| | | MsgRegister body; |
| | | body.mutable_proc()->Swap(&proc); |
| | | auto AddId = [&](const MQId id) { body.add_addrs()->set_mq_id(id); }; |
| | | AddId(SockNode().id()); |
| | | AddId(SockServer().id()); |
| | | AddId(SockClient().id()); |
| | | AddId(SockSub().id()); |
| | | AddId(SockPub().id()); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | AddRoute(head, sock); |
| | | |
| | | auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { |
| | | bool ok = head.type() == kMsgTypeCommonReply && |
| | |
| | | body.mutable_proc()->Swap(&proc); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | AddRoute(head, sock); |
| | | |
| | | auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) { |
| | | bool r = head.type() == kMsgTypeCommonReply && |
| | |
| | | body.mutable_proc()->Swap(&proc); |
| | | |
| | | auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | AddRoute(head, sock); |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(BHTopicCenterAddress(), head, body); |
| | |
| | | auto &sock = SockNode(); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | AddRoute(head, sock); |
| | | |
| | | MsgI reply; |
| | | DEFER1(reply.Release()); |
| | |
| | | body.mutable_topics()->Swap(&topics); |
| | | |
| | | auto head(InitMsgHead(GetType(body), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | AddRoute(head, sock); |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(BHTopicCenterAddress(), head, body); |
| | |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | | } |
| | | auto remote = head.route().rbegin()->mq_id(); |
| | | MQInfo remote = {head.route().rbegin()->mq_id(), head.route().rbegin()->abs_addr()}; |
| | | sock.Send(remote, reply_head, reply_body); |
| | | } |
| | | }; |
| | |
| | | MsgRequestTopic req; |
| | | if (!imsg.ParseBody(req)) { return; } |
| | | |
| | | SrcInfo *p = new SrcInfo; |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | acb(p, *head.mutable_proc_id(), req); |
| | | try { |
| | | SrcInfo *p = new SrcInfo; |
| | | if (!p) { |
| | | throw std::runtime_error("no memory."); |
| | | } |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | acb(p, *head.mutable_proc_id(), req); |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << "error server handle msg:" << e.what(); |
| | | } |
| | | }; |
| | | |
| | | auto &sock = SockServer(); |
| | |
| | | if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) { |
| | | if (imsg.ParseBody(request)) { |
| | | head.mutable_proc_id()->swap(proc_id); |
| | | SrcInfo *p = new SrcInfo; |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | src_info = p; |
| | | return true; |
| | | try { |
| | | SrcInfo *p = new SrcInfo; |
| | | if (!p) { |
| | | throw std::runtime_error("no memory."); |
| | | } |
| | | p->route.assign(head.route().begin(), head.route().end()); |
| | | p->msg_id = head.msg_id(); |
| | | src_info = p; |
| | | return true; |
| | | } catch (std::exception &e) { |
| | | LOG_ERROR() << "error recv request: " << e.what(); |
| | | return false; |
| | | } |
| | | } |
| | | } |
| | | return false; |
| | |
| | | for (unsigned i = 0; i < p->route.size() - 1; ++i) { |
| | | head.add_route()->Swap(&p->route[i]); |
| | | } |
| | | return sock.Send(p->route.back().mq_id(), head, body); |
| | | MQInfo dest = {p->route.back().mq_id(), p->route.back().abs_addr()}; |
| | | return sock.Send(dest, head, body); |
| | | } |
| | | |
| | | bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) |
| | |
| | | |
| | | out_msg_id = msg_id; |
| | | |
| | | auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) { |
| | | auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) { |
| | | auto &sock = SockClient(); |
| | | BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id)); |
| | | AddRoute(head, sock.id()); |
| | | AddRoute(head, sock); |
| | | head.set_topic(req.topic()); |
| | | |
| | | if (cb) { |
| | |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(addr.mq_id(), head, req, onRecv); |
| | | return sock.Send(remote, head, req, onRecv); |
| | | } else { |
| | | return sock.Send(addr.mq_id(), head, req); |
| | | return sock.Send(remote, head, req); |
| | | } |
| | | }; |
| | | |
| | | try { |
| | | BHAddress addr; |
| | | return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb); |
| | | return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb); |
| | | } catch (...) { |
| | | SetLastError(eError, "internal error."); |
| | | return false; |
| | |
| | | if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) { |
| | | LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id(); |
| | | BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | AddRoute(head, sock); |
| | | head.set_topic(request.topic()); |
| | | |
| | | MsgI reply_msg; |
| | | DEFER1(reply_msg.Release();); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(addr.mq_id(), head, request, reply_msg, reply_head, timeout_ms) && |
| | | if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeRequestTopicReply && |
| | | reply_msg.ParseBody(out_reply)) { |
| | | reply_head.mutable_proc_id()->swap(out_proc_id); |
| | |
| | | return false; |
| | | } |
| | | |
| | | int TopicNode::QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms) |
| | | int TopicNode::QueryTopicServers(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms) |
| | | { |
| | | int n = 0; |
| | | MsgQueryTopic query; |
| | |
| | | return true; |
| | | } |
| | | std::vector<NodeAddress> lst; |
| | | if (QueryRPCTopics(topic, lst, timeout_ms)) { |
| | | if (QueryTopicServers(topic, lst, timeout_ms)) { |
| | | addr = lst.front().addr(); |
| | | if (addr.mq_id() != 0) { |
| | | topic_query_cache_.Store(topic, addr); |
| | |
| | | try { |
| | | auto &sock = SockPub(); |
| | | BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | AddRoute(head, sock); |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(BHTopicBusAddress(), head, pub); |
| | |
| | | sub.mutable_topics()->Swap(&topics); |
| | | |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn())); |
| | | AddRoute(head, sock.id()); |
| | | AddRoute(head, sock); |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(BHTopicBusAddress(), head, sub); |
| | | } else { |
| | |
| | | MQId ssn() { return SockNode().id(); } |
| | | bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms); |
| | | typedef MsgQueryTopicReply::BHNodeAddress NodeAddress; |
| | | int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms); |
| | | int QueryTopicServers(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms); |
| | | const std::string &proc_id() { return info_.proc_id(); } |
| | | |
| | | typedef BHAddress Address; |
| | |
| | | } |
| | | |
| | | enum State { |
| | | eStateUninited, |
| | | eStateUnregistered, |
| | | eStateOnline, |
| | | eStateOffline // heartbeat fail. |
| | |
| | | void state(const State st) { state_.store(st); } |
| | | void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); } |
| | | State state() const { return state_.load(); } |
| | | bool IsOnline() { return Init() && state() == eStateOnline; } |
| | | bool IsOnline() { return state() == eStateOnline; } |
| | | bool Init(); |
| | | bool Valid() const { return !sockets_.empty(); } |
| | | std::mutex mutex_; |
| | |
| | | void *reply = 0; |
| | | int reply_len = 0; |
| | | reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000); |
| | | printf("register %s\n", reg ? "ok" : "failed"); |
| | | if (reg) { |
| | | printf("register ok\n"); |
| | | } else { |
| | | int ec = 0; |
| | | std::string msg; |
| | | GetLastError(ec, msg); |
| | | printf("register failed, %d, %s\n", ec, msg.c_str()); |
| | | } |
| | | |
| | | BHFree(reply, reply_len); |
| | | Sleep(1s); |
| | |
| | | DEFER1(BHFree(msg_id, len);); |
| | | // Sleep(10ms, false); |
| | | std::string dest(BHAddress().SerializeAsString()); |
| | | |
| | | bool r = BHAsyncRequest(dest.data(), dest.size(), s.data(), s.size(), 0, 0); |
| | | if (r) { |
| | | ++Status().nrequest_; |
| | |
| | | |
| | | int same = 0; |
| | | uint64_t last = 0; |
| | | while (last < nreq * ncli && same < 2) { |
| | | while (last < nreq * ncli && same < 3) { |
| | | Sleep(1s, false); |
| | | auto cur = Status().nreply_.load(); |
| | | if (last == cur) { |
| | | ++same; |
| | | printf("same %d\n", same); |
| | | } else { |
| | | last = cur; |
| | | same = 0; |
| | |
| | | run = false; |
| | | threads.WaitAll(); |
| | | auto &st = Status(); |
| | | Sleep(1s); |
| | | printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load()); |
| | | BHCleanup(); |
| | | printf("after cleanup\n"); |
| | |
| | | { |
| | | SharedMemory &shm = TestShm(); |
| | | GlobalInit(shm); |
| | | auto InitSem = [](auto id) { |
| | | auto sem_id = semget(id, 1, 0666 | IPC_CREAT); |
| | | union semun init_val; |
| | | init_val.val = 1; |
| | | semctl(sem_id, 0, SETVAL, init_val); |
| | | return; |
| | | }; |
| | | |
| | | MQId id = ShmMsgQueue::NewId(); |
| | | InitSem(id); |
| | | MQId server_id = ShmMsgQueue::NewId(); |
| | | ShmMsgQueue server(server_id, shm, 1000); |
| | | |
| | | const int timeout = 1000; |
| | | const uint32_t data_size = 1001; |
| | |
| | | std::string str(data_size, 'a'); |
| | | auto Writer = [&](int writer_id, uint64_t n) { |
| | | MQId cli_id = ShmMsgQueue::NewId(); |
| | | InitSem(cli_id); |
| | | |
| | | ShmMsgQueue mq(cli_id, shm, 64); |
| | | MsgI msg; |
| | |
| | | |
| | | for (uint64_t i = 0; i < n; ++i) { |
| | | msg.AddRef(); |
| | | while (!mq.TrySend(id, msg.Offset())) {} |
| | | while (!mq.TrySend({server.Id(), server.AbsAddr()}, msg.Offset())) {} |
| | | ++nwrite; |
| | | } |
| | | }; |
| | | auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) { |
| | | ShmMsgQueue mq(id, shm, 1000); |
| | | ShmMsgQueue &mq = server; |
| | | auto now = []() { return steady_clock::now(); }; |
| | | auto tm = now(); |
| | | while (*run) { |
| | |
| | | req_body.set_topic("topic"); |
| | | req_body.set_data(msg_content); |
| | | auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id())); |
| | | req_head.add_route()->set_mq_id(cli.id()); |
| | | return cli.Send(srv.id(), req_head, req_body); |
| | | auto route = req_head.add_route(); |
| | | route->set_mq_id(cli.id()); |
| | | route->set_abs_addr(cli.AbsAddr()); |
| | | return cli.Send({srv.id(), srv.AbsAddr()}, req_head, req_body); |
| | | }; |
| | | |
| | | Req(); |
| | |
| | | DEFER1(req.Release()); |
| | | |
| | | if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { |
| | | auto src_id = req_head.route()[0].mq_id(); |
| | | MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()}; |
| | | auto Reply = [&]() { |
| | | MsgRequestTopic reply_body; |
| | | reply_body.set_topic("topic"); |
| | | reply_body.set_data(msg_content); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id())); |
| | | return srv.Send(src_id, reply_head, reply_body); |
| | | return srv.Send(src_mq, reply_head, reply_body); |
| | | }; |
| | | Reply(); |
| | | } |