From c338820e4db43ad32c20ff429a038b06bcb980f8 Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 08 四月 2021 18:13:25 +0800
Subject: [PATCH] BIG change, join center,bus; now msg is head+body.
---
.vscode/tasks.json | 4
.gitignore | 4
src/socket.cpp | 73 +
utest/utest.cpp | 75 +
src/topic_node.cpp | 322 ++++++++++
src/center.cpp | 402 ++++++++++++
src/msg.cpp | 145 ---
utest/speed_test.cpp | 68 +
proto/source/bhome_msg_api.proto | 71 ++
src/center.h | 3
src/proto.cpp | 41 +
src/socket.h | 97 ++
utest/simple_tests.cpp | 10
src/proto.h | 78 ++
.vscode/settings.json | 7
src/msg.h | 69 +
proto/source/bhome_msg.proto | 84 --
proto/source/error_msg.proto | 5
src/shm_queue.cpp | 19
/dev/null | 108 ---
.vscode/launch.json | 2
src/pubsub.h | 12
src/shm_queue.h | 10
src/pubsub.cpp | 56 +
src/topic_node.h | 121 +++
25 files changed, 1,406 insertions(+), 480 deletions(-)
diff --git a/.gitignore b/.gitignore
index 5c7daa3..8e81403 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,8 @@
*.un~
build/
+debug/
+release/
Makefile
utest/utest
+*.bak
+gmon.out
diff --git a/.vscode/launch.json b/.vscode/launch.json
index 9eeb23e..ef42f7b 100644
--- a/.vscode/launch.json
+++ b/.vscode/launch.json
@@ -8,7 +8,7 @@
"name": "g++ - Build and debug active file",
"type": "cppdbg",
"request": "launch",
- "program": "${workspaceFolder}/utest/utest",
+ "program": "${workspaceFolder}/debug/bin/utest",
"args": [
"-t",
"ReqRepTest"
diff --git a/.vscode/settings.json b/.vscode/settings.json
index d5005e9..7928bc8 100644
--- a/.vscode/settings.json
+++ b/.vscode/settings.json
@@ -55,7 +55,12 @@
"cinttypes": "cpp",
"typeindex": "cpp",
"typeinfo": "cpp",
- "variant": "cpp"
+ "variant": "cpp",
+ "iomanip": "cpp",
+ "*.inc": "cpp",
+ "strstream": "cpp",
+ "unordered_set": "cpp",
+ "cfenv": "cpp"
},
"files.exclude": {
"**/*.un~": true
diff --git a/.vscode/tasks.json b/.vscode/tasks.json
index 84142bc..db457e5 100644
--- a/.vscode/tasks.json
+++ b/.vscode/tasks.json
@@ -6,10 +6,10 @@
"command": "ninja",
"args": [
"-C",
- "../build"
+ "debug"
],
"options": {
- "cwd": "${workspaceFolder}/utest"
+ "cwd": "${workspaceFolder}"
},
"problemMatcher": [
"$gcc"
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index 9827f17..b06b692 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -1,39 +1,21 @@
syntax = "proto3";
-
option optimize_for = LITE_RUNTIME;
-import "google/protobuf/descriptor.proto";
-import "error_msg.proto";
+// import "google/protobuf/descriptor.proto";
+import "bhome_msg_api.proto";
package bhome.msg;
-// message format : header(BHMsgHead) + body(variable types)
-message BHAddress {
- bytes mq_id = 1; // mqid, uuid
- bytes ip = 2; //
- int32 port = 3;
-}
-
-message ProcInfo
-{
- bytes id = 1; // serial number, maybe managed
- bytes name = 2;
- bytes public_info = 3;
- bytes private_info = 4;
-}
+// message format : head_len(4) + head(BHMsgHead) + body_len(4) + body(variable types)
message BHMsgHead {
bytes msg_id = 1;
repeated BHAddress route = 2; // for reply and proxy.
int64 timestamp = 3;
int32 type = 4;
- ProcInfo proc = 5;
+ bytes proc_id = 5;
bytes topic = 6; // for request route
-}
-
-message BHMsgBody {
- bytes data = 1;
}
message BHMsg { // deprecated
@@ -46,6 +28,7 @@
enum MsgType {
kMsgTypeInvalid = 0;
+ kMsgTypeRawData = 1;
kMsgTypeCommonReply = 2;
@@ -57,57 +40,16 @@
kMsgTypeQueryTopicReply = 15;
kMsgTypeRequestTopic = 16;
kMsgTypeRequestTopicReply = 17;
+ kMsgTypeRegisterRPC = 18;
+ // reply
- kMsgTypePublish = 100;
- // kMsgTypePublishReply = 101;
- kMsgTypeSubscribe = 102;
- // kMsgTypeSubscribeReply = 103;
- kMsgTypeUnsubscribe = 104;
- // kMsgTypeUnsubscribeReply = 105;
+ kMsgTypePublish = 20;
+ // kMsgTypePublishReply = 21;
+ kMsgTypeSubscribe = 22;
+ // kMsgTypeSubscribeReply = 23;
+ kMsgTypeUnsubscribe = 24;
+ // kMsgTypeUnsubscribeReply = 25;
-}
-
-message MsgPub {
- bytes topic = 1;
- bytes data = 2;
-}
-
-message MsgSub {
- repeated bytes topics = 1;
-}
-
-message MsgCommonReply {
- ErrorMsg errmsg = 1;
-}
-
-message MsgRequestTopic {
- bytes topic = 1;
- bytes data = 2;
-}
-
-message MsgRequestTopicReply {
- ErrorMsg errmsg = 1;
- bytes data = 2;
-}
-
-message MsgRegister
-{
- ProcInfo proc = 1;
- repeated bytes topics = 2;
-}
-
-message MsgHeartbeat
-{
- ProcInfo proc = 1;
-}
-
-message MsgQueryTopic {
- bytes topic = 1;
-}
-
-message MsgQueryTopicReply {
- ErrorMsg errmsg = 1;
- BHAddress address = 2;
}
service TopicRPC {
diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
new file mode 100644
index 0000000..82b8115
--- /dev/null
+++ b/proto/source/bhome_msg_api.proto
@@ -0,0 +1,71 @@
+syntax = "proto3";
+option optimize_for = LITE_RUNTIME;
+
+// public messages
+import "error_msg.proto";
+
+package bhome.msg;
+
+message BHAddress {
+ bytes mq_id = 1; // mqid, uuid
+ // bytes ip = 2; //
+ // int32 port = 3;
+}
+
+message ProcInfo
+{
+ bytes proc_id = 1; // serial number, maybe managed
+ bytes name = 2;
+ bytes public_info = 3; // maybe json.
+ bytes private_info = 4;
+}
+
+message MsgPublish {
+ bytes topic = 1;
+ bytes data = 2;
+}
+
+message MsgSubscribe {
+ repeated bytes topics = 1;
+}
+message MsgUnsubscribe {
+ repeated bytes topics = 1;
+}
+
+message MsgCommonReply {
+ ErrorMsg errmsg = 1;
+}
+
+message MsgRequestTopic {
+ bytes topic = 1;
+ bytes data = 2;
+}
+
+message MsgRequestTopicReply {
+ ErrorMsg errmsg = 1;
+ bytes data = 2;
+}
+
+message MsgRegister
+{
+ ProcInfo proc = 1;
+}
+
+message MsgRegisterRPC
+{
+ repeated bytes topics = 1;
+}
+
+message MsgHeartbeat
+{
+ ProcInfo proc = 1;
+}
+
+message MsgQueryTopic {
+ bytes topic = 1;
+}
+
+message MsgQueryTopicReply {
+ ErrorMsg errmsg = 1;
+ BHAddress address = 2;
+}
diff --git a/proto/source/error_msg.proto b/proto/source/error_msg.proto
index f283108..b85ddb3 100644
--- a/proto/source/error_msg.proto
+++ b/proto/source/error_msg.proto
@@ -8,6 +8,11 @@
eSuccess = 0;
eError = 1;
eInvalidInput = 2;
+ eNotRegistered = 3;
+ eNotFound = 4;
+ eOffline = 5;
+ eNoRespond = 6;
+ eAddressNotMatch = 7;
}
message ErrorMsg {
diff --git a/src/center.cpp b/src/center.cpp
index a3897fb..fe549b7 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -16,20 +16,387 @@
* =====================================================================================
*/
#include "center.h"
+#include "bh_util.h"
#include "defs.h"
-#include "pubsub_center.h"
-#include "reqrep_center.h"
#include "shm.h"
+#include <set>
using namespace bhome_shm;
+using namespace bhome_msg;
+using namespace bhome::msg;
typedef BHCenter::MsgHandler Handler;
-Handler Join(Handler h1, Handler h2)
+namespace
{
- return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg) {
- return h1(socket, imsg, msg) || h2(socket, imsg, msg);
+auto Now = []() { time_t t; return time(&t); };
+
+//TODO check proc_id
+class NodeCenter
+{
+public:
+ typedef std::string ProcId;
+ typedef std::string Address;
+ typedef bhome::msg::ProcInfo ProcInfo;
+
+private:
+ enum {
+ kStateInvalid = 0,
+ kStateNormal = 1,
+ kStateNoRespond = 2,
+ kStateOffline = 3,
+ };
+
+ struct ProcState {
+ time_t timestamp_ = 0;
+ uint32_t flag_ = 0; // reserved
+ };
+ typedef std::unordered_map<Address, std::set<Topic>> AddressTopics;
+
+ struct NodeInfo {
+ ProcState state_; // state
+ Address addr_; // registered_mqid.
+ ProcInfo proc_; //
+ AddressTopics services_; // address: topics
+ AddressTopics subscriptions_; // address: topics
+ };
+ typedef std::shared_ptr<NodeInfo> Node;
+ typedef std::weak_ptr<NodeInfo> WeakNode;
+
+ struct TopicDest {
+ Address mq_;
+ WeakNode weak_node_;
+ bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
+ };
+ const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
+
+public:
+ typedef std::set<TopicDest> Clients;
+
+ NodeCenter(const std::string &id = "#Center") :
+ id_(id) {}
+ const std::string &id() const { return id_; } // no need to lock.
+
+ //TODO maybe just return serialized string.
+ MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
+ {
+ if (msg.proc().proc_id() != head.proc_id()) {
+ return MakeReply(eInvalidInput, "invalid proc id.");
+ }
+
+ try {
+ Node node(new NodeInfo);
+ node->addr_ = SrcAddr(head);
+ node->proc_.Swap(msg.mutable_proc());
+ node->state_.timestamp_ = Now();
+ node->state_.flag_ = kStateNormal;
+ nodes_[node->proc_.proc_id()] = node;
+ return MakeReply(eSuccess);
+ } catch (...) {
+ return MakeReply(eError, "register node error.");
+ }
+ }
+ template <class OnSuccess, class OnError>
+ auto HandleMsg(const BHMsgHead &head, OnSuccess onOk, OnError onErr)
+ {
+ auto pos = nodes_.find(head.proc_id());
+ if (pos == nodes_.end()) {
+ return onErr(eNotRegistered, "Node is not registered.");
+ } else {
+ auto node = pos->second;
+ if (head.type() == kMsgTypeHeartbeat && node->addr_ != SrcAddr(head)) {
+ return onErr(eAddressNotMatch, "Node address error.");
+ } else if (!Valid(*node)) {
+ return onErr(eNoRespond, "Node is not alive.");
+ } else {
+ return onOk(node);
+ }
+ }
+ }
+
+ template <class Reply, class Func>
+ Reply HandleMsg(const BHMsgHead &head, Func const &op)
+ {
+ try {
+ auto onErr = [](const ErrorCode ec, const std::string &str) { return MakeReply<Reply>(ec, str); };
+ return HandleMsg(head, op, onErr);
+
+ auto pos = nodes_.find(head.proc_id());
+ if (pos == nodes_.end()) {
+ return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
+ } else {
+ auto node = pos->second;
+ if (node->addr_ != SrcAddr(head)) {
+ return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
+ } else if (!Valid(*node)) {
+ return MakeReply<Reply>(eNoRespond, "Node is not alive.");
+ } else {
+ return op(node);
+ }
+ }
+ } catch (...) {
+ //TODO error log
+ return MakeReply<Reply>(eError, "internal error.");
+ }
+ }
+ template <class Func>
+ inline MsgCommonReply HandleMsg(const BHMsgHead &head, Func const &op)
+ {
+ return HandleMsg<MsgCommonReply, Func>(head, op);
+ }
+
+ MsgCommonReply RegisterRPC(const BHMsgHead &head, MsgRegisterRPC &msg)
+ {
+ return HandleMsg(
+ head, [&](Node node) -> MsgCommonReply {
+ auto &src = SrcAddr(head);
+ node->services_[src].insert(msg.topics().begin(), msg.topics().end());
+ TopicDest dest = {src, node};
+ for (auto &topic : msg.topics()) {
+ service_map_[topic].insert(dest);
+ }
+ return MakeReply(eSuccess);
+ });
+ }
+
+ MsgCommonReply Heartbeat(const BHMsgHead &head, const MsgHeartbeat &msg)
+ {
+ return HandleMsg(head, [&](Node node) {
+ NodeInfo &ni = *node;
+ ni.state_.timestamp_ = Now();
+ auto &info = msg.proc();
+ if (!info.public_info().empty()) {
+ ni.proc_.set_public_info(info.public_info());
+ }
+ if (!info.private_info().empty()) {
+ ni.proc_.set_private_info(info.private_info());
+ }
+ return MakeReply(eSuccess);
+ });
+ }
+
+ MsgQueryTopicReply QueryTopic(const BHMsgHead &head, const MsgQueryTopic &req)
+ {
+ typedef MsgQueryTopicReply Reply;
+
+ auto query = [&](Node self) -> MsgQueryTopicReply {
+ auto pos = service_map_.find(req.topic());
+ if (pos != service_map_.end() && !pos->second.empty()) {
+ // now just find first one.
+ const TopicDest &dest = *(pos->second.begin());
+ Node dest_node(dest.weak_node_.lock());
+ if (!dest_node) {
+ service_map_.erase(pos);
+ return MakeReply<Reply>(eOffline, "topic server offline.");
+ } else if (!Valid(*dest_node)) {
+ return MakeReply<Reply>(eNoRespond, "topic server not responding.");
+ } else {
+ MsgQueryTopicReply reply = MakeReply<Reply>(eSuccess);
+ reply.mutable_address()->set_mq_id(dest.mq_);
+ return reply;
+ }
+
+ } else {
+ return MakeReply<Reply>(eNotFound, "topic server not found.");
+ }
+ };
+
+ return HandleMsg<Reply>(head, query);
+ }
+
+ MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
+ {
+ return HandleMsg(head, [&](Node node) {
+ auto &src = SrcAddr(head);
+ node->subscriptions_[src].insert(msg.topics().begin(), msg.topics().end());
+ TopicDest dest = {src, node};
+ for (auto &topic : msg.topics()) {
+ subscribe_map_[topic].insert(dest);
+ }
+ return MakeReply(eSuccess);
+ });
+ }
+ MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
+ {
+ return HandleMsg(head, [&](Node node) {
+ auto &src = SrcAddr(head);
+ auto pos = node->subscriptions_.find(src);
+
+ auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
+ auto pos = subscribe_map_.find(topic);
+ if (pos != subscribe_map_.end() &&
+ pos->second.erase(dest) != 0 &&
+ pos->second.empty()) {
+ subscribe_map_.erase(pos);
+ }
+ };
+
+ if (pos != node->subscriptions_.end()) {
+ const TopicDest &dest = {src, node};
+ // clear node sub records;
+ for (auto &topic : msg.topics()) {
+ pos->second.erase(topic);
+ RemoveSubTopicDestRecord(topic, dest);
+ }
+ if (pos->second.empty()) {
+ node->subscriptions_.erase(pos);
+ }
+ }
+ return MakeReply(eSuccess);
+ });
+ }
+
+ Clients DoFindClients(const std::string &topic)
+ {
+ Clients dests;
+ auto Find1 = [&](const std::string &t) {
+ auto pos = subscribe_map_.find(topic);
+ if (pos != subscribe_map_.end()) {
+ auto &clients = pos->second;
+ for (auto &cli : clients) {
+ if (Valid(cli.weak_node_)) {
+ dests.insert(cli);
+ }
+ }
+ }
+ };
+ Find1(topic);
+
+ size_t pos = 0;
+ while (true) {
+ pos = topic.find(kTopicSep, pos);
+ if (pos == topic.npos || ++pos == topic.size()) {
+ // Find1(std::string()); // sub all.
+ break;
+ } else {
+ Find1(topic.substr(0, pos));
+ }
+ }
+ return dests;
+ }
+ bool FindClients(const BHMsgHead &head, const MsgPublish &msg, Clients &out, MsgCommonReply &reply)
+ {
+ bool ret = false;
+ HandleMsg(head, [&](Node node) {
+ DoFindClients(msg.topic()).swap(out);
+ ret = true;
+ return MakeReply(eSuccess);
+ }).Swap(&reply);
+ return ret;
+ }
+
+private:
+ bool Valid(const NodeInfo &node)
+ {
+ return node.state_.flag_ == kStateNormal;
+ }
+ bool Valid(const WeakNode &weak)
+ {
+ auto node = weak.lock();
+ return node && Valid(*node);
+ }
+ void CheckAllNodes(); //TODO, call it in timer.
+ std::string id_; // center proc id;
+
+ std::unordered_map<Topic, Clients> service_map_;
+ std::unordered_map<Topic, Clients> subscribe_map_;
+ std::unordered_map<ProcId, Node> nodes_;
+};
+
+template <class Body, class OnMsg, class Replyer>
+inline void Dispatch(MsgI &msg, BHMsgHead &head, OnMsg const &onmsg, Replyer const &replyer)
+{
+ if (head.route_size() != 1) { return; }
+ Body body;
+ if (msg.ParseBody(body)) {
+ replyer(onmsg(body));
+ }
+}
+
+Handler Combine(const Handler &h1, const Handler &h2)
+{
+ return [h1, h2](ShmSocket &socket, bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head) {
+ return h1(socket, msg, head) || h2(socket, msg, head);
};
}
+template <class... H>
+Handler Combine(const Handler &h0, const Handler &h1, const Handler &h2, const H &...rest)
+{
+ return Combine(Combine(h0, h1), h2, rest...);
+}
+
+#define CASE_ON_MSG_TYPE(MsgTag) \
+ case kMsgType##MsgTag: \
+ Dispatch<Msg##MsgTag>( \
+ msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
+ return true;
+
+bool InstallCenter()
+{
+ auto center_ptr = std::make_shared<Synced<NodeCenter>>();
+ auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
+ return [&](auto &&rep_body) {
+ auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
+ bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10);
+ if (!r) {
+ printf("send reply failed.\n");
+ }
+ //TODO resend failed.
+ };
+ };
+
+ auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+ auto ¢er = *center_ptr;
+ auto replyer = MakeReplyer(socket, head, center->id());
+ switch (head.type()) {
+ CASE_ON_MSG_TYPE(Register);
+ CASE_ON_MSG_TYPE(Heartbeat);
+
+ CASE_ON_MSG_TYPE(RegisterRPC);
+ CASE_ON_MSG_TYPE(QueryTopic);
+ default: return false;
+ }
+ };
+
+ auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
+ auto ¢er = *center_ptr;
+ auto replyer = MakeReplyer(socket, head, center->id());
+ auto OnPublish = [&]() {
+ MsgPublish pub;
+ NodeCenter::Clients clients;
+ MsgCommonReply reply;
+ MsgI pubmsg;
+ if (head.route_size() != 1 || !msg.ParseBody(pub)) {
+ return;
+ } else if (!center->FindClients(head, pub, clients, reply)) {
+ // send error reply.
+ MakeReplyer(socket, head, center->id())(reply);
+ } else if (pubmsg.MakeRC(socket.shm(), msg)) {
+ DEFER1(pubmsg.Release(socket.shm()));
+ for (auto &cli : clients) {
+ auto node = cli.weak_node_.lock();
+ if (node) {
+ socket.Send(cli.mq_.data(), pubmsg, 10);
+ }
+ }
+ }
+ };
+ switch (head.type()) {
+ CASE_ON_MSG_TYPE(Subscribe);
+ CASE_ON_MSG_TYPE(Unsubscribe);
+ case kMsgTypePublish: OnPublish(); return true;
+ default: return false;
+ }
+ };
+
+ BHCenter::Install("#center.reg", OnCenter, BHTopicCenterAddress(), 1000);
+ BHCenter::Install("#center.bus", OnPubSub, BHTopicBusAddress(), 1000);
+
+ return true;
+}
+
+#undef CASE_ON_MSG_TYPE
+
+} // namespace
SharedMemory &BHomeShm()
{
@@ -42,17 +409,24 @@
static CenterRecords rec;
return rec;
}
+
bool BHCenter::Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len)
{
- CenterRecords()[name] = CenterInfo{name, handler, mqid, mq_len};
+ Centers()[name] = CenterInfo{name, handler, mqid, mq_len};
+ return true;
+}
+bool BHCenter::Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len)
+{
+ return Install(name, handler, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
}
BHCenter::BHCenter(Socket::Shm &shm)
{
- sockets_["center"] = std::make_shared<ShmSocket>(shm, &BHTopicCenterAddress(), 1000);
- sockets_["bus"] = std::make_shared<ShmSocket>(shm, &BHTopicBusAddress(), 1000);
+ InstallCenter();
+
for (auto &kv : Centers()) {
- sockets_[kv.first] = std::make_shared<ShmSocket>(shm, kv.second.mqid_.data(), kv.second.mq_len_);
+ auto &info = kv.second;
+ sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_);
}
}
@@ -61,16 +435,12 @@
bool BHCenter::Start()
{
- auto onCenter = MakeReqRepCenter();
- auto onBus = MakeBusCenter();
- sockets_["center"]->Start(onCenter);
- sockets_["bus"]->Start(onBus);
-
for (auto &kv : Centers()) {
- sockets_[kv.first]->Start(kv.second.handler_);
+ auto &info = kv.second;
+ sockets_[info.name_]->Start(info.handler_);
}
+
return true;
- // socket_.Start(Join(onCenter, onBus));
}
bool BHCenter::Stop()
diff --git a/src/center.h b/src/center.h
index 02ec8f4..920addd 100644
--- a/src/center.h
+++ b/src/center.h
@@ -28,8 +28,9 @@
typedef ShmSocket Socket;
public:
- typedef std::function<bool(ShmSocket &socket, bhome_msg::MsgI &imsg, bhome::msg::BHMsg &msg)> MsgHandler;
+ typedef Socket::PartialRecvCB MsgHandler;
static bool Install(const std::string &name, MsgHandler handler, const std::string &mqid, const int mq_len);
+ static bool Install(const std::string &name, MsgHandler handler, const MQId &mqid, const int mq_len);
BHCenter(Socket::Shm &shm);
BHCenter();
diff --git a/src/msg.cpp b/src/msg.cpp
index 8752066..c353d84 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -25,140 +25,38 @@
center accept request and route.;
//*/
const uint32_t kMsgTag = 0xf1e2d3c4;
-const uint32_t kMsgPrefixLen = 4;
-inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); }
-
-std::string RandId()
+void *MsgI::Pack(SharedMemory &shm,
+ const uint32_t head_len, const ToArray &headToArray,
+ const uint32_t body_len, const ToArray &bodyToArray)
{
- boost::uuids::uuid id = boost::uuids::random_generator()();
- return std::string((char *) &id, sizeof(id));
-}
-BHMsg InitMsg(MsgType type, const std::string &msgid = RandId())
-{
- BHMsg msg;
- msg.set_msg_id(msgid);
- msg.set_type(type);
- time_t tm = 0;
- msg.set_timestamp(time(&tm));
- return msg;
-}
-
-BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
-{
- BHMsg msg(InitMsg(kMsgTypeRequestTopic));
- AddRoute(msg, src_id);
- MsgRequestTopic req;
- req.set_topic(topic);
- req.set_data(data, size);
- msg.set_body(req.SerializeAsString());
- return msg;
-}
-
-BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
-{
- BHMsg msg(InitMsg(kMsgTypeRegister));
- AddRoute(msg, src_id);
- MsgRegister reg;
- reg.mutable_proc()->Swap(&info);
- for (auto &t : topics) {
- reg.add_topics(t);
+ void *addr = shm.Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len);
+ if (addr) {
+ auto p = static_cast<char *>(addr);
+ auto Pack1 = [&p](auto len, auto &writer) {
+ Put32(p, len);
+ p += sizeof(len);
+ writer(p, len);
+ p += len;
+ };
+ Pack1(head_len, headToArray);
+ Pack1(body_len, bodyToArray);
}
- msg.set_body(reg.SerializeAsString());
- return msg;
+ return addr;
}
-BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
+bool MsgI::ParseHead(BHMsgHead &head) const
{
- BHMsg msg(InitMsg(kMsgTypeHeartbeat));
- AddRoute(msg, src_id);
- MsgHeartbeat reg;
- reg.mutable_proc()->Swap(&info);
- msg.set_body(reg.SerializeAsString());
- return msg;
-}
-
-BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
-{
- assert(data && size);
- BHMsg msg(InitMsg(kMsgTypeRequestTopicReply, src_msgid));
- MsgRequestTopicReply reply;
- reply.set_data(data, size);
- msg.set_body(reply.SerializeAsString());
- return msg;
-}
-
-BHMsg MakeSubUnsub(const MQId &client, const std::vector<std::string> &topics, const MsgType sub_unsub)
-{
- assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
- BHMsg msg(InitMsg(sub_unsub));
- AddRoute(msg, client);
- MsgSub subs;
- for (auto &t : topics) {
- subs.add_topics(t);
- }
- msg.set_body(subs.SerializeAsString());
- return msg;
-}
-
-BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeSubscribe); }
-BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics) { return MakeSubUnsub(client, topics, kMsgTypeUnsubscribe); }
-
-BHMsg MakePub(const std::string &topic, const void *data, const size_t size)
-{
- assert(data && size);
- BHMsg msg(InitMsg(kMsgTypePublish));
- MsgPub pub;
- pub.set_topic(topic);
- pub.set_data(data, size);
- msg.set_body(pub.SerializeAsString());
- return msg;
-}
-
-BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
-{
- BHMsg msg(InitMsg(kMsgTypeQueryTopic));
- AddRoute(msg, client);
- MsgQueryTopic query;
- query.set_topic(topic);
- msg.set_body(query.SerializeAsString());
- return msg;
-}
-BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
-{
- BHMsg msg(InitMsg(kMsgTypeQueryTopicReply, msgid));
- MsgQueryTopicReply reply;
- reply.mutable_address()->set_mq_id(mqid);
- msg.set_body(reply.SerializeAsString());
- return msg;
-}
-
-void *Pack(SharedMemory &shm, const BHMsg &msg)
-{
- uint32_t msg_size = msg.ByteSizeLong();
- void *p = shm.Alloc(4 + msg_size);
- if (p) {
- Put32(p, msg_size);
- if (!msg.SerializeToArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size)) {
- shm.Dealloc(p);
- p = 0;
- }
- }
- return p;
-}
-
-bool MsgI::Unpack(BHMsg &msg) const
-{
- void *p = ptr_.get();
+ auto p = static_cast<char *>(ptr_.get());
assert(p);
uint32_t msg_size = Get32(p);
- return msg.ParseFromArray(static_cast<char *>(p) + kMsgPrefixLen, msg_size);
+ p += 4;
+ return head.ParseFromArray(p, msg_size);
}
// with ref count;
-bool MsgI::MakeRC(SharedMemory &shm, const BHMsg &msg)
+bool MsgI::MakeRC(SharedMemory &shm, void *p)
{
- void *p = Pack(shm, msg);
if (!p) {
return false;
}
@@ -171,9 +69,8 @@
return true;
}
-bool MsgI::Make(SharedMemory &shm, const BHMsg &msg)
+bool MsgI::Make(SharedMemory &shm, void *p)
{
- void *p = Pack(shm, msg);
if (!p) {
return false;
}
diff --git a/src/msg.h b/src/msg.h
index 30b3208..661d989 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -18,10 +18,12 @@
#ifndef MSG_5BILLZET
#define MSG_5BILLZET
-#include "bhome_msg.pb.h"
+#include "bh_util.h"
+#include "proto.h"
#include "shm.h"
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/uuid/uuid_generators.hpp>
+#include <functional>
#include <stdint.h>
namespace bhome_msg
@@ -59,16 +61,6 @@
int num_ = 1;
};
-BHMsg MakeQueryTopic(const MQId &client, const std::string &topic);
-BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid);
-BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size);
-BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size);
-BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics);
-BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info);
-BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
-BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
-BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
-
// message content layout: header_size + header + data_size + data
class MsgI
{
@@ -76,7 +68,22 @@
offset_ptr<void> ptr_;
offset_ptr<RefCount> count_;
- bool BuildSubOrUnsub(SharedMemory &shm, const std::vector<std::string> &topics, const MsgType sub_unsub);
+ typedef std::function<void(void *p, int len)> ToArray;
+ void *Pack(SharedMemory &shm,
+ const uint32_t head_len, const ToArray &headToArray,
+ const uint32_t body_len, const ToArray &bodyToArray);
+
+ template <class Body>
+ void *Pack(SharedMemory &shm, const BHMsgHead &head, const Body &body)
+ {
+ return Pack(
+ shm,
+ uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); },
+ uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
+ }
+
+ bool MakeRC(SharedMemory &shm, void *addr);
+ bool Make(SharedMemory &shm, void *addr);
public:
MsgI(void *p = 0, RefCount *c = 0) :
@@ -97,9 +104,41 @@
int Count() const { return IsCounted() ? count_->Get() : 1; }
bool IsCounted() const { return static_cast<bool>(count_); }
- bool Make(SharedMemory &shm, const BHMsg &msg);
- bool MakeRC(SharedMemory &shm, const BHMsg &msg);
- bool Unpack(BHMsg &msg) const;
+ template <class Body>
+ bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
+ {
+ return Make(shm, Pack(shm, head, body));
+ }
+ template <class Body>
+ bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
+ {
+ return MakeRC(shm, Pack(shm, head, body));
+ }
+ bool MakeRC(SharedMemory &shm, MsgI &a)
+ {
+ if (a.IsCounted()) {
+ *this = a;
+ AddRef();
+ return true;
+ } else {
+ void *p = a.ptr_.get();
+ a.ptr_ = 0;
+ return MakeRC(shm, p);
+ }
+ }
+ bool ParseHead(BHMsgHead &head) const;
+ template <class Body>
+ bool ParseBody(Body &body) const
+ {
+ auto p = static_cast<char *>(ptr_.get());
+ assert(p);
+ uint32_t size = Get32(p);
+ p += 4;
+ p += size;
+ size = Get32(p);
+ p += 4;
+ return body.ParseFromArray(p, size);
+ }
};
inline void swap(MsgI &m1, MsgI &m2) { m1.swap(m2); }
diff --git a/src/proto.cpp b/src/proto.cpp
new file mode 100644
index 0000000..0ec894f
--- /dev/null
+++ b/src/proto.cpp
@@ -0,0 +1,41 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: proto.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�07鏃� 17鏃�04鍒�36绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "proto.h"
+#include <boost/uuid/uuid_generators.hpp>
+
+std::string RandId()
+{
+ boost::uuids::uuid id = boost::uuids::random_generator()();
+ return std::string((char *) &id, sizeof(id));
+}
+
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id)
+{
+ return InitMsgHead(type, proc_id, RandId());
+}
+
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid)
+{
+ BHMsgHead msg;
+ msg.set_msg_id(msgid);
+ msg.set_type(type);
+ msg.set_proc_id(proc_id);
+ time_t tm = 0;
+ msg.set_timestamp(time(&tm));
+ return msg;
+}
diff --git a/src/proto.h b/src/proto.h
new file mode 100644
index 0000000..2057711
--- /dev/null
+++ b/src/proto.h
@@ -0,0 +1,78 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: proto.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�07鏃� 13鏃�48鍒�51绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef PROTO_UA9UWKL1
+#define PROTO_UA9UWKL1
+
+#include "bhome_msg.pb.h"
+
+using namespace bhome::msg;
+
+template <class Msg>
+struct MsgToType {
+};
+
+#define BHOME_MAP_MSG_AND_TYPE(mSG, tYPE) \
+ template <> \
+ struct MsgToType<mSG> { \
+ static const bhome::msg::MsgType value = tYPE; \
+ };
+
+#define BHOME_SIMPLE_MAP_MSG(name) BHOME_MAP_MSG_AND_TYPE(Msg##name, kMsgType##name)
+
+BHOME_SIMPLE_MAP_MSG(CommonReply);
+BHOME_SIMPLE_MAP_MSG(Register);
+BHOME_SIMPLE_MAP_MSG(RegisterRPC);
+BHOME_SIMPLE_MAP_MSG(Heartbeat);
+BHOME_SIMPLE_MAP_MSG(QueryTopic);
+BHOME_SIMPLE_MAP_MSG(QueryTopicReply);
+BHOME_SIMPLE_MAP_MSG(RequestTopic);
+BHOME_SIMPLE_MAP_MSG(RequestTopicReply);
+BHOME_SIMPLE_MAP_MSG(Publish);
+BHOME_SIMPLE_MAP_MSG(Subscribe);
+BHOME_SIMPLE_MAP_MSG(Unsubscribe);
+
+#undef BHOME_SIMPLE_MAP_MSG
+#undef BHOME_MAP_MSG_AND_TYPE
+
+template <class Msg>
+constexpr inline bhome::msg::MsgType GetType(const Msg &)
+{
+ return MsgToType<Msg>::value;
+}
+
+inline void SetError(ErrorMsg &em, const ErrorCode err_code, const std::string &err_str = "")
+{
+ em.set_errcode(err_code);
+ if (!err_str.empty()) {
+ em.set_errstring(err_str);
+ }
+}
+
+template <class Reply = MsgCommonReply>
+inline Reply MakeReply(const ErrorCode err_code, const std::string &err_str = "")
+{
+ Reply msg;
+ SetError(*msg.mutable_errmsg(), err_code, err_str);
+ return msg;
+}
+
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
+BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
+// inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
+
+#endif // end of include guard: PROTO_UA9UWKL1
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index 0266c86..471c63c 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -22,24 +22,38 @@
using namespace std::chrono_literals;
using namespace bhome_msg;
-bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
+bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms)
{
try {
+ MsgPublish pub;
+ pub.set_topic(topic);
+ pub.set_data(data, size);
+ BHMsgHead head(InitMsgHead(GetType(pub), proc_id));
MsgI imsg;
- if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) {
- return false;
+ if (imsg.MakeRC(shm(), head, pub)) {
+ DEFER1(imsg.Release(shm()));
+ return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
}
- DEFER1(imsg.Release(shm()));
- return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
} catch (...) {
- return false;
}
+ return false;
}
+namespace
+{
+inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
-bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
+} // namespace
+bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms)
{
try {
- return mq().Send(BHTopicBusAddress(), MakeSub(mq().Id(), topics), timeout_ms);
+ MsgSubscribe sub;
+ for (auto &topic : topics) {
+ sub.add_topics(topic);
+ }
+ BHMsgHead head(InitMsgHead(GetType(sub), proc_id));
+ AddRoute(head, mq().Id());
+
+ return Send(&BHTopicBusAddress(), head, sub, timeout_ms);
} catch (...) {
return false;
}
@@ -47,11 +61,11 @@
bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
{
- auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
- if (msg.type() == kMsgTypePublish) {
- MsgPub d;
- if (d.ParseFromString(msg.body())) {
- tdcb(d.topic(), d.data());
+ auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
+ if (head.type() == kMsgTypePublish) {
+ MsgPublish pub;
+ if (imsg.ParseBody(pub)) {
+ tdcb(head.proc_id(), pub.topic(), pub.data());
}
} else {
// ignored, or dropped
@@ -61,14 +75,16 @@
return tdcb && Start(AsyncRecvProc, nworker);
}
-bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms)
+bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
{
- BHMsg msg;
- if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
- MsgPub d;
- if (d.ParseFromString(msg.body())) {
- d.mutable_topic()->swap(topic);
- d.mutable_data()->swap(data);
+ MsgI msg;
+ BHMsgHead head;
+ if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
+ MsgPublish pub;
+ if (msg.ParseBody(pub)) {
+ head.mutable_proc_id()->swap(proc_id);
+ pub.mutable_topic()->swap(topic);
+ pub.mutable_data()->swap(data);
return true;
}
}
diff --git a/src/pubsub.h b/src/pubsub.h
index 3c3d4ad..bd60fcd 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -33,11 +33,7 @@
shm_(shm) {}
SocketPublish() :
SocketPublish(BHomeShm()) {}
- bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
- bool Publish(const Topic &topic, const std::string &data, const int timeout_ms)
- {
- return Publish(topic, data.data(), data.size(), timeout_ms);
- }
+ bool Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms);
};
// socket subscribe
@@ -52,11 +48,11 @@
SocketSubscribe(BHomeShm()) {}
~SocketSubscribe() { Stop(); }
- typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB;
+ typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
bool Stop() { return Socket::Stop(); }
- bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
- bool RecvSub(Topic &topic, std::string &data, const int timeout_ms);
+ bool Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms);
+ bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
};
#endif // end of include guard: PUBSUB_4KGRA997
diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
deleted file mode 100644
index 698327e..0000000
--- a/src/pubsub_center.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: pubsub_center.cpp
- *
- * Description: pub/sub center/manager
- *
- * Version: 1.0
- * Created: 2021骞�04鏈�01鏃� 09鏃�29鍒�04绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (),
- * Organization:
- *
- * =====================================================================================
- */
-#include "pubsub_center.h"
-#include "bh_util.h"
-using namespace bhome_shm;
-namespace
-{
-class BusCenter
-{
- typedef std::set<MQId> Clients;
- std::unordered_map<Topic, Clients> records_;
- // todo cache data if send fail.
-
-public:
- template <class Iter>
- void SubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
- {
- for (auto it = topic_begin; it != topic_end; ++it) {
- records_[*it].insert(client);
- }
- }
- template <class Iter>
- void UnsubScribe(const MQId &client, Iter topic_begin, Iter topic_end)
- {
- for (auto it = topic_begin; it != topic_end; ++it) {
- auto pos = records_.find(*it);
- if (pos != records_.end()) {
- if (pos->second.erase(client) && pos->second.empty()) {
- records_.erase(pos);
- }
- }
- }
- };
- Clients FindClients(const std::string &topic)
- {
- Clients dests;
- auto Find1 = [&](const std::string &t) {
- auto pos = records_.find(topic);
- if (pos != records_.end() && !pos->second.empty()) {
- auto &clients = pos->second;
- for (auto &cli : clients) {
- dests.insert(cli);
- }
- }
- };
- Find1(topic);
-
- //TODO check and adjust topic on client side sub/pub.
- size_t pos = 0;
- while (true) {
- pos = topic.find(kTopicSep, pos);
- if (pos == topic.npos || ++pos == topic.size()) {
- // Find1(std::string()); // sub all.
- break;
- } else {
- Find1(topic.substr(0, pos));
- }
- }
- return dests;
- }
-};
-
-} // namespace
-
-BHCenter::MsgHandler MakeBusCenter()
-{
- auto bus_ptr = std::make_shared<Synced<BusCenter>>();
-
- return [bus_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
-#ifndef NDEBUG
- static std::atomic<time_t> last(0);
- time_t now = 0;
- time(&now);
- if (last.exchange(now) < now) {
- printf("bus queue size: %ld\n", socket.Pending());
- }
-#endif
- auto &bus = *bus_ptr;
- auto &shm = socket.shm();
-
- auto OnSubChange = [&](auto &&update) {
- MsgSub sub;
- if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
- assert(sizeof(MQId) == msg.route(0).mq_id().size());
- MQId client;
- memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
- update(client, sub.topics());
- }
- };
- auto Sub = [&](const MQId &id, auto &topics) { bus->SubScribe(id, topics.begin(), topics.end()); };
- auto Unsub = [&](const MQId &id, auto &topics) { bus->UnsubScribe(id, topics.begin(), topics.end()); };
-
- auto OnPublish = [&]() {
- MsgPub pub;
- if (!pub.ParseFromString(msg.body())) {
- return;
- }
- auto Dispatch = [&](auto &&send1) {
- const auto &clients(bus->FindClients(pub.topic()));
- for (auto &cli : clients) {
- send1(cli);
- }
- };
-
- if (imsg.IsCounted()) {
- Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, imsg, 10); });
- } else {
- MsgI pubmsg;
- if (!pubmsg.MakeRC(shm, msg)) { return; }
- DEFER1(pubmsg.Release(shm));
-
- Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm, cli, pubmsg, 10); });
- }
- };
-
- switch (msg.type()) {
- case kMsgTypeSubscribe: OnSubChange(Sub); return true;
- case kMsgTypeUnsubscribe: OnSubChange(Unsub); return true;
- case kMsgTypePublish: OnPublish(); return true;
- default: return false;
- }
- };
-}
-
-bool PubSubCenter::Start(const int nworker)
-{
- auto handler = MakeBusCenter();
- printf("sizeof(pub/sub handler) = %ld\n", sizeof(handler));
-
- const int kMaxWorker = 16;
- return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
-}
\ No newline at end of file
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
deleted file mode 100644
index f81fa0e..0000000
--- a/src/pubsub_center.h
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: pubsub_center.h
- *
- * Description:
- *
- * Version: 1.0
- * Created: 2021骞�04鏈�01鏃� 09鏃�29鍒�39绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (),
- * Organization:
- *
- * =====================================================================================
- */
-#ifndef PUBSUB_CENTER_MFSUZJU7
-#define PUBSUB_CENTER_MFSUZJU7
-
-#include "center.h"
-#include "defs.h"
-#include "socket.h"
-#include <mutex>
-#include <set>
-#include <unordered_map>
-
-BHCenter::MsgHandler MakeBusCenter();
-
-// publish/subcribe manager.
-class PubSubCenter
-{
- ShmSocket socket_;
-
-public:
- PubSubCenter(ShmSocket::Shm &shm) :
- socket_(shm, &BHTopicBusAddress(), 1000) {}
- PubSubCenter() :
- PubSubCenter(BHomeShm()) {}
- ~PubSubCenter() { Stop(); }
- bool Start(const int nworker = 2);
- bool Stop() { return socket_.Stop(); }
-};
-
-#endif // end of include guard: PUBSUB_CENTER_MFSUZJU7
diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
deleted file mode 100644
index ce35d1c..0000000
--- a/src/reqrep_center.cpp
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: reqrep_center.cpp
- *
- * Description: topic request/reply center
- *
- * Version: 1.0
- * Created: 2021骞�04鏈�01鏃� 14鏃�08鍒�50绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (),
- * Organization:
- *
- * =====================================================================================
- */
-#include "reqrep_center.h"
-#include "bh_util.h"
-#include "msg.h"
-#include <chrono>
-#include <memory>
-#include <mutex>
-#include <unordered_map>
-
-using namespace bhome_shm;
-
-namespace
-{
-auto Now = []() { time_t t; return time(&t); };
-
-class NodeCenter
-{
-public:
- typedef std::string ProcAddr;
- typedef bhome::msg::ProcInfo ProcInfo;
-
- template <class Iter>
- bool Register(ProcInfo &info, const ProcAddr &src_mq, Iter topics_begin, Iter topics_end)
- {
- try {
- Node node(new NodeInfo);
- node->addr_ = src_mq;
- node->proc_.Swap(&info);
- node->state_.timestamp_ = Now();
- nodes_[node->proc_.id()] = node;
- for (auto it = topics_begin; it != topics_end; ++it) {
- topic_map_[*it] = node;
- }
- return true;
- } catch (...) {
- return false;
- }
- }
- void Heartbeat(ProcInfo &info, const ProcAddr &src_mq)
- {
- auto pos = nodes_.find(info.name());
- if (pos != nodes_.end() && pos->second->addr_ == src_mq) { // both name and mq should be the same.
- NodeInfo &ni = *pos->second;
- ni.state_.timestamp_ = Now();
- if (!info.public_info().empty()) {
- ni.proc_.set_public_info(info.public_info());
- }
- if (!info.private_info().empty()) {
- ni.proc_.set_private_info(info.private_info());
- }
- }
- }
- bool QueryTopic(const Topic &topic, ProcAddr &addr)
- {
- auto pos = topic_map_.find(topic);
- if (pos != topic_map_.end()) {
- Node node(pos->second.lock());
- if (node) {
- addr = node->addr_;
- return true;
- } else { // dead, remove record.
- topic_map_.erase(pos);
- return false;
- }
- } else {
- return false;
- }
- }
-
-private:
- struct ProcState {
- time_t timestamp_ = 0;
- uint32_t flag_ = 0; // reserved
- };
- typedef std::string ProcId;
- struct NodeInfo {
- ProcState state_; // state
- ProcAddr addr_; // registered_mqid.
- ProcInfo proc_; //
- };
- typedef std::shared_ptr<NodeInfo> Node;
- typedef std::weak_ptr<NodeInfo> WeakNode;
- std::unordered_map<Topic, WeakNode> topic_map_;
- std::unordered_map<ProcId, Node> nodes_;
-};
-
-} // namespace
-
-BHCenter::MsgHandler MakeReqRepCenter()
-{
- auto center_ptr = std::make_shared<Synced<NodeCenter>>();
- return [center_ptr](ShmSocket &socket, MsgI &imsg, BHMsg &msg) {
- auto ¢er = *center_ptr;
- auto &shm = socket.shm();
-
-#ifndef NDEBUG
- static std::atomic<time_t> last(0);
- time_t now = 0;
- time(&now);
- if (last.exchange(now) < now) {
- printf("center queue size: %ld\n", socket.Pending());
- }
-#endif
- auto SrcMQ = [&]() { return msg.route(0).mq_id(); };
-
- auto OnRegister = [&]() {
- if (msg.route_size() != 1) { return; }
-
- MsgRegister reg;
- if (reg.ParseFromString(msg.body()) && reg.has_proc()) {
- center->Register(*reg.mutable_proc(), SrcMQ(), reg.topics().begin(), reg.topics().end());
- }
- };
-
- auto OnHeartbeat = [&]() {
- if (msg.route_size() != 1) { return; }
- auto &src_mq = msg.route(0).mq_id();
-
- MsgHeartbeat hb;
- if (hb.ParseFromString(msg.body()) && hb.has_proc()) {
- center->Heartbeat(*hb.mutable_proc(), SrcMQ());
- }
- };
-
- auto OnQueryTopic = [&]() {
- if (msg.route_size() != 1) { return; }
-
- MsgQueryTopic query;
- NodeCenter::ProcAddr dest;
- if (query.ParseFromString(msg.body()) && center->QueryTopic(query.topic(), dest)) {
- MQId remote;
- memcpy(&remote, SrcMQ().data(), sizeof(MQId));
- MsgI imsg;
- if (!imsg.Make(shm, MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
- if (!ShmMsgQueue::Send(shm, remote, imsg, 100)) {
- imsg.Release(shm);
- }
- }
- };
-
- switch (msg.type()) {
- case kMsgTypeRegister: OnRegister(); return true;
- case kMsgTypeHeartbeat: OnHeartbeat(); return true;
- case kMsgTypeQueryTopic: OnQueryTopic(); return true;
- default: return false;
- }
- };
-}
-
-bool ReqRepCenter::Start(const int nworker)
-{
- auto handler = MakeReqRepCenter();
- printf("sizeof(rep/req handler) = %ld\n", sizeof(handler));
-
- const int kMaxWorker = 16;
- return socket_.Start(handler, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
-}
diff --git a/src/reqrep_center.h b/src/reqrep_center.h
deleted file mode 100644
index bdcdcad..0000000
--- a/src/reqrep_center.h
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: reqrep_center.h
- *
- * Description:
- *
- * Version: 1.0
- * Created: 2021骞�04鏈�01鏃� 14鏃�09鍒�13绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (),
- * Organization:
- *
- * =====================================================================================
- */
-#ifndef REQREP_CENTER_US3RBM60
-#define REQREP_CENTER_US3RBM60
-
-#include "center.h"
-#include "defs.h"
-#include "socket.h"
-
-BHCenter::MsgHandler MakeReqRepCenter();
-class ReqRepCenter
-{
- ShmSocket socket_;
-
-public:
- ReqRepCenter(ShmSocket::Shm &shm) :
- socket_(shm, &BHTopicCenterAddress(), 1000) {}
- ReqRepCenter() :
- ReqRepCenter(BHomeShm()) {}
- ~ReqRepCenter() { Stop(); }
- bool Start(const int nworker = 2);
- bool Stop() { return socket_.Stop(); }
-};
-
-#endif // end of include guard: REQREP_CENTER_US3RBM60
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index dcb5a9e..521f773 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -87,15 +87,14 @@
// 2) find remote queue first, then build msg;
// 1 is about 50% faster than 2, maybe cache related.
-bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
-{
- MsgI imsg;
- if (Read(imsg, timeout_ms)) {
- DEFER1(imsg.Release(shm()););
- return imsg.Unpack(msg);
- } else {
- return false;
- }
-}
+// bool ShmMsgQueue::Recv(MsgI &imsg, BHMsgHead &head, const int timeout_ms)
+// {
+// if (Read(imsg, timeout_ms)) {
+// // DEFER1(imsg.Release(shm()););
+// return imsg.ParseHead(head);
+// } else {
+// return false;
+// }
+// }
} // namespace bhome_shm
diff --git a/src/shm_queue.h b/src/shm_queue.h
index ab8a88c..32ccfae 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -131,7 +131,7 @@
~ShmMsgQueue();
const MQId &Id() const { return id_; }
- bool Recv(BHMsg &msg, const int timeout_ms);
+ // bool Recv(MsgI &msg, BHMsgHead &head, const int timeout_ms);
bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
@@ -141,12 +141,11 @@
{
return Send(shm(), remote_id, msg, timeout_ms, extra...);
}
-
- template <class... Extra>
- bool Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, Extra const &...extra)
+ template <class Body, class... Extra>
+ bool Send(const MQId &remote_id, const BHMsgHead &head, const Body &body, const int timeout_ms, Extra const &...extra)
{
MsgI msg;
- if (msg.Make(shm(), data)) {
+ if (msg.Make(shm(), head, body)) {
if (Send(shm(), remote_id, msg, timeout_ms, extra...)) {
return true;
} else {
@@ -155,6 +154,7 @@
}
return false;
}
+
size_t Pending() const { return data()->size(); }
};
diff --git a/src/socket.cpp b/src/socket.cpp
index b9def0c..f2b29f4 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -29,43 +29,53 @@
} // namespace
-ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) :
- shm_(shm), run_(false)
+ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
+ shm_(shm), run_(false), mq_(id, shm, len)
{
- if (id && len > 0) {
- mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len));
- }
}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
- shm_(shm), run_(false)
-{
- if (len > 0) {
- mq_.reset(new Queue(shm_, len));
- }
-}
+ shm_(shm), run_(false), mq_(shm, len) {}
ShmSocket::~ShmSocket()
{
Stop(); //TODO should stop in sub class, incase thread access sub class data.
}
-bool ShmSocket::Start(const RecvCB &onData, const IdleCB &onIdle, int nworker)
+bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
- if (!mq_ || !onData) {
- return false; // TODO error code.
- }
+ auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
+ auto Find = [&](RecvCB &cb) {
+ std::lock_guard<std::mutex> lock(mutex());
+ const std::string &msgid = head.msg_id();
+ auto pos = async_cbs_.find(msgid);
+ if (pos != async_cbs_.end()) {
+ cb.swap(pos->second);
+ async_cbs_.erase(pos);
+ return true;
+ } else {
+ return false;
+ }
+ };
+
+ RecvCB cb;
+ if (Find(cb)) {
+ cb(socket, imsg, head);
+ } else if (onData) {
+ onData(socket, imsg, head);
+ } // else ignored, or dropped
+ };
std::lock_guard<std::mutex> lock(mutex_);
StopNoLock();
- auto RecvProc = [this, onData, onIdle]() {
+ auto RecvProc = [this, onRecv, onIdle]() {
while (run_) {
try {
MsgI imsg;
- DEFER1(imsg.Release(shm_));
- if (mq_->Recv(imsg, 100)) {
- BHMsg msg;
- if (imsg.Unpack(msg)) {
- onData(*this, imsg, msg);
+ if (mq().Recv(imsg, 10)) {
+ DEFER1(imsg.Release(shm()));
+ BHMsgHead head;
+ if (imsg.ParseHead(head)) {
+ onRecv(*this, imsg, head);
}
} else if (onIdle) {
onIdle(*this);
@@ -102,17 +112,18 @@
return false;
}
-bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
-{
- return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
-}
-
-bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
+bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms)
{
std::lock_guard<std::mutex> lock(mutex_);
- if (!mq_ || RunningNoLock()) {
+ auto Recv = [&]() {
+ if (mq().Recv(msg, timeout_ms)) {
+ if (msg.ParseHead(head)) {
+ return true;
+ } else {
+ msg.Release(shm());
+ }
+ }
return false;
- } else {
- return mq_->Recv(msg, timeout_ms);
- }
+ };
+ return !RunningNoLock() && Recv();
}
diff --git a/src/socket.h b/src/socket.h
index 57d0ae4..7c4f83f 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -19,14 +19,18 @@
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
+#include "defs.h"
#include "shm_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
+#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include <thread>
#include <vector>
+
+using namespace bhome_msg;
class ShmSocket : private boost::noncopyable
{
@@ -35,36 +39,88 @@
public:
typedef bhome_shm::SharedMemory Shm;
- typedef std::function<void(ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg)> RecvCB;
- typedef std::function<void(bhome_msg::BHMsg &msg)> RecvBHMsgCB;
+ typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
+ typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
typedef std::function<void(ShmSocket &sock)> IdleCB;
- ShmSocket(Shm &shm, const void *id, const int len);
+ ShmSocket(Shm &shm, const MQId &id, const int len);
ShmSocket(Shm &shm, const int len = 12);
~ShmSocket();
-
+ const MQId &id() const { return mq().Id(); }
Shm &shm() { return shm_; }
// start recv.
- bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1);
- bool Start(const RecvCB &onData, int nworker = 1) { return Start(onData, IdleCB(), nworker); }
- bool Start(const RecvBHMsgCB &onData, const IdleCB &onIdle, int nworker = 1)
- {
- return Start([onData](ShmSocket &sock, bhome_msg::MsgI &imsg, bhome_msg::BHMsg &msg) { onData(msg); }, onIdle, nworker);
- }
- bool Start(const RecvBHMsgCB &onData, int nworker = 1)
- {
- return Start(onData, IdleCB(), nworker);
- }
+ bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
+ bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
+ bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
bool Stop();
- size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
+ size_t Pending() const { return mq().Pending(); }
- bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
- bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
+ bool Send(const void *id, const MsgI &imsg, const int timeout_ms)
+ {
+ return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms);
+ }
+ //TODO reimplment, using async.
+ bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
+
+ template <class Body>
+ bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB())
+ {
+ assert(valid_remote);
+ try {
+ if (cb) {
+ auto RegisterCB = [&]() {
+ std::lock_guard<std::mutex> lock(mutex());
+ async_cbs_.emplace(head.msg_id(), cb);
+ };
+ return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB);
+ } else {
+ return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms);
+ }
+ } catch (...) {
+ return false;
+ }
+ }
+
+ template <class Body>
+ bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
+ {
+ struct State {
+ std::mutex mutex;
+ std::condition_variable cv;
+ bool canceled = false;
+ };
+
+ try {
+ std::shared_ptr<State> st(new State);
+ auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
+
+ auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
+ std::unique_lock<std::mutex> lk(st->mutex);
+ if (!st->canceled) {
+ reply.swap(msg);
+ reply_head.Swap(&head);
+ st->cv.notify_one();
+ } else {
+ }
+ };
+
+ std::unique_lock<std::mutex> lk(st->mutex);
+ bool sendok = Send(remote, head, body, timeout_ms, OnRecv);
+ if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
+ return true;
+ } else {
+ st->canceled = true;
+ return false;
+ }
+ } catch (...) {
+ return false;
+ }
+ }
protected:
const Shm &shm() const { return shm_; }
- Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
- const Queue &mq() const { return *mq_; }
+ Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid.
+ const Queue &mq() const { return mq_; }
std::mutex &mutex() { return mutex_; }
private:
@@ -76,7 +132,8 @@
std::mutex mutex_;
std::atomic<bool> run_;
- std::unique_ptr<Queue> mq_;
+ Queue mq_;
+ std::unordered_map<std::string, RecvCB> async_cbs_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
new file mode 100644
index 0000000..c6c9771
--- /dev/null
+++ b/src/topic_node.cpp
@@ -0,0 +1,322 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: topic_node.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�07鏃� 09鏃�01鍒�48绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "topic_node.h"
+#include "bh_util.h"
+#include <chrono>
+#include <list>
+
+using namespace std::chrono;
+using namespace std::chrono_literals;
+
+namespace
+{
+inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
+
+struct SrcInfo {
+ std::vector<BHAddress> route;
+ std::string msg_id;
+};
+
+class ServerFailedQ
+{
+ struct FailedMsg {
+ steady_clock::time_point xpr;
+ std::string remote_;
+ BHMsgHead head_;
+ MsgRequestTopicReply body_;
+ FailedMsg(const std::string &addr, BHMsgHead &&head, MsgRequestTopicReply &&body) :
+ xpr(steady_clock::now() + 10s), remote_(addr), head_(std::move(head)), body_(std::move(body)) {}
+ bool Expired() { return steady_clock::now() > xpr; }
+ };
+ typedef std::list<FailedMsg> Queue;
+ Synced<Queue> queue_;
+
+public:
+ void Push(const std::string &remote, BHMsgHead &&head, MsgRequestTopicReply &&body)
+ {
+ queue_->emplace_back(remote, std::move(head), std::move(body));
+ }
+ void TrySend(ShmSocket &socket, const int timeout_ms = 0)
+ {
+ queue_.Apply([&](Queue &q) {
+ if (!q.empty()) {
+ auto it = q.begin();
+ do {
+ if (it->Expired()) {
+ // it->msg_.Release(socket.shm());
+ it = q.erase(it);
+ } else if (socket.Send(it->remote_.data(), it->head_, it->body_, timeout_ms)) {
+ it = q.erase(it);
+ } else {
+ ++it;
+ }
+ } while (it != q.end());
+ }
+ });
+ }
+};
+
+} // namespace
+TopicNode::TopicNode(SharedMemory &shm) :
+ shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
+{
+ SockNode().Start();
+}
+TopicNode::~TopicNode()
+{
+ StopAll();
+ SockNode().Stop();
+}
+void TopicNode::StopAll()
+{
+ ServerStop();
+ ClientStopWorker();
+}
+
+bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms)
+{
+ auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
+ AddRoute(head, SockNode().id());
+
+ MsgI reply;
+ DEFER1(reply.Release(shm_););
+ BHMsgHead reply_head;
+ bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+ r = r && reply_head.type() == kMsgTypeCommonReply;
+ r = r && reply.ParseBody(reply_body);
+ if (r) {
+ info_ = body;
+ }
+ return r;
+}
+
+bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms)
+{
+ //TODO check registered
+
+ auto head(InitMsgHead(GetType(body), proc_id()));
+ AddRoute(head, SockReply().id());
+
+ MsgI reply;
+ DEFER1(reply.Release(shm_););
+ BHMsgHead reply_head;
+ bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+ r = r && reply_head.type() == kMsgTypeCommonReply;
+ r = r && reply.ParseBody(reply_body);
+ return r;
+}
+
+bool TopicNode::ServerStart(const OnRequest &rcb, int nworker)
+{
+ //TODO check registered
+
+ auto failed_q = std::make_shared<ServerFailedQ>();
+
+ auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
+
+ auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+ if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) {
+ MsgRequestTopic req;
+ if (imsg.ParseBody(req)) {
+ std::string out;
+ if (rcb(req.topic(), req.data(), out)) {
+ MsgRequestTopicReply reply_body;
+ reply_body.set_data(std::move(out));
+ BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
+
+ for (int i = 0; i < head.route_size() - 1; ++i) {
+ reply_head.add_route()->Swap(head.mutable_route(i));
+ }
+ if (!sock.Send(head.route().rbegin()->mq_id().data(), reply_head, reply_body, 10)) {
+ failed_q->Push(head.route().rbegin()->mq_id(), std::move(reply_head), std::move(reply_body));
+ }
+ }
+ }
+ } else {
+ // ignored, or dropped
+ }
+
+ onIdle(sock);
+ };
+
+ return rcb && SockReply().Start(onRecv, onIdle, nworker);
+}
+bool TopicNode::ServerStop() { return SockReply().Stop(); }
+
+bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
+{
+ MsgI imsg;
+ BHMsgHead head;
+ if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
+ MsgRequestTopic request;
+ if (imsg.ParseBody(request)) {
+ request.mutable_topic()->swap(topic);
+ request.mutable_data()->swap(data);
+ SrcInfo *p = new SrcInfo;
+ p->route.assign(head.route().begin(), head.route().end());
+ p->msg_id = head.msg_id();
+ src_info = p;
+ return true;
+ }
+ }
+ return false;
+}
+
+bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms)
+{
+ SrcInfo *p = static_cast<SrcInfo *>(src_info);
+ DEFER1(delete p);
+ if (!p || p->route.empty()) {
+ return false;
+ }
+ MsgRequestTopicReply body;
+ body.set_data(data);
+ BHMsgHead head(InitMsgHead(GetType(body), proc_id(), p->msg_id));
+
+ for (unsigned i = 0; i < p->route.size() - 1; ++i) {
+ head.add_route()->Swap(&p->route[i]);
+ }
+
+ return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms);
+}
+
+bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
+{
+ if (!cb) {
+ return false;
+ }
+ auto onData = [this, cb](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
+ if (head.type() == kMsgTypeRequestTopicReply) {
+ MsgRequestTopicReply reply;
+ if (imsg.ParseBody(reply)) {
+ cb(reply.data());
+ }
+ }
+ };
+
+ return SockRequest().Start(onData, nworker);
+}
+bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); }
+
+bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
+{
+ auto Call = [&](const void *remote) {
+ auto &sock = SockRequest();
+ MsgRequestTopic req;
+ req.set_topic(topic);
+ req.set_data(data, size);
+ BHMsgHead head(InitMsgHead(GetType(req), proc_id()));
+ AddRoute(head, sock.id());
+
+ if (cb) {
+ auto onRecv = [cb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
+ if (head.type() == kMsgTypeRequestTopicReply) {
+ MsgRequestTopicReply reply;
+ if (imsg.ParseBody(reply)) {
+ cb(reply.data());
+ }
+ }
+ };
+ return sock.Send(remote, head, req, timeout_ms, onRecv);
+ } else {
+ return sock.Send(remote, head, req, timeout_ms);
+ }
+ };
+
+ try {
+ BHAddress addr;
+ if (ClientQueryRPCTopic(topic, addr, timeout_ms)) {
+ return Call(addr.mq_id().data());
+ } else {
+ return false;
+ }
+ } catch (...) {
+ return false;
+ }
+}
+
+bool TopicNode::ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
+{
+ try {
+ auto &sock = SockRequest();
+ BHAddress addr;
+ if (ClientQueryRPCTopic(topic, addr, timeout_ms)) {
+
+ MsgRequestTopic req;
+ req.set_topic(topic);
+ req.set_data(data, size);
+ BHMsgHead head(InitMsgHead(GetType(req), proc_id()));
+ AddRoute(head, sock.id());
+
+ MsgI reply;
+ DEFER1(reply.Release(shm_););
+ BHMsgHead reply_head;
+
+ if (sock.SendAndRecv(addr.mq_id().data(), head, req, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply) {
+ MsgRequestTopicReply dr;
+ if (reply.ParseBody(dr)) {
+ dr.mutable_data()->swap(out);
+ return true;
+ } else {
+ printf("error parse reply.\n");
+ }
+ } else {
+ printf("error recv data. line: %d\n", __LINE__);
+ }
+ } else {
+ printf("error recv data. line: %d\n", __LINE__);
+ }
+ } catch (...) {
+ printf("error recv data. line: %d\n", __LINE__);
+ }
+ return false;
+}
+
+bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
+{
+ auto &sock = SockRequest();
+ if (topic_query_cache_.Find(topic, addr)) {
+ return true;
+ }
+
+ MsgQueryTopic query;
+ query.set_topic(topic);
+ BHMsgHead head(InitMsgHead(GetType(query), proc_id()));
+ AddRoute(head, sock.id());
+
+ MsgI reply;
+ DEFER1(reply.Release(shm_));
+ BHMsgHead reply_head;
+
+ if (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms)) {
+ if (reply_head.type() == kMsgTypeQueryTopicReply) {
+ MsgQueryTopicReply rep;
+ if (reply.ParseBody(rep)) {
+ addr = rep.address();
+ if (addr.mq_id().empty()) {
+ return false;
+ } else {
+ topic_query_cache_.Update(topic, addr);
+ return true;
+ }
+ }
+ }
+ } else {
+ }
+ return false;
+}
\ No newline at end of file
diff --git a/src/topic_node.h b/src/topic_node.h
new file mode 100644
index 0000000..8852af1
--- /dev/null
+++ b/src/topic_node.h
@@ -0,0 +1,121 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: topic_node.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�07鏃� 09鏃�05鍒�26绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (), lichao@aiotlink.com
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef TOPIC_NODE_YVKWA6TF
+#define TOPIC_NODE_YVKWA6TF
+
+#include "msg.h"
+#include "pubsub.h"
+#include "socket.h"
+#include <memory>
+
+using namespace bhome_shm;
+using namespace bhome_msg;
+
+// a node is a client.
+class TopicNode
+{
+ SharedMemory &shm_;
+ MsgRegister info_;
+
+public:
+ TopicNode(SharedMemory &shm);
+ ~TopicNode();
+ bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms);
+ bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms);
+
+ // topic rpc server
+ typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
+ bool ServerStart(OnRequest const &cb, const int nworker = 2);
+ bool ServerStop();
+ bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
+ bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms);
+
+ // topic client
+ typedef std::function<void(const std::string &data)> RequestResultCB;
+ bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
+ bool ClientStopWorker();
+ bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
+ bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB())
+ {
+ return ClientAsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
+ }
+ bool ClientSyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
+ bool ClientSyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
+ {
+ return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms);
+ }
+
+ void StopAll();
+
+private:
+ bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
+ const std::string &proc_id() { return info_.proc().proc_id(); }
+
+ typedef bhome_msg::BHAddress Address;
+ class TopicQueryCache
+ {
+ class Impl
+ {
+ typedef std::unordered_map<Topic, Address> Store;
+ Store store_;
+
+ public:
+ bool Find(const Topic &topic, Address &addr)
+ {
+ auto pos = store_.find(topic);
+ if (pos != store_.end()) {
+ addr = pos->second;
+ return true;
+ } else {
+ return false;
+ }
+ }
+ bool Update(const Topic &topic, const Address &addr)
+ {
+ store_[topic] = addr;
+ return true;
+ }
+ };
+ Synced<Impl> impl_;
+ // Impl &impl()
+ // {
+ // thread_local Impl impl;
+ // return impl;
+ // }
+
+ public:
+ bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
+ bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
+ };
+
+ // some sockets may be the same one, using functions make it easy to change.
+
+ auto &SockNode() { return sock_node_; }
+ auto &SockSub() { return sock_sub_; }
+ auto &SockRequest() { return sock_request_; }
+ auto &SockReply() { return sock_reply_; }
+
+ ShmSocket sock_node_;
+ ShmSocket sock_request_;
+ ShmSocket sock_reply_;
+ SocketSubscribe sock_sub_;
+
+ TopicQueryCache topic_query_cache_;
+};
+
+#endif // end of include guard: TOPIC_NODE_YVKWA6TF
diff --git a/src/topic_reply.cpp b/src/topic_reply.cpp
deleted file mode 100644
index 2ab75e6..0000000
--- a/src/topic_reply.cpp
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: topic_reply.cpp
- *
- * Description:
- *
- * Version: 1.0
- * Created: 2021骞�04鏈�06鏃� 14鏃�40鍒�52绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (),
- * Organization:
- *
- * =====================================================================================
- */
-#include "topic_reply.h"
-#include <chrono>
-#include <list>
-
-using namespace bhome_msg;
-using namespace std::chrono;
-using namespace std::chrono_literals;
-
-namespace
-{
-struct SrcInfo {
- std::vector<BHAddress> route;
- std::string msg_id;
-};
-
-class FailedQ
-{
- struct FailedMsg {
- steady_clock::time_point xpr;
- std::string remote_;
- BHMsg msg_;
- FailedMsg(const std::string &addr, BHMsg &&msg) :
- xpr(steady_clock::now() + 10s), remote_(addr), msg_(std::move(msg)) {}
- bool Expired() { return steady_clock::now() > xpr; }
- };
- typedef std::list<FailedMsg> Queue;
- Synced<Queue> queue_;
-
-public:
- void Push(const std::string &remote, BHMsg &&msg)
- {
- queue_->emplace_back(remote, std::move(msg));
- }
- void TrySend(ShmSocket &socket, const int timeout_ms = 0)
- {
- queue_.Apply([&](Queue &q) {
- if (!q.empty()) {
- auto it = q.begin();
- do {
- if (it->Expired() || socket.SyncSend(it->remote_.data(), it->msg_, timeout_ms)) {
- it = q.erase(it);
- } else {
- ++it;
- }
- } while (it != q.end());
- }
- });
- }
-};
-
-} // namespace
-
-bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
-{
- //TODO check reply?
- return SyncSend(&BHTopicCenterAddress(), MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
-}
-bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
-{
- return SyncSend(&BHTopicCenterAddress(), MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
-}
-bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
-{
- auto failed_q = std::make_shared<FailedQ>();
-
- auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
-
- auto onRecv = [this, rcb, failed_q, onIdle](BHMsg &msg) {
- if (msg.type() == kMsgTypeRequestTopic && msg.route_size() > 0) {
- MsgRequestTopic req;
- if (req.ParseFromString(msg.body())) {
- std::string out;
- if (rcb(req.topic(), req.data(), out)) {
- BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
- for (int i = 0; i < msg.route_size() - 1; ++i) {
- msg.add_route()->Swap(msg.mutable_route(i));
- }
- if (!SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 10)) {
- failed_q->Push(msg.route().rbegin()->mq_id(), std::move(msg_reply));
- }
- }
- }
- } else {
- // ignored, or dropped
- }
-
- onIdle(*this);
- };
-
- return rcb && Start(onRecv, onIdle, nworker);
-}
-
-bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
-{
- BHMsg msg;
- if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequestTopic) {
- MsgRequestTopic request;
- if (request.ParseFromString(msg.body())) {
- request.mutable_topic()->swap(topic);
- request.mutable_data()->swap(data);
- SrcInfo *p = new SrcInfo;
- p->route.assign(msg.route().begin(), msg.route().end());
- p->msg_id = msg.msg_id();
- src_info = p;
- return true;
- }
- }
- return false;
-}
-
-bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
-{
- SrcInfo *p = static_cast<SrcInfo *>(src_info);
- DEFER1(delete p);
- if (!p || p->route.empty()) {
- return false;
- }
-
- BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
- for (unsigned i = 0; i < p->route.size() - 1; ++i) {
- msg.add_route()->Swap(&p->route[i]);
- }
-
- return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
-}
\ No newline at end of file
diff --git a/src/topic_reply.h b/src/topic_reply.h
deleted file mode 100644
index 090ad88..0000000
--- a/src/topic_reply.h
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: topic_reply.h
- *
- * Description:
- *
- * Version: 1.0
- * Created: 2021骞�04鏈�06鏃� 14鏃�41鍒�12绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (),
- * Organization:
- *
- * =====================================================================================
- */
-#ifndef TOPIC_REPLY_3RVYPPWI
-#define TOPIC_REPLY_3RVYPPWI
-
-#include "bh_util.h"
-#include "defs.h"
-#include "msg.h"
-#include "socket.h"
-#include <deque>
-#include <functional>
-
-using bhome::msg::ProcInfo;
-
-class SocketReply : private ShmSocket
-{
- typedef ShmSocket Socket;
-
-public:
- SocketReply(Socket::Shm &shm) :
- Socket(shm, 64) {}
- SocketReply() :
- SocketReply(BHomeShm()) {}
- ~SocketReply() { Stop(); }
-
- typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
- bool StartWorker(const OnRequest &rcb, int nworker = 2);
- bool Stop() { return Socket::Stop(); }
- bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
- bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
- bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
- bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
-
-private:
-};
-
-#endif // end of include guard: TOPIC_REPLY_3RVYPPWI
diff --git a/src/topic_request.cpp b/src/topic_request.cpp
deleted file mode 100644
index 382ce21..0000000
--- a/src/topic_request.cpp
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: topic_request.cpp
- *
- * Description: topic request sockets
- *
- * Version: 1.0
- * Created: 2021骞�04鏈�01鏃� 09鏃�35鍒�35绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (),
- * Organization:
- *
- * =====================================================================================
- */
-#include "topic_request.h"
-#include "bh_util.h"
-#include "msg.h"
-#include <chrono>
-#include <condition_variable>
-
-using namespace bhome_msg;
-
-bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker)
-{
- auto AsyncRecvProc = [this, rrcb](BHMsg &msg) {
- auto Find = [&](RecvBHMsgCB &cb) {
- std::lock_guard<std::mutex> lock(mutex());
- const std::string &msgid = msg.msg_id();
- auto pos = async_cbs_.find(msgid);
- if (pos != async_cbs_.end()) {
- cb.swap(pos->second);
- async_cbs_.erase(pos);
- return true;
- } else {
- return false;
- }
- };
-
- RecvBHMsgCB cb;
- if (Find(cb) && cb) {
- cb(msg);
- } else if (msg.type() == kMsgTypeRequestTopicReply && rrcb) {
- MsgRequestTopicReply reply;
- if (reply.ParseFromString(msg.body())) {
- rrcb(reply.data());
- }
- } else {
- // ignored, or dropped
- }
- };
-
- return Start(AsyncRecvProc, nworker);
-}
-
-bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
-{
- try {
- BHAddress addr;
- if (QueryRPCTopic(topic, addr, timeout_ms)) {
- const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
- return AsyncSend(addr.mq_id().data(), &msg, timeout_ms);
- } else {
- return false;
- }
- } catch (...) {
- return false;
- }
-}
-bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
-{
- auto Call = [&](const void *remote) {
- const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
- auto onRecv = [cb](BHMsg &msg) {
- if (msg.type() == kMsgTypeRequestTopicReply) {
- MsgRequestTopicReply reply;
- if (reply.ParseFromString(msg.body())) {
- cb(reply.data());
- }
- }
- };
- return AsyncSend(remote, &msg, timeout_ms, onRecv);
- };
-
- try {
- BHAddress addr;
- if (QueryRPCTopic(topic, addr, timeout_ms)) {
- return Call(addr.mq_id().data());
- } else {
- return false;
- }
- } catch (...) {
- return false;
- }
-}
-
-bool SocketRequest::SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
-{
- try {
- BHAddress addr;
- if (QueryRPCTopic(topic, addr, timeout_ms)) {
- const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
- BHMsg reply;
- if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeRequestTopicReply) {
- MsgRequestTopicReply dr;
- if (dr.ParseFromString(reply.body())) {
- dr.mutable_data()->swap(out);
- return true;
- } else {
- printf("error parse reply.\n");
- }
- } else {
- printf("error recv data. line: %d\n", __LINE__);
- }
- } else {
- printf("error recv data. line: %d\n", __LINE__);
- }
- } catch (...) {
- printf("error recv data. line: %d\n", __LINE__);
- }
- return false;
-}
-
-bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms)
-{
- assert(remote && pmsg);
- try {
- const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
- return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms);
- } catch (...) {
- return false;
- }
-}
-bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvBHMsgCB &cb)
-{
- assert(remote && pmsg);
- try {
- const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
- auto RegisterCB = [&]() {
- std::lock_guard<std::mutex> lock(mutex());
- async_cbs_.emplace(msg.msg_id(), cb);
- };
-
- return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB);
- } catch (...) {
- return false;
- }
-}
-
-bool SocketRequest::SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms)
-{
- struct State {
- std::mutex mutex;
- std::condition_variable cv;
- bool canceled = false;
- };
-
- try {
- std::shared_ptr<State> st(new State);
- auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
-
- auto OnRecv = [=](BHMsg &msg) {
- std::unique_lock<std::mutex> lk(st->mutex);
- if (!st->canceled) {
- static_cast<BHMsg *>(result)->Swap(&msg);
- st->cv.notify_one();
- } else {
- }
- };
-
- std::unique_lock<std::mutex> lk(st->mutex);
- bool sendok = AsyncSend(remote, msg, timeout_ms, OnRecv);
- if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
- return true;
- } else {
- st->canceled = true;
- return false;
- }
- } catch (...) {
- return false;
- }
-}
-
-bool SocketRequest::QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
-{
- if (topic_cache_.Find(topic, addr)) {
- return true;
- }
-
- BHMsg result;
- const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
- if (SyncSendAndRecv(&BHTopicCenterAddress(), &msg, &result, timeout_ms)) {
- if (result.type() == kMsgTypeQueryTopicReply) {
- MsgQueryTopicReply reply;
- if (reply.ParseFromString(result.body())) {
- addr = reply.address();
- if (addr.mq_id().empty()) {
- return false;
- } else {
- topic_cache_.Update(topic, addr);
- return true;
- }
- }
- }
- } else {
- }
- return false;
-}
diff --git a/src/topic_request.h b/src/topic_request.h
deleted file mode 100644
index 6765dc2..0000000
--- a/src/topic_request.h
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: topic_request.h
- *
- * Description: topic request socket
- *
- * Version: 1.0
- * Created: 2021骞�04鏈�01鏃� 09鏃�36鍒�06绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (),
- * Organization:
- *
- * =====================================================================================
- */
-#ifndef TOPIC_REQUEST_ACEH09NK
-#define TOPIC_REQUEST_ACEH09NK
-
-#include "bh_util.h"
-#include "defs.h"
-#include "msg.h"
-#include "socket.h"
-#include <functional>
-#include <unordered_map>
-
-using bhome::msg::ProcInfo;
-
-class SocketRequest : private ShmSocket
-{
- typedef ShmSocket Socket;
-
-public:
- SocketRequest(Socket::Shm &shm) :
- Socket(shm, 64) { StartWorker(); }
- SocketRequest() :
- SocketRequest(BHomeShm()) {}
- ~SocketRequest() { Stop(); }
-
- typedef std::function<void(const std::string &data)> RequestResultCB;
- bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
- bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
- bool Stop() { return Socket::Stop(); }
- bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
- bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
-
- bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
- {
- return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
- }
- bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms)
- {
- return AsyncRequest(topic, data.data(), data.size(), timeout_ms);
- }
- bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
- bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
- {
- return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
- }
-
-private:
- bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvBHMsgCB &cb);
- bool AsyncSend(const void *remote, const void *msg, const int timeout_ms);
- bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
- bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
- std::unordered_map<std::string, RecvBHMsgCB> async_cbs_;
-
- typedef bhome_msg::BHAddress Address;
- class TopicCache
- {
- class Impl
- {
- typedef std::unordered_map<Topic, Address> Store;
- Store store_;
-
- public:
- bool Find(const Topic &topic, Address &addr)
- {
- auto pos = store_.find(topic);
- if (pos != store_.end()) {
- addr = pos->second;
- return true;
- } else {
- return false;
- }
- }
- bool Update(const Topic &topic, const Address &addr)
- {
- store_[topic] = addr;
- return true;
- }
- };
- Synced<Impl> impl_;
- // Impl &impl()
- // {
- // thread_local Impl impl;
- // return impl;
- // }
-
- public:
- bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
- bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
- };
- TopicCache topic_cache_;
-};
-
-#endif // end of include guard: TOPIC_REQUEST_ACEH09NK
diff --git a/utest/simple_tests.cpp b/utest/simple_tests.cpp
index 06093fd..cbbcc2a 100644
--- a/utest/simple_tests.cpp
+++ b/utest/simple_tests.cpp
@@ -36,7 +36,7 @@
BOOST_CHECK(!p);
BOOST_CHECK(p.get() == 0);
const char *str = "basic";
- p = str;
+ p = str;
BOOST_CHECK(p);
BOOST_CHECK(p.get() == str);
p = 0;
@@ -49,7 +49,7 @@
auto Code = [&](int id) {
typedef ShmObject<s1000> Int;
std::string name = std::to_string(id);
- auto a0 = Avail();
+ auto a0 = Avail();
Int i1(shm, name);
auto a1 = Avail();
BOOST_CHECK_LT(a1, a0);
@@ -64,7 +64,7 @@
{
auto old = Avail();
- void *p = shm.Alloc(1024);
+ void *p = shm.Alloc(1024);
shm.Dealloc(p);
BOOST_CHECK_EQUAL(old, Avail());
}
@@ -80,7 +80,7 @@
// boost::timer::auto_cpu_timer timer;
ThreadManager threads;
int nthread = 1;
- int nloop = 1;
+ int nloop = 1;
for (int i = 0; i < nthread; ++i) {
threads.Launch(BasicTest, i, nloop);
}
@@ -114,7 +114,7 @@
int ms = i * 100;
printf("Timeout Test %4d: ", ms);
boost::timer::auto_cpu_timer timer;
- BHMsg msg;
+ MsgI msg;
bool r = q.Recv(msg, ms);
BOOST_CHECK(!r);
}
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 34f80d8..d777f91 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -28,14 +28,20 @@
MQId id = boost::uuids::random_generator()();
const int timeout = 100;
const uint32_t data_size = 4000;
+ const std::string proc_id = "demo_proc";
auto Writer = [&](int writer_id, uint64_t n) {
SharedMemory shm(shm_name, mem_size);
ShmMsgQueue mq(shm, 64);
std::string str(data_size, 'a');
MsgI msg;
+ MsgRequestTopic body;
+ body.set_topic("topic");
+ body.set_data(str);
+ auto head(InitMsgHead(GetType(body), proc_id));
+ msg.MakeRC(shm, head, body);
DEFER1(msg.Release(shm););
- msg.MakeRC(shm, MakeRequest(mq.Id(), "topic", str.data(), str.size()));
+
for (uint64_t i = 0; i < n; ++i) {
// mq.Send(id, str.data(), str.size(), timeout);
mq.Send(id, msg, timeout);
@@ -45,8 +51,10 @@
SharedMemory shm(shm_name, mem_size);
ShmMsgQueue mq(id, shm, 1000);
while (*run) {
- BHMsg msg;
+ MsgI msg;
+ BHMsgHead head;
if (mq.Recv(msg, timeout)) {
+ DEFER1(msg.Release(shm));
// ok
} else if (isfork) {
exit(0); // for forked quit after 1s.
@@ -113,6 +121,8 @@
const size_t msg_length = 1000;
std::string msg_content(msg_length, 'a');
msg_content[20] = '\0';
+ const std::string client_proc_id = "client_proc";
+ const std::string server_proc_id = "server_proc";
SharedMemory shm(shm_name, 1024 * 1024 * 50);
auto Avail = [&]() { return shm.get_free_memory(); };
@@ -121,9 +131,18 @@
ShmMsgQueue cli(shm, qlen);
MsgI request_rc;
- request_rc.MakeRC(shm, MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()));
+ MsgRequestTopic req_body;
+ req_body.set_topic("topic");
+ req_body.set_data(msg_content);
+ auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
+ request_rc.MakeRC(shm, req_head, req_body);
+
+ MsgRequestTopic reply_body;
+ reply_body.set_topic("topic");
+ reply_body.set_data(msg_content);
+ auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id));
MsgI reply_rc;
- reply_rc.MakeRC(shm, MakeReply("fakemsgid", msg_content.data(), msg_content.size()));
+ reply_rc.MakeRC(shm, reply_head, reply_body);
std::atomic<uint64_t> count(0);
@@ -133,7 +152,11 @@
auto Client = [&](int cli_id, int nmsg) {
for (int i = 0; i < nmsg; ++i) {
auto Req = [&]() {
- return cli.Send(srv.Id(), MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()), 100);
+ MsgRequestTopic req_body;
+ req_body.set_topic("topic");
+ req_body.set_data(msg_content);
+ auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
+ return cli.Send(srv.Id(), req_head, req_body, 100);
};
auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
@@ -141,10 +164,12 @@
printf("********** client send error.\n");
continue;
}
- BHMsg msg;
+ MsgI msg;
+ BHMsgHead head;
if (!cli.Recv(msg, 1000)) {
printf("********** client recv error.\n");
} else {
+ DEFER1(msg.Release(shm));
++count;
auto cur = Now();
if (last_time.exchange(cur) < cur) {
@@ -158,18 +183,27 @@
std::atomic<bool> stop(false);
auto Server = [&]() {
- BHMsg req;
- while (!stop) {
- if (srv.Recv(req, 100) && req.type() == kMsgTypeRequestTopic) {
- auto &mqid = req.route()[0].mq_id();
- MQId src_id;
- memcpy(&src_id, mqid.data(), sizeof(src_id));
- auto Reply = [&]() {
- return srv.Send(src_id, MakeReply(req.msg_id(), msg_content.data(), msg_content.size()), 100);
- };
- auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
+ MsgI req;
+ BHMsgHead req_head;
- if (ReplyRC()) {
+ while (!stop) {
+ if (srv.Recv(req, 100)) {
+ DEFER1(req.Release(shm));
+ if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
+ auto &mqid = req_head.route()[0].mq_id();
+ MQId src_id;
+ memcpy(&src_id, mqid.data(), sizeof(src_id));
+ auto Reply = [&]() {
+ MsgRequestTopic reply_body;
+ reply_body.set_topic("topic");
+ reply_body.set_data(msg_content);
+ auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
+ return srv.Send(src_id, reply_head, reply_body, 100);
+ };
+ auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
+
+ if (ReplyRC()) {
+ }
}
}
}
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 092455f..c925e22 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,11 +1,8 @@
#include "center.h"
#include "defs.h"
#include "pubsub.h"
-#include "pubsub_center.h"
-#include "reqrep_center.h"
#include "socket.h"
-#include "topic_reply.h"
-#include "topic_request.h"
+#include "topic_node.h"
#include "util.h"
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
@@ -15,6 +12,7 @@
#include <string>
#include <thread>
#include <vector>
+using namespace bhome_msg;
template <class A, class B>
struct IsSameType {
@@ -79,9 +77,11 @@
int *flag = shm.find_or_construct<int>("flag")(123);
printf("flag = %d\n", *flag);
++*flag;
+ const std::string sub_proc_id = "subscriber";
+ const std::string pub_proc_id = "publisher";
- PubSubCenter bus(shm);
- bus.Start();
+ BHCenter center(shm);
+ center.Start();
std::this_thread::sleep_for(100ms);
@@ -93,12 +93,12 @@
const int timeout = 1000;
auto Sub = [&](int id, const std::vector<std::string> &topics) {
SocketSubscribe client(shm);
- bool r = client.Subscribe(topics, timeout);
+ bool r = client.Subscribe(sub_proc_id, topics, timeout);
std::mutex mutex;
std::condition_variable cv;
std::atomic<uint64_t> n(0);
- auto OnTopicData = [&](const std::string &topic, const std::string &data) {
+ auto OnTopicData = [&](const std::string &proc_id, const std::string &topic, const std::string &data) {
++total_count;
auto cur = Now();
@@ -123,7 +123,7 @@
for (unsigned i = 0; i < nmsg; ++i) {
std::string data = topic + std::to_string(i) + std::string(1000, '-');
- bool r = provider.Publish(topic, data, timeout);
+ bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout);
if (!r) {
printf("pub ret: %s\n", r ? "ok" : "fail");
}
@@ -150,9 +150,8 @@
std::cout << "end : " << Now();
printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
-
- bus.Stop();
}
+
namespace
{
struct C {
@@ -177,12 +176,24 @@
printf("flag = %d\n", *flag);
++*flag;
+ const std::string client_proc_id = "client_proc_";
+ const std::string server_proc_id = "server_proc_";
+
BHCenter center(shm);
center.Start();
std::atomic<bool> run(true);
auto Client = [&](const std::string &topic, const int nreq) {
- SocketRequest client(shm);
+ TopicNode client(shm);
+ MsgRegister reg;
+ reg.mutable_proc()->set_proc_id(client_proc_id + topic);
+ MsgCommonReply reply_body;
+
+ if (!client.Register(reg, reply_body, 1000)) {
+ printf("client register failed\n");
+ return;
+ }
+
std::atomic<int> count(0);
std::string reply;
auto onRecv = [&](const std::string &rep) {
@@ -191,40 +202,54 @@
printf("count: %d\n", count.load());
}
};
- client.StartWorker(onRecv, 2);
+ client.ClientStartWorker(onRecv, 2);
boost::timer::auto_cpu_timer timer;
for (int i = 0; i < nreq; ++i) {
- if (!client.AsyncRequest(topic, "data " + std::to_string(i), 1000)) {
+ if (!client.ClientAsyncRequest(topic, "data " + std::to_string(i), 1000)) {
printf("client request failed\n");
+ ++count;
}
// if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
// printf("client request failed\n");
- // } else {
- // ++count;
// }
+ // ++count;
}
do {
std::this_thread::yield();
} while (count.load() < nreq);
- client.Stop();
+ client.ClientStopWorker();
printf("request %s %d done ", topic.c_str(), count.load());
};
std::atomic_uint64_t server_msg_count(0);
auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
- SocketReply server(shm);
- ProcInfo info;
- info.set_id(name);
- info.set_name(name);
- if (!server.Register(info, topics, 100)) {
- printf("register failed\n");
+ TopicNode server(shm);
+ MsgRegister reg;
+ reg.mutable_proc()->set_proc_id(server_proc_id);
+ reg.mutable_proc()->set_name(name);
+ MsgCommonReply reply_body;
+
+ if (!server.Register(reg, reply_body, 100)) {
+ printf("server register failed\n");
+ return;
}
+
auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
++server_msg_count;
reply = topic + ':' + data;
return true;
};
- server.StartWorker(onData);
+ server.ServerStart(onData);
+
+ MsgRegisterRPC rpc;
+ for (auto &topic : topics) {
+ rpc.add_topics(topic);
+ }
+ if (!server.RegisterRPC(rpc, reply_body, 100)) {
+ printf("server register topic failed\n");
+ return;
+ }
+
while (run) {
std::this_thread::yield();
}
@@ -234,7 +259,7 @@
servers.Launch(Server, "server", topics);
std::this_thread::sleep_for(100ms);
for (auto &t : topics) {
- clients.Launch(Client, t, 1000 * 100);
+ clients.Launch(Client, t, 1000 * 1);
}
clients.WaitAll();
printf("clients done, server replyed: %d\n", server_msg_count.load());
--
Gitblit v1.8.0