| | |
| | | #include "log.h" |
| | | #include "shm.h" |
| | | #include <chrono> |
| | | #include <set> |
| | | #include <unordered_map> |
| | | |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | |
| | | namespace |
| | | { |
| | | |
| | | typedef std::string ProcId; |
| | | typedef size_t ProcIndex; // max local procs. |
| | | const int kMaxProcs = 65536; |
| | | |
| | | // record all procs ever registered, always grow, never remove. |
| | | // mainly for node to request msg allocation. |
| | | // use index instead of MQId to save some bits. |
| | | class ProcRecords |
| | | { |
| | | public: |
| | | struct ProcRec { |
| | | ProcId proc_; |
| | | MQId ssn_ = 0; |
| | | }; |
| | | |
| | | ProcRecords() { procs_.reserve(kMaxProcs); } |
| | | |
| | | ProcIndex Put(const ProcId &proc_id, const MQId ssn) |
| | | { |
| | | if (procs_.size() >= kMaxProcs) { |
| | | return -1; |
| | | } |
| | | auto pos_isnew = proc_index_.emplace(proc_id, procs_.size()); |
| | | int index = pos_isnew.first->second; |
| | | if (pos_isnew.second) { |
| | | procs_.emplace_back(ProcRec{proc_id, ssn}); |
| | | } else { // update ssn |
| | | procs_[index].ssn_ = ssn; |
| | | } |
| | | return index; |
| | | } |
| | | const ProcRec &Get(const ProcIndex index) const |
| | | { |
| | | static ProcRec empty_rec; |
| | | return (index < procs_.size()) ? procs_[index] : empty_rec; |
| | | } |
| | | |
| | | private: |
| | | std::unordered_map<ProcId, size_t> proc_index_; |
| | | std::vector<ProcRec> procs_; |
| | | }; |
| | | |
| | | class MsgRecords |
| | | { |
| | | typedef int64_t MsgId; |
| | | typedef int64_t Offset; |
| | | |
| | | public: |
| | | void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } |
| | | void FreeMsg(MsgId id) |
| | | { |
| | | auto pos = msgs_.find(id); |
| | | if (pos != msgs_.end()) { |
| | | ShmMsg(pos->second).Free(); |
| | | msgs_.erase(pos); |
| | | } else { |
| | | LOG_TRACE() << "ignore late free request."; |
| | | } |
| | | } |
| | | void AutoRemove() |
| | | { |
| | | auto now = NowSec(); |
| | | if (now < time_to_clean_) { |
| | | return; |
| | | } |
| | | LOG_FUNCTION; |
| | | time_to_clean_ = now + 1; |
| | | int64_t limit = std::max(10000ul, msgs_.size() / 10); |
| | | int64_t n = 0; |
| | | auto it = msgs_.begin(); |
| | | while (it != msgs_.end() && --limit > 0) { |
| | | ShmMsg msg(it->second); |
| | | if (msg.Count() == 0) { |
| | | msg.Free(); |
| | | it = msgs_.erase(it); |
| | | ++n; |
| | | } else if (msg.timestamp() + 10 < NowSec()) { |
| | | msg.Free(); |
| | | it = msgs_.erase(it); |
| | | ++n; |
| | | // LOG_DEBUG() << "release timeout msg, someone crashed."; |
| | | } else { |
| | | ++it; |
| | | } |
| | | } |
| | | if (n > 0) { |
| | | LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; |
| | | } |
| | | } |
| | | size_t size() const { return msgs_.size(); } |
| | | void DebugPrint() const |
| | | { |
| | | LOG_DEBUG() << "msgs : " << size(); |
| | | int i = 0; |
| | | int total_count = 0; |
| | | for (auto &kv : msgs_) { |
| | | MsgI msg(kv.second); |
| | | total_count += msg.Count(); |
| | | LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); |
| | | } |
| | | LOG_DEBUG() << "total count: " << total_count; |
| | | } |
| | | |
| | | private: |
| | | std::unordered_map<MsgId, Offset> msgs_; |
| | | int64_t time_to_clean_ = 0; |
| | | }; |
| | | |
| | | //TODO check proc_id |
| | | class NodeCenter |
| | | { |
| | | public: |
| | | typedef std::string ProcId; |
| | | typedef MQId Address; |
| | | typedef bhome_msg::ProcInfo ProcInfo; |
| | | typedef std::function<void(Address const)> Cleaner; |
| | |
| | | |
| | | // center name, no relative to shm. |
| | | const std::string &id() const { return id_; } |
| | | void OnNodeInit(SharedMemory &shm, const int64_t msg) |
| | | void OnNodeInit(ShmSocket &socket, const int64_t val) |
| | | { |
| | | MQId ssn = msg; |
| | | LOG_FUNCTION; |
| | | SharedMemory &shm = socket.shm(); |
| | | MQId ssn = (val >> 4) & MaskBits(60); |
| | | if (nodes_.find(ssn) != nodes_.end()) { |
| | | return; // ignore in exists. |
| | | } |
| | | |
| | | auto UpdateRegInfo = [&](Node &node) { |
| | | for (int i = 0; i < 10; ++i) { |
| | | node->addrs_.insert(ssn + i); |
| | |
| | | |
| | | // create sockets. |
| | | try { |
| | | auto CreateSocket = [](SharedMemory &shm, const MQId id) { |
| | | ShmSocket tmp(shm, true, id, 16); |
| | | }; |
| | | auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); }; |
| | | // alloc(-1), node, server, sub, request, |
| | | for (int i = -1; i < 4; ++i) { |
| | | CreateSocket(shm, ssn + i); |
| | | for (int i = 0; i < 4; ++i) { |
| | | CreateSocket(ssn + i); |
| | | node->addrs_.insert(ssn + i); |
| | | } |
| | | return true; |
| | |
| | | } |
| | | }; |
| | | |
| | | auto PrepareProcInit = [&]() { |
| | | bool r = false; |
| | | ShmMsg init_msg; |
| | | if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) { |
| | | // 31bit pointer, 4bit cmd+flag |
| | | int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply); |
| | | r = SendAllocReply(socket, ssn, reply, init_msg); |
| | | } |
| | | return r; |
| | | }; |
| | | |
| | | Node node(new NodeInfo); |
| | | if (UpdateRegInfo(node)) { |
| | | if (UpdateRegInfo(node) && PrepareProcInit()) { |
| | | nodes_[ssn] = node; |
| | | LOG_INFO() << "new node ssn (" << ssn << ") init"; |
| | | } else { |
| | | for (int i = 0; i < 10; ++i) { |
| | | ShmSocket::Remove(shm, ssn + i); |
| | | } |
| | | } |
| | | } |
| | | void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); } |
| | | |
| | | bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg) |
| | | { |
| | | RecordMsg(msg); |
| | | auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; |
| | | return socket.Send(dest, reply, onExpireFree); |
| | | } |
| | | bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg) |
| | | { |
| | | RecordMsg(msg); |
| | | auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; |
| | | return socket.Send(dest, msg, onExpireFree); |
| | | } |
| | | |
| | | void OnAlloc(ShmSocket &socket, const int64_t val) |
| | | { |
| | | // LOG_FUNCTION; |
| | | // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag |
| | | int64_t msg_id = (val >> 4) & MaskBits(28); |
| | | int proc_index = (val >> 32) & MaskBits(16); |
| | | int socket_index = ((val) >> 48) & MaskBits(4); |
| | | auto proc_rec(procs_.Get(proc_index)); |
| | | if (proc_rec.proc_.empty()) { |
| | | return; |
| | | } |
| | | Address dest = proc_rec.ssn_ + socket_index; |
| | | |
| | | auto size = GetAllocSize((val >> 52) & MaskBits(8)); |
| | | MsgI new_msg; |
| | | if (new_msg.Make(size)) { |
| | | // 31bit proc index, 28bit id, ,4bit cmd+flag |
| | | int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); |
| | | SendAllocReply(socket, dest, reply, new_msg); |
| | | } else { |
| | | int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure. |
| | | socket.Send(dest, reply); |
| | | } |
| | | } |
| | | |
| | | void OnFree(ShmSocket &socket, const int64_t val) |
| | | { |
| | | int64_t msg_id = (val >> 4) & MaskBits(31); |
| | | msgs_.FreeMsg(msg_id); |
| | | } |
| | | |
| | | bool OnCommand(ShmSocket &socket, const int64_t val) |
| | | { |
| | | assert(IsCmd(val)); |
| | | int cmd = DecodeCmd(val); |
| | | switch (cmd) { |
| | | case eCmdNodeInit: OnNodeInit(socket, val); break; |
| | | case eCmdAllocRequest0: OnAlloc(socket, val); break; |
| | | case eCmdFree: OnFree(socket, val); break; |
| | | default: return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg) |
| | | { |
| | | LOG_DEBUG() << "center got proc init."; |
| | | auto index = procs_.Put(head.proc_id(), head.ssn_id()); |
| | | auto reply(MakeReply<MsgProcInitReply>(eSuccess)); |
| | | reply.set_proc_index(index); |
| | | return reply; |
| | | } |
| | | |
| | | MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) |
| | |
| | | }; |
| | | |
| | | auto pos = nodes_.find(ssn); |
| | | if (pos != nodes_.end()) { // update |
| | | Node &node = pos->second; |
| | | UpdateRegInfo(node); |
| | | } else { |
| | | Node node(new NodeInfo); |
| | | UpdateRegInfo(node); |
| | | nodes_[ssn] = node; |
| | | if (pos == nodes_.end()) { |
| | | return MakeReply(eInvalidInput, "invalid session."); |
| | | } |
| | | |
| | | // update proc info |
| | | Node &node = pos->second; |
| | | UpdateRegInfo(node); |
| | | LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; |
| | | |
| | | auto old = online_node_addr_map_.find(head.proc_id()); |
| | |
| | | void OnTimer() |
| | | { |
| | | CheckNodes(); |
| | | msgs_.AutoRemove(); |
| | | } |
| | | |
| | | private: |
| | | void CheckNodes() |
| | | { |
| | | auto now = NowSec(); |
| | | if (now - last_check_time_ < 1) { return; } |
| | | if (now <= last_check_time_) { return; } |
| | | last_check_time_ = now; |
| | | |
| | | auto it = nodes_.begin(); |
| | |
| | | ++it; |
| | | } |
| | | } |
| | | msgs_.DebugPrint(); |
| | | } |
| | | bool CanHeartbeat(const NodeInfo &node) |
| | | { |
| | |
| | | std::unordered_map<Topic, Clients> service_map_; |
| | | std::unordered_map<Topic, Clients> subscribe_map_; |
| | | std::unordered_map<Address, Node> nodes_; |
| | | std::unordered_map<std::string, Address> online_node_addr_map_; |
| | | std::unordered_map<ProcId, Address> online_node_addr_map_; |
| | | ProcRecords procs_; // To get a short index for msg alloc. |
| | | MsgRecords msgs_; // record all msgs alloced. |
| | | |
| | | Cleaner cleaner_; // remove mqs. |
| | | int64_t offline_time_; |
| | | int64_t kill_time_; |
| | |
| | | msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ |
| | | return true; |
| | | |
| | | auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) |
| | | auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced<NodeCenter> ¢er) |
| | | { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); |
| | | auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); |
| | | auto remote = head.route(0).mq_id(); |
| | | socket.Send(remote, reply_head, rep_body); |
| | | MsgI msg; |
| | | if (msg.Make(reply_head, rep_body)) { |
| | | DEFER1(msg.Release();); |
| | | center->SendAllocMsg(socket, remote, msg); |
| | | } |
| | | }; |
| | | } |
| | | |
| | | bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) |
| | | { |
| | | auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) { |
| | | // command |
| | | auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | center->OnNodeInit(socket.shm(), msg.Offset()); |
| | | return IsCmd(cmd) && center->OnCommand(socket, cmd); |
| | | }; |
| | | auto Nothing = [](ShmSocket &socket) {}; |
| | | |
| | | BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16); |
| | | |
| | | // now we can talk. |
| | | auto OnCenterIdle = [center_ptr](ShmSocket &socket) { |
| | | auto ¢er = *center_ptr; |
| | | center->OnTimer(); |
| | |
| | | |
| | | auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | auto replyer = MakeReplyer(socket, head, center); |
| | | switch (head.type()) { |
| | | CASE_ON_MSG_TYPE(ProcInit); |
| | | CASE_ON_MSG_TYPE(Register); |
| | | CASE_ON_MSG_TYPE(Heartbeat); |
| | | CASE_ON_MSG_TYPE(Unregister); |
| | |
| | | default: return false; |
| | | } |
| | | }; |
| | | BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); |
| | | BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); |
| | | |
| | | auto OnBusIdle = [=](ShmSocket &socket) {}; |
| | | auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; |
| | | auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { |
| | | auto ¢er = *center_ptr; |
| | | auto replyer = MakeReplyer(socket, head, center->id()); |
| | | auto replyer = MakeReplyer(socket, head, center); |
| | | auto OnPublish = [&]() { |
| | | MsgPublish pub; |
| | | NodeCenter::Clients clients; |
| | |
| | | } |
| | | }; |
| | | |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000); |
| | | BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); |
| | | |
| | | return true; |
| | | } |
| | |
| | | return rec; |
| | | } |
| | | |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len) |
| | | { |
| | | Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len}; |
| | | return true; |
| | | } |
| | | bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) |
| | | { |
| | | Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len}; |
| | | Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len}; |
| | | return true; |
| | | } |
| | | |
| | |
| | | { |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | | if (info.handler_) { |
| | | sockets_[info.name_]->Start(info.handler_, info.idle_); |
| | | } else { |
| | | sockets_[info.name_]->Start(info.raw_handler_, info.idle_); |
| | | } |
| | | sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); |
| | | } |
| | | |
| | | return true; |
| | |
| | | |
| | | public: |
| | | typedef Socket::PartialRecvCB MsgHandler; |
| | | typedef Socket::RawRecvCB MsgIHandler; |
| | | typedef Socket::RawRecvCB RawHandler; |
| | | typedef Socket::IdleCB IdleHandler; |
| | | static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len); |
| | | static bool Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len); |
| | | static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len); |
| | | |
| | | BHCenter(Socket::Shm &shm); |
| | | ~BHCenter() { Stop(); } |
| | |
| | | struct CenterInfo { |
| | | std::string name_; |
| | | MsgHandler handler_; |
| | | MsgIHandler raw_handler_; |
| | | RawHandler raw_handler_; |
| | | IdleHandler idle_; |
| | | MQId mqid_; |
| | | int mq_len_ = 0; |
| | |
| | | std::atomic<bool> run_; |
| | | }; |
| | | |
| | | bool CenterInit(bhome_shm::SharedMemory &shm) |
| | | { |
| | | ShmSocket create(shm, BHGlobalSenderAddress(), 16); |
| | | return true; |
| | | } |
| | | |
| | | } // namespace |
| | | int center_main(int argc, const char *argv[]) |
| | | { |
| | |
| | | if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); } |
| | | |
| | | auto &shm = BHomeShm(); |
| | | CenterInit(shm); |
| | | GlobalInit(shm); |
| | | |
| | | InstanceFlag inst(shm, kCenterRunningFlag); |
| | |
| | | |
| | | kMsgTypeCommonReply = 2; |
| | | |
| | | kMsgTypeProcInit = 8; |
| | | kMsgTypeProcInitReply = 9; |
| | | kMsgTypeRegister= 10; |
| | | // kMsgTypeRegisterReply= 11; |
| | | kMsgTypeHeartbeat = 12; |
| | |
| | | MsgTopicList topics = 1; |
| | | } |
| | | |
| | | message MsgProcInit{ } // proc_id is in header. |
| | | |
| | | message MsgProcInitReply { |
| | | ErrorMsg errmsg = 1; |
| | | int32 proc_index = 2; |
| | | } |
| | | |
| | | service TopicRPC { |
| | | rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply); |
| | | rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply); |
| | |
| | | inline void PutInt(void *p, uint32_t u) { Put32(p, u); } |
| | | inline void PutInt(void *p, uint64_t u) { Put64(p, u); } |
| | | |
| | | constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } |
| | | |
| | | class ExitCall |
| | | { |
| | | typedef std::function<void(void)> func_t; |
| | |
| | | return le; |
| | | } |
| | | |
| | | constexpr int64_t AllocSizeIndex[] = { |
| | | 16, 24, 32, 40, 48, 56, 64, 72, |
| | | 80, 88, 96, 104, 120, 136, 152, 168, |
| | | 184, 200, 224, 248, 272, 296, 328, 360, |
| | | 392, 432, 472, 520, 568, 624, 680, 744, |
| | | 816, 896, 984, 1080, 1184, 1296, 1416, 1544, |
| | | 1688, 1848, 2016, 2200, 2400, 2624, 2864, 3128, |
| | | 3416, 3728, 4072, 4448, 4856, 5304, 5792, 6320, |
| | | 6896, 7528, 8216, 8968, 9784, 10680, 11656, 12720, |
| | | 13880, 15144, 16520, 18024, 19664, 21456, 23408, 25536, |
| | | 27864, 30400, 33168, 36184, 39480, 43072, 46992, 51264, |
| | | 55928, 61016, 66568, 72624, 79232, 86440, 94304, 102880, |
| | | 112232, 122440, 133576, 145720, 158968, 173424, 189192, 206392, |
| | | 225160, 245632, 267968, 292328, 318904, 347896, 379528, 414032, |
| | | 451672, 492736, 537536, 586408, 639720, 697880, 761328, 830544, |
| | | 906048, 988416, 1078272, 1176296, 1283232, 1399896, 1527160, 1665992, |
| | | 1817448, 1982672, 2162920, 2359552, 2574056, 2808064, 3063344, 3341832, |
| | | 3645640, 3977064, 4338616, 4733040, 5163320, 5632712, 6144776, 6703392, |
| | | 7312792, 7977592, 8702832, 9494000, 10357096, 11298656, 12325808, 13446336, |
| | | 14668736, 16002264, 17457016, 19044024, 20775304, 22663968, 24724328, 26972000, |
| | | 29424000, 32098912, 35017000, 38200368, 41673128, 45461600, 49594472, 54103064, |
| | | 59021528, 64387128, 70240504, 76626008, 83592008, 91191288, 99481408, 108525176, |
| | | 118391104, 129153936, 140895208, 153703864, 167676944, 182920304, 199549424, 217690280, |
| | | 237480312, 259069432, 282621200, 308314040, 336342592, 366919192, 400275488, 436664168, |
| | | 476360912, 519666456, 566908864, 618446040, 674668408, 736001904, 802911168, 875903096, |
| | | 955530656, 1042397080, 1137160456, 1240538680, 1353314928, 1476343560, 1610556616, 1756970856, |
| | | 1916695480, 2090940528, 2281026032, 2488392040, 2714609504, 2961392192, 3230609664, 3524301456, |
| | | 3844692504, 4194210008, 4575501832, 4991456544, 5445225320, 5940245808, 6480268160, 7069383448, |
| | | 7712054672, 8413150552, 9177982424, 10012344464, 10922557600, 11915517384, 12998746240, 14180450448, |
| | | 15469582312, 16875907976, 18410081432, 20083725200, 21909518400, 23901292800, 26074137600, 28444513752, |
| | | 31030378640, 33851322152, 36928715080, 40285871000, 43948222912, 47943515904, 52302017352, 57056746208, |
| | | 62243723136, 67902243424, 74075174648, 80809281440, 88155579752, 96169723368, 104912425496, 114449918728, |
| | | 124854456800, 136204861968, 148587122152, 162095042352, 176830955296, 192906496688, 210443450936, 229574673752}; |
| | | |
| | | const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]); |
| | | static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough."); |
| | | static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct."); |
| | | } // namespace |
| | | |
| | | int64_t CalcAllocIndex(int64_t size) |
| | | { |
| | | auto pos = std::lower_bound(AllocSizeIndex, AllocSizeIndex + kAllocIndexLen, size); |
| | | return (pos == AllocSizeIndex + kAllocIndexLen) ? -1 : pos - AllocSizeIndex; |
| | | } |
| | | |
| | | int64_t GetAllocSize(int index) { return index < kAllocIndexLen ? AllocSizeIndex[index] : 0; } |
| | | |
| | | std::string BHomeShmName() |
| | | { |
| | | return "bhome_default_shm_v0"; |
| | |
| | | |
| | | typedef uint64_t MQId; |
| | | |
| | | const MQId kBHNodeInit = 10; |
| | | const MQId kBHDefaultSender = 99; |
| | | const MQId kBHTopicCenter = 100; |
| | | const MQId kBHTopicBus = 101; |
| | | const MQId kBHUniCenter = 102; |
| | | inline const MQId BHInitAddress() { return kBHNodeInit; } |
| | | inline const MQId BHGlobalSenderAddress() { return kBHDefaultSender; } |
| | | inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; } |
| | | inline const MQId BHTopicBusAddress() { return kBHTopicBus; } |
| | | inline const MQId BHUniCenterAddress() { return kBHUniCenter; } |
| | | |
| | | int64_t CalcAllocIndex(int64_t size); |
| | | int64_t GetAllocSize(int index); |
| | | |
| | | const int kBHCenterPort = 24287; |
| | | const char kTopicSep = '.'; |
| | |
| | | */ |
| | | #include "msg.h" |
| | | #include "bh_util.h" |
| | | #include "socket.h" |
| | | |
| | | namespace bhome_msg |
| | | { |
| | | |
| | | ShmSocket &ShmMsg::Sender() |
| | | { |
| | | static ShmSocket sender(shm(), false, BHGlobalSenderAddress(), 16); |
| | | return sender; |
| | | } |
| | | |
| | | int ShmMsg::Release() |
| | | { |
| | | if (!valid()) { |
| | | return 0; |
| | | } |
| | | auto n = meta()->count_.Dec(); |
| | | if (n == 0) { |
| | | int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree); |
| | | Sender().Send(BHTopicCenterAddress(), free_cmd); |
| | | } else if (n < 0) { |
| | | throw -123; |
| | | } |
| | | return n; |
| | | } |
| | | |
| | | void ShmMsg::Free() |
| | | { |
| | | assert(valid()); |
| | | shm().Dealloc(meta()); |
| | | offset_ = 0; |
| | | assert(!valid()); |
| | | } |
| | | } // namespace bhome_msg |
| | |
| | | #include <functional> |
| | | #include <stdint.h> |
| | | |
| | | class ShmSocket; |
| | | namespace bhome_msg |
| | | { |
| | | using namespace bhome_shm; |
| | |
| | | |
| | | class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg> |
| | | { |
| | | private: |
| | | static inline SharedMemory &shm() { return GetData(); } |
| | | static ShmSocket &Sender(); |
| | | |
| | | // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. |
| | | class RefCount : private boost::noncopyable |
| | | { |
| | |
| | | int Dec() { return --num_; } |
| | | int Get() { return num_.load(); } |
| | | }; |
| | | |
| | | typedef int64_t OffsetType; |
| | | static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); } |
| | | static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); } |
| | |
| | | |
| | | static const uint32_t kMsgTag = 0xf1e2d3c4; |
| | | struct Meta { |
| | | static int64_t NewId() |
| | | { |
| | | static std::atomic<int64_t> id(0); |
| | | return ++id; |
| | | } |
| | | |
| | | RefCount count_; |
| | | const uint32_t tag_ = kMsgTag; |
| | | const uint32_t size_ = 0; |
| | | const int64_t id_ = 0; |
| | | std::atomic<int64_t> timestamp_; |
| | | Meta(uint32_t size) : |
| | | size_(size) {} |
| | | size_(size), id_(NewId()), timestamp_(NowSec()) {} |
| | | }; |
| | | OffsetType offset_; |
| | | void *Alloc(const size_t size) |
| | | static void *Alloc(const size_t size) |
| | | { |
| | | void *p = shm().Alloc(sizeof(Meta) + size); |
| | | if (p) { |
| | |
| | | } |
| | | return p; |
| | | } |
| | | void Free() |
| | | { |
| | | assert(valid()); |
| | | shm().Dealloc(meta()); |
| | | offset_ = 0; |
| | | assert(!valid()); |
| | | } |
| | | |
| | | private: |
| | | Meta *meta() const { return get<Meta>() - 1; } |
| | | |
| | | typedef std::function<void(void *p, int len)> ToArray; |
| | | void *Pack(const uint32_t head_len, const ToArray &headToArray, |
| | | const uint32_t body_len, const ToArray &bodyToArray) |
| | | |
| | | template <class Body> |
| | | void *Pack(const BHMsgHead &head, const uint32_t head_len, const Body &body, const uint32_t body_len) |
| | | { |
| | | void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len); |
| | | void *addr = get(); |
| | | if (addr) { |
| | | auto p = static_cast<char *>(addr); |
| | | auto Pack1 = [&p](auto len, auto &writer) { |
| | | auto Pack1 = [&p](auto len, auto &&writer) { |
| | | Put32(p, len); |
| | | p += sizeof(len); |
| | | writer(p, len); |
| | | p += len; |
| | | }; |
| | | Pack1(head_len, headToArray); |
| | | Pack1(body_len, bodyToArray); |
| | | Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); }); |
| | | Pack1(body_len, [&](void *p, int len) { body.SerializeToArray(p, len); }); |
| | | } |
| | | return addr; |
| | | } |
| | | |
| | | template <class Body> |
| | | void *Pack(const BHMsgHead &head, const Body &body) |
| | | { |
| | | return Pack( |
| | | uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); }, |
| | | uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); |
| | | } |
| | | |
| | | void *Pack(const std::string &content) |
| | | { |
| | | void *addr = Alloc(content.size()); |
| | | void *addr = get(); |
| | | if (addr) { |
| | | memcpy(addr, content.data(), content.size()); |
| | | } |
| | |
| | | offset_(p ? (Addr(p) - BaseAddr()) : 0) {} |
| | | |
| | | template <class T = void> |
| | | T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); } |
| | | T *get() const { return offset_ != 0 ? static_cast<T *>(Ptr(offset_ + BaseAddr())) : nullptr; } |
| | | |
| | | public: |
| | | static bool BindShm(SharedMemory &shm) { return SetData(shm); } |
| | | ShmMsg() : |
| | | ShmMsg(nullptr) {} |
| | | offset_(0) {} |
| | | explicit ShmMsg(const OffsetType offset) : |
| | | offset_(offset) {} |
| | | OffsetType Offset() const { return offset_; } |
| | | OffsetType &OffsetRef() { return offset_; } |
| | | void swap(ShmMsg &a) { std::swap(offset_, a.offset_); } |
| | | bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; } |
| | | |
| | | int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } |
| | | int Release() |
| | | { |
| | | if (!valid()) { |
| | | return 0; |
| | | } |
| | | auto n = meta()->count_.Dec(); |
| | | if (n == 0) { |
| | | Free(); |
| | | } |
| | | return n; |
| | | } |
| | | bool valid() const { return offset_ != 0 && meta()->tag_ == kMsgTag; } |
| | | int64_t id() const { return valid() ? meta()->id_ : 0; } |
| | | int64_t timestamp() const { return valid() ? meta()->timestamp_.load() : 0; } |
| | | size_t Size() const { return valid() ? meta()->size_ : 0; } |
| | | int Count() const { return valid() ? meta()->count_.Get() : 1; } |
| | | int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } |
| | | int Release(); |
| | | void Free(); |
| | | |
| | | template <class Body> |
| | | inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); } |
| | | inline bool Make(const std::string &content) { return Make(Pack(content)); } |
| | | inline bool Make(const BHMsgHead &head, const Body &body) |
| | | { |
| | | uint32_t head_len = head.ByteSizeLong(); |
| | | uint32_t body_len = body.ByteSizeLong(); |
| | | uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len; |
| | | return Make(size) && Pack(head, head_len, body, body_len); |
| | | } |
| | | template <class Body> |
| | | inline bool Fill(const BHMsgHead &head, const Body &body) |
| | | { |
| | | uint32_t head_len = head.ByteSizeLong(); |
| | | uint32_t body_len = body.ByteSizeLong(); |
| | | uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len; |
| | | return valid() && (meta()->size_ >= size) && Pack(head, head_len, body, body_len); |
| | | } |
| | | |
| | | inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); } |
| | | inline bool Fill(const std::string &content) { return valid() && (meta()->size_ >= content.size()) && Pack(content); } |
| | | |
| | | inline bool Make(const size_t size) { return Make(Alloc(size)); } |
| | | |
| | | template <class Body> |
| | | static inline std::string Serialize(const BHMsgHead &head, const Body &body) |
| | | { |
| | |
| | | |
| | | typedef ShmMsg MsgI; |
| | | |
| | | constexpr inline int EncodeCmd(int cmd) { return ((cmd & MaskBits(3)) << 1) | 1; } |
| | | constexpr inline int DecodeCmd(int64_t msg) { return (msg >> 1) & MaskBits(3); } |
| | | constexpr inline bool IsCmd(int64_t msg) { return (msg & 1) != 0; } |
| | | // int64_t pack format: cmd data ,3bit cmd, 1bit flag. |
| | | enum MsgCmd { |
| | | eCmdNodeInit = 0, // upto 59bit ssn id |
| | | eCmdNodeInitReply = 1, // 31bit proc index, |
| | | eCmdAllocRequest0 = 2, // 8bit size, 4bit socket index, 16bit proc index, 28bit id |
| | | eCmdAllocReply0 = 3, // 31bit ptr, 28bit id, |
| | | eCmdFree = 4, // upto 59bit msg id, |
| | | }; |
| | | |
| | | } // namespace bhome_msg |
| | | |
| | | #endif // end of include guard: MSG_5BILLZET |
| | |
| | | BHOME_SIMPLE_MAP_MSG(Publish); |
| | | BHOME_SIMPLE_MAP_MSG(Subscribe); |
| | | BHOME_SIMPLE_MAP_MSG(Unsubscribe); |
| | | BHOME_SIMPLE_MAP_MSG(ProcInit); |
| | | BHOME_SIMPLE_MAP_MSG(ProcInitReply); |
| | | |
| | | #undef BHOME_SIMPLE_MAP_MSG |
| | | #undef BHOME_MAP_MSG_AND_TYPE |
| | |
| | | |
| | | bool FMutex::try_lock() |
| | | { |
| | | if (mtx_.try_lock()) { |
| | | if (flock(fd_, LOCK_EX | LOCK_NB) == 0) { |
| | | if (flock(fd_, LOCK_EX | LOCK_NB) == 0) { |
| | | ++count_; |
| | | if (mtx_.try_lock()) { |
| | | return true; |
| | | } else { |
| | | mtx_.unlock(); |
| | | if (--count_ == 0) { |
| | | flock(fd_, LOCK_UN); |
| | | } |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | void FMutex::lock() |
| | | { |
| | | mtx_.lock(); |
| | | flock(fd_, LOCK_EX); |
| | | ++count_; |
| | | mtx_.lock(); |
| | | } |
| | | void FMutex::unlock() |
| | | { |
| | | flock(fd_, LOCK_UN); |
| | | mtx_.unlock(); |
| | | if (--count_ == 0) { |
| | | flock(fd_, LOCK_UN); |
| | | } |
| | | } |
| | | |
| | | } // namespace robust |
| | |
| | | #ifndef ROBUST_Q31RCWYU |
| | | #define ROBUST_Q31RCWYU |
| | | |
| | | #include "bh_util.h" |
| | | #include "log.h" |
| | | #include <atomic> |
| | | #include <chrono> |
| | |
| | | |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | | constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } |
| | | |
| | | void QuickSleep(); |
| | | |
| | | class CasMutex |
| | |
| | | public: |
| | | typedef uint64_t id_t; |
| | | FMutex(id_t id) : |
| | | id_(id), fd_(Open(id_)) |
| | | id_(id), fd_(Open(id_)), count_(0) |
| | | { |
| | | if (fd_ == -1) { throw "error create mutex!"; } |
| | | } |
| | |
| | | } |
| | | static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); } |
| | | static int Close(int fd) { return close(fd); } |
| | | void FLock(); |
| | | void FUnlock(); |
| | | id_t id_; |
| | | int fd_; |
| | | std::mutex mtx_; |
| | | std::atomic<int32_t> count_; |
| | | }; |
| | | |
| | | union semun { |
| | |
| | | AData buf[capacity]; |
| | | }; |
| | | |
| | | template <class Int> |
| | | class AtomicQueue<0, Int> |
| | | { |
| | | typedef Int Data; |
| | | typedef std::atomic<Data> AData; |
| | | static_assert(sizeof(Data) == sizeof(AData)); |
| | | |
| | | public: |
| | | AtomicQueue() { memset(this, 0, sizeof(*this)); } |
| | | bool push(const Data d, bool try_more = false) |
| | | { |
| | | auto cur = buf.load(); |
| | | return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d)); |
| | | } |
| | | bool pop(Data &d, bool try_more = false) |
| | | { |
| | | Data cur = buf.load(); |
| | | bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0); |
| | | if (r) { d = Dec(cur); } |
| | | return r; |
| | | } |
| | | uint32_t head() const { return 0; } |
| | | uint32_t tail() const { return 0; } |
| | | |
| | | private: |
| | | static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. |
| | | static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. |
| | | static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. |
| | | AData buf; |
| | | }; |
| | | |
| | | } // namespace robust |
| | | #endif // end of include guard: ROBUST_Q31RCWYU |
| | |
| | | } |
| | | |
| | | auto SendData = [&](Data &d) { |
| | | auto TryLoop = [&](auto &&data) { |
| | | for (int i = 0; i < 1; ++i) { |
| | | if (mq.TrySend(remote, data)) { |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | }; |
| | | bool r = false; |
| | | if (d.index() == 0) { |
| | | auto &msg = boost::variant2::get<0>(pos->data().data_); |
| | | r = mq.TrySend(remote, msg); |
| | | r = TryLoop(msg); |
| | | if (r) { |
| | | msg.Release(); |
| | | } |
| | | } else { |
| | | auto &content = boost::variant2::get<1>(pos->data().data_); |
| | | MsgI msg; |
| | | if (msg.Make(content)) { |
| | | DEFER1(msg.Release();); |
| | | r = mq.TrySend(remote, msg); |
| | | } |
| | | auto command = boost::variant2::get<1>(pos->data().data_); |
| | | r = TryLoop(command); |
| | | } |
| | | return r; |
| | | }; |
| | |
| | | Collect(); |
| | | |
| | | return !out_.empty(); |
| | | } |
| | | } |
| | |
| | | typedef MQId Remote; |
| | | typedef bhome_msg::MsgI MsgI; |
| | | typedef std::string Content; |
| | | typedef boost::variant2::variant<MsgI, Content> Data; |
| | | typedef int64_t Command; |
| | | typedef boost::variant2::variant<MsgI, Command> Data; |
| | | typedef std::function<void(const Data &)> OnMsgEvent; |
| | | struct MsgInfo { |
| | | Data data_; |
| | |
| | | typedef TimedMsg::TimePoint TimePoint; |
| | | typedef TimedMsg::Duration Duration; |
| | | |
| | | // template <class... Rest> |
| | | // void Append(const MQId &id, Rest &&...rest) |
| | | // { |
| | | // Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...); |
| | | // } |
| | | |
| | | void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | msg.AddRef(); |
| | | AppendData(addr, Data(msg), DefaultExpire(), onExpire); |
| | | } |
| | | void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent()) |
| | | void Append(const Remote addr, const Command command, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire); |
| | | AppendData(addr, Data(command), DefaultExpire(), onExpire); |
| | | } |
| | | bool TrySend(ShmMsgQueue &mq); |
| | | // bool empty() const { return store_.empty(); } |
| | | |
| | | private: |
| | | static TimePoint Now() { return TimedMsg::Clock::now(); } |
| | |
| | | |
| | | ShmMsgQueue::~ShmMsgQueue() {} |
| | | |
| | | #ifndef BH_USE_ATOMIC_Q |
| | | ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id) |
| | | { |
| | | static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm; |
| | |
| | | } |
| | | return *pos->second; |
| | | } |
| | | #endif |
| | | |
| | | bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) |
| | | { |
| | | Queue *q = Find(shm, id); |
| | | if (q) { |
| | | MsgI msg; |
| | | while (q->TryRead(msg.OffsetRef())) { |
| | | msg.Release(); |
| | | RawData val = 0; |
| | | while (q->TryRead(val)) { |
| | | if (IsCmd(val)) { |
| | | LOG_DEBUG() << "clsing queue " << id << ", has a cmd" << DecodeCmd(val); |
| | | } else { |
| | | MsgI(val).Release(); |
| | | } |
| | | } |
| | | } |
| | | return Shmq::Remove(shm, MsgQIdToName(id)); |
| | |
| | | return Shmq::Find(shm, MsgQIdToName(remote_id)); |
| | | } |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg) |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, int64_t val) |
| | | { |
| | | bool r = false; |
| | | try { |
| | | ShmMsgQueue dest(remote_id, false, shm, 1); |
| | | msg.AddRef(); |
| | | DEFER1(if (!r) { msg.Release(); }); |
| | | |
| | | #ifndef BH_USE_ATOMIC_Q |
| | | Guard lock(GetMutex(remote_id)); |
| | | r = dest.queue().TryWrite(msg.Offset()); |
| | | #endif |
| | | return dest.queue().TryWrite(val); |
| | | } catch (...) { |
| | | // SetLastError(eNotFound, "remote not found"); |
| | | return false; |
| | | } |
| | | return r; |
| | | } |
| | | |
| | | // Test shows that in the 2 cases: |
| | |
| | | using namespace bhome_shm; |
| | | using namespace bhome_msg; |
| | | |
| | | #define BH_USE_ATOMIC_Q |
| | | |
| | | class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> |
| | | { |
| | | // typedef ShmObject<SharedQ63<4>> Shmq; |
| | | typedef ShmObject<SharedQueue<int64_t>> Shmq; |
| | | typedef Shmq::ShmType ShmType; |
| | | typedef Shmq::Data Queue; |
| | | typedef std::function<void()> OnSend; |
| | | typedef robust::FMutex Mutex; |
| | | // typedef robust::SemMutex Mutex; |
| | | // typedef robust::NullMutex Mutex; |
| | | typedef robust::Guard<Mutex> Guard; |
| | | |
| | | public: |
| | | typedef int64_t RawData; |
| | | |
| | | #ifdef BH_USE_ATOMIC_Q |
| | | typedef ShmObject<SharedQ63<0>> Shmq; |
| | | #else |
| | | typedef ShmObject<SharedQueue<RawData>> Shmq; |
| | | // typedef robust::FMutex Mutex; |
| | | // typedef robust::SemMutex Mutex; |
| | | typedef robust::NullMutex Mutex; |
| | | typedef robust::Guard<Mutex> Guard; |
| | | #endif |
| | | |
| | | typedef Shmq::Data Queue; |
| | | typedef Shmq::ShmType ShmType; |
| | | typedef uint64_t MQId; |
| | | |
| | | static MQId NewId(); |
| | |
| | | ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len); |
| | | ShmMsgQueue(ShmType &segment, const int len); |
| | | ~ShmMsgQueue(); |
| | | static bool Remove(SharedMemory &shm, const MQId id); |
| | | static bool Remove(ShmType &shm, const MQId id); |
| | | MQId Id() const { return id_; } |
| | | ShmType &shm() const { return queue_.shm(); } |
| | | |
| | | bool Recv(MsgI &msg, const int timeout_ms) |
| | | bool Recv(RawData &val, const int timeout_ms) |
| | | { |
| | | #ifndef BH_USE_ATOMIC_Q |
| | | Guard lock(GetMutex(Id())); |
| | | return queue().Read(msg.OffsetRef(), timeout_ms); |
| | | #endif |
| | | return queue().Read(val, timeout_ms); |
| | | } |
| | | bool TryRecv(MsgI &msg) |
| | | |
| | | bool TryRecv(RawData &val) |
| | | { |
| | | #ifndef BH_USE_ATOMIC_Q |
| | | Guard lock(GetMutex(Id())); |
| | | return queue().TryRead(msg.OffsetRef()); |
| | | #endif |
| | | return queue().TryRead(val); |
| | | } |
| | | static Queue *Find(SharedMemory &shm, const MQId remote_id); |
| | | static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg); |
| | | |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); } |
| | | bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); } |
| | | static Queue *Find(ShmType &shm, const MQId remote_id); |
| | | static bool TrySend(ShmType &shm, const MQId remote_id, const RawData val); |
| | | static bool TrySend(ShmType &shm, const MQId remote_id, MsgI msg) |
| | | { |
| | | bool r = false; |
| | | msg.AddRef(); // TODO check if we could avoid addref here. |
| | | DEFER1(if (!r) { msg.Release(); }); |
| | | r = TrySend(shm, remote_id, msg.Offset()); |
| | | return r; |
| | | } |
| | | bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); } |
| | | bool TrySend(const MQId remote_id, const RawData val) { return TrySend(shm(), remote_id, val); } |
| | | |
| | | private: |
| | | #ifndef BH_USE_ATOMIC_Q |
| | | static Mutex &GetMutex(const MQId id); |
| | | #endif |
| | | MQId id_; |
| | | Queue &queue() { return *queue_.data(); } |
| | | Shmq queue_; |
| | |
| | | |
| | | private: |
| | | Circular<D> queue_; |
| | | bhome_shm::Mutex mutex_; |
| | | // bhome_shm::Mutex mutex_; |
| | | }; |
| | | |
| | | template <int Power = 4> |
| | |
| | | using namespace std::chrono; |
| | | auto end_time = steady_clock::now() + milliseconds(timeout_ms); |
| | | do { |
| | | if (TryRead(d)) { |
| | | return true; |
| | | } else { |
| | | robust::QuickSleep(); |
| | | for (int i = 0; i < 100; ++i) { |
| | | if (TryRead(d)) { |
| | | return true; |
| | | } |
| | | } |
| | | robust::QuickSleep(); |
| | | } while (steady_clock::now() < end_time); |
| | | return false; |
| | | } |
| | |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "msg.h" |
| | | #include <chrono> |
| | | using namespace std::chrono; |
| | | using namespace std::chrono_literals; |
| | | |
| | | using namespace bhome_msg; |
| | | using namespace bhome_shm; |
| | | |
| | | ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : |
| | | run_(false), mq_(id, shm, len) |
| | | run_(false), mq_(id, shm, len), alloc_id_(0) |
| | | { |
| | | Start(); |
| | | } |
| | | ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) : |
| | | run_(false), mq_(id, create_or_else_find, shm, len) |
| | | run_(false), mq_(id, create_or_else_find, shm, len), alloc_id_(0) |
| | | { |
| | | Start(); |
| | | } |
| | | ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : |
| | | run_(false), mq_(shm, len) |
| | | run_(false), mq_(shm, len), alloc_id_(0) |
| | | { |
| | | Start(); |
| | | } |
| | |
| | | Stop(); |
| | | } |
| | | |
| | | bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker) |
| | | bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle) |
| | | { |
| | | auto ioProc = [this, onData, onIdle]() { |
| | | auto ioProc = [this, onData, onRaw, onIdle]() { |
| | | auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; |
| | | auto DoRecv = [=] { |
| | | // do not recv if no cb is set. |
| | | if (!onData) { |
| | | return false; |
| | | } |
| | | auto onMsg = [&](MsgI &imsg) { |
| | | DEFER1(imsg.Release()); |
| | | onData(*this, imsg); |
| | | }; |
| | | MsgI imsg; |
| | | return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false; |
| | | }; |
| | | if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; } |
| | | |
| | | try { |
| | | bool more_to_send = DoSend(); |
| | | bool more_to_recv = DoRecv(); |
| | | if (onIdle) { onIdle(*this); } |
| | | if (!more_to_send && !more_to_recv) { |
| | | robust::QuickSleep(); |
| | | } |
| | | } catch (...) { |
| | | } |
| | | }; |
| | | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | StopNoLock(); |
| | | |
| | | run_.store(true); |
| | | for (int i = 0; i < nworker; ++i) { |
| | | workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } }); |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) |
| | | { |
| | | auto ioProc = [this, onData, onIdle]() { |
| | | auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; |
| | | auto DoRecv = [=] { |
| | | auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | auto onMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { |
| | | RecvCB cb; |
| | | if (per_msg_cbs_->Pick(head.msg_id(), cb)) { |
| | | cb(socket, imsg, head); |
| | |
| | | onData(socket, imsg, head); |
| | | } |
| | | }; |
| | | |
| | | // do not recv if no cb is set. |
| | | if (!onData && per_msg_cbs_->empty()) { |
| | | return false; |
| | | } |
| | | auto onMsg = [&](MsgI &imsg) { |
| | | DEFER1(imsg.Release()); |
| | | BHMsgHead head; |
| | | if (imsg.ParseHead(head)) { |
| | | onRecvWithPerMsgCB(*this, imsg, head); |
| | | auto onCmdCB = [this, onRaw](ShmSocket &socket, int64_t val) { |
| | | int cmd = DecodeCmd(val); |
| | | if (cmd == eCmdAllocReply0) { |
| | | int id = (val >> 4) & MaskBits(28); |
| | | RawRecvCB cb; |
| | | if (alloc_cbs_->Pick(id, cb)) { |
| | | cb(socket, val); |
| | | return; |
| | | } |
| | | } |
| | | if (onRaw) { |
| | | onRaw(socket, val); |
| | | } |
| | | }; |
| | | MsgI imsg; |
| | | return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false; |
| | | |
| | | auto onRecv = [&](auto &val) { |
| | | if (IsCmd(val)) { |
| | | onCmdCB(*this, val); |
| | | } else { |
| | | MsgI imsg(val); |
| | | DEFER1(imsg.Release()); |
| | | BHMsgHead head; |
| | | if (imsg.ParseHead(head)) { |
| | | onMsgCB(*this, imsg, head); |
| | | } |
| | | } |
| | | }; |
| | | ShmMsgQueue::RawData val = 0; |
| | | auto TryRecvMore = [&]() { |
| | | for (int i = 0; i < 100; ++i) { |
| | | if (mq().TryRecv(val)) { |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | }; |
| | | return TryRecvMore() ? (onRecv(val), true) : false; |
| | | }; |
| | | |
| | | try { |
| | |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | StopNoLock(); |
| | | |
| | | auto worker_proc = [this, ioProc]() { |
| | | while (run_) { ioProc(); } |
| | | // try send pending msgs. |
| | | auto end_time = steady_clock::now() + 3s; |
| | | while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) { |
| | | // LOG_DEBUG() << "try send pending msgs."; |
| | | } |
| | | }; |
| | | |
| | | run_.store(true); |
| | | for (int i = 0; i < nworker; ++i) { |
| | | workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } }); |
| | | workers_.emplace_back(worker_proc); |
| | | } |
| | | return true; |
| | | } |
| | |
| | | return false; |
| | | } |
| | | |
| | | bool ShmSocket::SyncRecv(int64_t &cmd, const int timeout_ms) |
| | | { |
| | | return (timeout_ms == 0) ? mq().TryRecv(cmd) : mq().Recv(cmd, timeout_ms); |
| | | } |
| | | //maybe reimplment, using async cbs? |
| | | bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms) |
| | | { |
| | |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) |
| | | { // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag |
| | | // LOG_FUNCTION; |
| | | if (node_proc_index_ == -1 || socket_index_ == -1) { |
| | | return false; |
| | | } |
| | | int id = (++alloc_id_) & MaskBits(28); |
| | | int64_t cmd = (CalcAllocIndex(size) << 52) | |
| | | ((socket_index_ & MaskBits(4)) << 48) | |
| | | ((node_proc_index_ & MaskBits(16)) << 32) | |
| | | (id << 4) | |
| | | EncodeCmd(eCmdAllocRequest0); |
| | | auto rawCB = [onResult](ShmSocket &sock, int64_t &val) { |
| | | MsgI msg((val >> 32) & MaskBits(31)); |
| | | DEFER1(msg.Release()); |
| | | onResult(msg); |
| | | return true; |
| | | }; |
| | | |
| | | alloc_cbs_->Store(id, std::move(rawCB)); |
| | | auto onExpireRemoveCB = [this, id](SendQ::Data const &msg) { |
| | | RawRecvCB cb_no_use; |
| | | alloc_cbs_->Pick(id, cb_no_use); |
| | | }; |
| | | return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB); |
| | | } |
| | |
| | | public: |
| | | typedef ShmMsgQueue::MQId MQId; |
| | | typedef bhome_shm::SharedMemory Shm; |
| | | typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB; |
| | | typedef std::function<void(ShmSocket &sock, Queue::RawData &val)> RawRecvCB; |
| | | typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB; |
| | | typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB; |
| | | typedef std::function<void(ShmSocket &sock)> IdleCB; |
| | |
| | | static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); } |
| | | bool Remove() { return Remove(shm(), id()); } |
| | | MQId id() const { return mq().Id(); } |
| | | void SetNodeProc(const int proc_index, const int socket_index) |
| | | { |
| | | node_proc_index_ = proc_index; |
| | | socket_index_ = socket_index; |
| | | LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_; |
| | | } |
| | | // start recv. |
| | | bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1); |
| | | bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); |
| | | bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } |
| | | bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB()); |
| | | bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, RawRecvCB(), onIdle); } |
| | | bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } |
| | | bool Stop(); |
| | | |
| | | template <class Body> |
| | | bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) |
| | | bool CenterSend(const MQId remote, BHMsgHead &head, Body &body) |
| | | { |
| | | try { |
| | | if (!cb) { |
| | | return SendImpl(remote, MsgI::Serialize(head, body)); |
| | | } else { |
| | | std::string msg_id(head.msg_id()); |
| | | per_msg_cbs_->Store(msg_id, std::move(cb)); |
| | | auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Pick(msg_id, cb_no_use); |
| | | }; |
| | | return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB); |
| | | } |
| | | //TODO alloc outsiez and use send. |
| | | MsgI msg; |
| | | if (!msg.Make(head, body)) { return false; } |
| | | DEFER1(msg.Release()); |
| | | |
| | | return Send(remote, msg); |
| | | } catch (...) { |
| | | SetLastError(eError, "Send internal error."); |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool Send(const MQId remote, const MsgI &imsg) |
| | | { |
| | | return SendImpl(remote, imsg); |
| | | } |
| | | bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); |
| | | |
| | | template <class Body> |
| | | bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) |
| | | { |
| | | std::string msg_id(head.msg_id()); |
| | | std::string content(MsgI::Serialize(head, body)); |
| | | size_t size = content.size(); |
| | | auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { |
| | | if (!msg.Fill(content)) { return; } |
| | | |
| | | try { |
| | | if (!cb) { |
| | | Send(remote, msg); |
| | | } else { |
| | | per_msg_cbs_->Store(msg_id, std::move(cb)); |
| | | auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Pick(msg_id, cb_no_use); |
| | | }; |
| | | Send(remote, msg, onExpireRemoveCB); |
| | | } |
| | | } catch (...) { |
| | | SetLastError(eError, "Send internal error."); |
| | | } |
| | | }; |
| | | |
| | | return RequestAlloc(size, OnResult); |
| | | } |
| | | template <class... T> |
| | | bool Send(const MQId remote, const MsgI &imsg, T &&...t) |
| | | { |
| | | return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...); |
| | | } |
| | | template <class... T> |
| | | bool Send(const MQId remote, const int64_t cmd, T &&...t) |
| | | { |
| | | return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...); |
| | | } |
| | | bool SyncRecv(int64_t &cmd, const int timeout_ms); |
| | | bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); |
| | | |
| | | template <class Body> |
| | |
| | | std::atomic<bool> run_; |
| | | |
| | | Queue mq_; |
| | | template <class Key> |
| | | template <class Key, class CB> |
| | | class CallbackRecords |
| | | { |
| | | std::unordered_map<Key, RecvCB> store_; |
| | | std::unordered_map<Key, CB> store_; |
| | | |
| | | public: |
| | | bool empty() const { return store_.empty(); } |
| | | bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; } |
| | | bool Pick(const Key &id, RecvCB &cb) |
| | | bool Store(const Key &id, CB &&cb) { return store_.emplace(id, std::move(cb)).second; } |
| | | bool Pick(const Key &id, CB &cb) |
| | | { |
| | | auto pos = store_.find(id); |
| | | if (pos != store_.end()) { |
| | |
| | | } |
| | | }; |
| | | |
| | | Synced<CallbackRecords<std::string>> per_msg_cbs_; |
| | | Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_; |
| | | Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_; |
| | | |
| | | SendQ send_buffer_; |
| | | // node request center alloc memory. |
| | | int node_proc_index_ = -1; |
| | | int socket_index_ = -1; |
| | | std::atomic<int> alloc_id_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SOCKET_GWTJHBPO |
| | |
| | | TopicNode::TopicNode(SharedMemory &shm) : |
| | | shm_(shm), state_(eStateUnregistered) |
| | | { |
| | | Init(); |
| | | } |
| | | |
| | | TopicNode::~TopicNode() |
| | |
| | | |
| | | if (Valid()) { |
| | | return true; |
| | | } else if (info_.proc_id().empty()) { |
| | | return false; |
| | | } |
| | | |
| | | if (ssn_id_ == 0) { |
| | | ssn_id_ = ShmMsgQueue::NewId(); |
| | | } |
| | | LOG_DEBUG() << "Node Init, id " << ssn_id_; |
| | | MsgI msg; |
| | | msg.OffsetRef() = ssn_id_; |
| | | if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) { |
| | | |
| | | auto end_time = steady_clock::now() + 3s; |
| | | do { |
| | | try { |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen)); |
| | | auto NodeInit = [&]() { |
| | | auto SendInitCmd = [&]() { |
| | | int64_t init_cmd = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit); |
| | | auto end_time = steady_clock::now() + 3s; |
| | | bool r = false; |
| | | do { |
| | | r = ShmMsgQueue::TrySend(shm(), BHTopicCenterAddress(), init_cmd); |
| | | } while (!r && steady_clock::now() < end_time); |
| | | return r; |
| | | }; |
| | | if (SendInitCmd()) { |
| | | LOG_DEBUG() << "node send init ok"; |
| | | auto end_time = steady_clock::now() + 3s; |
| | | do { |
| | | try { |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen)); |
| | | } |
| | | break; |
| | | } catch (...) { |
| | | sockets_.clear(); |
| | | std::this_thread::sleep_for(100ms); |
| | | } |
| | | break; |
| | | } catch (...) { |
| | | sockets_.clear(); |
| | | std::this_thread::sleep_for(100ms); |
| | | } |
| | | } while (steady_clock::now() < end_time); |
| | | if (!sockets_.empty()) { |
| | | // recv msgs to avoid memory leak. |
| | | auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; |
| | | SockNode().Start(default_ignore_msg); |
| | | return true; |
| | | } while (steady_clock::now() < end_time); |
| | | } |
| | | }; |
| | | if (sockets_.empty()) { |
| | | NodeInit(); |
| | | } |
| | | if (!sockets_.empty()) { |
| | | LOG_DEBUG() << "node sockets ok"; |
| | | auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) { |
| | | LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val); |
| | | switch (DecodeCmd(val)) { |
| | | case eCmdNodeInitReply: { |
| | | MsgI msg(val >> 4); |
| | | DEFER1(msg.Release()); |
| | | MsgProcInit body; |
| | | auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_); |
| | | head.add_route()->set_mq_id(ssn_id_); |
| | | if (msg.Fill(head, body)) { |
| | | socket.Send(BHTopicCenterAddress(), msg); |
| | | } |
| | | } break; |
| | | default: |
| | | break; |
| | | } |
| | | return true; |
| | | }; |
| | | |
| | | // recv msgs to avoid memory leak. |
| | | auto onMsg = [this](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { |
| | | LOG_DEBUG() << "node recv type: " << head.type(); |
| | | if (head.type() == kMsgTypeProcInitReply) { |
| | | LOG_DEBUG() << "got proc init reply"; |
| | | MsgProcInitReply reply; |
| | | if (imsg.ParseBody(reply)) { |
| | | SetProcIndex(reply.proc_index()); |
| | | } |
| | | } |
| | | return true; |
| | | }; |
| | | SockNode().Start(1, onMsg, onNodeCmd); |
| | | LOG_DEBUG() << "sockets ok."; |
| | | return true; |
| | | } |
| | | return false; |
| | | } |
| | |
| | | } else if (nworker > 16) { |
| | | nworker = 16; |
| | | } |
| | | SockNode().Start(); |
| | | // SockNode().Start(); |
| | | ServerStart(server_cb, nworker); |
| | | SubscribeStartWorker(sub_cb, nworker); |
| | | ClientStartWorker(client_cb, nworker); |
| | |
| | | |
| | | bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) |
| | | { |
| | | { |
| | | std::lock_guard<std::mutex> lk(mutex_); |
| | | info_ = proc; |
| | | } |
| | | |
| | | if (!Init()) { |
| | | SetLastError(eError, kErrMsgNotInit); |
| | | return false; |
| | | } |
| | | |
| | | info_ = proc; |
| | | |
| | | auto &sock = SockNode(); |
| | | MsgRegister body; |
| | |
| | | ShmSocket &SockClient() { return *sockets_[eSockClient]; } |
| | | ShmSocket &SockServer() { return *sockets_[eSockServer]; } |
| | | |
| | | void SetProcIndex(int index) |
| | | { |
| | | proc_index_ = index; |
| | | for (int i = eSockStart; i < eSockEnd; ++i) { |
| | | sockets_[i]->SetNodeProc(index, i); |
| | | } |
| | | } |
| | | |
| | | enum State { |
| | | eStateUnregistered, |
| | | eStateOnline, |
| | |
| | | std::mutex mutex_; |
| | | MQId ssn_id_ = 0; |
| | | std::atomic<State> state_; |
| | | int proc_index_ = -1; |
| | | |
| | | TopicQueryCache topic_query_cache_; |
| | | }; |
| | |
| | | // } |
| | | |
| | | int same = 0; |
| | | int64_t last = 0; |
| | | uint64_t last = 0; |
| | | while (last < nreq * ncli && same < 2) { |
| | | Sleep(1s, false); |
| | | auto cur = Status().nreply_.load(); |
| | |
| | | std::atomic<uint64_t> nwrite(0); |
| | | std::atomic<uint64_t> writedone(0); |
| | | |
| | | #if 0 |
| | | typedef AtomicQueue<4> Rcb; |
| | | #if 1 |
| | | const int kPower = 0; |
| | | typedef AtomicQueue<kPower> Rcb; |
| | | |
| | | Rcb tmp; |
| | | BOOST_CHECK(tmp.like_empty()); |
| | | // BOOST_CHECK(tmp.like_empty()); |
| | | BOOST_CHECK(tmp.push(1)); |
| | | BOOST_CHECK(tmp.tail() == 1); |
| | | if (kPower != 0) { |
| | | BOOST_CHECK(tmp.tail() == 1); |
| | | } |
| | | BOOST_CHECK(tmp.head() == 0); |
| | | int64_t d; |
| | | BOOST_CHECK(tmp.pop(d)); |
| | | BOOST_CHECK(tmp.like_empty()); |
| | | BOOST_CHECK(tmp.head() == 1); |
| | | BOOST_CHECK(tmp.tail() == 1); |
| | | if (kPower != 0) { |
| | | // BOOST_CHECK(tmp.like_empty()); |
| | | BOOST_CHECK(tmp.head() == 1); |
| | | BOOST_CHECK(tmp.tail() == 1); |
| | | } |
| | | |
| | | ShmObject<Rcb> rcb(shm, "test_rcb"); |
| | | bool try_more = true; |
| | |
| | | BOOST_AUTO_TEST_CASE(MutexTest) |
| | | { |
| | | { |
| | | int fd = open("/tmp/test_fmutex", O_CREAT | O_RDONLY, 0666); |
| | | flock(fd, LOCK_EX); |
| | | printf("lock 1"); |
| | | int sem_id = semget(100, 1, 0666 | IPC_CREAT); |
| | | auto P = [&]() { |
| | | sembuf op = {0, -1, SEM_UNDO}; |
| | | semop(sem_id, &op, 1); |
| | | }; |
| | | auto V = [&]() { |
| | | sembuf op = {0, 1, SEM_UNDO}; |
| | | semop(sem_id, &op, 1); |
| | | }; |
| | | for (int i = 0; i < 10; ++i) { |
| | | V(); |
| | | } |
| | | Sleep(10s); |
| | | flock(fd, LOCK_EX); |
| | | printf("lock 2"); |
| | | Sleep(10s); |
| | | flock(fd, LOCK_UN); |
| | | printf("un lock 2"); |
| | | Sleep(10s); |
| | | flock(fd, LOCK_UN); |
| | | printf("un lock 1"); |
| | | |
| | | return; |
| | | } |
| | | |
| | |
| | | |
| | | std::mutex m; |
| | | typedef std::chrono::steady_clock Clock; |
| | | auto Now = []() { return Clock::now().time_since_epoch(); }; |
| | | |
| | | if (pi) { |
| | | auto old = *pi; |
| | | printf("int : %d, add1: %d\n", old, ++*pi); |
| | |
| | | } |
| | | }; |
| | | |
| | | int nwriters[] = {1, 10, 100}; |
| | | int nwriters[] = {1, 10, 100, 1000}; |
| | | int nreaders[] = {2}; |
| | | const int64_t total_msg = 1000 * 100; |
| | | const int64_t total_msg = 1000 * 1000; |
| | | |
| | | auto Test = [&](auto &www, auto &rrr, bool isfork) { |
| | | for (auto nreader : nreaders) { |
| | |
| | | // typedef ThreadManager Manager; |
| | | // const bool isfork = IsSameType<Manager, ProcessManager>::value; |
| | | |
| | | { |
| | | if (0) { |
| | | ThreadManager tw, tr; |
| | | printf("---------------- Testing thread io: -------------------------------------------------------\n"); |
| | | Test(tw, tr, false); |
| | | } |
| | | { |
| | | |
| | | if (1) { |
| | | ProcessManager pw, pr; |
| | | printf("================ Testing process io: =======================================================\n"); |
| | | Test(pw, pr, true); |