From db322f33ba13592f2492317e3f1a070454c97059 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 13 五月 2021 19:34:46 +0800
Subject: [PATCH] center alloc all msgs.
---
src/robust.h | 39 +
src/socket.h | 90 +++-
box/center.cpp | 276 ++++++++++++--
src/proto.h | 2
src/msg.h | 109 +++--
src/shm_msg_queue.h | 59 ++
src/socket.cpp | 141 ++++---
proto/source/bhome_msg.proto | 9
src/defs.h | 9
box/center.h | 7
src/sendq.cpp | 20
src/topic_node.cpp | 95 +++-
src/shm_msg_queue.cpp | 26
src/defs.cpp | 45 ++
src/msg.cpp | 29 +
box/center_main.cc | 7
utest/speed_test.cpp | 9
utest/api_test.cpp | 2
src/bh_util.h | 2
src/shm_queue.h | 11
src/topic_node.h | 9
utest/robust_test.cpp | 45 +-
src/sendq.h | 14
src/robust.cpp | 16
24 files changed, 788 insertions(+), 283 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index d6ac804..b440a03 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -21,7 +21,7 @@
#include "log.h"
#include "shm.h"
#include <chrono>
-#include <set>
+#include <unordered_map>
using namespace std::chrono;
using namespace std::chrono_literals;
@@ -33,11 +33,118 @@
namespace
{
+typedef std::string ProcId;
+typedef size_t ProcIndex; // max local procs.
+const int kMaxProcs = 65536;
+
+// record all procs ever registered, always grow, never remove.
+// mainly for node to request msg allocation.
+// use index instead of MQId to save some bits.
+class ProcRecords
+{
+public:
+ struct ProcRec {
+ ProcId proc_;
+ MQId ssn_ = 0;
+ };
+
+ ProcRecords() { procs_.reserve(kMaxProcs); }
+
+ ProcIndex Put(const ProcId &proc_id, const MQId ssn)
+ {
+ if (procs_.size() >= kMaxProcs) {
+ return -1;
+ }
+ auto pos_isnew = proc_index_.emplace(proc_id, procs_.size());
+ int index = pos_isnew.first->second;
+ if (pos_isnew.second) {
+ procs_.emplace_back(ProcRec{proc_id, ssn});
+ } else { // update ssn
+ procs_[index].ssn_ = ssn;
+ }
+ return index;
+ }
+ const ProcRec &Get(const ProcIndex index) const
+ {
+ static ProcRec empty_rec;
+ return (index < procs_.size()) ? procs_[index] : empty_rec;
+ }
+
+private:
+ std::unordered_map<ProcId, size_t> proc_index_;
+ std::vector<ProcRec> procs_;
+};
+
+class MsgRecords
+{
+ typedef int64_t MsgId;
+ typedef int64_t Offset;
+
+public:
+ void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); }
+ void FreeMsg(MsgId id)
+ {
+ auto pos = msgs_.find(id);
+ if (pos != msgs_.end()) {
+ ShmMsg(pos->second).Free();
+ msgs_.erase(pos);
+ } else {
+ LOG_TRACE() << "ignore late free request.";
+ }
+ }
+ void AutoRemove()
+ {
+ auto now = NowSec();
+ if (now < time_to_clean_) {
+ return;
+ }
+ LOG_FUNCTION;
+ time_to_clean_ = now + 1;
+ int64_t limit = std::max(10000ul, msgs_.size() / 10);
+ int64_t n = 0;
+ auto it = msgs_.begin();
+ while (it != msgs_.end() && --limit > 0) {
+ ShmMsg msg(it->second);
+ if (msg.Count() == 0) {
+ msg.Free();
+ it = msgs_.erase(it);
+ ++n;
+ } else if (msg.timestamp() + 10 < NowSec()) {
+ msg.Free();
+ it = msgs_.erase(it);
+ ++n;
+ // LOG_DEBUG() << "release timeout msg, someone crashed.";
+ } else {
+ ++it;
+ }
+ }
+ if (n > 0) {
+ LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
+ }
+ }
+ size_t size() const { return msgs_.size(); }
+ void DebugPrint() const
+ {
+ LOG_DEBUG() << "msgs : " << size();
+ int i = 0;
+ int total_count = 0;
+ for (auto &kv : msgs_) {
+ MsgI msg(kv.second);
+ total_count += msg.Count();
+ LOG_TRACE() << " " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
+ }
+ LOG_DEBUG() << "total count: " << total_count;
+ }
+
+private:
+ std::unordered_map<MsgId, Offset> msgs_;
+ int64_t time_to_clean_ = 0;
+};
+
//TODO check proc_id
class NodeCenter
{
public:
- typedef std::string ProcId;
typedef MQId Address;
typedef bhome_msg::ProcInfo ProcInfo;
typedef std::function<void(Address const)> Cleaner;
@@ -102,13 +209,14 @@
// center name, no relative to shm.
const std::string &id() const { return id_; }
- void OnNodeInit(SharedMemory &shm, const int64_t msg)
+ void OnNodeInit(ShmSocket &socket, const int64_t val)
{
- MQId ssn = msg;
+ LOG_FUNCTION;
+ SharedMemory &shm = socket.shm();
+ MQId ssn = (val >> 4) & MaskBits(60);
if (nodes_.find(ssn) != nodes_.end()) {
return; // ignore in exists.
}
-
auto UpdateRegInfo = [&](Node &node) {
for (int i = 0; i < 10; ++i) {
node->addrs_.insert(ssn + i);
@@ -118,12 +226,10 @@
// create sockets.
try {
- auto CreateSocket = [](SharedMemory &shm, const MQId id) {
- ShmSocket tmp(shm, true, id, 16);
- };
+ auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); };
// alloc(-1), node, server, sub, request,
- for (int i = -1; i < 4; ++i) {
- CreateSocket(shm, ssn + i);
+ for (int i = 0; i < 4; ++i) {
+ CreateSocket(ssn + i);
node->addrs_.insert(ssn + i);
}
return true;
@@ -132,11 +238,93 @@
}
};
+ auto PrepareProcInit = [&]() {
+ bool r = false;
+ ShmMsg init_msg;
+ if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) {
+ // 31bit pointer, 4bit cmd+flag
+ int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply);
+ r = SendAllocReply(socket, ssn, reply, init_msg);
+ }
+ return r;
+ };
+
Node node(new NodeInfo);
- if (UpdateRegInfo(node)) {
+ if (UpdateRegInfo(node) && PrepareProcInit()) {
nodes_[ssn] = node;
LOG_INFO() << "new node ssn (" << ssn << ") init";
+ } else {
+ for (int i = 0; i < 10; ++i) {
+ ShmSocket::Remove(shm, ssn + i);
+ }
}
+ }
+ void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); }
+
+ bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg)
+ {
+ RecordMsg(msg);
+ auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); };
+ return socket.Send(dest, reply, onExpireFree);
+ }
+ bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg)
+ {
+ RecordMsg(msg);
+ auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); };
+ return socket.Send(dest, msg, onExpireFree);
+ }
+
+ void OnAlloc(ShmSocket &socket, const int64_t val)
+ {
+ // LOG_FUNCTION;
+ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
+ int64_t msg_id = (val >> 4) & MaskBits(28);
+ int proc_index = (val >> 32) & MaskBits(16);
+ int socket_index = ((val) >> 48) & MaskBits(4);
+ auto proc_rec(procs_.Get(proc_index));
+ if (proc_rec.proc_.empty()) {
+ return;
+ }
+ Address dest = proc_rec.ssn_ + socket_index;
+
+ auto size = GetAllocSize((val >> 52) & MaskBits(8));
+ MsgI new_msg;
+ if (new_msg.Make(size)) {
+ // 31bit proc index, 28bit id, ,4bit cmd+flag
+ int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
+ SendAllocReply(socket, dest, reply, new_msg);
+ } else {
+ int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure.
+ socket.Send(dest, reply);
+ }
+ }
+
+ void OnFree(ShmSocket &socket, const int64_t val)
+ {
+ int64_t msg_id = (val >> 4) & MaskBits(31);
+ msgs_.FreeMsg(msg_id);
+ }
+
+ bool OnCommand(ShmSocket &socket, const int64_t val)
+ {
+ assert(IsCmd(val));
+ int cmd = DecodeCmd(val);
+ switch (cmd) {
+ case eCmdNodeInit: OnNodeInit(socket, val); break;
+ case eCmdAllocRequest0: OnAlloc(socket, val); break;
+ case eCmdFree: OnFree(socket, val); break;
+ default: return false;
+ }
+ return true;
+ }
+
+ MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg)
+ {
+ LOG_DEBUG() << "center got proc init.";
+ auto index = procs_.Put(head.proc_id(), head.ssn_id());
+ auto reply(MakeReply<MsgProcInitReply>(eSuccess));
+ reply.set_proc_index(index);
+ return reply;
}
MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
@@ -160,14 +348,13 @@
};
auto pos = nodes_.find(ssn);
- if (pos != nodes_.end()) { // update
- Node &node = pos->second;
- UpdateRegInfo(node);
- } else {
- Node node(new NodeInfo);
- UpdateRegInfo(node);
- nodes_[ssn] = node;
+ if (pos == nodes_.end()) {
+ return MakeReply(eInvalidInput, "invalid session.");
}
+
+ // update proc info
+ Node &node = pos->second;
+ UpdateRegInfo(node);
LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")";
auto old = online_node_addr_map_.find(head.proc_id());
@@ -376,13 +563,14 @@
void OnTimer()
{
CheckNodes();
+ msgs_.AutoRemove();
}
private:
void CheckNodes()
{
auto now = NowSec();
- if (now - last_check_time_ < 1) { return; }
+ if (now <= last_check_time_) { return; }
last_check_time_ = now;
auto it = nodes_.begin();
@@ -396,6 +584,7 @@
++it;
}
}
+ msgs_.DebugPrint();
}
bool CanHeartbeat(const NodeInfo &node)
{
@@ -448,7 +637,10 @@
std::unordered_map<Topic, Clients> service_map_;
std::unordered_map<Topic, Clients> subscribe_map_;
std::unordered_map<Address, Node> nodes_;
- std::unordered_map<std::string, Address> online_node_addr_map_;
+ std::unordered_map<ProcId, Address> online_node_addr_map_;
+ ProcRecords procs_; // To get a short index for msg alloc.
+ MsgRecords msgs_; // record all msgs alloced.
+
Cleaner cleaner_; // remove mqs.
int64_t offline_time_;
int64_t kill_time_;
@@ -483,25 +675,28 @@
msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
return true;
-auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id)
+auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced<NodeCenter> ¢er)
{
return [&](auto &&rep_body) {
- auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
+ auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
auto remote = head.route(0).mq_id();
- socket.Send(remote, reply_head, rep_body);
+ MsgI msg;
+ if (msg.Make(reply_head, rep_body)) {
+ DEFER1(msg.Release(););
+ center->SendAllocMsg(socket, remote, msg);
+ }
};
}
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
{
- auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) {
+ // command
+ auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
auto ¢er = *center_ptr;
- center->OnNodeInit(socket.shm(), msg.Offset());
+ return IsCmd(cmd) && center->OnCommand(socket, cmd);
};
- auto Nothing = [](ShmSocket &socket) {};
- BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16);
-
+ // now we can talk.
auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
auto ¢er = *center_ptr;
center->OnTimer();
@@ -509,8 +704,9 @@
auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
auto ¢er = *center_ptr;
- auto replyer = MakeReplyer(socket, head, center->id());
+ auto replyer = MakeReplyer(socket, head, center);
switch (head.type()) {
+ CASE_ON_MSG_TYPE(ProcInit);
CASE_ON_MSG_TYPE(Register);
CASE_ON_MSG_TYPE(Heartbeat);
CASE_ON_MSG_TYPE(Unregister);
@@ -520,12 +716,13 @@
default: return false;
}
};
- BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
+ BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000);
auto OnBusIdle = [=](ShmSocket &socket) {};
+ auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
auto ¢er = *center_ptr;
- auto replyer = MakeReplyer(socket, head, center->id());
+ auto replyer = MakeReplyer(socket, head, center);
auto OnPublish = [&]() {
MsgPublish pub;
NodeCenter::Clients clients;
@@ -561,7 +758,7 @@
}
};
- BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000);
+ BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000);
return true;
}
@@ -576,14 +773,9 @@
return rec;
}
-bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
+bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len)
{
- Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len};
- return true;
-}
-bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
-{
- Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len};
+ Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len};
return true;
}
@@ -609,11 +801,7 @@
{
for (auto &kv : Centers()) {
auto &info = kv.second;
- if (info.handler_) {
- sockets_[info.name_]->Start(info.handler_, info.idle_);
- } else {
- sockets_[info.name_]->Start(info.raw_handler_, info.idle_);
- }
+ sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
}
return true;
diff --git a/box/center.h b/box/center.h
index 4d71bc9..d68573b 100644
--- a/box/center.h
+++ b/box/center.h
@@ -29,10 +29,9 @@
public:
typedef Socket::PartialRecvCB MsgHandler;
- typedef Socket::RawRecvCB MsgIHandler;
+ typedef Socket::RawRecvCB RawHandler;
typedef Socket::IdleCB IdleHandler;
- static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
- static bool Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
+ static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len);
BHCenter(Socket::Shm &shm);
~BHCenter() { Stop(); }
@@ -43,7 +42,7 @@
struct CenterInfo {
std::string name_;
MsgHandler handler_;
- MsgIHandler raw_handler_;
+ RawHandler raw_handler_;
IdleHandler idle_;
MQId mqid_;
int mq_len_ = 0;
diff --git a/box/center_main.cc b/box/center_main.cc
index 232b943..6795e41 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -83,6 +83,12 @@
std::atomic<bool> run_;
};
+bool CenterInit(bhome_shm::SharedMemory &shm)
+{
+ ShmSocket create(shm, BHGlobalSenderAddress(), 16);
+ return true;
+}
+
} // namespace
int center_main(int argc, const char *argv[])
{
@@ -102,6 +108,7 @@
if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); }
auto &shm = BHomeShm();
+ CenterInit(shm);
GlobalInit(shm);
InstanceFlag inst(shm, kCenterRunningFlag);
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 51e9b6e..dcb5c56 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -28,6 +28,8 @@
kMsgTypeCommonReply = 2;
+ kMsgTypeProcInit = 8;
+ kMsgTypeProcInitReply = 9;
kMsgTypeRegister= 10;
// kMsgTypeRegisterReply= 11;
kMsgTypeHeartbeat = 12;
@@ -60,6 +62,13 @@
MsgTopicList topics = 1;
}
+message MsgProcInit{ } // proc_id is in header.
+
+message MsgProcInitReply {
+ ErrorMsg errmsg = 1;
+ int32 proc_index = 2;
+}
+
service TopicRPC {
rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply);
rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply);
diff --git a/src/bh_util.h b/src/bh_util.h
index a1c0d84..223da2a 100644
--- a/src/bh_util.h
+++ b/src/bh_util.h
@@ -92,6 +92,8 @@
inline void PutInt(void *p, uint32_t u) { Put32(p, u); }
inline void PutInt(void *p, uint64_t u) { Put64(p, u); }
+constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
+
class ExitCall
{
typedef std::function<void(void)> func_t;
diff --git a/src/defs.cpp b/src/defs.cpp
index 6d688b2..450349e 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -33,8 +33,53 @@
return le;
}
+constexpr int64_t AllocSizeIndex[] = {
+ 16, 24, 32, 40, 48, 56, 64, 72,
+ 80, 88, 96, 104, 120, 136, 152, 168,
+ 184, 200, 224, 248, 272, 296, 328, 360,
+ 392, 432, 472, 520, 568, 624, 680, 744,
+ 816, 896, 984, 1080, 1184, 1296, 1416, 1544,
+ 1688, 1848, 2016, 2200, 2400, 2624, 2864, 3128,
+ 3416, 3728, 4072, 4448, 4856, 5304, 5792, 6320,
+ 6896, 7528, 8216, 8968, 9784, 10680, 11656, 12720,
+ 13880, 15144, 16520, 18024, 19664, 21456, 23408, 25536,
+ 27864, 30400, 33168, 36184, 39480, 43072, 46992, 51264,
+ 55928, 61016, 66568, 72624, 79232, 86440, 94304, 102880,
+ 112232, 122440, 133576, 145720, 158968, 173424, 189192, 206392,
+ 225160, 245632, 267968, 292328, 318904, 347896, 379528, 414032,
+ 451672, 492736, 537536, 586408, 639720, 697880, 761328, 830544,
+ 906048, 988416, 1078272, 1176296, 1283232, 1399896, 1527160, 1665992,
+ 1817448, 1982672, 2162920, 2359552, 2574056, 2808064, 3063344, 3341832,
+ 3645640, 3977064, 4338616, 4733040, 5163320, 5632712, 6144776, 6703392,
+ 7312792, 7977592, 8702832, 9494000, 10357096, 11298656, 12325808, 13446336,
+ 14668736, 16002264, 17457016, 19044024, 20775304, 22663968, 24724328, 26972000,
+ 29424000, 32098912, 35017000, 38200368, 41673128, 45461600, 49594472, 54103064,
+ 59021528, 64387128, 70240504, 76626008, 83592008, 91191288, 99481408, 108525176,
+ 118391104, 129153936, 140895208, 153703864, 167676944, 182920304, 199549424, 217690280,
+ 237480312, 259069432, 282621200, 308314040, 336342592, 366919192, 400275488, 436664168,
+ 476360912, 519666456, 566908864, 618446040, 674668408, 736001904, 802911168, 875903096,
+ 955530656, 1042397080, 1137160456, 1240538680, 1353314928, 1476343560, 1610556616, 1756970856,
+ 1916695480, 2090940528, 2281026032, 2488392040, 2714609504, 2961392192, 3230609664, 3524301456,
+ 3844692504, 4194210008, 4575501832, 4991456544, 5445225320, 5940245808, 6480268160, 7069383448,
+ 7712054672, 8413150552, 9177982424, 10012344464, 10922557600, 11915517384, 12998746240, 14180450448,
+ 15469582312, 16875907976, 18410081432, 20083725200, 21909518400, 23901292800, 26074137600, 28444513752,
+ 31030378640, 33851322152, 36928715080, 40285871000, 43948222912, 47943515904, 52302017352, 57056746208,
+ 62243723136, 67902243424, 74075174648, 80809281440, 88155579752, 96169723368, 104912425496, 114449918728,
+ 124854456800, 136204861968, 148587122152, 162095042352, 176830955296, 192906496688, 210443450936, 229574673752};
+
+const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]);
+static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough.");
+static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
} // namespace
+int64_t CalcAllocIndex(int64_t size)
+{
+ auto pos = std::lower_bound(AllocSizeIndex, AllocSizeIndex + kAllocIndexLen, size);
+ return (pos == AllocSizeIndex + kAllocIndexLen) ? -1 : pos - AllocSizeIndex;
+}
+
+int64_t GetAllocSize(int index) { return index < kAllocIndexLen ? AllocSizeIndex[index] : 0; }
+
std::string BHomeShmName()
{
return "bhome_default_shm_v0";
diff --git a/src/defs.h b/src/defs.h
index a95c81f..f0a0d49 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -23,14 +23,15 @@
typedef uint64_t MQId;
-const MQId kBHNodeInit = 10;
+const MQId kBHDefaultSender = 99;
const MQId kBHTopicCenter = 100;
const MQId kBHTopicBus = 101;
-const MQId kBHUniCenter = 102;
-inline const MQId BHInitAddress() { return kBHNodeInit; }
+inline const MQId BHGlobalSenderAddress() { return kBHDefaultSender; }
inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
-inline const MQId BHUniCenterAddress() { return kBHUniCenter; }
+
+int64_t CalcAllocIndex(int64_t size);
+int64_t GetAllocSize(int index);
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
diff --git a/src/msg.cpp b/src/msg.cpp
index f180d67..a4777d2 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -17,8 +17,37 @@
*/
#include "msg.h"
#include "bh_util.h"
+#include "socket.h"
namespace bhome_msg
{
+ShmSocket &ShmMsg::Sender()
+{
+ static ShmSocket sender(shm(), false, BHGlobalSenderAddress(), 16);
+ return sender;
+}
+
+int ShmMsg::Release()
+{
+ if (!valid()) {
+ return 0;
+ }
+ auto n = meta()->count_.Dec();
+ if (n == 0) {
+ int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree);
+ Sender().Send(BHTopicCenterAddress(), free_cmd);
+ } else if (n < 0) {
+ throw -123;
+ }
+ return n;
+}
+
+void ShmMsg::Free()
+{
+ assert(valid());
+ shm().Dealloc(meta());
+ offset_ = 0;
+ assert(!valid());
+}
} // namespace bhome_msg
diff --git a/src/msg.h b/src/msg.h
index 1f5b0f1..9589389 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -26,6 +26,7 @@
#include <functional>
#include <stdint.h>
+class ShmSocket;
namespace bhome_msg
{
using namespace bhome_shm;
@@ -35,8 +36,9 @@
class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg>
{
-private:
static inline SharedMemory &shm() { return GetData(); }
+ static ShmSocket &Sender();
+
// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
class RefCount : private boost::noncopyable
{
@@ -49,6 +51,7 @@
int Dec() { return --num_; }
int Get() { return num_.load(); }
};
+
typedef int64_t OffsetType;
static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); }
static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); }
@@ -60,14 +63,22 @@
static const uint32_t kMsgTag = 0xf1e2d3c4;
struct Meta {
+ static int64_t NewId()
+ {
+ static std::atomic<int64_t> id(0);
+ return ++id;
+ }
+
RefCount count_;
const uint32_t tag_ = kMsgTag;
const uint32_t size_ = 0;
+ const int64_t id_ = 0;
+ std::atomic<int64_t> timestamp_;
Meta(uint32_t size) :
- size_(size) {}
+ size_(size), id_(NewId()), timestamp_(NowSec()) {}
};
OffsetType offset_;
- void *Alloc(const size_t size)
+ static void *Alloc(const size_t size)
{
void *p = shm().Alloc(sizeof(Meta) + size);
if (p) {
@@ -76,45 +87,33 @@
}
return p;
}
- void Free()
- {
- assert(valid());
- shm().Dealloc(meta());
- offset_ = 0;
- assert(!valid());
- }
+
+private:
Meta *meta() const { return get<Meta>() - 1; }
typedef std::function<void(void *p, int len)> ToArray;
- void *Pack(const uint32_t head_len, const ToArray &headToArray,
- const uint32_t body_len, const ToArray &bodyToArray)
+
+ template <class Body>
+ void *Pack(const BHMsgHead &head, const uint32_t head_len, const Body &body, const uint32_t body_len)
{
- void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len);
+ void *addr = get();
if (addr) {
auto p = static_cast<char *>(addr);
- auto Pack1 = [&p](auto len, auto &writer) {
+ auto Pack1 = [&p](auto len, auto &&writer) {
Put32(p, len);
p += sizeof(len);
writer(p, len);
p += len;
};
- Pack1(head_len, headToArray);
- Pack1(body_len, bodyToArray);
+ Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); });
+ Pack1(body_len, [&](void *p, int len) { body.SerializeToArray(p, len); });
}
return addr;
}
- template <class Body>
- void *Pack(const BHMsgHead &head, const Body &body)
- {
- return Pack(
- uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); },
- uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
- }
-
void *Pack(const std::string &content)
{
- void *addr = Alloc(content.size());
+ void *addr = get();
if (addr) {
memcpy(addr, content.data(), content.size());
}
@@ -133,36 +132,48 @@
offset_(p ? (Addr(p) - BaseAddr()) : 0) {}
template <class T = void>
- T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); }
+ T *get() const { return offset_ != 0 ? static_cast<T *>(Ptr(offset_ + BaseAddr())) : nullptr; }
public:
static bool BindShm(SharedMemory &shm) { return SetData(shm); }
ShmMsg() :
- ShmMsg(nullptr) {}
+ offset_(0) {}
explicit ShmMsg(const OffsetType offset) :
offset_(offset) {}
OffsetType Offset() const { return offset_; }
OffsetType &OffsetRef() { return offset_; }
void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
- bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; }
-
- int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
- int Release()
- {
- if (!valid()) {
- return 0;
- }
- auto n = meta()->count_.Dec();
- if (n == 0) {
- Free();
- }
- return n;
- }
+ bool valid() const { return offset_ != 0 && meta()->tag_ == kMsgTag; }
+ int64_t id() const { return valid() ? meta()->id_ : 0; }
+ int64_t timestamp() const { return valid() ? meta()->timestamp_.load() : 0; }
+ size_t Size() const { return valid() ? meta()->size_ : 0; }
int Count() const { return valid() ? meta()->count_.Get() : 1; }
+ int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
+ int Release();
+ void Free();
template <class Body>
- inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); }
- inline bool Make(const std::string &content) { return Make(Pack(content)); }
+ inline bool Make(const BHMsgHead &head, const Body &body)
+ {
+ uint32_t head_len = head.ByteSizeLong();
+ uint32_t body_len = body.ByteSizeLong();
+ uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
+ return Make(size) && Pack(head, head_len, body, body_len);
+ }
+ template <class Body>
+ inline bool Fill(const BHMsgHead &head, const Body &body)
+ {
+ uint32_t head_len = head.ByteSizeLong();
+ uint32_t body_len = body.ByteSizeLong();
+ uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
+ return valid() && (meta()->size_ >= size) && Pack(head, head_len, body, body_len);
+ }
+
+ inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); }
+ inline bool Fill(const std::string &content) { return valid() && (meta()->size_ >= content.size()) && Pack(content); }
+
+ inline bool Make(const size_t size) { return Make(Alloc(size)); }
+
template <class Body>
static inline std::string Serialize(const BHMsgHead &head, const Body &body)
{
@@ -208,6 +219,18 @@
typedef ShmMsg MsgI;
+constexpr inline int EncodeCmd(int cmd) { return ((cmd & MaskBits(3)) << 1) | 1; }
+constexpr inline int DecodeCmd(int64_t msg) { return (msg >> 1) & MaskBits(3); }
+constexpr inline bool IsCmd(int64_t msg) { return (msg & 1) != 0; }
+// int64_t pack format: cmd data ,3bit cmd, 1bit flag.
+enum MsgCmd {
+ eCmdNodeInit = 0, // upto 59bit ssn id
+ eCmdNodeInitReply = 1, // 31bit proc index,
+ eCmdAllocRequest0 = 2, // 8bit size, 4bit socket index, 16bit proc index, 28bit id
+ eCmdAllocReply0 = 3, // 31bit ptr, 28bit id,
+ eCmdFree = 4, // upto 59bit msg id,
+};
+
} // namespace bhome_msg
#endif // end of include guard: MSG_5BILLZET
diff --git a/src/proto.h b/src/proto.h
index 94a438c..c05407b 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -48,6 +48,8 @@
BHOME_SIMPLE_MAP_MSG(Publish);
BHOME_SIMPLE_MAP_MSG(Subscribe);
BHOME_SIMPLE_MAP_MSG(Unsubscribe);
+BHOME_SIMPLE_MAP_MSG(ProcInit);
+BHOME_SIMPLE_MAP_MSG(ProcInitReply);
#undef BHOME_SIMPLE_MAP_MSG
#undef BHOME_MAP_MSG_AND_TYPE
diff --git a/src/robust.cpp b/src/robust.cpp
index 26d41b9..4654652 100644
--- a/src/robust.cpp
+++ b/src/robust.cpp
@@ -35,24 +35,30 @@
bool FMutex::try_lock()
{
- if (mtx_.try_lock()) {
- if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
+ if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
+ ++count_;
+ if (mtx_.try_lock()) {
return true;
} else {
- mtx_.unlock();
+ if (--count_ == 0) {
+ flock(fd_, LOCK_UN);
+ }
}
}
return false;
}
void FMutex::lock()
{
- mtx_.lock();
flock(fd_, LOCK_EX);
+ ++count_;
+ mtx_.lock();
}
void FMutex::unlock()
{
- flock(fd_, LOCK_UN);
mtx_.unlock();
+ if (--count_ == 0) {
+ flock(fd_, LOCK_UN);
+ }
}
} // namespace robust
\ No newline at end of file
diff --git a/src/robust.h b/src/robust.h
index 8657122..c70e2fe 100644
--- a/src/robust.h
+++ b/src/robust.h
@@ -19,6 +19,7 @@
#ifndef ROBUST_Q31RCWYU
#define ROBUST_Q31RCWYU
+#include "bh_util.h"
#include "log.h"
#include <atomic>
#include <chrono>
@@ -37,8 +38,6 @@
using namespace std::chrono;
using namespace std::chrono_literals;
-constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
-
void QuickSleep();
class CasMutex
@@ -99,7 +98,7 @@
public:
typedef uint64_t id_t;
FMutex(id_t id) :
- id_(id), fd_(Open(id_))
+ id_(id), fd_(Open(id_)), count_(0)
{
if (fd_ == -1) { throw "error create mutex!"; }
}
@@ -117,11 +116,10 @@
}
static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
static int Close(int fd) { return close(fd); }
- void FLock();
- void FUnlock();
id_t id_;
int fd_;
std::mutex mtx_;
+ std::atomic<int32_t> count_;
};
union semun {
@@ -310,5 +308,36 @@
AData buf[capacity];
};
+template <class Int>
+class AtomicQueue<0, Int>
+{
+ typedef Int Data;
+ typedef std::atomic<Data> AData;
+ static_assert(sizeof(Data) == sizeof(AData));
+
+public:
+ AtomicQueue() { memset(this, 0, sizeof(*this)); }
+ bool push(const Data d, bool try_more = false)
+ {
+ auto cur = buf.load();
+ return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d));
+ }
+ bool pop(Data &d, bool try_more = false)
+ {
+ Data cur = buf.load();
+ bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0);
+ if (r) { d = Dec(cur); }
+ return r;
+ }
+ uint32_t head() const { return 0; }
+ uint32_t tail() const { return 0; }
+
+private:
+ static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
+ static inline Data Enc(const Data d) { return (d << 1) | 1; } // lowest bit 1 means data ok.
+ static inline Data Dec(const Data d) { return d >> 1; } // lowest bit 1 means data ok.
+ AData buf;
+};
+
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU
diff --git a/src/sendq.cpp b/src/sendq.cpp
index c0d5afd..36af264 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -40,20 +40,24 @@
}
auto SendData = [&](Data &d) {
+ auto TryLoop = [&](auto &&data) {
+ for (int i = 0; i < 1; ++i) {
+ if (mq.TrySend(remote, data)) {
+ return true;
+ }
+ }
+ return false;
+ };
bool r = false;
if (d.index() == 0) {
auto &msg = boost::variant2::get<0>(pos->data().data_);
- r = mq.TrySend(remote, msg);
+ r = TryLoop(msg);
if (r) {
msg.Release();
}
} else {
- auto &content = boost::variant2::get<1>(pos->data().data_);
- MsgI msg;
- if (msg.Make(content)) {
- DEFER1(msg.Release(););
- r = mq.TrySend(remote, msg);
- }
+ auto command = boost::variant2::get<1>(pos->data().data_);
+ r = TryLoop(command);
}
return r;
};
@@ -110,4 +114,4 @@
Collect();
return !out_.empty();
-}
\ No newline at end of file
+}
diff --git a/src/sendq.h b/src/sendq.h
index 0e565d5..862a1cc 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -37,7 +37,8 @@
typedef MQId Remote;
typedef bhome_msg::MsgI MsgI;
typedef std::string Content;
- typedef boost::variant2::variant<MsgI, Content> Data;
+ typedef int64_t Command;
+ typedef boost::variant2::variant<MsgI, Command> Data;
typedef std::function<void(const Data &)> OnMsgEvent;
struct MsgInfo {
Data data_;
@@ -47,23 +48,16 @@
typedef TimedMsg::TimePoint TimePoint;
typedef TimedMsg::Duration Duration;
- // template <class... Rest>
- // void Append(const MQId &id, Rest &&...rest)
- // {
- // Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
- // }
-
void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent())
{
msg.AddRef();
AppendData(addr, Data(msg), DefaultExpire(), onExpire);
}
- void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
+ void Append(const Remote addr, const Command command, OnMsgEvent onExpire = OnMsgEvent())
{
- AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
+ AppendData(addr, Data(command), DefaultExpire(), onExpire);
}
bool TrySend(ShmMsgQueue &mq);
- // bool empty() const { return store_.empty(); }
private:
static TimePoint Now() { return TimedMsg::Clock::now(); }
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index b78c1a0..d96c511 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -56,6 +56,7 @@
ShmMsgQueue::~ShmMsgQueue() {}
+#ifndef BH_USE_ATOMIC_Q
ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id)
{
static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm;
@@ -69,13 +70,19 @@
}
return *pos->second;
}
+#endif
+
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
{
Queue *q = Find(shm, id);
if (q) {
- MsgI msg;
- while (q->TryRead(msg.OffsetRef())) {
- msg.Release();
+ RawData val = 0;
+ while (q->TryRead(val)) {
+ if (IsCmd(val)) {
+ LOG_DEBUG() << "clsing queue " << id << ", has a cmd" << DecodeCmd(val);
+ } else {
+ MsgI(val).Release();
+ }
}
}
return Shmq::Remove(shm, MsgQIdToName(id));
@@ -86,19 +93,18 @@
return Shmq::Find(shm, MsgQIdToName(remote_id));
}
-bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg)
+bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, int64_t val)
{
- bool r = false;
try {
ShmMsgQueue dest(remote_id, false, shm, 1);
- msg.AddRef();
- DEFER1(if (!r) { msg.Release(); });
-
+#ifndef BH_USE_ATOMIC_Q
Guard lock(GetMutex(remote_id));
- r = dest.queue().TryWrite(msg.Offset());
+#endif
+ return dest.queue().TryWrite(val);
} catch (...) {
+ // SetLastError(eNotFound, "remote not found");
+ return false;
}
- return r;
}
// Test shows that in the 2 cases:
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index 1970803..4b7aed8 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -24,19 +24,25 @@
using namespace bhome_shm;
using namespace bhome_msg;
+#define BH_USE_ATOMIC_Q
+
class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
{
- // typedef ShmObject<SharedQ63<4>> Shmq;
- typedef ShmObject<SharedQueue<int64_t>> Shmq;
- typedef Shmq::ShmType ShmType;
- typedef Shmq::Data Queue;
- typedef std::function<void()> OnSend;
- typedef robust::FMutex Mutex;
- // typedef robust::SemMutex Mutex;
- // typedef robust::NullMutex Mutex;
- typedef robust::Guard<Mutex> Guard;
-
public:
+ typedef int64_t RawData;
+
+#ifdef BH_USE_ATOMIC_Q
+ typedef ShmObject<SharedQ63<0>> Shmq;
+#else
+ typedef ShmObject<SharedQueue<RawData>> Shmq;
+ // typedef robust::FMutex Mutex;
+ // typedef robust::SemMutex Mutex;
+ typedef robust::NullMutex Mutex;
+ typedef robust::Guard<Mutex> Guard;
+#endif
+
+ typedef Shmq::Data Queue;
+ typedef Shmq::ShmType ShmType;
typedef uint64_t MQId;
static MQId NewId();
@@ -45,26 +51,45 @@
ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len);
ShmMsgQueue(ShmType &segment, const int len);
~ShmMsgQueue();
- static bool Remove(SharedMemory &shm, const MQId id);
+ static bool Remove(ShmType &shm, const MQId id);
MQId Id() const { return id_; }
ShmType &shm() const { return queue_.shm(); }
- bool Recv(MsgI &msg, const int timeout_ms)
+ bool Recv(RawData &val, const int timeout_ms)
{
+#ifndef BH_USE_ATOMIC_Q
Guard lock(GetMutex(Id()));
- return queue().Read(msg.OffsetRef(), timeout_ms);
+#endif
+ return queue().Read(val, timeout_ms);
}
- bool TryRecv(MsgI &msg)
+
+ bool TryRecv(RawData &val)
{
+#ifndef BH_USE_ATOMIC_Q
Guard lock(GetMutex(Id()));
- return queue().TryRead(msg.OffsetRef());
+#endif
+ return queue().TryRead(val);
}
- static Queue *Find(SharedMemory &shm, const MQId remote_id);
- static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg);
+
+ bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
+ bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
+ static Queue *Find(ShmType &shm, const MQId remote_id);
+ static bool TrySend(ShmType &shm, const MQId remote_id, const RawData val);
+ static bool TrySend(ShmType &shm, const MQId remote_id, MsgI msg)
+ {
+ bool r = false;
+ msg.AddRef(); // TODO check if we could avoid addref here.
+ DEFER1(if (!r) { msg.Release(); });
+ r = TrySend(shm, remote_id, msg.Offset());
+ return r;
+ }
bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
+ bool TrySend(const MQId remote_id, const RawData val) { return TrySend(shm(), remote_id, val); }
private:
+#ifndef BH_USE_ATOMIC_Q
static Mutex &GetMutex(const MQId id);
+#endif
MQId id_;
Queue &queue() { return *queue_.data(); }
Shmq queue_;
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 5fd14e3..5c9e077 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -76,7 +76,7 @@
private:
Circular<D> queue_;
- bhome_shm::Mutex mutex_;
+ // bhome_shm::Mutex mutex_;
};
template <int Power = 4>
@@ -92,11 +92,12 @@
using namespace std::chrono;
auto end_time = steady_clock::now() + milliseconds(timeout_ms);
do {
- if (TryRead(d)) {
- return true;
- } else {
- robust::QuickSleep();
+ for (int i = 0; i < 100; ++i) {
+ if (TryRead(d)) {
+ return true;
+ }
}
+ robust::QuickSleep();
} while (steady_clock::now() < end_time);
return false;
}
diff --git a/src/socket.cpp b/src/socket.cpp
index 6231579..0704174 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -20,22 +20,25 @@
#include "bh_util.h"
#include "defs.h"
#include "msg.h"
+#include <chrono>
+using namespace std::chrono;
+using namespace std::chrono_literals;
using namespace bhome_msg;
using namespace bhome_shm;
ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
- run_(false), mq_(id, shm, len)
+ run_(false), mq_(id, shm, len), alloc_id_(0)
{
Start();
}
ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
- run_(false), mq_(id, create_or_else_find, shm, len)
+ run_(false), mq_(id, create_or_else_find, shm, len), alloc_id_(0)
{
Start();
}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
- run_(false), mq_(shm, len)
+ run_(false), mq_(shm, len), alloc_id_(0)
{
Start();
}
@@ -45,50 +48,15 @@
Stop();
}
-bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker)
+bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle)
{
- auto ioProc = [this, onData, onIdle]() {
+ auto ioProc = [this, onData, onRaw, onIdle]() {
auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
auto DoRecv = [=] {
// do not recv if no cb is set.
- if (!onData) {
- return false;
- }
- auto onMsg = [&](MsgI &imsg) {
- DEFER1(imsg.Release());
- onData(*this, imsg);
- };
- MsgI imsg;
- return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
- };
+ if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; }
- try {
- bool more_to_send = DoSend();
- bool more_to_recv = DoRecv();
- if (onIdle) { onIdle(*this); }
- if (!more_to_send && !more_to_recv) {
- robust::QuickSleep();
- }
- } catch (...) {
- }
- };
-
- std::lock_guard<std::mutex> lock(mutex_);
- StopNoLock();
-
- run_.store(true);
- for (int i = 0; i < nworker; ++i) {
- workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
- }
- return true;
-}
-
-bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
-{
- auto ioProc = [this, onData, onIdle]() {
- auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
- auto DoRecv = [=] {
- auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
+ auto onMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
RecvCB cb;
if (per_msg_cbs_->Pick(head.msg_id(), cb)) {
cb(socket, imsg, head);
@@ -96,20 +64,43 @@
onData(socket, imsg, head);
}
};
-
- // do not recv if no cb is set.
- if (!onData && per_msg_cbs_->empty()) {
- return false;
- }
- auto onMsg = [&](MsgI &imsg) {
- DEFER1(imsg.Release());
- BHMsgHead head;
- if (imsg.ParseHead(head)) {
- onRecvWithPerMsgCB(*this, imsg, head);
+ auto onCmdCB = [this, onRaw](ShmSocket &socket, int64_t val) {
+ int cmd = DecodeCmd(val);
+ if (cmd == eCmdAllocReply0) {
+ int id = (val >> 4) & MaskBits(28);
+ RawRecvCB cb;
+ if (alloc_cbs_->Pick(id, cb)) {
+ cb(socket, val);
+ return;
+ }
+ }
+ if (onRaw) {
+ onRaw(socket, val);
}
};
- MsgI imsg;
- return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
+
+ auto onRecv = [&](auto &val) {
+ if (IsCmd(val)) {
+ onCmdCB(*this, val);
+ } else {
+ MsgI imsg(val);
+ DEFER1(imsg.Release());
+ BHMsgHead head;
+ if (imsg.ParseHead(head)) {
+ onMsgCB(*this, imsg, head);
+ }
+ }
+ };
+ ShmMsgQueue::RawData val = 0;
+ auto TryRecvMore = [&]() {
+ for (int i = 0; i < 100; ++i) {
+ if (mq().TryRecv(val)) {
+ return true;
+ }
+ }
+ return false;
+ };
+ return TryRecvMore() ? (onRecv(val), true) : false;
};
try {
@@ -126,9 +117,18 @@
std::lock_guard<std::mutex> lock(mutex_);
StopNoLock();
+ auto worker_proc = [this, ioProc]() {
+ while (run_) { ioProc(); }
+ // try send pending msgs.
+ auto end_time = steady_clock::now() + 3s;
+ while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) {
+ // LOG_DEBUG() << "try send pending msgs.";
+ }
+ };
+
run_.store(true);
for (int i = 0; i < nworker; ++i) {
- workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
+ workers_.emplace_back(worker_proc);
}
return true;
}
@@ -153,6 +153,10 @@
return false;
}
+bool ShmSocket::SyncRecv(int64_t &cmd, const int timeout_ms)
+{
+ return (timeout_ms == 0) ? mq().TryRecv(cmd) : mq().Recv(cmd, timeout_ms);
+}
//maybe reimplment, using async cbs?
bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms)
{
@@ -167,3 +171,30 @@
}
return false;
}
+
+bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
+{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
+ // LOG_FUNCTION;
+ if (node_proc_index_ == -1 || socket_index_ == -1) {
+ return false;
+ }
+ int id = (++alloc_id_) & MaskBits(28);
+ int64_t cmd = (CalcAllocIndex(size) << 52) |
+ ((socket_index_ & MaskBits(4)) << 48) |
+ ((node_proc_index_ & MaskBits(16)) << 32) |
+ (id << 4) |
+ EncodeCmd(eCmdAllocRequest0);
+ auto rawCB = [onResult](ShmSocket &sock, int64_t &val) {
+ MsgI msg((val >> 32) & MaskBits(31));
+ DEFER1(msg.Release());
+ onResult(msg);
+ return true;
+ };
+
+ alloc_cbs_->Store(id, std::move(rawCB));
+ auto onExpireRemoveCB = [this, id](SendQ::Data const &msg) {
+ RawRecvCB cb_no_use;
+ alloc_cbs_->Pick(id, cb_no_use);
+ };
+ return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB);
+}
\ No newline at end of file
diff --git a/src/socket.h b/src/socket.h
index 981677f..d69b8d4 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -42,7 +42,7 @@
public:
typedef ShmMsgQueue::MQId MQId;
typedef bhome_shm::SharedMemory Shm;
- typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB;
+ typedef std::function<void(ShmSocket &sock, Queue::RawData &val)> RawRecvCB;
typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
typedef std::function<void(ShmSocket &sock)> IdleCB;
@@ -54,39 +54,74 @@
static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
bool Remove() { return Remove(shm(), id()); }
MQId id() const { return mq().Id(); }
+ void SetNodeProc(const int proc_index, const int socket_index)
+ {
+ node_proc_index_ = proc_index;
+ socket_index_ = socket_index;
+ LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
+ }
// start recv.
- bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1);
- bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
- bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
+ bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
+ bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, RawRecvCB(), onIdle); }
bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
bool Stop();
template <class Body>
- bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
+ bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
{
try {
- if (!cb) {
- return SendImpl(remote, MsgI::Serialize(head, body));
- } else {
- std::string msg_id(head.msg_id());
- per_msg_cbs_->Store(msg_id, std::move(cb));
- auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
- RecvCB cb_no_use;
- per_msg_cbs_->Pick(msg_id, cb_no_use);
- };
- return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB);
- }
+ //TODO alloc outsiez and use send.
+ MsgI msg;
+ if (!msg.Make(head, body)) { return false; }
+ DEFER1(msg.Release());
+
+ return Send(remote, msg);
} catch (...) {
SetLastError(eError, "Send internal error.");
return false;
}
}
- bool Send(const MQId remote, const MsgI &imsg)
- {
- return SendImpl(remote, imsg);
- }
+ bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
+ template <class Body>
+ bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
+ {
+ std::string msg_id(head.msg_id());
+ std::string content(MsgI::Serialize(head, body));
+ size_t size = content.size();
+ auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
+ if (!msg.Fill(content)) { return; }
+
+ try {
+ if (!cb) {
+ Send(remote, msg);
+ } else {
+ per_msg_cbs_->Store(msg_id, std::move(cb));
+ auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
+ RecvCB cb_no_use;
+ per_msg_cbs_->Pick(msg_id, cb_no_use);
+ };
+ Send(remote, msg, onExpireRemoveCB);
+ }
+ } catch (...) {
+ SetLastError(eError, "Send internal error.");
+ }
+ };
+
+ return RequestAlloc(size, OnResult);
+ }
+ template <class... T>
+ bool Send(const MQId remote, const MsgI &imsg, T &&...t)
+ {
+ return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
+ }
+ template <class... T>
+ bool Send(const MQId remote, const int64_t cmd, T &&...t)
+ {
+ return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
+ }
+ bool SyncRecv(int64_t &cmd, const int timeout_ms);
bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
template <class Body>
@@ -153,15 +188,15 @@
std::atomic<bool> run_;
Queue mq_;
- template <class Key>
+ template <class Key, class CB>
class CallbackRecords
{
- std::unordered_map<Key, RecvCB> store_;
+ std::unordered_map<Key, CB> store_;
public:
bool empty() const { return store_.empty(); }
- bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
- bool Pick(const Key &id, RecvCB &cb)
+ bool Store(const Key &id, CB &&cb) { return store_.emplace(id, std::move(cb)).second; }
+ bool Pick(const Key &id, CB &cb)
{
auto pos = store_.find(id);
if (pos != store_.end()) {
@@ -174,9 +209,14 @@
}
};
- Synced<CallbackRecords<std::string>> per_msg_cbs_;
+ Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
+ Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
SendQ send_buffer_;
+ // node request center alloc memory.
+ int node_proc_index_ = -1;
+ int socket_index_ = -1;
+ std::atomic<int> alloc_id_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index d8d6a42..35228b4 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -42,7 +42,6 @@
TopicNode::TopicNode(SharedMemory &shm) :
shm_(shm), state_(eStateUnregistered)
{
- Init();
}
TopicNode::~TopicNode()
@@ -57,34 +56,79 @@
if (Valid()) {
return true;
+ } else if (info_.proc_id().empty()) {
+ return false;
}
if (ssn_id_ == 0) {
ssn_id_ = ShmMsgQueue::NewId();
}
LOG_DEBUG() << "Node Init, id " << ssn_id_;
- MsgI msg;
- msg.OffsetRef() = ssn_id_;
- if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) {
-
- auto end_time = steady_clock::now() + 3s;
- do {
- try {
- for (int i = eSockStart; i < eSockEnd; ++i) {
- sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
+ auto NodeInit = [&]() {
+ auto SendInitCmd = [&]() {
+ int64_t init_cmd = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit);
+ auto end_time = steady_clock::now() + 3s;
+ bool r = false;
+ do {
+ r = ShmMsgQueue::TrySend(shm(), BHTopicCenterAddress(), init_cmd);
+ } while (!r && steady_clock::now() < end_time);
+ return r;
+ };
+ if (SendInitCmd()) {
+ LOG_DEBUG() << "node send init ok";
+ auto end_time = steady_clock::now() + 3s;
+ do {
+ try {
+ for (int i = eSockStart; i < eSockEnd; ++i) {
+ sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
+ }
+ break;
+ } catch (...) {
+ sockets_.clear();
+ std::this_thread::sleep_for(100ms);
}
- break;
- } catch (...) {
- sockets_.clear();
- std::this_thread::sleep_for(100ms);
- }
- } while (steady_clock::now() < end_time);
- if (!sockets_.empty()) {
- // recv msgs to avoid memory leak.
- auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
- SockNode().Start(default_ignore_msg);
- return true;
+ } while (steady_clock::now() < end_time);
}
+ };
+ if (sockets_.empty()) {
+ NodeInit();
+ }
+ if (!sockets_.empty()) {
+ LOG_DEBUG() << "node sockets ok";
+ auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) {
+ LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val);
+ switch (DecodeCmd(val)) {
+ case eCmdNodeInitReply: {
+ MsgI msg(val >> 4);
+ DEFER1(msg.Release());
+ MsgProcInit body;
+ auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_);
+ head.add_route()->set_mq_id(ssn_id_);
+ if (msg.Fill(head, body)) {
+ socket.Send(BHTopicCenterAddress(), msg);
+ }
+ } break;
+ default:
+ break;
+ }
+ return true;
+ };
+
+ // recv msgs to avoid memory leak.
+ auto onMsg = [this](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+ LOG_DEBUG() << "node recv type: " << head.type();
+ if (head.type() == kMsgTypeProcInitReply) {
+ LOG_DEBUG() << "got proc init reply";
+ MsgProcInitReply reply;
+ if (imsg.ParseBody(reply)) {
+ SetProcIndex(reply.proc_index());
+ }
+ }
+ return true;
+ };
+ SockNode().Start(1, onMsg, onNodeCmd);
+ LOG_DEBUG() << "sockets ok.";
+ return true;
}
return false;
}
@@ -100,7 +144,7 @@
} else if (nworker > 16) {
nworker = 16;
}
- SockNode().Start();
+ // SockNode().Start();
ServerStart(server_cb, nworker);
SubscribeStartWorker(sub_cb, nworker);
ClientStartWorker(client_cb, nworker);
@@ -114,12 +158,15 @@
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
+ {
+ std::lock_guard<std::mutex> lk(mutex_);
+ info_ = proc;
+ }
+
if (!Init()) {
SetLastError(eError, kErrMsgNotInit);
return false;
}
-
- info_ = proc;
auto &sock = SockNode();
MsgRegister body;
diff --git a/src/topic_node.h b/src/topic_node.h
index 338a6e3..b018807 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -130,6 +130,14 @@
ShmSocket &SockClient() { return *sockets_[eSockClient]; }
ShmSocket &SockServer() { return *sockets_[eSockServer]; }
+ void SetProcIndex(int index)
+ {
+ proc_index_ = index;
+ for (int i = eSockStart; i < eSockEnd; ++i) {
+ sockets_[i]->SetNodeProc(index, i);
+ }
+ }
+
enum State {
eStateUnregistered,
eStateOnline,
@@ -144,6 +152,7 @@
std::mutex mutex_;
MQId ssn_id_ = 0;
std::atomic<State> state_;
+ int proc_index_ = -1;
TopicQueryCache topic_query_cache_;
};
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index bd59c7f..f48f307 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -293,7 +293,7 @@
// }
int same = 0;
- int64_t last = 0;
+ uint64_t last = 0;
while (last < nreq * ncli && same < 2) {
Sleep(1s, false);
auto cur = Status().nreply_.load();
diff --git a/utest/robust_test.cpp b/utest/robust_test.cpp
index 0645918..e7b8894 100644
--- a/utest/robust_test.cpp
+++ b/utest/robust_test.cpp
@@ -39,19 +39,24 @@
std::atomic<uint64_t> nwrite(0);
std::atomic<uint64_t> writedone(0);
-#if 0
- typedef AtomicQueue<4> Rcb;
+#if 1
+ const int kPower = 0;
+ typedef AtomicQueue<kPower> Rcb;
Rcb tmp;
- BOOST_CHECK(tmp.like_empty());
+ // BOOST_CHECK(tmp.like_empty());
BOOST_CHECK(tmp.push(1));
- BOOST_CHECK(tmp.tail() == 1);
+ if (kPower != 0) {
+ BOOST_CHECK(tmp.tail() == 1);
+ }
BOOST_CHECK(tmp.head() == 0);
int64_t d;
BOOST_CHECK(tmp.pop(d));
- BOOST_CHECK(tmp.like_empty());
- BOOST_CHECK(tmp.head() == 1);
- BOOST_CHECK(tmp.tail() == 1);
+ if (kPower != 0) {
+ // BOOST_CHECK(tmp.like_empty());
+ BOOST_CHECK(tmp.head() == 1);
+ BOOST_CHECK(tmp.tail() == 1);
+ }
ShmObject<Rcb> rcb(shm, "test_rcb");
bool try_more = true;
@@ -166,18 +171,20 @@
BOOST_AUTO_TEST_CASE(MutexTest)
{
{
- int fd = open("/tmp/test_fmutex", O_CREAT | O_RDONLY, 0666);
- flock(fd, LOCK_EX);
- printf("lock 1");
+ int sem_id = semget(100, 1, 0666 | IPC_CREAT);
+ auto P = [&]() {
+ sembuf op = {0, -1, SEM_UNDO};
+ semop(sem_id, &op, 1);
+ };
+ auto V = [&]() {
+ sembuf op = {0, 1, SEM_UNDO};
+ semop(sem_id, &op, 1);
+ };
+ for (int i = 0; i < 10; ++i) {
+ V();
+ }
Sleep(10s);
- flock(fd, LOCK_EX);
- printf("lock 2");
- Sleep(10s);
- flock(fd, LOCK_UN);
- printf("un lock 2");
- Sleep(10s);
- flock(fd, LOCK_UN);
- printf("un lock 1");
+
return;
}
@@ -204,7 +211,7 @@
std::mutex m;
typedef std::chrono::steady_clock Clock;
- auto Now = []() { return Clock::now().time_since_epoch(); };
+
if (pi) {
auto old = *pi;
printf("int : %d, add1: %d\n", old, ++*pi);
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index f8f54f5..334c081 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -92,9 +92,9 @@
}
};
- int nwriters[] = {1, 10, 100};
+ int nwriters[] = {1, 10, 100, 1000};
int nreaders[] = {2};
- const int64_t total_msg = 1000 * 100;
+ const int64_t total_msg = 1000 * 1000;
auto Test = [&](auto &www, auto &rrr, bool isfork) {
for (auto nreader : nreaders) {
@@ -127,12 +127,13 @@
// typedef ThreadManager Manager;
// const bool isfork = IsSameType<Manager, ProcessManager>::value;
- {
+ if (0) {
ThreadManager tw, tr;
printf("---------------- Testing thread io: -------------------------------------------------------\n");
Test(tw, tr, false);
}
- {
+
+ if (1) {
ProcessManager pw, pr;
printf("================ Testing process io: =======================================================\n");
Test(pw, pr, true);
--
Gitblit v1.8.0