From d26327b3cde043a9470dcd7fea8e704ea517fdae Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期四, 01 四月 2021 19:26:57 +0800
Subject: [PATCH] add req/rep center;
---
src/pubsub_center.h | 10
src/reqrep_center.cpp | 121 ++++++++++++
src/reqrep.h | 35 +++
src/msg.h | 8
src/socket.cpp | 9
proto/source/bhome_msg.proto | 28 ++
src/defs.h | 4
utest/utest.cpp | 58 +++++
src/msg.cpp | 68 +++++-
utest/speed_test.cpp | 30 +-
src/pubsub.h | 2
src/reqrep_center.h | 61 ++++++
src/pubsub.cpp | 4
src/pubsub_center.cpp | 4
src/reqrep.cpp | 112 ++++++++++-
15 files changed, 482 insertions(+), 72 deletions(-)
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
index a88780b..149d8ee 100644
--- a/proto/source/bhome_msg.proto
+++ b/proto/source/bhome_msg.proto
@@ -25,8 +25,11 @@
kMsgTypePublish = 3;
kMsgTypeSubscribe = 4;
kMsgTypeUnsubscribe = 5;
- kMsgTypeQueryTopic = 6;
- kMsgTypeQueryTopicReply = 7;
+
+ kMsgTypeProcQueryTopic = 6;
+ kMsgTypeProcQueryTopicReply = 7;
+ kMsgTypeProcRegisterTopics = 8;
+ kMsgTypeProcHeartbeat = 9;
}
message DataPub {
@@ -47,10 +50,27 @@
bytes data = 1;
}
-message DataQueryTopic {
+message ProcInfo
+{
+ bytes name = 1;
+ bytes info = 2;
+}
+
+message DataProcRegister
+{
+ ProcInfo proc = 1;
+ repeated bytes topics = 2;
+}
+
+message DataProcHeartbeat
+{
+ ProcInfo proc = 1;
+}
+
+message DataProcQueryTopic {
bytes topic = 1;
}
-message DataQueryTopicReply {
+message DataProcQueryTopicReply {
BHAddress address = 1;
}
diff --git a/src/defs.h b/src/defs.h
index fcdcc70..db73634 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -24,8 +24,8 @@
typedef boost::uuids::uuid MQId;
-const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
-const MQId kBHTopicRPCId = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
+const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
+const MQId kBHTopicReqRepCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
namespace bhome_shm
diff --git a/src/msg.cpp b/src/msg.cpp
index 41dd459..c1dfff9 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -24,36 +24,61 @@
const uint32_t kMsgTag = 0xf1e2d3c4;
const uint32_t kMsgPrefixLen = 4;
-BHMsg InitMsg(MsgType type)
+inline void AddRoute(BHMsg &msg, const MQId &id) { msg.add_route()->set_mq_id(&id, sizeof(id)); }
+
+std::string RandId()
+{
+ boost::uuids::uuid id = boost::uuids::random_generator()();
+ return std::string((char *) &id, sizeof(id));
+}
+BHMsg InitMsg(MsgType type, const std::string &msgid = RandId())
{
BHMsg msg;
+ msg.set_msg_id(msgid);
msg.set_type(type);
time_t tm = 0;
msg.set_timestamp(time(&tm));
return msg;
}
-BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size)
-{
- assert(data && size);
- BHMsg msg(InitMsg(kMsgTypeRequest));
- msg.set_body(data, size);
- msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
- return msg;
-}
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
{
+ BHMsg msg(InitMsg(kMsgTypeRequest));
+ AddRoute(msg, src_id);
DataRequest req;
req.set_topic(topic);
req.set_data(data, size);
- const std::string &body(req.SerializeAsString());
- return MakeRequest(src_id, body.data(), body.size());
+ msg.set_body(req.SerializeAsString());
+ return msg;
}
-BHMsg MakeReply(const void *data, const size_t size)
+BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics)
+{
+ BHMsg msg(InitMsg(kMsgTypeProcRegisterTopics));
+ AddRoute(msg, src_id);
+ DataProcRegister reg;
+ reg.mutable_proc()->Swap(&info);
+ for (auto &t : topics) {
+ reg.add_topics(t);
+ }
+ msg.set_body(reg.SerializeAsString());
+ return msg;
+}
+
+BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info)
+{
+ BHMsg msg(InitMsg(kMsgTypeProcHeartbeat));
+ AddRoute(msg, src_id);
+ DataProcRegister reg;
+ reg.mutable_proc()->Swap(&info);
+ msg.set_body(reg.SerializeAsString());
+ return msg;
+}
+
+BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size)
{
assert(data && size);
- BHMsg msg(InitMsg(kMsgTypeReply));
+ BHMsg msg(InitMsg(kMsgTypeReply, src_msgid));
DataReply reply;
reply.set_data(data, size);
msg.set_body(reply.SerializeAsString());
@@ -64,7 +89,7 @@
{
assert(sub_unsub == kMsgTypeSubscribe || sub_unsub == kMsgTypeUnsubscribe);
BHMsg msg(InitMsg(sub_unsub));
- msg.add_route()->set_mq_id(&client, sizeof(client));
+ AddRoute(msg, client);
DataSub subs;
for (auto &t : topics) {
subs.add_topics(t);
@@ -87,14 +112,23 @@
return msg;
}
-BHMsg MakeQueryTopic(const std::string &topic)
+BHMsg MakeQueryTopic(const MQId &client, const std::string &topic)
{
- BHMsg msg(InitMsg(kMsgTypeQueryTopic));
- DataQueryTopic query;
+ BHMsg msg(InitMsg(kMsgTypeProcQueryTopic));
+ AddRoute(msg, client);
+ DataProcQueryTopic query;
query.set_topic(topic);
msg.set_body(query.SerializeAsString());
return msg;
}
+BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid)
+{
+ BHMsg msg(InitMsg(kMsgTypeProcQueryTopicReply, msgid));
+ DataProcQueryTopicReply reply;
+ reply.mutable_address()->set_mq_id(mqid);
+ msg.set_body(reply.SerializeAsString());
+ return msg;
+}
void *Pack(SharedMemory &shm, const BHMsg &msg)
{
diff --git a/src/msg.h b/src/msg.h
index ea1e636..8c345fd 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -59,10 +59,12 @@
int num_ = 1;
};
-BHMsg MakeQueryTopic(const std::string &topic);
-BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size);
+BHMsg MakeQueryTopic(const MQId &client, const std::string &topic);
+BHMsg MakeQueryTopicReply(const std::string &mqid, const std::string &msgid);
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size);
-BHMsg MakeReply(const void *data, const size_t size);
+BHMsg MakeReply(const std::string &src_msgid, const void *data, const size_t size);
+BHMsg MakeRegister(const MQId &src_id, ProcInfo info, const std::vector<std::string> &topics);
+BHMsg MakeHeartbeat(const MQId &src_id, ProcInfo info);
BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakePub(const std::string &topic, const void *data, const size_t size);
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index cfc77ab..4449c31 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -30,7 +30,7 @@
return false;
}
DEFER1(imsg.Release(shm()));
- return ShmMsgQueue::Send(shm(), kBHBusQueueId, imsg, timeout_ms);
+ return ShmMsgQueue::Send(shm(), kBHTopicBus, imsg, timeout_ms);
} catch (...) {
return false;
}
@@ -39,7 +39,7 @@
bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
{
try {
- return mq().Send(kBHBusQueueId, MakeSub(mq().Id(), topics), timeout_ms);
+ return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms);
} catch (...) {
return false;
}
diff --git a/src/pubsub.h b/src/pubsub.h
index cad9f61..ac5a9d2 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -50,9 +50,11 @@
Socket(shm, 64) {}
SocketSubscribe() :
SocketSubscribe(BHomeShm()) {}
+ ~SocketSubscribe() { Stop(); }
typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB;
bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
+ bool Stop() { return Socket::Stop(); }
bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
};
diff --git a/src/pubsub_center.cpp b/src/pubsub_center.cpp
index 33c16be..afd07bf 100644
--- a/src/pubsub_center.cpp
+++ b/src/pubsub_center.cpp
@@ -17,9 +17,7 @@
*/
#include "pubsub_center.h"
#include "bh_util.h"
-
-PubSubCenter::PubSubCenter(SharedMemory &shm) :
- socket_(shm) {}
+using namespace bhome_shm;
bool PubSubCenter::Start(const int nworker)
{
diff --git a/src/pubsub_center.h b/src/pubsub_center.h
index 866216e..b752216 100644
--- a/src/pubsub_center.h
+++ b/src/pubsub_center.h
@@ -23,7 +23,6 @@
#include <mutex>
#include <set>
#include <unordered_map>
-using namespace bhome_shm;
// publish/subcribe manager.
class PubSubCenter
@@ -31,18 +30,19 @@
class SocketBus : public ShmSocket
{
public:
- SocketBus(SharedMemory &shm) :
- ShmSocket(shm, &kBHBusQueueId, 1000) {}
+ SocketBus(ShmSocket::Shm &shm) :
+ ShmSocket(shm, &kBHTopicBus, 1000) {}
using ShmSocket::shm;
};
SocketBus socket_;
+ ShmSocket::Shm &shm() { return socket_.shm(); }
std::mutex mutex_;
typedef std::set<MQId> Clients;
std::unordered_map<std::string, Clients> records_;
- ShmSocket::Shm &shm() { return socket_.shm(); }
public:
- PubSubCenter(SharedMemory &shm);
+ PubSubCenter(ShmSocket::Shm &shm) :
+ socket_(shm) {}
PubSubCenter() :
PubSubCenter(BHomeShm()) {}
~PubSubCenter() { Stop(); }
diff --git a/src/reqrep.cpp b/src/reqrep.cpp
index e1636fd..bed6496 100644
--- a/src/reqrep.cpp
+++ b/src/reqrep.cpp
@@ -16,6 +16,7 @@
* =====================================================================================
*/
#include "reqrep.h"
+#include "bh_util.h"
#include "msg.h"
#include <chrono>
#include <condition_variable>
@@ -73,30 +74,33 @@
BHAddress addr;
if (QueryRPCTopic(topic, addr, timeout_ms)) {
return Call(addr.mq_id().data());
+ } else {
+ return false;
}
} catch (...) {
return false;
}
}
-bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
+bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
{
try {
BHAddress addr;
if (QueryRPCTopic(topic, addr, timeout_ms)) {
- const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
+ const BHMsg &req(MakeRequest(mq().Id(), topic, data, size));
BHMsg reply;
- if (SyncSendAndRecv(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
+ if (SyncSendAndRecv(addr.mq_id().data(), &req, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
DataReply dr;
- if (dr.ParseFromString(msg.body())) {
+ if (dr.ParseFromString(reply.body())) {
dr.mutable_data()->swap(out);
return true;
}
}
+ } else {
}
} catch (...) {
- return false;
}
+ return false;
}
bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
@@ -132,11 +136,13 @@
if (!st->canceled) {
static_cast<BHMsg *>(result)->Swap(&msg);
st->cv.notify_one();
- } // else result is no longer valid.
+ } else {
+ }
};
std::unique_lock<std::mutex> lk(st->mutex);
- if (AsyncSend(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
+ bool sendok = AsyncSend(remote, msg, timeout_ms, OnRecv);
+ if (sendok && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
return true;
} else {
st->canceled = true;
@@ -149,16 +155,100 @@
bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
+ if (tmp_cache_.first == topic) {
+ addr = tmp_cache_.second;
+ return true;
+ }
+
BHMsg result;
- const BHMsg &msg = MakeQueryTopic(topic);
- if (SyncSendAndRecv(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
- if (result.type() == kMsgTypeQueryTopicReply) {
- DataQueryTopicReply reply;
+ const BHMsg &msg = MakeQueryTopic(mq().Id(), topic);
+ if (SyncSendAndRecv(&kBHTopicReqRepCenter, &msg, &result, timeout_ms)) {
+ if (result.type() == kMsgTypeProcQueryTopicReply) {
+ DataProcQueryTopicReply reply;
if (reply.ParseFromString(result.body())) {
addr = reply.address();
+ tmp_cache_.first = topic;
+ tmp_cache_.second = addr;
return !addr.mq_id().empty();
}
+ }
+ } else {
+ }
+ return false;
+}
+
+// reply socket
+namespace
+{
+struct SrcInfo {
+ std::vector<BHAddress> route;
+ std::string msg_id;
+};
+
+} // namespace
+
+bool SocketReply::Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms)
+{
+ //TODO check reply?
+ return SyncSend(&kBHTopicReqRepCenter, MakeRegister(mq().Id(), proc_info, topics), timeout_ms);
+}
+bool SocketReply::Heartbeat(const ProcInfo &proc_info, const int timeout_ms)
+{
+ return SyncSend(&kBHTopicReqRepCenter, MakeHeartbeat(mq().Id(), proc_info), timeout_ms);
+}
+bool SocketReply::StartWorker(const OnRequest &rcb, int nworker)
+{
+ auto onRecv = [this, rcb](BHMsg &msg) {
+ if (msg.type() == kMsgTypeRequest && msg.route_size() > 0) {
+ DataRequest req;
+ if (req.ParseFromString(msg.body())) {
+ std::string out;
+ if (rcb(req.topic(), req.data(), out)) {
+ BHMsg msg_reply(MakeReply(msg.msg_id(), out.data(), out.size()));
+ for (int i = 0; i < msg.route_size() - 1; ++i) {
+ msg.add_route()->Swap(msg.mutable_route(i));
+ }
+ SyncSend(msg.route().rbegin()->mq_id().data(), msg_reply, 100);
+ }
+ }
+ } else {
+ // ignored, or dropped
+ }
+ };
+
+ return rcb && Start(onRecv, nworker);
+}
+
+bool SocketReply::RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms)
+{
+ BHMsg msg;
+ if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypeRequest) {
+ DataRequest request;
+ if (request.ParseFromString(msg.body())) {
+ request.mutable_topic()->swap(topic);
+ request.mutable_data()->swap(data);
+ SrcInfo *p = new SrcInfo;
+ p->route.assign(msg.route().begin(), msg.route().end());
+ p->msg_id = msg.msg_id();
+ src_info = p;
+ return true;
}
}
return false;
}
+
+bool SocketReply::SendReply(void *src_info, const std::string &data, const int timeout_ms)
+{
+ SrcInfo *p = static_cast<SrcInfo *>(src_info);
+ DEFER1(delete p);
+ if (!p || p->route.empty()) {
+ return false;
+ }
+
+ BHMsg msg(MakeReply(p->msg_id, data.data(), data.size()));
+ for (unsigned i = 0; i < p->route.size() - 1; ++i) {
+ msg.add_route()->Swap(&p->route[i]);
+ }
+
+ return SyncSend(p->route.back().mq_id().data(), msg, timeout_ms);
+}
\ No newline at end of file
diff --git a/src/reqrep.h b/src/reqrep.h
index 02cc86f..2971403 100644
--- a/src/reqrep.h
+++ b/src/reqrep.h
@@ -19,9 +19,12 @@
#define REQREP_ACEH09NK
#include "defs.h"
+#include "msg.h"
#include "socket.h"
#include <functional>
#include <unordered_map>
+
+using bhome::msg::ProcInfo;
class SocketRequest : private ShmSocket
{
@@ -32,19 +35,21 @@
Socket(shm, 64) { StartWorker(); }
SocketRequest() :
SocketRequest(BHomeShm()) {}
+ ~SocketRequest() { Stop(); }
typedef std::function<void(const std::string &data)> RequestResultCB;
bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
+ bool Stop() { return Socket::Stop(); }
bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
{
return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
}
- bool SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
- bool SyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, std::string &out)
+ bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
+ bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms)
{
- return SyncRequest(topic, data.data(), data.size(), timeout_ms, out);
+ return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
}
private:
@@ -52,6 +57,30 @@
bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
std::unordered_map<std::string, RecvCB> async_cbs_;
+
+ std::pair<std::string, bhome::msg::BHAddress> tmp_cache_;
+};
+
+class SocketReply : private ShmSocket
+{
+ typedef ShmSocket Socket;
+
+public:
+ SocketReply(Socket::Shm &shm) :
+ Socket(shm, 64) {}
+ SocketReply() :
+ SocketReply(BHomeShm()) {}
+ ~SocketReply() { Stop(); }
+
+ typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
+ bool StartWorker(const OnRequest &rcb, int nworker = 2);
+ bool Stop() { return Socket::Stop(); }
+ bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
+ bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
+ bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
+ bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
+
+private:
};
#endif // end of include guard: REQREP_ACEH09NK
diff --git a/src/reqrep_center.cpp b/src/reqrep_center.cpp
new file mode 100644
index 0000000..5f1873e
--- /dev/null
+++ b/src/reqrep_center.cpp
@@ -0,0 +1,121 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: reqrep_center.cpp
+ *
+ * Description: topic request/reply center
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�01鏃� 14鏃�08鍒�50绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "reqrep_center.h"
+#include "bh_util.h"
+using namespace bhome_shm;
+
+struct A {
+ void F(int){};
+};
+
+namespace
+{
+inline uint64_t Now()
+{
+ time_t t;
+ return time(&t);
+}
+
+} // namespace
+bool ReqRepCenter::Start(const int nworker)
+{
+ auto onRecv = [&](BHMsg &msg) {
+#ifndef NDEBUG
+ static std::atomic<time_t> last(0);
+ time_t now = 0;
+ time(&now);
+ if (last.exchange(now) < now) {
+ printf("bus queue size: %ld\n", socket_.Pending());
+ }
+#endif
+ if (msg.route_size() == 0) {
+ return;
+ }
+ auto &src_mq = msg.route(0).mq_id();
+
+ auto OnRegister = [&]() {
+ DataProcRegister reg;
+ if (!reg.ParseFromString(msg.body())) {
+ return;
+ }
+ ProcInfo pi;
+ pi.server_mqid_ = src_mq;
+ pi.proc_id_ = reg.proc().name();
+ pi.ext_info_ = reg.proc().info();
+ pi.timestamp_ = Now();
+
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (auto &t : reg.topics()) {
+ topic_mq_[t] = pi.server_mqid_;
+ }
+ procs_[pi.proc_id_] = pi;
+ };
+
+ auto OnHeartbeat = [&]() {
+ DataProcHeartbeat hb;
+ if (!hb.ParseFromString(msg.body())) {
+ return;
+ }
+
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto pos = procs_.find(hb.proc().name());
+ if (pos != procs_.end() && pos->second.server_mqid_ == src_mq) { // both name and mq should be the same.
+ pos->second.timestamp_ = Now();
+ pos->second.ext_info_ = hb.proc().info();
+ }
+ };
+
+ auto OnQueryTopic = [&]() {
+ DataProcQueryTopic query;
+ if (!query.ParseFromString(msg.body())) {
+ return;
+ }
+
+ std::string dest;
+ auto FindDest = [&]() {
+ std::lock_guard<std::mutex> lock(mutex_);
+ auto pos = topic_mq_.find(query.topic());
+ if (pos != topic_mq_.end()) {
+ dest = pos->second;
+ return true;
+ } else {
+ return false;
+ }
+ };
+ if (FindDest()) {
+ MQId remote;
+ memcpy(&remote, msg.route().rbegin()->mq_id().data(), sizeof(remote));
+ MsgI imsg;
+ if (!imsg.Make(shm(), MakeQueryTopicReply(dest, msg.msg_id()))) { return; }
+ if (!ShmMsgQueue::Send(shm(), remote, imsg, 100)) {
+ imsg.Release(shm());
+ }
+ }
+ };
+
+ switch (msg.type()) {
+ case kMsgTypeProcRegisterTopics: OnRegister(); break;
+ case kMsgTypeProcHeartbeat: OnHeartbeat(); break;
+ case kMsgTypeProcQueryTopic: OnQueryTopic(); break;
+ default: break;
+ }
+ };
+
+ const int kMaxWorker = 16;
+ return socket_.Start(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
+}
\ No newline at end of file
diff --git a/src/reqrep_center.h b/src/reqrep_center.h
new file mode 100644
index 0000000..2ca7295
--- /dev/null
+++ b/src/reqrep_center.h
@@ -0,0 +1,61 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: reqrep_center.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�04鏈�01鏃� 14鏃�09鍒�13绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef REQREP_CENTER_US3RBM60
+#define REQREP_CENTER_US3RBM60
+
+#include "defs.h"
+#include "socket.h"
+#include <chrono>
+#include <mutex>
+#include <set>
+
+class ReqRepCenter
+{
+ class Socket : public ShmSocket
+ {
+ public:
+ Socket(ShmSocket::Shm &shm) :
+ ShmSocket(shm, &kBHTopicReqRepCenter, 1000) {}
+ using ShmSocket::shm;
+ };
+ Socket socket_;
+ ShmSocket::Shm &shm() { return socket_.shm(); }
+ struct ProcInfo {
+ std::string proc_id_; // unique name
+ std::string server_mqid_;
+ std::string ext_info_; // maybe json.
+ uint64_t timestamp_ = 0;
+ };
+
+ typedef std::string Dests;
+
+ std::mutex mutex_;
+ std::unordered_map<std::string, Dests> topic_mq_;
+ std::unordered_map<std::string, ProcInfo> procs_;
+
+public:
+ ReqRepCenter(ShmSocket::Shm &shm) :
+ socket_(shm) {}
+ ReqRepCenter() :
+ ReqRepCenter(BHomeShm()) {}
+ ~ReqRepCenter() { Stop(); }
+ bool Start(const int nworker = 2);
+ bool Stop() { return socket_.Stop(); }
+};
+
+#endif // end of include guard: REQREP_CENTER_US3RBM60
diff --git a/src/socket.cpp b/src/socket.cpp
index 4c2fc6b..b9519be 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -46,7 +46,7 @@
ShmSocket::~ShmSocket()
{
- Stop();
+ Stop(); //TODO should stop in sub class, incase thread access sub class data.
}
bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
@@ -102,12 +102,7 @@
bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
{
- std::lock_guard<std::mutex> lock(mutex_);
- if (!mq_ || RunningNoLock()) {
- return false;
- } else {
- return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
- }
+ return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
}
bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 35465bb..dc64cc0 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -24,9 +24,9 @@
{
const std::string shm_name("ShmSpeed");
ShmRemover auto_remove(shm_name);
- const int mem_size = 1024 * 1024 * 50;
- MQId id = boost::uuids::random_generator()();
- const int timeout = 100;
+ const int mem_size = 1024 * 1024 * 50;
+ MQId id = boost::uuids::random_generator()();
+ const int timeout = 100;
const uint32_t data_size = 4000;
auto Writer = [&](int writer_id, uint64_t n) {
@@ -35,7 +35,7 @@
std::string str(data_size, 'a');
MsgI msg;
DEFER1(msg.Release(shm););
- msg.MakeRC(shm, MakeRequest(mq.Id(), str.data(), str.size()));
+ msg.MakeRC(shm, MakeRequest(mq.Id(), "topic", str.data(), str.size()));
for (uint64_t i = 0; i < n; ++i) {
// mq.Send(id, str.data(), str.size(), timeout);
mq.Send(id, msg, timeout);
@@ -70,7 +70,7 @@
auto Test = [&](auto &www, auto &rrr, bool isfork) {
for (auto nreader : nreaders) {
for (auto nwriter : nwriters) {
- const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
+ const uint64_t nmsg = 1000 * 1000 * 10 / nwriter;
const uint64_t total_msg = nmsg * nwriter;
std::atomic<bool> run(true);
std::this_thread::sleep_for(10ms);
@@ -104,26 +104,26 @@
run.store(false);
}
-// Request Reply Test
-BOOST_AUTO_TEST_CASE(RRTest)
+// Send Recv Test
+BOOST_AUTO_TEST_CASE(SRTest)
{
- const std::string shm_name("ShmReqRep");
+ const std::string shm_name("ShmSendRecv");
ShmRemover auto_remove(shm_name);
- const int qlen = 64;
+ const int qlen = 64;
const size_t msg_length = 1000;
std::string msg_content(msg_length, 'a');
msg_content[20] = '\0';
SharedMemory shm(shm_name, 1024 * 1024 * 50);
- auto Avail = [&]() { return shm.get_free_memory(); };
+ auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
ShmMsgQueue srv(shm, qlen);
ShmMsgQueue cli(shm, qlen);
MsgI request_rc;
- request_rc.MakeRC(shm, MakeRequest(cli.Id(), msg_content.data(), msg_content.size()));
+ request_rc.MakeRC(shm, MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()));
MsgI reply_rc;
- reply_rc.MakeRC(shm, MakeReply(msg_content.data(), msg_content.size()));
+ reply_rc.MakeRC(shm, MakeReply("fakemsgid", msg_content.data(), msg_content.size()));
std::atomic<uint64_t> count(0);
@@ -133,7 +133,7 @@
auto Client = [&](int cli_id, int nmsg) {
for (int i = 0; i < nmsg; ++i) {
auto Req = [&]() {
- return cli.Send(srv.Id(), MakeRequest(cli.Id(), msg_content.data(), msg_content.size()), 100);
+ return cli.Send(srv.Id(), MakeRequest(cli.Id(), "topic", msg_content.data(), msg_content.size()), 100);
};
auto ReqRC = [&]() { return cli.Send(srv.Id(), request_rc, 1000); };
@@ -165,7 +165,7 @@
MQId src_id;
memcpy(&src_id, mqid.data(), sizeof(src_id));
auto Reply = [&]() {
- return srv.Send(src_id, MakeReply(msg_content.data(), msg_content.size()), 100);
+ return srv.Send(src_id, MakeReply(req.msg_id(), msg_content.data(), msg_content.size()), 100);
};
auto ReplyRC = [&]() { return srv.Send(src_id, reply_rc, 100); };
@@ -180,7 +180,7 @@
ThreadManager clients, servers;
for (int i = 0; i < qlen; ++i) { servers.Launch(Server); }
- int ncli = 100 * 1;
+ int ncli = 100 * 1;
uint64_t nmsg = 100 * 100 * 2;
printf("client threads: %d, msgs : %ld, total msg: %ld\n", ncli, nmsg, ncli * nmsg);
for (int i = 0; i < ncli; ++i) { clients.Launch(Client, i, nmsg); }
diff --git a/utest/utest.cpp b/utest/utest.cpp
index b95e646..54c6d6f 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -1,6 +1,8 @@
#include "defs.h"
#include "pubsub.h"
#include "pubsub_center.h"
+#include "reqrep.h"
+#include "reqrep_center.h"
#include "socket.h"
#include "util.h"
#include <atomic>
@@ -150,6 +152,62 @@
bus.Stop();
}
+BOOST_AUTO_TEST_CASE(ReqRepTest)
+{
+ const std::string shm_name("ShmReqRep");
+ ShmRemover auto_remove(shm_name);
+ SharedMemory shm(shm_name, 1024 * 1024 * 50);
+
+ auto Avail = [&]() { return shm.get_free_memory(); };
+ auto init_avail = Avail();
+ int *flag = shm.find_or_construct<int>("flag")(123);
+ printf("flag = %d\n", *flag);
+ ++*flag;
+
+ ReqRepCenter center(shm);
+ center.Start(2);
+ std::atomic<bool> run(true);
+
+ auto Client = [&](const std::string &topic, const int nreq) {
+ SocketRequest client(shm);
+ std::string reply;
+ boost::timer::auto_cpu_timer timer;
+ for (int i = 0; i < nreq; ++i) {
+ if (!client.SyncRequest(topic, "data " + std::to_string(i), reply, 1000)) {
+ printf("client request failed\n");
+ }
+ }
+ printf("request %s %d done ", topic.c_str(), nreq);
+ };
+ auto Server = [&](const std::string &name, const std::vector<std::string> &topics) {
+ SocketReply server(shm);
+ ProcInfo info;
+ info.set_name(name);
+ info.set_info(name);
+ if (!server.Register(info, topics, 100)) {
+ printf("register failed\n");
+ }
+ auto onData = [](const std::string &topic, const std::string &data, std::string &reply) {
+ reply = topic + ':' + data;
+ return true;
+ };
+ server.StartWorker(onData);
+ while (run) {
+ std::this_thread::yield();
+ }
+ };
+ ThreadManager clients, servers;
+ std::vector<std::string> topics = {"topic1", "topic2"};
+ servers.Launch(Server, "server", topics);
+ std::this_thread::sleep_for(100ms);
+ for (auto &t : topics) {
+ clients.Launch(Client, t, 1000 * 100);
+ }
+ clients.WaitAll();
+ run = false;
+ servers.WaitAll();
+}
+
inline int MyMin(int a, int b)
{
printf("MyMin\n");
--
Gitblit v1.8.0