From cab831748a2a9cc18b7f18f3b5e14a4374b7ab68 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 17 五月 2021 18:34:26 +0800
Subject: [PATCH] socket send using abs addr, avoid shm find by id.
---
src/socket.h | 44 +---
box/center.cpp | 103 ++++++-----
.vscode/settings.json | 11 +
src/shm_msg_queue.h | 5
src/socket.cpp | 40 +++
src/defs.h | 18 +-
box/center.h | 4
src/sendq.cpp | 33 +++
src/topic_node.cpp | 100 ++++++----
src/shm_msg_queue.cpp | 6
src/defs.cpp | 8
utest/speed_test.cpp | 27 +-
proto/source/bhome_msg_api.proto | 6
utest/api_test.cpp | 14 +
src/bh_util.h | 4
src/topic_node.h | 5
src/sendq.h | 48 +++--
src/bh_api.cpp | 31 ++-
18 files changed, 307 insertions(+), 200 deletions(-)
diff --git a/.vscode/settings.json b/.vscode/settings.json
index c0b9587..df60df7 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -61,7 +61,16 @@
"strstream": "cpp",
"unordered_set": "cpp",
"cfenv": "cpp",
- "*.ipp": "cpp"
+ "*.ipp": "cpp",
+ "cassert": "cpp",
+ "cerrno": "cpp",
+ "cfloat": "cpp",
+ "ciso646": "cpp",
+ "climits": "cpp",
+ "ios": "cpp",
+ "locale": "cpp",
+ "queue": "cpp",
+ "random": "cpp"
},
"files.exclude": {
"**/*.un~": true,
diff --git a/box/center.cpp b/box/center.cpp
index 7d51f2f..2f244b4 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -105,17 +105,18 @@
auto it = msgs_.begin();
while (it != msgs_.end() && --limit > 0) {
ShmMsg msg(it->second);
- if (msg.Count() == 0) {
+ auto Free = [&]() {
msg.Free();
it = msgs_.erase(it);
++n;
- } else if (msg.timestamp() + 60 < NowSec()) {
- msg.Free();
- it = msgs_.erase(it);
- ++n;
- // LOG_DEBUG() << "release timeout msg, someone crashed.";
- } else {
+ };
+ int n = now - msg.timestamp();
+ if (n < 10) {
++it;
+ } else if (msg.Count() == 0) {
+ Free();
+ } else if (n > 60) {
+ Free();
}
}
if (n > 0) {
@@ -181,22 +182,24 @@
typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
struct NodeInfo {
- ProcState state_; // state
- std::set<Address> addrs_; // registered mqs
- ProcInfo proc_; //
- AddressTopics services_; // address: topics
- AddressTopics subscriptions_; // address: topics
+ ProcState state_; // state
+ std::map<MQId, int64_t> addrs_; // registered mqs
+ ProcInfo proc_; //
+ AddressTopics services_; // address: topics
+ AddressTopics subscriptions_; // address: topics
};
typedef std::shared_ptr<NodeInfo> Node;
typedef std::weak_ptr<NodeInfo> WeakNode;
struct TopicDest {
- Address mq_;
+ MQId mq_id_;
+ int64_t mq_abs_addr_;
WeakNode weak_node_;
- bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
+ bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; }
};
inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
- inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
+ inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); }
+ inline bool MatchAddr(std::map<Address, int64_t> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) :
id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {}
@@ -218,39 +221,38 @@
return; // ignore in exists.
}
auto UpdateRegInfo = [&](Node &node) {
- for (int i = 0; i < 10; ++i) {
- node->addrs_.insert(ssn + i);
- }
node->state_.timestamp_ = NowSec() - offline_time_;
node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
// create sockets.
+ const int nsocks = 4;
try {
- auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); };
- // alloc(-1), node, server, sub, request,
- for (int i = 0; i < 4; ++i) {
- CreateSocket(ssn + i);
- node->addrs_.insert(ssn + i);
+ for (int i = 0; i < nsocks; ++i) {
+ ShmSocket tmp(shm, true, ssn + i, 16);
+ node->addrs_.emplace(ssn + i, tmp.AbsAddr());
}
return true;
} catch (...) {
+ for (int i = 0; i < nsocks; ++i) {
+ ShmSocket::Remove(shm, ssn + i);
+ }
return false;
}
};
- auto PrepareProcInit = [&]() {
+ auto PrepareProcInit = [&](Node &node) {
bool r = false;
ShmMsg init_msg;
if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) {
// 31bit pointer, 4bit cmd+flag
int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply);
- r = SendAllocReply(socket, ssn, reply, init_msg);
+ r = SendAllocReply(socket, {ssn, node->addrs_[ssn]}, reply, init_msg);
}
return r;
};
Node node(new NodeInfo);
- if (UpdateRegInfo(node) && PrepareProcInit()) {
+ if (UpdateRegInfo(node) && PrepareProcInit(node)) {
nodes_[ssn] = node;
LOG_INFO() << "new node ssn (" << ssn << ") init";
} else {
@@ -261,13 +263,13 @@
}
void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); }
- bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg)
+ bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg)
{
RecordMsg(msg);
auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); };
return socket.Send(dest, reply, onExpireFree);
}
- bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg)
+ bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg)
{
RecordMsg(msg);
return socket.Send(dest, msg);
@@ -284,7 +286,21 @@
if (proc_rec.proc_.empty()) {
return;
}
- Address dest = proc_rec.ssn_ + socket_index;
+
+ MQInfo dest = {proc_rec.ssn_ + socket_index, 0};
+ auto FindMq = [&]() {
+ auto pos = nodes_.find(proc_rec.ssn_);
+ if (pos != nodes_.end()) {
+ for (auto &&mq : pos->second->addrs_) {
+ if (mq.first == dest.id_) {
+ dest.offset_ = mq.second;
+ return true;
+ }
+ }
+ }
+ return false;
+ };
+ if (!FindMq()) { return; }
auto size = GetAllocSize((val >> 52) & MaskBits(8));
MsgI new_msg;
@@ -337,10 +353,6 @@
// when node restart, ssn will change,
// and old node will be removed after timeout.
auto UpdateRegInfo = [&](Node &node) {
- node->addrs_.insert(SrcAddr(head));
- for (auto &addr : msg.addrs()) {
- node->addrs_.insert(addr.mq_id());
- }
node->proc_.Swap(msg.mutable_proc());
node->state_.timestamp_ = head.timestamp();
node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
@@ -420,11 +432,11 @@
auto src = SrcAddr(head);
auto &topics = msg.topics().topic_list();
node->services_[src].insert(topics.begin(), topics.end());
- TopicDest dest = {src, node};
+ TopicDest dest = {src, SrcAbsAddr(head), node};
for (auto &topic : topics) {
service_map_[topic].insert(dest);
}
- LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << *node->addrs_.begin() << " serve " << topics.size() << " topics:\n";
+ LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << node->addrs_.begin()->first << " serve " << topics.size() << " topics:\n";
for (auto &topic : topics) {
LOG_DEBUG() << "\t" << topic;
}
@@ -464,7 +476,8 @@
if (dest_node && Valid(*dest_node)) {
auto node_addr = reply.add_node_address();
node_addr->set_proc_id(dest_node->proc_.proc_id());
- node_addr->mutable_addr()->set_mq_id(dest.mq_);
+ node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
+ node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
}
}
return reply;
@@ -482,7 +495,7 @@
auto src = SrcAddr(head);
auto &topics = msg.topics().topic_list();
node->subscriptions_[src].insert(topics.begin(), topics.end());
- TopicDest dest = {src, node};
+ TopicDest dest = {src, SrcAbsAddr(head), node};
for (auto &topic : topics) {
subscribe_map_[topic].insert(dest);
}
@@ -505,7 +518,7 @@
};
if (pos != node->subscriptions_.end()) {
- const TopicDest &dest = {src, node};
+ const TopicDest &dest = {src, SrcAbsAddr(head), node};
auto &topics = msg.topics().topic_list();
// clear node sub records;
for (auto &topic : topics) {
@@ -602,7 +615,7 @@
{
auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) {
for (auto &addr_topics : node_rec) {
- TopicDest dest{addr_topics.first, node};
+ TopicDest dest{addr_topics.first, 0, node}; // abs_addr is not used.
for (auto &topic : addr_topics.second) {
auto pos = rec_map.find(topic);
if (pos != rec_map.end()) {
@@ -626,7 +639,7 @@
}
for (auto &addr : node->addrs_) {
- cleaner_(addr);
+ cleaner_(addr.first);
}
node->addrs_.clear();
@@ -678,7 +691,7 @@
{
return [&](auto &&rep_body) {
auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
- auto remote = head.route(0).mq_id();
+ MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
MsgI msg;
if (msg.Make(reply_head, rep_body)) {
DEFER1(msg.Release(););
@@ -741,7 +754,7 @@
if (node) {
// should also make sure that mq is not killed before msg expires.
// it would be ok if (kill_time - offline_time) is longer than expire time.
- socket.Send(cli.mq_, msg);
+ socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
++it;
} else {
it = clients.erase(it);
@@ -772,9 +785,9 @@
return rec;
}
-bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len)
+bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len)
{
- Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len};
+ Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mq, mq_len};
return true;
}
@@ -792,7 +805,7 @@
for (auto &kv : Centers()) {
auto &info = kv.second;
- sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_);
+ sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_);
}
}
diff --git a/box/center.h b/box/center.h
index d68573b..ebe48b4 100644
--- a/box/center.h
+++ b/box/center.h
@@ -31,7 +31,7 @@
typedef Socket::PartialRecvCB MsgHandler;
typedef Socket::RawRecvCB RawHandler;
typedef Socket::IdleCB IdleHandler;
- static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len);
+ static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len);
BHCenter(Socket::Shm &shm);
~BHCenter() { Stop(); }
@@ -44,7 +44,7 @@
MsgHandler handler_;
RawHandler raw_handler_;
IdleHandler idle_;
- MQId mqid_;
+ MQInfo mq_;
int mq_len_ = 0;
};
typedef std::map<std::string, CenterInfo> CenterRecords;
diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
index 94bc82e..8b422c7 100644
--- a/proto/source/bhome_msg_api.proto
+++ b/proto/source/bhome_msg_api.proto
@@ -9,8 +9,9 @@
message BHAddress {
uint64 mq_id = 1;
- bytes ip = 2;
- int32 port = 3;
+ int64 abs_addr = 2;
+ bytes ip = 3;
+ int32 port = 4;
}
message ProcInfo
@@ -48,7 +49,6 @@
message MsgRegister
{
ProcInfo proc = 1;
- repeated BHAddress addrs = 2;
}
message MsgUnregister
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index c9ceb20..ca7249d 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -30,16 +30,21 @@
}
std::unique_ptr<TopicNode> &ProcNodePtr()
{
- static bool init = GlobalInit(BHomeShm());
- auto InitLog = []() {
- auto id = GetProcExe();
- char path[200] = {0};
- sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str());
- ns_log::AddLog(path);
- return true;
- };
- static bool init_log = InitLog();
- static std::unique_ptr<TopicNode> ptr(new TopicNode(BHomeShm()));
+ static std::mutex mtx;
+ std::lock_guard<std::mutex> lk(mtx);
+
+ static std::unique_ptr<TopicNode> ptr;
+ if (!ptr && GlobalInit(BHomeShm())) {
+ auto InitLog = []() {
+ auto id = GetProcExe();
+ char path[200] = {0};
+ sprintf(path, "/tmp/bhshmq_node_%s.log", id.c_str());
+ ns_log::AddLog(path);
+ return true;
+ };
+ static bool init_log = InitLog();
+ ptr.reset(new TopicNode(BHomeShm()));
+ }
return ptr;
}
TopicNode &ProcNode()
@@ -114,6 +119,12 @@
return false;
}
MsgOut msg_reply;
+ auto &ptr = ProcNodePtr();
+ if (!ptr) {
+ SetLastError(eNotFound, "center not started.");
+ return 0;
+ }
+
return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
PackOutput(msg_reply, reply, reply_len);
}
diff --git a/src/bh_util.h b/src/bh_util.h
index 223da2a..15ffeb0 100644
--- a/src/bh_util.h
+++ b/src/bh_util.h
@@ -157,9 +157,9 @@
}
protected:
- static inline T &GetData()
+ static inline T &GetData(const std::string &msg = "Must set data before use!")
{
- if (!ptr()) { throw std::string("Must set ShmMsg shm before use!"); }
+ if (!ptr()) { throw std::logic_error(msg); }
return *ptr();
}
diff --git a/src/defs.cpp b/src/defs.cpp
index b812b65..6e7a5fd 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -141,10 +141,10 @@
return false;
}
-uint64_t BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_.id_; }
-uint64_t BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_.id_; }
-uint64_t BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_.id_; }
-uint64_t BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_.id_; }
+const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; }
+const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; }
+const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; }
+const MQInfo &BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_; }
int64_t CalcAllocIndex(int64_t size)
{
diff --git a/src/defs.h b/src/defs.h
index 5c770a7..cc3dc02 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -27,12 +27,12 @@
int64_t CalcAllocIndex(int64_t size);
int64_t GetAllocSize(int index);
-struct CenterInfo {
- struct MQInfo {
- int64_t id_ = 0;
- int64_t offset_ = 0;
- };
+struct MQInfo {
+ MQId id_ = 0;
+ int64_t offset_ = 0;
+};
+struct CenterInfo {
MQInfo mq_center_;
MQInfo mq_bus_;
MQInfo mq_init_;
@@ -59,9 +59,9 @@
void GetLastError(int &ec, std::string &msg);
//TODO center can check shm for previous crash.
-uint64_t BHGlobalSenderAddress();
-uint64_t BHTopicCenterAddress();
-uint64_t BHTopicBusAddress();
-uint64_t BHCenterReplyAddress();
+const MQInfo &BHGlobalSenderAddress();
+const MQInfo &BHTopicCenterAddress();
+const MQInfo &BHTopicBusAddress();
+const MQInfo &BHCenterReplyAddress();
#endif // end of include guard: DEFS_KP8LKGD0
diff --git a/src/sendq.cpp b/src/sendq.cpp
index 1eaefe6..f1e5918 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -21,6 +21,24 @@
using namespace bhome_shm;
+void SendQ::AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire)
+{
+ TimedMsg tmp(expire, MsgInfo{mq, data, std::move(onExpire)});
+ std::unique_lock<std::mutex> lock(mutex_in_);
+
+ try {
+ auto &al = in_[mq.id_];
+ if (!al.empty()) {
+ al.front().emplace_back(std::move(tmp));
+ } else {
+ al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
+ }
+ } catch (std::exception &e) {
+ LOG_ERROR() << "sendq error: " << e.what();
+ throw e;
+ }
+}
+
int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr)
{
auto FirstNotExpired = [](Array &l) {
@@ -36,7 +54,7 @@
}
}
- while (pos != arr.end() && mq.TrySend(remote, pos->data().data_)) {
+ while (pos != arr.end() && mq.TrySend(pos->data().mq_, pos->data().data_)) {
++pos;
}
@@ -59,6 +77,8 @@
bool SendQ::TrySend(ShmMsgQueue &mq)
{
std::unique_lock<std::mutex> lock(mutex_out_);
+ // if (TooFast()) { return false; }
+
size_t nsend = 0;
if (!out_.empty()) {
auto rec = out_.begin();
@@ -89,3 +109,14 @@
return !out_.empty();
}
+
+bool SendQ::TooFast()
+{
+ auto cur = NowSec();
+ if (cur > last_time_) {
+ last_time_ = cur;
+ count_ = 0;
+ }
+
+ return ++count_ > 1000 * 100;
+} // not accurate in multi-thread.
\ No newline at end of file
diff --git a/src/sendq.h b/src/sendq.h
index 9e2b5ca..d1ba30a 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -39,6 +39,7 @@
typedef int64_t Data;
typedef std::function<void(const Data &)> OnMsgEvent;
struct MsgInfo {
+ MQInfo mq_;
Data data_;
OnMsgEvent on_expire_;
};
@@ -46,45 +47,51 @@
typedef TimedMsg::TimePoint TimePoint;
typedef TimedMsg::Duration Duration;
- void Append(const Remote addr, const MsgI msg)
+ bool Append(const MQInfo &mq, MsgI msg)
{
msg.AddRef();
auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); };
- AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
+ try {
+ AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
+ return true;
+ } catch (...) {
+ msg.Release();
+ return false;
+ }
}
- void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire)
+ bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
{
msg.AddRef();
auto onMsgExpire = [onExpire](const Data &d) {
onExpire(d);
MsgI(d).Release();
};
- AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
+ try {
+ AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
+ return true;
+ } catch (...) {
+ msg.Release();
+ return false;
+ }
}
- void Append(const Remote addr, const Data command, OnMsgEvent onExpire = OnMsgEvent())
+ bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent())
{
- AppendData(addr, command, DefaultExpire(), onExpire);
+ try {
+ AppendData(mq, command, DefaultExpire(), onExpire);
+ return true;
+ } catch (...) {
+ return false;
+ }
}
bool TrySend(ShmMsgQueue &mq);
private:
static TimePoint Now() { return TimedMsg::Clock::now(); }
static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
- void AppendData(const Remote addr, const Data data, const TimePoint &expire, OnMsgEvent onExpire)
- {
- //TODO simple queue, organize later ?
+ void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire);
- TimedMsg tmp(expire, MsgInfo{data, std::move(onExpire)});
- std::unique_lock<std::mutex> lock(mutex_in_);
- auto &al = in_[addr];
- if (!al.empty()) {
- al.front().emplace_back(std::move(tmp));
- } else {
- al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
- }
- }
typedef std::deque<TimedMsg> Array;
typedef std::list<Array> ArrayList;
typedef std::unordered_map<Remote, ArrayList> Store;
@@ -92,10 +99,15 @@
int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
+ bool TooFast();
+
std::mutex mutex_in_;
std::mutex mutex_out_;
Store in_;
Store out_;
+
+ int64_t count_ = 0;
+ int64_t last_time_ = 0;
};
#endif // end of include guard: SENDQ_IWKMSK7M
diff --git a/src/shm_msg_queue.cpp b/src/shm_msg_queue.cpp
index 663da1e..1d78e8c 100644
--- a/src/shm_msg_queue.cpp
+++ b/src/shm_msg_queue.cpp
@@ -33,7 +33,7 @@
ShmMsgQueue::MQId ShmMsgQueue::NewId()
{
- static auto &id = GetData();
+ static auto &id = GetData("Must init shared memory before use! Please make sure center is running.");
return (++id) * 10;
}
@@ -96,11 +96,11 @@
return Shmq::Find(shm, MsgQIdToName(remote_id));
}
-bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote, int64_t val)
+bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQInfo &remote, const RawData val)
{
try {
//TODO find from center, or use offset.
- ShmMsgQueue dest(shm, false, remote, 1);
+ ShmMsgQueue dest(remote.offset_, shm, remote.id_);
#ifndef BH_USE_ATOMIC_Q
Guard lock(GetMutex(remote_id));
#endif
diff --git a/src/shm_msg_queue.h b/src/shm_msg_queue.h
index eead739..de60fde 100644
--- a/src/shm_msg_queue.h
+++ b/src/shm_msg_queue.h
@@ -18,6 +18,7 @@
#ifndef SHM_MSG_QUEUE_D847TQXH
#define SHM_MSG_QUEUE_D847TQXH
+#include "defs.h"
#include "msg.h"
#include "shm_queue.h"
@@ -75,8 +76,8 @@
bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
static Queue *Find(ShmType &shm, const MQId remote);
- static bool TrySend(ShmType &shm, const MQId remote, const RawData val);
- bool TrySend(const MQId remote, const RawData val) { return TrySend(shm(), remote, val); }
+ static bool TrySend(ShmType &shm, const MQInfo &remote, const RawData val);
+ bool TrySend(const MQInfo &remote, const RawData val) { return TrySend(shm(), remote, val); }
private:
#ifndef BH_USE_ATOMIC_Q
diff --git a/src/socket.cpp b/src/socket.cpp
index 4f09517..55b43f9 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -80,15 +80,13 @@
}
};
ShmMsgQueue::RawData val = 0;
- auto TryRecvMore = [&]() {
- for (int i = 0; i < 100; ++i) {
- if (mq().TryRecv(val)) {
- return true;
- }
+ for (int i = 0; i < 100; ++i) {
+ if (mq().TryRecv(val)) {
+ onRecv(val);
+ return true;
}
- return false;
- };
- return TryRecvMore() ? (onRecv(val), true) : false;
+ }
+ return false;
};
try {
@@ -160,6 +158,31 @@
return false;
}
+bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
+{
+ size_t size = content.size();
+ auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
+ if (!msg.Fill(content)) { return; }
+
+ try {
+ if (!cb) {
+ Send(remote, msg);
+ } else {
+ per_msg_cbs_->Store(msg_id, std::move(cb));
+ auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
+ RecvCB cb_no_use;
+ per_msg_cbs_->Pick(msg_id, cb_no_use);
+ };
+ Send(remote, msg, onExpireRemoveCB);
+ }
+ } catch (...) {
+ SetLastError(eError, "Send internal error.");
+ }
+ };
+
+ return RequestAlloc(size, OnResult);
+}
+
bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
// LOG_FUNCTION;
@@ -184,5 +207,6 @@
RawRecvCB cb_no_use;
alloc_cbs_->Pick(id, cb_no_use);
};
+
return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB);
}
\ No newline at end of file
diff --git a/src/socket.h b/src/socket.h
index 8e9db69..dea106c 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -59,7 +59,6 @@
{
node_proc_index_ = proc_index;
socket_index_ = socket_index;
- LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
}
// start recv.
bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
@@ -68,7 +67,7 @@
bool Stop();
template <class Body>
- bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
+ bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body)
{
try {
//TODO alloc outsiez and use send.
@@ -86,39 +85,17 @@
bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
template <class Body>
- bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
+ bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
{
- std::string msg_id(head.msg_id());
- std::string content(MsgI::Serialize(head, body));
- size_t size = content.size();
- auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
- if (!msg.Fill(content)) { return; }
-
- try {
- if (!cb) {
- Send(remote, msg);
- } else {
- per_msg_cbs_->Store(msg_id, std::move(cb));
- auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
- RecvCB cb_no_use;
- per_msg_cbs_->Pick(msg_id, cb_no_use);
- };
- Send(remote, msg, onExpireRemoveCB);
- }
- } catch (...) {
- SetLastError(eError, "Send internal error.");
- }
- };
-
- return RequestAlloc(size, OnResult);
+ return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb));
}
template <class... T>
- bool Send(const MQId remote, const MsgI &imsg, T &&...t)
+ bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t)
{
return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
}
template <class... T>
- bool Send(const MQId remote, const int64_t cmd, T &&...t)
+ bool Send(const MQInfo &remote, const int64_t cmd, T &&...t)
{
return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
}
@@ -126,7 +103,7 @@
bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
template <class Body>
- bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
+ bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
{
struct State {
std::mutex mutex;
@@ -136,6 +113,7 @@
try {
std::shared_ptr<State> st(new State);
+
auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
@@ -176,12 +154,12 @@
bool StopNoLock();
bool RunningNoLock() { return !workers_.empty(); }
+ bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB());
+
template <class... Rest>
- bool SendImpl(const MQId remote, Rest &&...rest)
+ bool SendImpl(const MQInfo &remote, Rest &&...rest)
{
- // TODO send alloc request, and pack later, higher bit means alloc?
- send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
- return true;
+ return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
}
std::vector<std::thread> workers_;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 35228b4..51a0ab7 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -28,7 +28,12 @@
namespace
{
-inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); }
+inline void AddRoute(BHMsgHead &head, const ShmSocket &sock)
+{
+ auto route = head.add_route();
+ route->set_mq_id(sock.id());
+ route->set_abs_addr(sock.AbsAddr());
+}
struct SrcInfo {
std::vector<BHAddress> route;
@@ -40,7 +45,7 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
- shm_(shm), state_(eStateUnregistered)
+ shm_(shm), state_(eStateUninited)
{
}
@@ -79,6 +84,7 @@
auto end_time = steady_clock::now() + 3s;
do {
try {
+ //TODO recv offset, avoid query.
for (int i = eSockStart; i < eSockEnd; ++i) {
sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
}
@@ -94,7 +100,6 @@
NodeInit();
}
if (!sockets_.empty()) {
- LOG_DEBUG() << "node sockets ok";
auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) {
LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val);
switch (DecodeCmd(val)) {
@@ -103,7 +108,7 @@
DEFER1(msg.Release());
MsgProcInit body;
auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_);
- head.add_route()->set_mq_id(ssn_id_);
+ AddRoute(head, socket);
if (msg.Fill(head, body)) {
socket.Send(BHTopicCenterAddress(), msg);
}
@@ -122,12 +127,12 @@
MsgProcInitReply reply;
if (imsg.ParseBody(reply)) {
SetProcIndex(reply.proc_index());
+ this->state_ = eStateUnregistered;
}
}
return true;
};
SockNode().Start(1, onMsg, onNodeCmd);
- LOG_DEBUG() << "sockets ok.";
return true;
}
return false;
@@ -167,19 +172,22 @@
SetLastError(eError, kErrMsgNotInit);
return false;
}
+ auto end_time = steady_clock::now() + milliseconds(timeout_ms);
+
+ while (state_ != eStateUnregistered && steady_clock::now() < end_time) {
+ std::this_thread::yield();
+ }
+ if (state_ != eStateUnregistered) {
+ SetLastError(eError, kErrMsgNotInit);
+ return false;
+ }
auto &sock = SockNode();
MsgRegister body;
body.mutable_proc()->Swap(&proc);
- auto AddId = [&](const MQId id) { body.add_addrs()->set_mq_id(id); };
- AddId(SockNode().id());
- AddId(SockServer().id());
- AddId(SockClient().id());
- AddId(SockSub().id());
- AddId(SockPub().id());
auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
- AddRoute(head, sock.id());
+ AddRoute(head, sock);
auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
bool ok = head.type() == kMsgTypeCommonReply &&
@@ -224,7 +232,7 @@
body.mutable_proc()->Swap(&proc);
auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
- AddRoute(head, sock.id());
+ AddRoute(head, sock);
auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
bool r = head.type() == kMsgTypeCommonReply &&
@@ -260,7 +268,7 @@
body.mutable_proc()->Swap(&proc);
auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
- AddRoute(head, sock.id());
+ AddRoute(head, sock);
if (timeout_ms == 0) {
return sock.Send(BHTopicCenterAddress(), head, body);
@@ -290,7 +298,7 @@
auto &sock = SockNode();
BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
- AddRoute(head, sock.id());
+ AddRoute(head, sock);
MsgI reply;
DEFER1(reply.Release());
@@ -312,7 +320,7 @@
body.mutable_topics()->Swap(&topics);
auto head(InitMsgHead(GetType(body), proc_id(), ssn()));
- AddRoute(head, sock.id());
+ AddRoute(head, sock);
if (timeout_ms == 0) {
return sock.Send(BHTopicCenterAddress(), head, body);
@@ -341,7 +349,7 @@
for (int i = 0; i < head.route_size() - 1; ++i) {
reply_head.add_route()->Swap(head.mutable_route(i));
}
- auto remote = head.route().rbegin()->mq_id();
+ MQInfo remote = {head.route().rbegin()->mq_id(), head.route().rbegin()->abs_addr()};
sock.Send(remote, reply_head, reply_body);
}
};
@@ -357,10 +365,17 @@
MsgRequestTopic req;
if (!imsg.ParseBody(req)) { return; }
- SrcInfo *p = new SrcInfo;
- p->route.assign(head.route().begin(), head.route().end());
- p->msg_id = head.msg_id();
- acb(p, *head.mutable_proc_id(), req);
+ try {
+ SrcInfo *p = new SrcInfo;
+ if (!p) {
+ throw std::runtime_error("no memory.");
+ }
+ p->route.assign(head.route().begin(), head.route().end());
+ p->msg_id = head.msg_id();
+ acb(p, *head.mutable_proc_id(), req);
+ } catch (std::exception &e) {
+ LOG_ERROR() << "error server handle msg:" << e.what();
+ }
};
auto &sock = SockServer();
@@ -381,11 +396,19 @@
if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
if (imsg.ParseBody(request)) {
head.mutable_proc_id()->swap(proc_id);
- SrcInfo *p = new SrcInfo;
- p->route.assign(head.route().begin(), head.route().end());
- p->msg_id = head.msg_id();
- src_info = p;
- return true;
+ try {
+ SrcInfo *p = new SrcInfo;
+ if (!p) {
+ throw std::runtime_error("no memory.");
+ }
+ p->route.assign(head.route().begin(), head.route().end());
+ p->msg_id = head.msg_id();
+ src_info = p;
+ return true;
+ } catch (std::exception &e) {
+ LOG_ERROR() << "error recv request: " << e.what();
+ return false;
+ }
}
}
return false;
@@ -409,7 +432,8 @@
for (unsigned i = 0; i < p->route.size() - 1; ++i) {
head.add_route()->Swap(&p->route[i]);
}
- return sock.Send(p->route.back().mq_id(), head, body);
+ MQInfo dest = {p->route.back().mq_id(), p->route.back().abs_addr()};
+ return sock.Send(dest, head, body);
}
bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -440,10 +464,10 @@
out_msg_id = msg_id;
- auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) {
+ auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
auto &sock = SockClient();
BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
- AddRoute(head, sock.id());
+ AddRoute(head, sock);
head.set_topic(req.topic());
if (cb) {
@@ -455,15 +479,15 @@
}
}
};
- return sock.Send(addr.mq_id(), head, req, onRecv);
+ return sock.Send(remote, head, req, onRecv);
} else {
- return sock.Send(addr.mq_id(), head, req);
+ return sock.Send(remote, head, req);
}
};
try {
BHAddress addr;
- return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb);
+ return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
} catch (...) {
SetLastError(eError, "internal error.");
return false;
@@ -484,14 +508,14 @@
if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id();
BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
- AddRoute(head, sock.id());
+ AddRoute(head, sock);
head.set_topic(request.topic());
MsgI reply_msg;
DEFER1(reply_msg.Release(););
BHMsgHead reply_head;
- if (sock.SendAndRecv(addr.mq_id(), head, request, reply_msg, reply_head, timeout_ms) &&
+ if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) &&
reply_head.type() == kMsgTypeRequestTopicReply &&
reply_msg.ParseBody(out_reply)) {
reply_head.mutable_proc_id()->swap(out_proc_id);
@@ -504,7 +528,7 @@
return false;
}
-int TopicNode::QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms)
+int TopicNode::QueryTopicServers(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms)
{
int n = 0;
MsgQueryTopic query;
@@ -532,7 +556,7 @@
return true;
}
std::vector<NodeAddress> lst;
- if (QueryRPCTopics(topic, lst, timeout_ms)) {
+ if (QueryTopicServers(topic, lst, timeout_ms)) {
addr = lst.front().addr();
if (addr.mq_id() != 0) {
topic_query_cache_.Store(topic, addr);
@@ -555,7 +579,7 @@
try {
auto &sock = SockPub();
BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
- AddRoute(head, sock.id());
+ AddRoute(head, sock);
if (timeout_ms == 0) {
return sock.Send(BHTopicBusAddress(), head, pub);
@@ -589,7 +613,7 @@
sub.mutable_topics()->Swap(&topics);
BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
- AddRoute(head, sock.id());
+ AddRoute(head, sock);
if (timeout_ms == 0) {
return sock.Send(BHTopicBusAddress(), head, sub);
} else {
diff --git a/src/topic_node.h b/src/topic_node.h
index b018807..be82cf6 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -78,7 +78,7 @@
MQId ssn() { return SockNode().id(); }
bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
typedef MsgQueryTopicReply::BHNodeAddress NodeAddress;
- int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
+ int QueryTopicServers(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
const std::string &proc_id() { return info_.proc_id(); }
typedef BHAddress Address;
@@ -139,6 +139,7 @@
}
enum State {
+ eStateUninited,
eStateUnregistered,
eStateOnline,
eStateOffline // heartbeat fail.
@@ -146,7 +147,7 @@
void state(const State st) { state_.store(st); }
void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); }
State state() const { return state_.load(); }
- bool IsOnline() { return Init() && state() == eStateOnline; }
+ bool IsOnline() { return state() == eStateOnline; }
bool Init();
bool Valid() const { return !sockets_.empty(); }
std::mutex mutex_;
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index f48f307..44c809d 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -129,7 +129,14 @@
void *reply = 0;
int reply_len = 0;
reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
- printf("register %s\n", reg ? "ok" : "failed");
+ if (reg) {
+ printf("register ok\n");
+ } else {
+ int ec = 0;
+ std::string msg;
+ GetLastError(ec, msg);
+ printf("register failed, %d, %s\n", ec, msg.c_str());
+ }
BHFree(reply, reply_len);
Sleep(1s);
@@ -239,6 +246,7 @@
DEFER1(BHFree(msg_id, len););
// Sleep(10ms, false);
std::string dest(BHAddress().SerializeAsString());
+
bool r = BHAsyncRequest(dest.data(), dest.size(), s.data(), s.size(), 0, 0);
if (r) {
++Status().nrequest_;
@@ -294,11 +302,12 @@
int same = 0;
uint64_t last = 0;
- while (last < nreq * ncli && same < 2) {
+ while (last < nreq * ncli && same < 3) {
Sleep(1s, false);
auto cur = Status().nreply_.load();
if (last == cur) {
++same;
+ printf("same %d\n", same);
} else {
last = cur;
same = 0;
@@ -308,6 +317,7 @@
run = false;
threads.WaitAll();
auto &st = Status();
+ Sleep(1s);
printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
BHCleanup();
printf("after cleanup\n");
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 66e5179..4dea623 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -24,16 +24,8 @@
{
SharedMemory &shm = TestShm();
GlobalInit(shm);
- auto InitSem = [](auto id) {
- auto sem_id = semget(id, 1, 0666 | IPC_CREAT);
- union semun init_val;
- init_val.val = 1;
- semctl(sem_id, 0, SETVAL, init_val);
- return;
- };
-
- MQId id = ShmMsgQueue::NewId();
- InitSem(id);
+ MQId server_id = ShmMsgQueue::NewId();
+ ShmMsgQueue server(server_id, shm, 1000);
const int timeout = 1000;
const uint32_t data_size = 1001;
@@ -44,7 +36,6 @@
std::string str(data_size, 'a');
auto Writer = [&](int writer_id, uint64_t n) {
MQId cli_id = ShmMsgQueue::NewId();
- InitSem(cli_id);
ShmMsgQueue mq(cli_id, shm, 64);
MsgI msg;
@@ -58,12 +49,12 @@
for (uint64_t i = 0; i < n; ++i) {
msg.AddRef();
- while (!mq.TrySend(id, msg.Offset())) {}
+ while (!mq.TrySend({server.Id(), server.AbsAddr()}, msg.Offset())) {}
++nwrite;
}
};
auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
- ShmMsgQueue mq(id, shm, 1000);
+ ShmMsgQueue &mq = server;
auto now = []() { return steady_clock::now(); };
auto tm = now();
while (*run) {
@@ -189,8 +180,10 @@
req_body.set_topic("topic");
req_body.set_data(msg_content);
auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id()));
- req_head.add_route()->set_mq_id(cli.id());
- return cli.Send(srv.id(), req_head, req_body);
+ auto route = req_head.add_route();
+ route->set_mq_id(cli.id());
+ route->set_abs_addr(cli.AbsAddr());
+ return cli.Send({srv.id(), srv.AbsAddr()}, req_head, req_body);
};
Req();
@@ -207,13 +200,13 @@
DEFER1(req.Release());
if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
- auto src_id = req_head.route()[0].mq_id();
+ MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()};
auto Reply = [&]() {
MsgRequestTopic reply_body;
reply_body.set_topic("topic");
reply_body.set_data(msg_content);
auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
- return srv.Send(src_id, reply_head, reply_body);
+ return srv.Send(src_mq, reply_head, reply_body);
};
Reply();
}
--
Gitblit v1.8.0