From db322f33ba13592f2492317e3f1a070454c97059 Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期四, 13 五月 2021 19:34:46 +0800 Subject: [PATCH] center alloc all msgs. --- src/robust.h | 39 + src/socket.h | 90 +++- box/center.cpp | 276 ++++++++++++-- src/proto.h | 2 src/msg.h | 109 +++-- src/shm_msg_queue.h | 59 ++ src/socket.cpp | 141 ++++--- proto/source/bhome_msg.proto | 9 src/defs.h | 9 box/center.h | 7 src/sendq.cpp | 20 src/topic_node.cpp | 95 +++- src/shm_msg_queue.cpp | 26 src/defs.cpp | 45 ++ src/msg.cpp | 29 + box/center_main.cc | 7 utest/speed_test.cpp | 9 utest/api_test.cpp | 2 src/bh_util.h | 2 src/shm_queue.h | 11 src/topic_node.h | 9 utest/robust_test.cpp | 45 +- src/sendq.h | 14 src/robust.cpp | 16 24 files changed, 788 insertions(+), 283 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index d6ac804..b440a03 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -21,7 +21,7 @@ #include "log.h" #include "shm.h" #include <chrono> -#include <set> +#include <unordered_map> using namespace std::chrono; using namespace std::chrono_literals; @@ -33,11 +33,118 @@ namespace { +typedef std::string ProcId; +typedef size_t ProcIndex; // max local procs. +const int kMaxProcs = 65536; + +// record all procs ever registered, always grow, never remove. +// mainly for node to request msg allocation. +// use index instead of MQId to save some bits. +class ProcRecords +{ +public: + struct ProcRec { + ProcId proc_; + MQId ssn_ = 0; + }; + + ProcRecords() { procs_.reserve(kMaxProcs); } + + ProcIndex Put(const ProcId &proc_id, const MQId ssn) + { + if (procs_.size() >= kMaxProcs) { + return -1; + } + auto pos_isnew = proc_index_.emplace(proc_id, procs_.size()); + int index = pos_isnew.first->second; + if (pos_isnew.second) { + procs_.emplace_back(ProcRec{proc_id, ssn}); + } else { // update ssn + procs_[index].ssn_ = ssn; + } + return index; + } + const ProcRec &Get(const ProcIndex index) const + { + static ProcRec empty_rec; + return (index < procs_.size()) ? procs_[index] : empty_rec; + } + +private: + std::unordered_map<ProcId, size_t> proc_index_; + std::vector<ProcRec> procs_; +}; + +class MsgRecords +{ + typedef int64_t MsgId; + typedef int64_t Offset; + +public: + void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); } + void FreeMsg(MsgId id) + { + auto pos = msgs_.find(id); + if (pos != msgs_.end()) { + ShmMsg(pos->second).Free(); + msgs_.erase(pos); + } else { + LOG_TRACE() << "ignore late free request."; + } + } + void AutoRemove() + { + auto now = NowSec(); + if (now < time_to_clean_) { + return; + } + LOG_FUNCTION; + time_to_clean_ = now + 1; + int64_t limit = std::max(10000ul, msgs_.size() / 10); + int64_t n = 0; + auto it = msgs_.begin(); + while (it != msgs_.end() && --limit > 0) { + ShmMsg msg(it->second); + if (msg.Count() == 0) { + msg.Free(); + it = msgs_.erase(it); + ++n; + } else if (msg.timestamp() + 10 < NowSec()) { + msg.Free(); + it = msgs_.erase(it); + ++n; + // LOG_DEBUG() << "release timeout msg, someone crashed."; + } else { + ++it; + } + } + if (n > 0) { + LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n; + } + } + size_t size() const { return msgs_.size(); } + void DebugPrint() const + { + LOG_DEBUG() << "msgs : " << size(); + int i = 0; + int total_count = 0; + for (auto &kv : msgs_) { + MsgI msg(kv.second); + total_count += msg.Count(); + LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size(); + } + LOG_DEBUG() << "total count: " << total_count; + } + +private: + std::unordered_map<MsgId, Offset> msgs_; + int64_t time_to_clean_ = 0; +}; + //TODO check proc_id class NodeCenter { public: - typedef std::string ProcId; typedef MQId Address; typedef bhome_msg::ProcInfo ProcInfo; typedef std::function<void(Address const)> Cleaner; @@ -102,13 +209,14 @@ // center name, no relative to shm. const std::string &id() const { return id_; } - void OnNodeInit(SharedMemory &shm, const int64_t msg) + void OnNodeInit(ShmSocket &socket, const int64_t val) { - MQId ssn = msg; + LOG_FUNCTION; + SharedMemory &shm = socket.shm(); + MQId ssn = (val >> 4) & MaskBits(60); if (nodes_.find(ssn) != nodes_.end()) { return; // ignore in exists. } - auto UpdateRegInfo = [&](Node &node) { for (int i = 0; i < 10; ++i) { node->addrs_.insert(ssn + i); @@ -118,12 +226,10 @@ // create sockets. try { - auto CreateSocket = [](SharedMemory &shm, const MQId id) { - ShmSocket tmp(shm, true, id, 16); - }; + auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); }; // alloc(-1), node, server, sub, request, - for (int i = -1; i < 4; ++i) { - CreateSocket(shm, ssn + i); + for (int i = 0; i < 4; ++i) { + CreateSocket(ssn + i); node->addrs_.insert(ssn + i); } return true; @@ -132,11 +238,93 @@ } }; + auto PrepareProcInit = [&]() { + bool r = false; + ShmMsg init_msg; + if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) { + // 31bit pointer, 4bit cmd+flag + int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply); + r = SendAllocReply(socket, ssn, reply, init_msg); + } + return r; + }; + Node node(new NodeInfo); - if (UpdateRegInfo(node)) { + if (UpdateRegInfo(node) && PrepareProcInit()) { nodes_[ssn] = node; LOG_INFO() << "new node ssn (" << ssn << ") init"; + } else { + for (int i = 0; i < 10; ++i) { + ShmSocket::Remove(shm, ssn + i); + } } + } + void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); } + + bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg) + { + RecordMsg(msg); + auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; + return socket.Send(dest, reply, onExpireFree); + } + bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg) + { + RecordMsg(msg); + auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); }; + return socket.Send(dest, msg, onExpireFree); + } + + void OnAlloc(ShmSocket &socket, const int64_t val) + { + // LOG_FUNCTION; + // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag + int64_t msg_id = (val >> 4) & MaskBits(28); + int proc_index = (val >> 32) & MaskBits(16); + int socket_index = ((val) >> 48) & MaskBits(4); + auto proc_rec(procs_.Get(proc_index)); + if (proc_rec.proc_.empty()) { + return; + } + Address dest = proc_rec.ssn_ + socket_index; + + auto size = GetAllocSize((val >> 52) & MaskBits(8)); + MsgI new_msg; + if (new_msg.Make(size)) { + // 31bit proc index, 28bit id, ,4bit cmd+flag + int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0); + SendAllocReply(socket, dest, reply, new_msg); + } else { + int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure. + socket.Send(dest, reply); + } + } + + void OnFree(ShmSocket &socket, const int64_t val) + { + int64_t msg_id = (val >> 4) & MaskBits(31); + msgs_.FreeMsg(msg_id); + } + + bool OnCommand(ShmSocket &socket, const int64_t val) + { + assert(IsCmd(val)); + int cmd = DecodeCmd(val); + switch (cmd) { + case eCmdNodeInit: OnNodeInit(socket, val); break; + case eCmdAllocRequest0: OnAlloc(socket, val); break; + case eCmdFree: OnFree(socket, val); break; + default: return false; + } + return true; + } + + MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg) + { + LOG_DEBUG() << "center got proc init."; + auto index = procs_.Put(head.proc_id(), head.ssn_id()); + auto reply(MakeReply<MsgProcInitReply>(eSuccess)); + reply.set_proc_index(index); + return reply; } MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg) @@ -160,14 +348,13 @@ }; auto pos = nodes_.find(ssn); - if (pos != nodes_.end()) { // update - Node &node = pos->second; - UpdateRegInfo(node); - } else { - Node node(new NodeInfo); - UpdateRegInfo(node); - nodes_[ssn] = node; + if (pos == nodes_.end()) { + return MakeReply(eInvalidInput, "invalid session."); } + + // update proc info + Node &node = pos->second; + UpdateRegInfo(node); LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")"; auto old = online_node_addr_map_.find(head.proc_id()); @@ -376,13 +563,14 @@ void OnTimer() { CheckNodes(); + msgs_.AutoRemove(); } private: void CheckNodes() { auto now = NowSec(); - if (now - last_check_time_ < 1) { return; } + if (now <= last_check_time_) { return; } last_check_time_ = now; auto it = nodes_.begin(); @@ -396,6 +584,7 @@ ++it; } } + msgs_.DebugPrint(); } bool CanHeartbeat(const NodeInfo &node) { @@ -448,7 +637,10 @@ std::unordered_map<Topic, Clients> service_map_; std::unordered_map<Topic, Clients> subscribe_map_; std::unordered_map<Address, Node> nodes_; - std::unordered_map<std::string, Address> online_node_addr_map_; + std::unordered_map<ProcId, Address> online_node_addr_map_; + ProcRecords procs_; // To get a short index for msg alloc. + MsgRecords msgs_; // record all msgs alloced. + Cleaner cleaner_; // remove mqs. int64_t offline_time_; int64_t kill_time_; @@ -483,25 +675,28 @@ msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \ return true; -auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) +auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced<NodeCenter> ¢er) { return [&](auto &&rep_body) { - auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id())); + auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id())); auto remote = head.route(0).mq_id(); - socket.Send(remote, reply_head, rep_body); + MsgI msg; + if (msg.Make(reply_head, rep_body)) { + DEFER1(msg.Release();); + center->SendAllocMsg(socket, remote, msg); + } }; } bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr) { - auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) { + // command + auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool { auto ¢er = *center_ptr; - center->OnNodeInit(socket.shm(), msg.Offset()); + return IsCmd(cmd) && center->OnCommand(socket, cmd); }; - auto Nothing = [](ShmSocket &socket) {}; - BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16); - + // now we can talk. auto OnCenterIdle = [center_ptr](ShmSocket &socket) { auto ¢er = *center_ptr; center->OnTimer(); @@ -509,8 +704,9 @@ auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id()); + auto replyer = MakeReplyer(socket, head, center); switch (head.type()) { + CASE_ON_MSG_TYPE(ProcInit); CASE_ON_MSG_TYPE(Register); CASE_ON_MSG_TYPE(Heartbeat); CASE_ON_MSG_TYPE(Unregister); @@ -520,12 +716,13 @@ default: return false; } }; - BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000); + BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000); auto OnBusIdle = [=](ShmSocket &socket) {}; + auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; }; auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool { auto ¢er = *center_ptr; - auto replyer = MakeReplyer(socket, head, center->id()); + auto replyer = MakeReplyer(socket, head, center); auto OnPublish = [&]() { MsgPublish pub; NodeCenter::Clients clients; @@ -561,7 +758,7 @@ } }; - BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000); + BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000); return true; } @@ -576,14 +773,9 @@ return rec; } -bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) +bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len) { - Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len}; - return true; -} -bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) -{ - Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len}; + Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len}; return true; } @@ -609,11 +801,7 @@ { for (auto &kv : Centers()) { auto &info = kv.second; - if (info.handler_) { - sockets_[info.name_]->Start(info.handler_, info.idle_); - } else { - sockets_[info.name_]->Start(info.raw_handler_, info.idle_); - } + sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_); } return true; diff --git a/box/center.h b/box/center.h index 4d71bc9..d68573b 100644 --- a/box/center.h +++ b/box/center.h @@ -29,10 +29,9 @@ public: typedef Socket::PartialRecvCB MsgHandler; - typedef Socket::RawRecvCB MsgIHandler; + typedef Socket::RawRecvCB RawHandler; typedef Socket::IdleCB IdleHandler; - static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len); - static bool Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len); + static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len); BHCenter(Socket::Shm &shm); ~BHCenter() { Stop(); } @@ -43,7 +42,7 @@ struct CenterInfo { std::string name_; MsgHandler handler_; - MsgIHandler raw_handler_; + RawHandler raw_handler_; IdleHandler idle_; MQId mqid_; int mq_len_ = 0; diff --git a/box/center_main.cc b/box/center_main.cc index 232b943..6795e41 100644 --- a/box/center_main.cc +++ b/box/center_main.cc @@ -83,6 +83,12 @@ std::atomic<bool> run_; }; +bool CenterInit(bhome_shm::SharedMemory &shm) +{ + ShmSocket create(shm, BHGlobalSenderAddress(), 16); + return true; +} + } // namespace int center_main(int argc, const char *argv[]) { @@ -102,6 +108,7 @@ if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); } auto &shm = BHomeShm(); + CenterInit(shm); GlobalInit(shm); InstanceFlag inst(shm, kCenterRunningFlag); diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto index 51e9b6e..dcb5c56 100644 --- a/proto/source/bhome_msg.proto +++ b/proto/source/bhome_msg.proto @@ -28,6 +28,8 @@ kMsgTypeCommonReply = 2; + kMsgTypeProcInit = 8; + kMsgTypeProcInitReply = 9; kMsgTypeRegister= 10; // kMsgTypeRegisterReply= 11; kMsgTypeHeartbeat = 12; @@ -60,6 +62,13 @@ MsgTopicList topics = 1; } +message MsgProcInit{ } // proc_id is in header. + +message MsgProcInitReply { + ErrorMsg errmsg = 1; + int32 proc_index = 2; +} + service TopicRPC { rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply); rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply); diff --git a/src/bh_util.h b/src/bh_util.h index a1c0d84..223da2a 100644 --- a/src/bh_util.h +++ b/src/bh_util.h @@ -92,6 +92,8 @@ inline void PutInt(void *p, uint32_t u) { Put32(p, u); } inline void PutInt(void *p, uint64_t u) { Put64(p, u); } +constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } + class ExitCall { typedef std::function<void(void)> func_t; diff --git a/src/defs.cpp b/src/defs.cpp index 6d688b2..450349e 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -33,8 +33,53 @@ return le; } +constexpr int64_t AllocSizeIndex[] = { + 16, 24, 32, 40, 48, 56, 64, 72, + 80, 88, 96, 104, 120, 136, 152, 168, + 184, 200, 224, 248, 272, 296, 328, 360, + 392, 432, 472, 520, 568, 624, 680, 744, + 816, 896, 984, 1080, 1184, 1296, 1416, 1544, + 1688, 1848, 2016, 2200, 2400, 2624, 2864, 3128, + 3416, 3728, 4072, 4448, 4856, 5304, 5792, 6320, + 6896, 7528, 8216, 8968, 9784, 10680, 11656, 12720, + 13880, 15144, 16520, 18024, 19664, 21456, 23408, 25536, + 27864, 30400, 33168, 36184, 39480, 43072, 46992, 51264, + 55928, 61016, 66568, 72624, 79232, 86440, 94304, 102880, + 112232, 122440, 133576, 145720, 158968, 173424, 189192, 206392, + 225160, 245632, 267968, 292328, 318904, 347896, 379528, 414032, + 451672, 492736, 537536, 586408, 639720, 697880, 761328, 830544, + 906048, 988416, 1078272, 1176296, 1283232, 1399896, 1527160, 1665992, + 1817448, 1982672, 2162920, 2359552, 2574056, 2808064, 3063344, 3341832, + 3645640, 3977064, 4338616, 4733040, 5163320, 5632712, 6144776, 6703392, + 7312792, 7977592, 8702832, 9494000, 10357096, 11298656, 12325808, 13446336, + 14668736, 16002264, 17457016, 19044024, 20775304, 22663968, 24724328, 26972000, + 29424000, 32098912, 35017000, 38200368, 41673128, 45461600, 49594472, 54103064, + 59021528, 64387128, 70240504, 76626008, 83592008, 91191288, 99481408, 108525176, + 118391104, 129153936, 140895208, 153703864, 167676944, 182920304, 199549424, 217690280, + 237480312, 259069432, 282621200, 308314040, 336342592, 366919192, 400275488, 436664168, + 476360912, 519666456, 566908864, 618446040, 674668408, 736001904, 802911168, 875903096, + 955530656, 1042397080, 1137160456, 1240538680, 1353314928, 1476343560, 1610556616, 1756970856, + 1916695480, 2090940528, 2281026032, 2488392040, 2714609504, 2961392192, 3230609664, 3524301456, + 3844692504, 4194210008, 4575501832, 4991456544, 5445225320, 5940245808, 6480268160, 7069383448, + 7712054672, 8413150552, 9177982424, 10012344464, 10922557600, 11915517384, 12998746240, 14180450448, + 15469582312, 16875907976, 18410081432, 20083725200, 21909518400, 23901292800, 26074137600, 28444513752, + 31030378640, 33851322152, 36928715080, 40285871000, 43948222912, 47943515904, 52302017352, 57056746208, + 62243723136, 67902243424, 74075174648, 80809281440, 88155579752, 96169723368, 104912425496, 114449918728, + 124854456800, 136204861968, 148587122152, 162095042352, 176830955296, 192906496688, 210443450936, 229574673752}; + +const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]); +static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough."); +static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct."); } // namespace +int64_t CalcAllocIndex(int64_t size) +{ + auto pos = std::lower_bound(AllocSizeIndex, AllocSizeIndex + kAllocIndexLen, size); + return (pos == AllocSizeIndex + kAllocIndexLen) ? -1 : pos - AllocSizeIndex; +} + +int64_t GetAllocSize(int index) { return index < kAllocIndexLen ? AllocSizeIndex[index] : 0; } + std::string BHomeShmName() { return "bhome_default_shm_v0"; diff --git a/src/defs.h b/src/defs.h index a95c81f..f0a0d49 100644 --- a/src/defs.h +++ b/src/defs.h @@ -23,14 +23,15 @@ typedef uint64_t MQId; -const MQId kBHNodeInit = 10; +const MQId kBHDefaultSender = 99; const MQId kBHTopicCenter = 100; const MQId kBHTopicBus = 101; -const MQId kBHUniCenter = 102; -inline const MQId BHInitAddress() { return kBHNodeInit; } +inline const MQId BHGlobalSenderAddress() { return kBHDefaultSender; } inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; } inline const MQId BHTopicBusAddress() { return kBHTopicBus; } -inline const MQId BHUniCenterAddress() { return kBHUniCenter; } + +int64_t CalcAllocIndex(int64_t size); +int64_t GetAllocSize(int index); const int kBHCenterPort = 24287; const char kTopicSep = '.'; diff --git a/src/msg.cpp b/src/msg.cpp index f180d67..a4777d2 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -17,8 +17,37 @@ */ #include "msg.h" #include "bh_util.h" +#include "socket.h" namespace bhome_msg { +ShmSocket &ShmMsg::Sender() +{ + static ShmSocket sender(shm(), false, BHGlobalSenderAddress(), 16); + return sender; +} + +int ShmMsg::Release() +{ + if (!valid()) { + return 0; + } + auto n = meta()->count_.Dec(); + if (n == 0) { + int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree); + Sender().Send(BHTopicCenterAddress(), free_cmd); + } else if (n < 0) { + throw -123; + } + return n; +} + +void ShmMsg::Free() +{ + assert(valid()); + shm().Dealloc(meta()); + offset_ = 0; + assert(!valid()); +} } // namespace bhome_msg diff --git a/src/msg.h b/src/msg.h index 1f5b0f1..9589389 100644 --- a/src/msg.h +++ b/src/msg.h @@ -26,6 +26,7 @@ #include <functional> #include <stdint.h> +class ShmSocket; namespace bhome_msg { using namespace bhome_shm; @@ -35,8 +36,9 @@ class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg> { -private: static inline SharedMemory &shm() { return GetData(); } + static ShmSocket &Sender(); + // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. class RefCount : private boost::noncopyable { @@ -49,6 +51,7 @@ int Dec() { return --num_; } int Get() { return num_.load(); } }; + typedef int64_t OffsetType; static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); } static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); } @@ -60,14 +63,22 @@ static const uint32_t kMsgTag = 0xf1e2d3c4; struct Meta { + static int64_t NewId() + { + static std::atomic<int64_t> id(0); + return ++id; + } + RefCount count_; const uint32_t tag_ = kMsgTag; const uint32_t size_ = 0; + const int64_t id_ = 0; + std::atomic<int64_t> timestamp_; Meta(uint32_t size) : - size_(size) {} + size_(size), id_(NewId()), timestamp_(NowSec()) {} }; OffsetType offset_; - void *Alloc(const size_t size) + static void *Alloc(const size_t size) { void *p = shm().Alloc(sizeof(Meta) + size); if (p) { @@ -76,45 +87,33 @@ } return p; } - void Free() - { - assert(valid()); - shm().Dealloc(meta()); - offset_ = 0; - assert(!valid()); - } + +private: Meta *meta() const { return get<Meta>() - 1; } typedef std::function<void(void *p, int len)> ToArray; - void *Pack(const uint32_t head_len, const ToArray &headToArray, - const uint32_t body_len, const ToArray &bodyToArray) + + template <class Body> + void *Pack(const BHMsgHead &head, const uint32_t head_len, const Body &body, const uint32_t body_len) { - void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len); + void *addr = get(); if (addr) { auto p = static_cast<char *>(addr); - auto Pack1 = [&p](auto len, auto &writer) { + auto Pack1 = [&p](auto len, auto &&writer) { Put32(p, len); p += sizeof(len); writer(p, len); p += len; }; - Pack1(head_len, headToArray); - Pack1(body_len, bodyToArray); + Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); }); + Pack1(body_len, [&](void *p, int len) { body.SerializeToArray(p, len); }); } return addr; } - template <class Body> - void *Pack(const BHMsgHead &head, const Body &body) - { - return Pack( - uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); }, - uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); }); - } - void *Pack(const std::string &content) { - void *addr = Alloc(content.size()); + void *addr = get(); if (addr) { memcpy(addr, content.data(), content.size()); } @@ -133,36 +132,48 @@ offset_(p ? (Addr(p) - BaseAddr()) : 0) {} template <class T = void> - T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); } + T *get() const { return offset_ != 0 ? static_cast<T *>(Ptr(offset_ + BaseAddr())) : nullptr; } public: static bool BindShm(SharedMemory &shm) { return SetData(shm); } ShmMsg() : - ShmMsg(nullptr) {} + offset_(0) {} explicit ShmMsg(const OffsetType offset) : offset_(offset) {} OffsetType Offset() const { return offset_; } OffsetType &OffsetRef() { return offset_; } void swap(ShmMsg &a) { std::swap(offset_, a.offset_); } - bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; } - - int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } - int Release() - { - if (!valid()) { - return 0; - } - auto n = meta()->count_.Dec(); - if (n == 0) { - Free(); - } - return n; - } + bool valid() const { return offset_ != 0 && meta()->tag_ == kMsgTag; } + int64_t id() const { return valid() ? meta()->id_ : 0; } + int64_t timestamp() const { return valid() ? meta()->timestamp_.load() : 0; } + size_t Size() const { return valid() ? meta()->size_ : 0; } int Count() const { return valid() ? meta()->count_.Get() : 1; } + int AddRef() const { return valid() ? meta()->count_.Inc() : 1; } + int Release(); + void Free(); template <class Body> - inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); } - inline bool Make(const std::string &content) { return Make(Pack(content)); } + inline bool Make(const BHMsgHead &head, const Body &body) + { + uint32_t head_len = head.ByteSizeLong(); + uint32_t body_len = body.ByteSizeLong(); + uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len; + return Make(size) && Pack(head, head_len, body, body_len); + } + template <class Body> + inline bool Fill(const BHMsgHead &head, const Body &body) + { + uint32_t head_len = head.ByteSizeLong(); + uint32_t body_len = body.ByteSizeLong(); + uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len; + return valid() && (meta()->size_ >= size) && Pack(head, head_len, body, body_len); + } + + inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); } + inline bool Fill(const std::string &content) { return valid() && (meta()->size_ >= content.size()) && Pack(content); } + + inline bool Make(const size_t size) { return Make(Alloc(size)); } + template <class Body> static inline std::string Serialize(const BHMsgHead &head, const Body &body) { @@ -208,6 +219,18 @@ typedef ShmMsg MsgI; +constexpr inline int EncodeCmd(int cmd) { return ((cmd & MaskBits(3)) << 1) | 1; } +constexpr inline int DecodeCmd(int64_t msg) { return (msg >> 1) & MaskBits(3); } +constexpr inline bool IsCmd(int64_t msg) { return (msg & 1) != 0; } +// int64_t pack format: cmd data ,3bit cmd, 1bit flag. +enum MsgCmd { + eCmdNodeInit = 0, // upto 59bit ssn id + eCmdNodeInitReply = 1, // 31bit proc index, + eCmdAllocRequest0 = 2, // 8bit size, 4bit socket index, 16bit proc index, 28bit id + eCmdAllocReply0 = 3, // 31bit ptr, 28bit id, + eCmdFree = 4, // upto 59bit msg id, +}; + } // namespace bhome_msg #endif // end of include guard: MSG_5BILLZET diff --git a/src/proto.h b/src/proto.h index 94a438c..c05407b 100644 --- a/src/proto.h +++ b/src/proto.h @@ -48,6 +48,8 @@ BHOME_SIMPLE_MAP_MSG(Publish); BHOME_SIMPLE_MAP_MSG(Subscribe); BHOME_SIMPLE_MAP_MSG(Unsubscribe); +BHOME_SIMPLE_MAP_MSG(ProcInit); +BHOME_SIMPLE_MAP_MSG(ProcInitReply); #undef BHOME_SIMPLE_MAP_MSG #undef BHOME_MAP_MSG_AND_TYPE diff --git a/src/robust.cpp b/src/robust.cpp index 26d41b9..4654652 100644 --- a/src/robust.cpp +++ b/src/robust.cpp @@ -35,24 +35,30 @@ bool FMutex::try_lock() { - if (mtx_.try_lock()) { - if (flock(fd_, LOCK_EX | LOCK_NB) == 0) { + if (flock(fd_, LOCK_EX | LOCK_NB) == 0) { + ++count_; + if (mtx_.try_lock()) { return true; } else { - mtx_.unlock(); + if (--count_ == 0) { + flock(fd_, LOCK_UN); + } } } return false; } void FMutex::lock() { - mtx_.lock(); flock(fd_, LOCK_EX); + ++count_; + mtx_.lock(); } void FMutex::unlock() { - flock(fd_, LOCK_UN); mtx_.unlock(); + if (--count_ == 0) { + flock(fd_, LOCK_UN); + } } } // namespace robust \ No newline at end of file diff --git a/src/robust.h b/src/robust.h index 8657122..c70e2fe 100644 --- a/src/robust.h +++ b/src/robust.h @@ -19,6 +19,7 @@ #ifndef ROBUST_Q31RCWYU #define ROBUST_Q31RCWYU +#include "bh_util.h" #include "log.h" #include <atomic> #include <chrono> @@ -37,8 +38,6 @@ using namespace std::chrono; using namespace std::chrono_literals; -constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; } - void QuickSleep(); class CasMutex @@ -99,7 +98,7 @@ public: typedef uint64_t id_t; FMutex(id_t id) : - id_(id), fd_(Open(id_)) + id_(id), fd_(Open(id_)), count_(0) { if (fd_ == -1) { throw "error create mutex!"; } } @@ -117,11 +116,10 @@ } static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); } static int Close(int fd) { return close(fd); } - void FLock(); - void FUnlock(); id_t id_; int fd_; std::mutex mtx_; + std::atomic<int32_t> count_; }; union semun { @@ -310,5 +308,36 @@ AData buf[capacity]; }; +template <class Int> +class AtomicQueue<0, Int> +{ + typedef Int Data; + typedef std::atomic<Data> AData; + static_assert(sizeof(Data) == sizeof(AData)); + +public: + AtomicQueue() { memset(this, 0, sizeof(*this)); } + bool push(const Data d, bool try_more = false) + { + auto cur = buf.load(); + return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d)); + } + bool pop(Data &d, bool try_more = false) + { + Data cur = buf.load(); + bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0); + if (r) { d = Dec(cur); } + return r; + } + uint32_t head() const { return 0; } + uint32_t tail() const { return 0; } + +private: + static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok. + static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok. + static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok. + AData buf; +}; + } // namespace robust #endif // end of include guard: ROBUST_Q31RCWYU diff --git a/src/sendq.cpp b/src/sendq.cpp index c0d5afd..36af264 100644 --- a/src/sendq.cpp +++ b/src/sendq.cpp @@ -40,20 +40,24 @@ } auto SendData = [&](Data &d) { + auto TryLoop = [&](auto &&data) { + for (int i = 0; i < 1; ++i) { + if (mq.TrySend(remote, data)) { + return true; + } + } + return false; + }; bool r = false; if (d.index() == 0) { auto &msg = boost::variant2::get<0>(pos->data().data_); - r = mq.TrySend(remote, msg); + r = TryLoop(msg); if (r) { msg.Release(); } } else { - auto &content = boost::variant2::get<1>(pos->data().data_); - MsgI msg; - if (msg.Make(content)) { - DEFER1(msg.Release();); - r = mq.TrySend(remote, msg); - } + auto command = boost::variant2::get<1>(pos->data().data_); + r = TryLoop(command); } return r; }; @@ -110,4 +114,4 @@ Collect(); return !out_.empty(); -} \ No newline at end of file +} diff --git a/src/sendq.h b/src/sendq.h index 0e565d5..862a1cc 100644 --- a/src/sendq.h +++ b/src/sendq.h @@ -37,7 +37,8 @@ typedef MQId Remote; typedef bhome_msg::MsgI MsgI; typedef std::string Content; - typedef boost::variant2::variant<MsgI, Content> Data; + typedef int64_t Command; + typedef boost::variant2::variant<MsgI, Command> Data; typedef std::function<void(const Data &)> OnMsgEvent; struct MsgInfo { Data data_; @@ -47,23 +48,16 @@ typedef TimedMsg::TimePoint TimePoint; typedef TimedMsg::Duration Duration; - // template <class... Rest> - // void Append(const MQId &id, Rest &&...rest) - // { - // Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...); - // } - void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent()) { msg.AddRef(); AppendData(addr, Data(msg), DefaultExpire(), onExpire); } - void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent()) + void Append(const Remote addr, const Command command, OnMsgEvent onExpire = OnMsgEvent()) { - AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire); + AppendData(addr, Data(command), DefaultExpire(), onExpire); } bool TrySend(ShmMsgQueue &mq); - // bool empty() const { return store_.empty(); } private: static TimePoint Now() { return TimedMsg::Clock::now(); } diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp index b78c1a0..d96c511 100644 --- a/src/shm_msg_queue.cpp +++ b/src/shm_msg_queue.cpp @@ -56,6 +56,7 @@ ShmMsgQueue::~ShmMsgQueue() {} +#ifndef BH_USE_ATOMIC_Q ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id) { static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm; @@ -69,13 +70,19 @@ } return *pos->second; } +#endif + bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) { Queue *q = Find(shm, id); if (q) { - MsgI msg; - while (q->TryRead(msg.OffsetRef())) { - msg.Release(); + RawData val = 0; + while (q->TryRead(val)) { + if (IsCmd(val)) { + LOG_DEBUG() << "clsing queue " << id << ", has a cmd" << DecodeCmd(val); + } else { + MsgI(val).Release(); + } } } return Shmq::Remove(shm, MsgQIdToName(id)); @@ -86,19 +93,18 @@ return Shmq::Find(shm, MsgQIdToName(remote_id)); } -bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg) +bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, int64_t val) { - bool r = false; try { ShmMsgQueue dest(remote_id, false, shm, 1); - msg.AddRef(); - DEFER1(if (!r) { msg.Release(); }); - +#ifndef BH_USE_ATOMIC_Q Guard lock(GetMutex(remote_id)); - r = dest.queue().TryWrite(msg.Offset()); +#endif + return dest.queue().TryWrite(val); } catch (...) { + // SetLastError(eNotFound, "remote not found"); + return false; } - return r; } // Test shows that in the 2 cases: diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h index 1970803..4b7aed8 100644 --- a/src/shm_msg_queue.h +++ b/src/shm_msg_queue.h @@ -24,19 +24,25 @@ using namespace bhome_shm; using namespace bhome_msg; +#define BH_USE_ATOMIC_Q + class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> { - // typedef ShmObject<SharedQ63<4>> Shmq; - typedef ShmObject<SharedQueue<int64_t>> Shmq; - typedef Shmq::ShmType ShmType; - typedef Shmq::Data Queue; - typedef std::function<void()> OnSend; - typedef robust::FMutex Mutex; - // typedef robust::SemMutex Mutex; - // typedef robust::NullMutex Mutex; - typedef robust::Guard<Mutex> Guard; - public: + typedef int64_t RawData; + +#ifdef BH_USE_ATOMIC_Q + typedef ShmObject<SharedQ63<0>> Shmq; +#else + typedef ShmObject<SharedQueue<RawData>> Shmq; + // typedef robust::FMutex Mutex; + // typedef robust::SemMutex Mutex; + typedef robust::NullMutex Mutex; + typedef robust::Guard<Mutex> Guard; +#endif + + typedef Shmq::Data Queue; + typedef Shmq::ShmType ShmType; typedef uint64_t MQId; static MQId NewId(); @@ -45,26 +51,45 @@ ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len); ShmMsgQueue(ShmType &segment, const int len); ~ShmMsgQueue(); - static bool Remove(SharedMemory &shm, const MQId id); + static bool Remove(ShmType &shm, const MQId id); MQId Id() const { return id_; } ShmType &shm() const { return queue_.shm(); } - bool Recv(MsgI &msg, const int timeout_ms) + bool Recv(RawData &val, const int timeout_ms) { +#ifndef BH_USE_ATOMIC_Q Guard lock(GetMutex(Id())); - return queue().Read(msg.OffsetRef(), timeout_ms); +#endif + return queue().Read(val, timeout_ms); } - bool TryRecv(MsgI &msg) + + bool TryRecv(RawData &val) { +#ifndef BH_USE_ATOMIC_Q Guard lock(GetMutex(Id())); - return queue().TryRead(msg.OffsetRef()); +#endif + return queue().TryRead(val); } - static Queue *Find(SharedMemory &shm, const MQId remote_id); - static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg); + + bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); } + bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); } + static Queue *Find(ShmType &shm, const MQId remote_id); + static bool TrySend(ShmType &shm, const MQId remote_id, const RawData val); + static bool TrySend(ShmType &shm, const MQId remote_id, MsgI msg) + { + bool r = false; + msg.AddRef(); // TODO check if we could avoid addref here. + DEFER1(if (!r) { msg.Release(); }); + r = TrySend(shm, remote_id, msg.Offset()); + return r; + } bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); } + bool TrySend(const MQId remote_id, const RawData val) { return TrySend(shm(), remote_id, val); } private: +#ifndef BH_USE_ATOMIC_Q static Mutex &GetMutex(const MQId id); +#endif MQId id_; Queue &queue() { return *queue_.data(); } Shmq queue_; diff --git a/src/shm_queue.h b/src/shm_queue.h index 5fd14e3..5c9e077 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -76,7 +76,7 @@ private: Circular<D> queue_; - bhome_shm::Mutex mutex_; + // bhome_shm::Mutex mutex_; }; template <int Power = 4> @@ -92,11 +92,12 @@ using namespace std::chrono; auto end_time = steady_clock::now() + milliseconds(timeout_ms); do { - if (TryRead(d)) { - return true; - } else { - robust::QuickSleep(); + for (int i = 0; i < 100; ++i) { + if (TryRead(d)) { + return true; + } } + robust::QuickSleep(); } while (steady_clock::now() < end_time); return false; } diff --git a/src/socket.cpp b/src/socket.cpp index 6231579..0704174 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -20,22 +20,25 @@ #include "bh_util.h" #include "defs.h" #include "msg.h" +#include <chrono> +using namespace std::chrono; +using namespace std::chrono_literals; using namespace bhome_msg; using namespace bhome_shm; ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : - run_(false), mq_(id, shm, len) + run_(false), mq_(id, shm, len), alloc_id_(0) { Start(); } ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) : - run_(false), mq_(id, create_or_else_find, shm, len) + run_(false), mq_(id, create_or_else_find, shm, len), alloc_id_(0) { Start(); } ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) : - run_(false), mq_(shm, len) + run_(false), mq_(shm, len), alloc_id_(0) { Start(); } @@ -45,50 +48,15 @@ Stop(); } -bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker) +bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle) { - auto ioProc = [this, onData, onIdle]() { + auto ioProc = [this, onData, onRaw, onIdle]() { auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; auto DoRecv = [=] { // do not recv if no cb is set. - if (!onData) { - return false; - } - auto onMsg = [&](MsgI &imsg) { - DEFER1(imsg.Release()); - onData(*this, imsg); - }; - MsgI imsg; - return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false; - }; + if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; } - try { - bool more_to_send = DoSend(); - bool more_to_recv = DoRecv(); - if (onIdle) { onIdle(*this); } - if (!more_to_send && !more_to_recv) { - robust::QuickSleep(); - } - } catch (...) { - } - }; - - std::lock_guard<std::mutex> lock(mutex_); - StopNoLock(); - - run_.store(true); - for (int i = 0; i < nworker; ++i) { - workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } }); - } - return true; -} - -bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle) -{ - auto ioProc = [this, onData, onIdle]() { - auto DoSend = [this]() { return send_buffer_.TrySend(mq()); }; - auto DoRecv = [=] { - auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { + auto onMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) { RecvCB cb; if (per_msg_cbs_->Pick(head.msg_id(), cb)) { cb(socket, imsg, head); @@ -96,20 +64,43 @@ onData(socket, imsg, head); } }; - - // do not recv if no cb is set. - if (!onData && per_msg_cbs_->empty()) { - return false; - } - auto onMsg = [&](MsgI &imsg) { - DEFER1(imsg.Release()); - BHMsgHead head; - if (imsg.ParseHead(head)) { - onRecvWithPerMsgCB(*this, imsg, head); + auto onCmdCB = [this, onRaw](ShmSocket &socket, int64_t val) { + int cmd = DecodeCmd(val); + if (cmd == eCmdAllocReply0) { + int id = (val >> 4) & MaskBits(28); + RawRecvCB cb; + if (alloc_cbs_->Pick(id, cb)) { + cb(socket, val); + return; + } + } + if (onRaw) { + onRaw(socket, val); } }; - MsgI imsg; - return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false; + + auto onRecv = [&](auto &val) { + if (IsCmd(val)) { + onCmdCB(*this, val); + } else { + MsgI imsg(val); + DEFER1(imsg.Release()); + BHMsgHead head; + if (imsg.ParseHead(head)) { + onMsgCB(*this, imsg, head); + } + } + }; + ShmMsgQueue::RawData val = 0; + auto TryRecvMore = [&]() { + for (int i = 0; i < 100; ++i) { + if (mq().TryRecv(val)) { + return true; + } + } + return false; + }; + return TryRecvMore() ? (onRecv(val), true) : false; }; try { @@ -126,9 +117,18 @@ std::lock_guard<std::mutex> lock(mutex_); StopNoLock(); + auto worker_proc = [this, ioProc]() { + while (run_) { ioProc(); } + // try send pending msgs. + auto end_time = steady_clock::now() + 3s; + while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) { + // LOG_DEBUG() << "try send pending msgs."; + } + }; + run_.store(true); for (int i = 0; i < nworker; ++i) { - workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } }); + workers_.emplace_back(worker_proc); } return true; } @@ -153,6 +153,10 @@ return false; } +bool ShmSocket::SyncRecv(int64_t &cmd, const int timeout_ms) +{ + return (timeout_ms == 0) ? mq().TryRecv(cmd) : mq().Recv(cmd, timeout_ms); +} //maybe reimplment, using async cbs? bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms) { @@ -167,3 +171,30 @@ } return false; } + +bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult) +{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag + // LOG_FUNCTION; + if (node_proc_index_ == -1 || socket_index_ == -1) { + return false; + } + int id = (++alloc_id_) & MaskBits(28); + int64_t cmd = (CalcAllocIndex(size) << 52) | + ((socket_index_ & MaskBits(4)) << 48) | + ((node_proc_index_ & MaskBits(16)) << 32) | + (id << 4) | + EncodeCmd(eCmdAllocRequest0); + auto rawCB = [onResult](ShmSocket &sock, int64_t &val) { + MsgI msg((val >> 32) & MaskBits(31)); + DEFER1(msg.Release()); + onResult(msg); + return true; + }; + + alloc_cbs_->Store(id, std::move(rawCB)); + auto onExpireRemoveCB = [this, id](SendQ::Data const &msg) { + RawRecvCB cb_no_use; + alloc_cbs_->Pick(id, cb_no_use); + }; + return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB); +} \ No newline at end of file diff --git a/src/socket.h b/src/socket.h index 981677f..d69b8d4 100644 --- a/src/socket.h +++ b/src/socket.h @@ -42,7 +42,7 @@ public: typedef ShmMsgQueue::MQId MQId; typedef bhome_shm::SharedMemory Shm; - typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB; + typedef std::function<void(ShmSocket &sock, Queue::RawData &val)> RawRecvCB; typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB; typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB; typedef std::function<void(ShmSocket &sock)> IdleCB; @@ -54,39 +54,74 @@ static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); } bool Remove() { return Remove(shm(), id()); } MQId id() const { return mq().Id(); } + void SetNodeProc(const int proc_index, const int socket_index) + { + node_proc_index_ = proc_index; + socket_index_ = socket_index; + LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_; + } // start recv. - bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1); - bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); - bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } + bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB()); + bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, RawRecvCB(), onIdle); } bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } bool Stop(); template <class Body> - bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) + bool CenterSend(const MQId remote, BHMsgHead &head, Body &body) { try { - if (!cb) { - return SendImpl(remote, MsgI::Serialize(head, body)); - } else { - std::string msg_id(head.msg_id()); - per_msg_cbs_->Store(msg_id, std::move(cb)); - auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { - RecvCB cb_no_use; - per_msg_cbs_->Pick(msg_id, cb_no_use); - }; - return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB); - } + //TODO alloc outsiez and use send. + MsgI msg; + if (!msg.Make(head, body)) { return false; } + DEFER1(msg.Release()); + + return Send(remote, msg); } catch (...) { SetLastError(eError, "Send internal error."); return false; } } - bool Send(const MQId remote, const MsgI &imsg) - { - return SendImpl(remote, imsg); - } + bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult); + template <class Body> + bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) + { + std::string msg_id(head.msg_id()); + std::string content(MsgI::Serialize(head, body)); + size_t size = content.size(); + auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable { + if (!msg.Fill(content)) { return; } + + try { + if (!cb) { + Send(remote, msg); + } else { + per_msg_cbs_->Store(msg_id, std::move(cb)); + auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) { + RecvCB cb_no_use; + per_msg_cbs_->Pick(msg_id, cb_no_use); + }; + Send(remote, msg, onExpireRemoveCB); + } + } catch (...) { + SetLastError(eError, "Send internal error."); + } + }; + + return RequestAlloc(size, OnResult); + } + template <class... T> + bool Send(const MQId remote, const MsgI &imsg, T &&...t) + { + return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...); + } + template <class... T> + bool Send(const MQId remote, const int64_t cmd, T &&...t) + { + return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...); + } + bool SyncRecv(int64_t &cmd, const int timeout_ms); bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); template <class Body> @@ -153,15 +188,15 @@ std::atomic<bool> run_; Queue mq_; - template <class Key> + template <class Key, class CB> class CallbackRecords { - std::unordered_map<Key, RecvCB> store_; + std::unordered_map<Key, CB> store_; public: bool empty() const { return store_.empty(); } - bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; } - bool Pick(const Key &id, RecvCB &cb) + bool Store(const Key &id, CB &&cb) { return store_.emplace(id, std::move(cb)).second; } + bool Pick(const Key &id, CB &cb) { auto pos = store_.find(id); if (pos != store_.end()) { @@ -174,9 +209,14 @@ } }; - Synced<CallbackRecords<std::string>> per_msg_cbs_; + Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_; + Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_; SendQ send_buffer_; + // node request center alloc memory. + int node_proc_index_ = -1; + int socket_index_ = -1; + std::atomic<int> alloc_id_; }; #endif // end of include guard: SOCKET_GWTJHBPO diff --git a/src/topic_node.cpp b/src/topic_node.cpp index d8d6a42..35228b4 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -42,7 +42,6 @@ TopicNode::TopicNode(SharedMemory &shm) : shm_(shm), state_(eStateUnregistered) { - Init(); } TopicNode::~TopicNode() @@ -57,34 +56,79 @@ if (Valid()) { return true; + } else if (info_.proc_id().empty()) { + return false; } if (ssn_id_ == 0) { ssn_id_ = ShmMsgQueue::NewId(); } LOG_DEBUG() << "Node Init, id " << ssn_id_; - MsgI msg; - msg.OffsetRef() = ssn_id_; - if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) { - - auto end_time = steady_clock::now() + 3s; - do { - try { - for (int i = eSockStart; i < eSockEnd; ++i) { - sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen)); + auto NodeInit = [&]() { + auto SendInitCmd = [&]() { + int64_t init_cmd = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit); + auto end_time = steady_clock::now() + 3s; + bool r = false; + do { + r = ShmMsgQueue::TrySend(shm(), BHTopicCenterAddress(), init_cmd); + } while (!r && steady_clock::now() < end_time); + return r; + }; + if (SendInitCmd()) { + LOG_DEBUG() << "node send init ok"; + auto end_time = steady_clock::now() + 3s; + do { + try { + for (int i = eSockStart; i < eSockEnd; ++i) { + sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen)); + } + break; + } catch (...) { + sockets_.clear(); + std::this_thread::sleep_for(100ms); } - break; - } catch (...) { - sockets_.clear(); - std::this_thread::sleep_for(100ms); - } - } while (steady_clock::now() < end_time); - if (!sockets_.empty()) { - // recv msgs to avoid memory leak. - auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; }; - SockNode().Start(default_ignore_msg); - return true; + } while (steady_clock::now() < end_time); } + }; + if (sockets_.empty()) { + NodeInit(); + } + if (!sockets_.empty()) { + LOG_DEBUG() << "node sockets ok"; + auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) { + LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val); + switch (DecodeCmd(val)) { + case eCmdNodeInitReply: { + MsgI msg(val >> 4); + DEFER1(msg.Release()); + MsgProcInit body; + auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_); + head.add_route()->set_mq_id(ssn_id_); + if (msg.Fill(head, body)) { + socket.Send(BHTopicCenterAddress(), msg); + } + } break; + default: + break; + } + return true; + }; + + // recv msgs to avoid memory leak. + auto onMsg = [this](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { + LOG_DEBUG() << "node recv type: " << head.type(); + if (head.type() == kMsgTypeProcInitReply) { + LOG_DEBUG() << "got proc init reply"; + MsgProcInitReply reply; + if (imsg.ParseBody(reply)) { + SetProcIndex(reply.proc_index()); + } + } + return true; + }; + SockNode().Start(1, onMsg, onNodeCmd); + LOG_DEBUG() << "sockets ok."; + return true; } return false; } @@ -100,7 +144,7 @@ } else if (nworker > 16) { nworker = 16; } - SockNode().Start(); + // SockNode().Start(); ServerStart(server_cb, nworker); SubscribeStartWorker(sub_cb, nworker); ClientStartWorker(client_cb, nworker); @@ -114,12 +158,15 @@ bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { + { + std::lock_guard<std::mutex> lk(mutex_); + info_ = proc; + } + if (!Init()) { SetLastError(eError, kErrMsgNotInit); return false; } - - info_ = proc; auto &sock = SockNode(); MsgRegister body; diff --git a/src/topic_node.h b/src/topic_node.h index 338a6e3..b018807 100644 --- a/src/topic_node.h +++ b/src/topic_node.h @@ -130,6 +130,14 @@ ShmSocket &SockClient() { return *sockets_[eSockClient]; } ShmSocket &SockServer() { return *sockets_[eSockServer]; } + void SetProcIndex(int index) + { + proc_index_ = index; + for (int i = eSockStart; i < eSockEnd; ++i) { + sockets_[i]->SetNodeProc(index, i); + } + } + enum State { eStateUnregistered, eStateOnline, @@ -144,6 +152,7 @@ std::mutex mutex_; MQId ssn_id_ = 0; std::atomic<State> state_; + int proc_index_ = -1; TopicQueryCache topic_query_cache_; }; diff --git a/utest/api_test.cpp b/utest/api_test.cpp index bd59c7f..f48f307 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -293,7 +293,7 @@ // } int same = 0; - int64_t last = 0; + uint64_t last = 0; while (last < nreq * ncli && same < 2) { Sleep(1s, false); auto cur = Status().nreply_.load(); diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp index 0645918..e7b8894 100644 --- a/utest/robust_test.cpp +++ b/utest/robust_test.cpp @@ -39,19 +39,24 @@ std::atomic<uint64_t> nwrite(0); std::atomic<uint64_t> writedone(0); -#if 0 - typedef AtomicQueue<4> Rcb; +#if 1 + const int kPower = 0; + typedef AtomicQueue<kPower> Rcb; Rcb tmp; - BOOST_CHECK(tmp.like_empty()); + // BOOST_CHECK(tmp.like_empty()); BOOST_CHECK(tmp.push(1)); - BOOST_CHECK(tmp.tail() == 1); + if (kPower != 0) { + BOOST_CHECK(tmp.tail() == 1); + } BOOST_CHECK(tmp.head() == 0); int64_t d; BOOST_CHECK(tmp.pop(d)); - BOOST_CHECK(tmp.like_empty()); - BOOST_CHECK(tmp.head() == 1); - BOOST_CHECK(tmp.tail() == 1); + if (kPower != 0) { + // BOOST_CHECK(tmp.like_empty()); + BOOST_CHECK(tmp.head() == 1); + BOOST_CHECK(tmp.tail() == 1); + } ShmObject<Rcb> rcb(shm, "test_rcb"); bool try_more = true; @@ -166,18 +171,20 @@ BOOST_AUTO_TEST_CASE(MutexTest) { { - int fd = open("/tmp/test_fmutex", O_CREAT | O_RDONLY, 0666); - flock(fd, LOCK_EX); - printf("lock 1"); + int sem_id = semget(100, 1, 0666 | IPC_CREAT); + auto P = [&]() { + sembuf op = {0, -1, SEM_UNDO}; + semop(sem_id, &op, 1); + }; + auto V = [&]() { + sembuf op = {0, 1, SEM_UNDO}; + semop(sem_id, &op, 1); + }; + for (int i = 0; i < 10; ++i) { + V(); + } Sleep(10s); - flock(fd, LOCK_EX); - printf("lock 2"); - Sleep(10s); - flock(fd, LOCK_UN); - printf("un lock 2"); - Sleep(10s); - flock(fd, LOCK_UN); - printf("un lock 1"); + return; } @@ -204,7 +211,7 @@ std::mutex m; typedef std::chrono::steady_clock Clock; - auto Now = []() { return Clock::now().time_since_epoch(); }; + if (pi) { auto old = *pi; printf("int : %d, add1: %d\n", old, ++*pi); diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index f8f54f5..334c081 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -92,9 +92,9 @@ } }; - int nwriters[] = {1, 10, 100}; + int nwriters[] = {1, 10, 100, 1000}; int nreaders[] = {2}; - const int64_t total_msg = 1000 * 100; + const int64_t total_msg = 1000 * 1000; auto Test = [&](auto &www, auto &rrr, bool isfork) { for (auto nreader : nreaders) { @@ -127,12 +127,13 @@ // typedef ThreadManager Manager; // const bool isfork = IsSameType<Manager, ProcessManager>::value; - { + if (0) { ThreadManager tw, tr; printf("---------------- Testing thread io: -------------------------------------------------------\n"); Test(tw, tr, false); } - { + + if (1) { ProcessManager pw, pr; printf("================ Testing process io: =======================================================\n"); Test(pw, pr, true); -- Gitblit v1.8.0