From 2197cf91e7a3bd5941327ba630a42946b88f069e Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期五, 09 四月 2021 14:15:41 +0800
Subject: [PATCH] join pub/sub to node; refactor.
---
src/socket.h | 63 +++++--
src/proto.h | 2
src/msg.h | 26 +--
src/socket.cpp | 48 ++---
proto/source/bhome_msg.proto | 22 ++
utest/utest.cpp | 45 ++---
src/topic_node.cpp | 121 +++++++++++++-
src/center.cpp | 14 +
src/msg.cpp | 8 +
utest/speed_test.cpp | 18 +-
/dev/null | 31 ---
utest/util.h | 23 ++
src/shm_queue.h | 21 --
src/topic_node.h | 22 ++
14 files changed, 277 insertions(+), 187 deletions(-)
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index b06b692..5056a26 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -3,6 +3,7 @@
// import "google/protobuf/descriptor.proto";
import "bhome_msg_api.proto";
+import "error_msg.proto";
package bhome.msg;
@@ -18,12 +19,21 @@
bytes topic = 6; // for request route
}
-message BHMsg { // deprecated
- bytes msg_id = 1;
- int64 timestamp = 2;
- int32 type = 3;
- repeated BHAddress route = 4; // for reply and proxy.
- bytes body = 5;
+message MsgRequest {
+ MsgType type = 1;
+ // oneof body;
+}
+
+message MsgReply {
+ ErrorMsg err_msg = 1;
+ // oneof reply
+}
+
+message BHMsgBody {
+ oneof reqrep {
+ MsgRequest request = 1;
+ MsgReply reply = 2;
+ }
}
enum MsgType {
diff --git a/src/center.cpp b/src/center.cpp
index fe549b7..71c85c3 100644
--- a/src/center.cpp
+++ b/src/center.cpp
@@ -336,7 +336,7 @@
auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
return [&](auto &&rep_body) {
auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
- bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 10);
+ bool r = socket.Send(head.route(0).mq_id().data(), reply_head, rep_body, 100);
if (!r) {
printf("send reply failed.\n");
}
@@ -364,18 +364,20 @@
MsgPublish pub;
NodeCenter::Clients clients;
MsgCommonReply reply;
- MsgI pubmsg;
if (head.route_size() != 1 || !msg.ParseBody(pub)) {
return;
} else if (!center->FindClients(head, pub, clients, reply)) {
- // send error reply.
MakeReplyer(socket, head, center->id())(reply);
- } else if (pubmsg.MakeRC(socket.shm(), msg)) {
- DEFER1(pubmsg.Release(socket.shm()));
+ } else {
+ MakeReplyer(socket, head, center->id())(MakeReply(eSuccess));
+ if (!msg.EnableRefCount(socket.shm())) { return; } // no memory?
+
for (auto &cli : clients) {
auto node = cli.weak_node_.lock();
if (node) {
- socket.Send(cli.mq_.data(), pubmsg, 10);
+ if (!socket.Send(cli.mq_.data(), msg, 100)) {
+ printf("center route publish failed. need resend.\n");
+ }
}
}
}
diff --git a/src/msg.cpp b/src/msg.cpp
index c353d84..06b817e 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -78,6 +78,14 @@
return true;
}
+bool MsgI::EnableRefCount(SharedMemory &shm)
+{
+ if (!IsCounted()) {
+ count_ = shm.New<RefCount>();
+ }
+ return IsCounted();
+}
+
int MsgI::Release(SharedMemory &shm)
{
if (IsCounted()) {
diff --git a/src/msg.h b/src/msg.h
index 661d989..10ad0d2 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -105,27 +105,21 @@
bool IsCounted() const { return static_cast<bool>(count_); }
template <class Body>
- bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
- {
- return Make(shm, Pack(shm, head, body));
- }
- template <class Body>
- bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
+ inline bool MakeRC(SharedMemory &shm, const BHMsgHead &head, const Body &body)
{
return MakeRC(shm, Pack(shm, head, body));
}
- bool MakeRC(SharedMemory &shm, MsgI &a)
+
+ bool EnableRefCount(SharedMemory &shm);
+
+ template <class Body>
+ inline bool Make(SharedMemory &shm, const BHMsgHead &head, const Body &body)
{
- if (a.IsCounted()) {
- *this = a;
- AddRef();
- return true;
- } else {
- void *p = a.ptr_.get();
- a.ptr_ = 0;
- return MakeRC(shm, p);
- }
+ void *p = Pack(shm, head, body);
+ auto NeedRefCount = [&]() { return head.type() == kMsgTypePublish; };
+ return NeedRefCount() ? MakeRC(shm, p) : Make(shm, p);
}
+
bool ParseHead(BHMsgHead &head) const;
template <class Body>
bool ParseBody(Body &body) const
diff --git a/src/proto.h b/src/proto.h
index 2057711..da3bde6 100644
--- a/src/proto.h
+++ b/src/proto.h
@@ -74,5 +74,5 @@
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id, const std::string &msgid);
BHMsgHead InitMsgHead(const MsgType type, const std::string &proc_id);
// inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
-
+inline bool IsSuccess(const ErrorCode ec) { return ec == eSuccess; }
#endif // end of include guard: PROTO_UA9UWKL1
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
deleted file mode 100644
index 471c63c..0000000
--- a/src/pubsub.cpp
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: pubsub.cpp
- *
- * Description:
- *
- * Version: 1.0
- * Created: 2021骞�03鏈�24鏃� 18鏃�44鍒�13绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (),
- * Organization:
- *
- * =====================================================================================
- */
-#include "pubsub.h"
-#include "bh_util.h"
-#include "defs.h"
-
-using namespace std::chrono_literals;
-using namespace bhome_msg;
-
-bool SocketPublish::Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms)
-{
- try {
- MsgPublish pub;
- pub.set_topic(topic);
- pub.set_data(data, size);
- BHMsgHead head(InitMsgHead(GetType(pub), proc_id));
- MsgI imsg;
- if (imsg.MakeRC(shm(), head, pub)) {
- DEFER1(imsg.Release(shm()));
- return ShmMsgQueue::Send(shm(), BHTopicBusAddress(), imsg, timeout_ms);
- }
- } catch (...) {
- }
- return false;
-}
-namespace
-{
-inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
-
-} // namespace
-bool SocketSubscribe::Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms)
-{
- try {
- MsgSubscribe sub;
- for (auto &topic : topics) {
- sub.add_topics(topic);
- }
- BHMsgHead head(InitMsgHead(GetType(sub), proc_id));
- AddRoute(head, mq().Id());
-
- return Send(&BHTopicBusAddress(), head, sub, timeout_ms);
- } catch (...) {
- return false;
- }
-}
-
-bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
-{
- auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
- if (head.type() == kMsgTypePublish) {
- MsgPublish pub;
- if (imsg.ParseBody(pub)) {
- tdcb(head.proc_id(), pub.topic(), pub.data());
- }
- } else {
- // ignored, or dropped
- }
- };
-
- return tdcb && Start(AsyncRecvProc, nworker);
-}
-
-bool SocketSubscribe::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
-{
- MsgI msg;
- BHMsgHead head;
- if (SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
- MsgPublish pub;
- if (msg.ParseBody(pub)) {
- head.mutable_proc_id()->swap(proc_id);
- pub.mutable_topic()->swap(topic);
- pub.mutable_data()->swap(data);
- return true;
- }
- }
- return false;
-}
\ No newline at end of file
diff --git a/src/pubsub.h b/src/pubsub.h
deleted file mode 100644
index bd60fcd..0000000
--- a/src/pubsub.h
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: pubsub.h
- *
- * Description:
- *
- * Version: 1.0
- * Created: 2021骞�03鏈�24鏃� 18鏃�44鍒�36绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: Li Chao (),
- * Organization:
- *
- * =====================================================================================
- */
-#ifndef PUBSUB_4KGRA997
-#define PUBSUB_4KGRA997
-
-#include "defs.h"
-#include "socket.h"
-#include <string>
-
-class SocketPublish
-{
- typedef ShmSocket Socket;
- Socket::Shm &shm_;
- Socket::Shm &shm() { return shm_; }
-
-public:
- SocketPublish(Socket::Shm &shm) :
- shm_(shm) {}
- SocketPublish() :
- SocketPublish(BHomeShm()) {}
- bool Publish(const std::string &proc_id, const Topic &topic, const void *data, const size_t size, const int timeout_ms);
-};
-
-// socket subscribe
-class SocketSubscribe : private ShmSocket
-{
- typedef ShmSocket Socket;
-
-public:
- SocketSubscribe(Socket::Shm &shm) :
- Socket(shm, 64) {}
- SocketSubscribe() :
- SocketSubscribe(BHomeShm()) {}
- ~SocketSubscribe() { Stop(); }
-
- typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
- bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
- bool Stop() { return Socket::Stop(); }
- bool Subscribe(const std::string &proc_id, const std::vector<Topic> &topics, const int timeout_ms);
- bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
-};
-
-#endif // end of include guard: PUBSUB_4KGRA997
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 32ccfae..88c13ec 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -136,25 +136,8 @@
static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
- template <class... Extra>
- bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, Extra const &...extra)
- {
- return Send(shm(), remote_id, msg, timeout_ms, extra...);
- }
- template <class Body, class... Extra>
- bool Send(const MQId &remote_id, const BHMsgHead &head, const Body &body, const int timeout_ms, Extra const &...extra)
- {
- MsgI msg;
- if (msg.Make(shm(), head, body)) {
- if (Send(shm(), remote_id, msg, timeout_ms, extra...)) {
- return true;
- } else {
- msg.Release(shm());
- }
- }
- return false;
- }
-
+ template <class... Rest>
+ bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
size_t Pending() const { return data()->size(); }
};
diff --git a/src/socket.cpp b/src/socket.cpp
index f2b29f4..116175d 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -43,51 +43,37 @@
bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
- auto onRecv = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
- auto Find = [&](RecvCB &cb) {
- std::lock_guard<std::mutex> lock(mutex());
- const std::string &msgid = head.msg_id();
- auto pos = async_cbs_.find(msgid);
- if (pos != async_cbs_.end()) {
- cb.swap(pos->second);
- async_cbs_.erase(pos);
- return true;
- } else {
- return false;
- }
- };
-
+ auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
RecvCB cb;
- if (Find(cb)) {
+ if (async_cbs_->Find(head.msg_id(), cb)) {
cb(socket, imsg, head);
} else if (onData) {
onData(socket, imsg, head);
} // else ignored, or dropped
};
- std::lock_guard<std::mutex> lock(mutex_);
- StopNoLock();
- auto RecvProc = [this, onRecv, onIdle]() {
- while (run_) {
- try {
- MsgI imsg;
- if (mq().Recv(imsg, 10)) {
- DEFER1(imsg.Release(shm()));
- BHMsgHead head;
- if (imsg.ParseHead(head)) {
- onRecv(*this, imsg, head);
- }
- } else if (onIdle) {
- onIdle(*this);
+ auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() {
+ try {
+ MsgI imsg;
+ if (mq().Recv(imsg, 10)) {
+ DEFER1(imsg.Release(shm()));
+ BHMsgHead head;
+ if (imsg.ParseHead(head)) {
+ onRecvWithPerMsgCB(*this, imsg, head);
}
- } catch (...) {
+ } else if (onIdle) {
+ onIdle(*this);
}
+ } catch (...) {
}
};
+ std::lock_guard<std::mutex> lock(mutex_);
+ StopNoLock();
+
run_.store(true);
for (int i = 0; i < nworker; ++i) {
- workers_.emplace_back(RecvProc);
+ workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } });
}
return true;
}
diff --git a/src/socket.h b/src/socket.h
index 7c4f83f..f73bee5 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -19,6 +19,7 @@
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
+#include "bh_util.h"
#include "defs.h"
#include "shm_queue.h"
#include <atomic>
@@ -34,6 +35,15 @@
class ShmSocket : private boost::noncopyable
{
+ template <class DoSend>
+ inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend)
+ {
+ bool r = false;
+ DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); });
+ r = doSend(msg);
+ return r;
+ }
+
protected:
typedef bhome_shm::ShmMsgQueue Queue;
@@ -55,30 +65,28 @@
bool Stop();
size_t Pending() const { return mq().Pending(); }
- bool Send(const void *id, const MsgI &imsg, const int timeout_ms)
+ bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms)
{
- return mq().Send(*static_cast<const MQId *>(id), imsg, timeout_ms);
+ assert(valid_remote);
+ return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms);
}
//TODO reimplment, using async.
bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
template <class Body>
- bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb = RecvCB())
+ bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb)
{
- assert(valid_remote);
- try {
- if (cb) {
- auto RegisterCB = [&]() {
- std::lock_guard<std::mutex> lock(mutex());
- async_cbs_.emplace(head.msg_id(), cb);
- };
- return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms, RegisterCB);
- } else {
- return mq().Send(*static_cast<const MQId *>(valid_remote), head, body, timeout_ms);
- }
- } catch (...) {
- return false;
- }
+ auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { async_cbs_->Add(head.msg_id(), cb); }); };
+ MsgI msg;
+ return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
+ }
+
+ template <class Body>
+ bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms)
+ {
+ auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); };
+ MsgI msg;
+ return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
}
template <class Body>
@@ -133,7 +141,26 @@
std::atomic<bool> run_;
Queue mq_;
- std::unordered_map<std::string, RecvCB> async_cbs_;
+ class AsyncCBs
+ {
+ std::unordered_map<std::string, RecvCB> store_;
+
+ public:
+ bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; }
+ bool Find(const std::string &id, RecvCB &cb)
+ {
+ auto pos = store_.find(id);
+ if (pos != store_.end()) {
+ cb.swap(pos->second);
+ store_.erase(pos);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
+
+ Synced<AsyncCBs> async_cbs_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index c6c9771..d76c03a 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -76,29 +76,34 @@
shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
{
SockNode().Start();
+ SockClient().Start();
+ SockServer().Start();
}
+
TopicNode::~TopicNode()
{
StopAll();
- SockNode().Stop();
}
+
void TopicNode::StopAll()
{
- ServerStop();
- ClientStopWorker();
+ SockServer().Stop();
+ SockClient().Stop();
+ SockNode().Stop();
}
bool TopicNode::Register(const MsgRegister &body, MsgCommonReply &reply_body, const int timeout_ms)
{
+ auto &sock = SockNode();
+
auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
- AddRoute(head, SockNode().id());
+ AddRoute(head, sock.id());
MsgI reply;
DEFER1(reply.Release(shm_););
BHMsgHead reply_head;
- bool r = SockNode().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
- r = r && reply_head.type() == kMsgTypeCommonReply;
- r = r && reply.ParseBody(reply_body);
+ bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+ r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
if (r) {
info_ = body;
}
@@ -108,14 +113,15 @@
bool TopicNode::RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply_body, const int timeout_ms)
{
//TODO check registered
+ auto &sock = SockServer();
auto head(InitMsgHead(GetType(body), proc_id()));
- AddRoute(head, SockReply().id());
+ AddRoute(head, sock.id());
MsgI reply;
DEFER1(reply.Release(shm_););
BHMsgHead reply_head;
- bool r = SockReply().SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+ bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
r = r && reply_head.type() == kMsgTypeCommonReply;
r = r && reply.ParseBody(reply_body);
return r;
@@ -154,15 +160,17 @@
onIdle(sock);
};
- return rcb && SockReply().Start(onRecv, onIdle, nworker);
+ auto &sock = SockServer();
+ return rcb && sock.Start(onRecv, onIdle, nworker);
}
-bool TopicNode::ServerStop() { return SockReply().Stop(); }
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
{
+ auto &sock = SockServer();
+
MsgI imsg;
BHMsgHead head;
- if (SockReply().SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
+ if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
MsgRequestTopic request;
if (imsg.ParseBody(request)) {
request.mutable_topic()->swap(topic);
@@ -179,6 +187,8 @@
bool TopicNode::ServerSendReply(void *src_info, const std::string &data, const int timeout_ms)
{
+ auto &sock = SockServer();
+
SrcInfo *p = static_cast<SrcInfo *>(src_info);
DEFER1(delete p);
if (!p || p->route.empty()) {
@@ -192,7 +202,7 @@
head.add_route()->Swap(&p->route[i]);
}
- return SockReply().Send(p->route.back().mq_id().data(), head, body, timeout_ms);
+ return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms);
}
bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -211,12 +221,12 @@
return SockRequest().Start(onData, nworker);
}
-bool TopicNode::ClientStopWorker() { return SockRequest().Stop(); }
bool TopicNode::ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
{
auto Call = [&](const void *remote) {
auto &sock = SockRequest();
+
MsgRequestTopic req;
req.set_topic(topic);
req.set_data(data, size);
@@ -254,6 +264,7 @@
{
try {
auto &sock = SockRequest();
+
BHAddress addr;
if (ClientQueryRPCTopic(topic, addr, timeout_ms)) {
@@ -290,6 +301,7 @@
bool TopicNode::ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
auto &sock = SockRequest();
+
if (topic_query_cache_.Find(topic, addr)) {
return true;
}
@@ -319,4 +331,85 @@
} else {
}
return false;
+}
+
+// publish
+
+bool TopicNode::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
+{
+ try {
+ auto &sock = SockPub();
+
+ MsgPublish pub;
+ pub.set_topic(topic);
+ pub.set_data(data, size);
+ BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
+ AddRoute(head, sock.id());
+
+ MsgI reply;
+ DEFER1(reply.Release(shm()););
+ BHMsgHead reply_head;
+ MsgCommonReply reply_body;
+ return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
+ reply_head.type() == kMsgTypeCommonReply &&
+ reply.ParseBody(reply_body) &&
+ IsSuccess(reply_body.errmsg().errcode());
+ } catch (...) {
+ }
+ return false;
+}
+
+// subscribe
+
+bool TopicNode::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
+{
+ try {
+ auto &sock = SockSub();
+ MsgSubscribe sub;
+ for (auto &topic : topics) {
+ sub.add_topics(topic);
+ }
+ BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
+ AddRoute(head, sock.id());
+
+ return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms);
+ } catch (...) {
+ return false;
+ }
+}
+
+bool TopicNode::SubscribeStartWorker(const TopicDataCB &tdcb, int nworker)
+{
+ auto &sock = SockSub();
+
+ auto AsyncRecvProc = [this, tdcb](ShmSocket &, MsgI &imsg, BHMsgHead &head) {
+ if (head.type() == kMsgTypePublish) {
+ MsgPublish pub;
+ if (imsg.ParseBody(pub)) {
+ tdcb(head.proc_id(), pub.topic(), pub.data());
+ }
+ } else {
+ // ignored, or dropped
+ }
+ };
+
+ return tdcb && sock.Start(AsyncRecvProc, nworker);
+}
+
+bool TopicNode::RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms)
+{
+ auto &sock = SockSub();
+ MsgI msg;
+ DEFER1(msg.Release(shm()););
+ BHMsgHead head;
+ if (sock.SyncRecv(msg, head, timeout_ms) && head.type() == kMsgTypePublish) {
+ MsgPublish pub;
+ if (msg.ParseBody(pub)) {
+ head.mutable_proc_id()->swap(proc_id);
+ pub.mutable_topic()->swap(topic);
+ pub.mutable_data()->swap(data);
+ return true;
+ }
+ }
+ return false;
}
\ No newline at end of file
diff --git a/src/topic_node.h b/src/topic_node.h
index 8852af1..34fe2ee 100644
--- a/src/topic_node.h
+++ b/src/topic_node.h
@@ -19,7 +19,6 @@
#define TOPIC_NODE_YVKWA6TF
#include "msg.h"
-#include "pubsub.h"
#include "socket.h"
#include <memory>
@@ -32,23 +31,26 @@
SharedMemory &shm_;
MsgRegister info_;
+ SharedMemory &shm() { return shm_; }
+
public:
TopicNode(SharedMemory &shm);
~TopicNode();
+
+ void StopAll();
+ // topic node
bool Register(const MsgRegister &body, MsgCommonReply &reply, const int timeout_ms);
bool RegisterRPC(const MsgRegisterRPC &body, MsgCommonReply &reply, const int timeout_ms);
// topic rpc server
typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
bool ServerStart(OnRequest const &cb, const int nworker = 2);
- bool ServerStop();
bool ServerRecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
bool ServerSendReply(void *src_info, const std::string &data, const int timeout_ms);
// topic client
typedef std::function<void(const std::string &data)> RequestResultCB;
bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
- bool ClientStopWorker();
bool ClientAsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
bool ClientAsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB())
{
@@ -60,7 +62,14 @@
return ClientSyncRequest(topic, data.data(), data.size(), out, timeout_ms);
}
- void StopAll();
+ // publish
+ bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
+
+ // subscribe
+ typedef std::function<void(const std::string &proc_id, const Topic &topic, const std::string &data)> TopicDataCB;
+ bool SubscribeStartWorker(const TopicDataCB &tdcb, int nworker = 2);
+ bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
+ bool RecvSub(std::string &proc_id, Topic &topic, std::string &data, const int timeout_ms);
private:
bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
@@ -106,14 +115,17 @@
// some sockets may be the same one, using functions make it easy to change.
auto &SockNode() { return sock_node_; }
+ auto &SockPub() { return SockNode(); }
auto &SockSub() { return sock_sub_; }
auto &SockRequest() { return sock_request_; }
+ auto &SockClient() { return SockRequest(); }
auto &SockReply() { return sock_reply_; }
+ auto &SockServer() { return SockReply(); }
ShmSocket sock_node_;
ShmSocket sock_request_;
ShmSocket sock_reply_;
- SocketSubscribe sock_sub_;
+ ShmSocket sock_sub_;
TopicQueryCache topic_query_cache_;
};
diff --git a/src/topic_rpc.cpp b/src/topic_rpc.cpp
deleted file mode 100644
index 065a861..0000000
--- a/src/topic_rpc.cpp
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: topic_rpc.cpp
- *
- * Description: topic request/reply manager
- *
- * Version: 1.0
- * Created: 2021骞�03鏈�31鏃� 16鏃�29鍒�31绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: YOUR NAME (),
- * Organization:
- *
- * =====================================================================================
- */
-#include "topic_rpc.h"
-
-
-
diff --git a/src/topic_rpc.h b/src/topic_rpc.h
deleted file mode 100644
index 40ff985..0000000
--- a/src/topic_rpc.h
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * =====================================================================================
- *
- * Filename: topic_rpc.h
- *
- * Description:
- *
- * Version: 1.0
- * Created: 2021骞�03鏈�31鏃� 16鏃�30鍒�10绉�
- * Revision: none
- * Compiler: gcc
- *
- * Author: YOUR NAME (),
- * Organization:
- *
- * =====================================================================================
- */
-#ifndef TOPIC_RPC_JU1AYN5L
-#define TOPIC_RPC_JU1AYN5L
-
-#include "socket.h"
-
-// request/reply topic manager
-class RPCManager
-{
- ShmSocket socket_;
-
-public:
-};
-
-#endif // end of include guard: TOPIC_RPC_JU1AYN5L
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index d777f91..b1f11ac 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -40,6 +40,7 @@
body.set_data(str);
auto head(InitMsgHead(GetType(body), proc_id));
msg.MakeRC(shm, head, body);
+ assert(msg.IsCounted());
DEFER1(msg.Release(shm););
for (uint64_t i = 0; i < n; ++i) {
@@ -127,8 +128,8 @@
SharedMemory shm(shm_name, 1024 * 1024 * 50);
auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
- ShmMsgQueue srv(shm, qlen);
- ShmMsgQueue cli(shm, qlen);
+ ShmSocket srv(shm, qlen);
+ ShmSocket cli(shm, qlen);
MsgI request_rc;
MsgRequestTopic req_body;
@@ -156,9 +157,9 @@
req_body.set_topic("topic");
req_body.set_data(msg_content);
auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
- return cli.Send(srv.Id(), req_head, req_body, 100);
+ return cli.Send(&srv.id(), req_head, req_body, 100);
};
- auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
+ auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc, 1000); };
if (!ReqRC()) {
printf("********** client send error.\n");
@@ -166,7 +167,7 @@
}
MsgI msg;
BHMsgHead head;
- if (!cli.Recv(msg, 1000)) {
+ if (!cli.SyncRecv(msg, head, 1000)) {
printf("********** client recv error.\n");
} else {
DEFER1(msg.Release(shm));
@@ -187,8 +188,9 @@
BHMsgHead req_head;
while (!stop) {
- if (srv.Recv(req, 100)) {
+ if (srv.SyncRecv(req, req_head, 100)) {
DEFER1(req.Release(shm));
+
if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
auto &mqid = req_head.route()[0].mq_id();
MQId src_id;
@@ -198,9 +200,9 @@
reply_body.set_topic("topic");
reply_body.set_data(msg_content);
auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
- return srv.Send(src_id, reply_head, reply_body, 100);
+ return srv.Send(&src_id, reply_head, reply_body, 100);
};
- auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
+ auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc, 100); };
if (ReplyRC()) {
}
diff --git a/utest/utest.cpp b/utest/utest.cpp
index c925e22..f88eab9 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,8 +1,5 @@
#include "center.h"
#include "defs.h"
-#include "pubsub.h"
-#include "socket.h"
-#include "topic_node.h"
#include "util.h"
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
@@ -92,8 +89,12 @@
const uint64_t nmsg = 100 * 2;
const int timeout = 1000;
auto Sub = [&](int id, const std::vector<std::string> &topics) {
- SocketSubscribe client(shm);
- bool r = client.Subscribe(sub_proc_id, topics, timeout);
+ DemoNode client("client_" + std::to_string(id), shm);
+
+ bool r = client.Subscribe(topics, timeout);
+ if (!r) {
+ printf("client subscribe failed.\n");
+ }
std::mutex mutex;
std::condition_variable cv;
@@ -112,18 +113,19 @@
}
// printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
};
- client.StartRecv(OnTopicData, 1);
+ client.SubscribeStartWorker(OnTopicData, 1);
std::unique_lock<std::mutex> lk(mutex);
cv.wait(lk);
};
auto Pub = [&](const std::string &topic) {
- SocketPublish provider(shm);
+ DemoNode provider("server_" + topic, shm);
+
for (unsigned i = 0; i < nmsg; ++i) {
std::string data = topic + std::to_string(i) + std::string(1000, '-');
- bool r = provider.Publish(pub_proc_id, topic, data.data(), data.size(), timeout);
+ bool r = provider.Publish(topic, data.data(), data.size(), timeout);
if (!r) {
printf("pub ret: %s\n", r ? "ok" : "fail");
}
@@ -184,15 +186,7 @@
std::atomic<bool> run(true);
auto Client = [&](const std::string &topic, const int nreq) {
- TopicNode client(shm);
- MsgRegister reg;
- reg.mutable_proc()->set_proc_id(client_proc_id + topic);
- MsgCommonReply reply_body;
-
- if (!client.Register(reg, reply_body, 1000)) {
- printf("client register failed\n");
- return;
- }
+ DemoNode client(client_proc_id + topic, shm);
std::atomic<int> count(0);
std::string reply;
@@ -218,21 +212,13 @@
do {
std::this_thread::yield();
} while (count.load() < nreq);
- client.ClientStopWorker();
+ client.StopAll();
printf("request %s %d done ", topic.c_str(), count.load());
};
+
std::atomic_uint64_t server_msg_count(0);
auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
- TopicNode server(shm);
- MsgRegister reg;
- reg.mutable_proc()->set_proc_id(server_proc_id);
- reg.mutable_proc()->set_name(name);
- MsgCommonReply reply_body;
-
- if (!server.Register(reg, reply_body, 100)) {
- printf("server register failed\n");
- return;
- }
+ DemoNode server(name, shm);
auto onData = [&](const std::string &topic, const std::string &data, std::string &reply) {
++server_msg_count;
@@ -245,6 +231,7 @@
for (auto &topic : topics) {
rpc.add_topics(topic);
}
+ MsgCommonReply reply_body;
if (!server.RegisterRPC(rpc, reply_body, 100)) {
printf("server register topic failed\n");
return;
@@ -262,7 +249,7 @@
clients.Launch(Client, t, 1000 * 1);
}
clients.WaitAll();
- printf("clients done, server replyed: %d\n", server_msg_count.load());
+ printf("clients done, server replyed: %ld\n", server_msg_count.load());
run = false;
servers.WaitAll();
}
diff --git a/utest/util.h b/utest/util.h
index ca58cd7..28b636e 100644
--- a/utest/util.h
+++ b/utest/util.h
@@ -20,9 +20,7 @@
#define UTIL_W8A0OA5U
#include "bh_util.h"
-#include "msg.h"
-#include "shm.h"
-#include "shm_queue.h"
+#include "topic_node.h"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/noncopyable.hpp>
#include <boost/test/unit_test.hpp>
@@ -107,4 +105,23 @@
~ShmRemover() { SharedMemory::Remove(name_); }
};
+class DemoNode : public TopicNode
+{
+ std::string id_;
+
+public:
+ DemoNode(const std::string &id, SharedMemory &shm) :
+ TopicNode(shm), id_(id) { Init(); }
+ void Init()
+ {
+ MsgRegister reg;
+ reg.mutable_proc()->set_proc_id(id_);
+ MsgCommonReply reply_body;
+
+ if (!Register(reg, reply_body, 1000)) {
+ printf("node %s register failed\n", id_.c_str());
+ }
+ }
+};
+
#endif // end of include guard: UTIL_W8A0OA5U
--
Gitblit v1.8.0