From 491d98b3ba32cafed5682552bd870ca0ef93275c Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期二, 30 三月 2021 18:29:09 +0800
Subject: [PATCH] add ShmSocket as shm interface, add sub/pub.
---
src/shm.h | 4
src/socket.h | 74 ++++++++++++
src/shm_queue.h | 5
src/socket.cpp | 134 ++++++++++++++++++++++
src/center.h | 26 ++++
src/defs.h | 6 +
utest/utest.cpp | 66 +++++-----
src/shm_queue.cpp | 12 +
src/center.cpp | 29 ++++
9 files changed, 320 insertions(+), 36 deletions(-)
diff --git a/src/center.cpp b/src/center.cpp
new file mode 100644
index 0000000..809b6d1
--- /dev/null
+++ b/src/center.cpp
@@ -0,0 +1,29 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: center.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�03鏈�30鏃� 16鏃�19鍒�37绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#include "center.h"
+#include "defs.h"
+#include "shm.h"
+
+using namespace bhome_shm;
+
+SharedMemory &BHomeShm()
+{
+ static SharedMemory shm("bhome_default_shm_v0", 1024*1024*64);
+ return shm;
+}
+
diff --git a/src/center.h b/src/center.h
new file mode 100644
index 0000000..fcb8005
--- /dev/null
+++ b/src/center.h
@@ -0,0 +1,26 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: center.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�03鏈�30鏃� 16鏃�22鍒�24绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+#ifndef CENTER_TM9OUQTG
+#define CENTER_TM9OUQTG
+
+class BHCenter
+{
+
+};
+
+#endif // end of include guard: CENTER_TM9OUQTG
diff --git a/src/defs.h b/src/defs.h
index 56c6c9c..acfe09e 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -27,6 +27,12 @@
const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
+namespace bhome_shm {
+class SharedMemory;
+}
+
+bhome_shm::SharedMemory &BHomeShm();
+
//TODO center can check shm for previous crash.
#endif // end of include guard: DEFS_KP8LKGD0
diff --git a/src/shm.h b/src/shm.h
index 5e2c8b9..0f68754 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -61,6 +61,7 @@
template <class T, class ...Params> T * New(Params const&...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); }
template <class T> void Delete(T *p) { if (p) { destroy_ptr<T>(p); }; }
template <class T> void Delete(offset_ptr<T> p) { Delete(p.get()); }
+ template <class T> T *Find(const std::string &name) { return find<T>(name.c_str()).first; }
};
@@ -91,7 +92,8 @@
throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
}
}
- Data *find(const std::string &name) { return shm_.find<Data>(ObjName(name).c_str()).first; }
+ static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); }
+ Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); }
virtual ~ShmObject() {}
std::string name() const { return name_; }
Data* data() { return pdata_; }
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index ffc7c21..421cebf 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -28,7 +28,7 @@
namespace {
std::string MsgQIdToName(const MQId& id) { return "shmq" + to_string(id); }
-MQId EmptyId() { return nil_uuid(); }
+// MQId EmptyId() { return nil_uuid(); }
MQId NewId() { return random_generator()(); }
const int AdjustMQLength(const int len) {
const int kMaxLength = 10000;
@@ -59,12 +59,18 @@
Remove();
}
-bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
{
- Queue *remote = find(MsgQIdToName(remote_id));
+ Queue *remote = Find(shm, MsgQIdToName(remote_id));
return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
}
+// bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
+// {
+// Queue *remote = Find(MsgQIdToName(remote_id));
+// return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
+// }
+
bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms)
{
MsgI msg;
diff --git a/src/shm_queue.h b/src/shm_queue.h
index a536553..60b1862 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -120,7 +120,10 @@
bool Recv(BHMsg &msg, const int timeout_ms);
bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
- bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms);
+ static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
+ bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) {
+ return Send(shm(), remote_id, msg, timeout_ms);
+ }
};
} // namespace bhome_shm
diff --git a/src/socket.cpp b/src/socket.cpp
new file mode 100644
index 0000000..1c28bfa
--- /dev/null
+++ b/src/socket.cpp
@@ -0,0 +1,134 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: socket.cpp
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�03鏈�30鏃� 15鏃�48鍒�58绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+
+#include "socket.h"
+#include <chrono>
+#include "msg.h"
+#include "defs.h"
+#include "bh_util.h"
+
+using namespace bhome_msg;
+using namespace bhome_shm;
+using namespace std::chrono_literals;
+
+namespace {
+
+int GetSocketDefaultLen(ShmSocket::Type type)
+{
+ switch (type) {
+ case ShmSocket::eSockRequest : return 12;
+ case ShmSocket::eSockReply : return 64;
+ case ShmSocket::eSockPublish : return 0;
+ case ShmSocket::eSockSubscribe : return 64;
+ default: return 0;
+ }
+}
+
+
+}
+
+ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm)
+ : shm_(shm),
+ type_(type),
+ run_(false)
+{
+ int len = GetSocketDefaultLen(type);
+ if (len != 0) {
+ mq_.reset(new Queue(shm_, len));
+
+ auto RecvProc = [this](){
+ while (run_) {
+ try {
+ std::unique_lock<std::mutex> lk(mutex_);
+ if (cv_recv_cb_.wait_for(lk, 100ms, [this](){ return HasRecvCB(); })) {
+ BHMsg msg;
+ if (mq_->Recv(msg, 100)) {
+ this->onRecv_(msg);
+ }
+ }
+ } catch (...) {}
+ }
+ };
+ run_.store(true);
+ workers_.emplace_back(RecvProc);
+ }
+}
+ShmSocket::ShmSocket(Type type)
+ : ShmSocket(type, BHomeShm())
+{
+}
+
+ShmSocket::~ShmSocket()
+{
+ Stop();
+}
+
+bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
+{
+ if (type_ != eSockPublish) {
+ return false;
+ }
+ assert(!mq_);
+ try {
+ MsgI imsg;
+ if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) {
+ return false;
+ }
+ DEFER1(imsg.Release(shm_));
+ return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms);
+
+ } catch (...) {
+ return false;
+ }
+}
+
+bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
+{
+ if (type_ != eSockSubscribe) {
+ return false;
+ }
+ assert(mq_);
+ try {
+ return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms);
+ } catch (...) {
+ return false;
+ }
+}
+
+bool ShmSocket::SetRecvCallback(const RecvCB &onRecv)
+{
+ std::lock_guard<std::mutex> lock(mutex_);
+ onRecv_ = onRecv;
+ cv_recv_cb_.notify_one();
+ return true;
+}
+
+bool ShmSocket::HasRecvCB()
+{
+ return static_cast<bool>(onRecv_);
+}
+
+void ShmSocket::Stop()
+{
+ run_ = false;
+ for (auto &t : workers_) {
+ if (t.joinable()) {
+ t.join();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/socket.h b/src/socket.h
new file mode 100644
index 0000000..e65ac83
--- /dev/null
+++ b/src/socket.h
@@ -0,0 +1,74 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: socket.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�03鏈�30鏃� 15鏃�49鍒�19绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+
+#ifndef SOCKET_GWTJHBPO
+#define SOCKET_GWTJHBPO
+
+#include "shm_queue.h"
+#include <vector>
+#include <thread>
+#include <memory>
+#include <functional>
+#include <mutex>
+#include <condition_variable>
+#include <atomic>
+
+class ShmSocket
+{
+ typedef bhome_shm::ShmMsgQueue Queue;
+public:
+ enum Type {
+ eSockRequest,
+ eSockReply,
+ eSockSubscribe,
+ eSockPublish,
+ };
+ typedef std::function<void (bhome_msg::BHMsg &msg)> RecvCB;
+
+ ShmSocket(Type type);
+ ShmSocket(Type type, bhome_shm::SharedMemory &shm);
+ ~ShmSocket();
+
+ // bool Request(const std::string &topic, const void *data, const size_t size, onReply);
+ bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
+
+ // bool HandleRequest(onData);
+ bool ReadRequest(); // exclude with HandleRequest
+ bool SendReply(); // exclude with HandleRequest
+
+ bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
+ bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
+ bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
+ bool SetRecvCallback(const RecvCB &onRecv);
+private:
+ bool HasRecvCB();
+ void Stop();
+
+ bhome_shm::SharedMemory &shm_;
+ Type type_;
+ std::vector<std::thread> workers_;
+ std::mutex mutex_;
+ std::condition_variable cv_recv_cb_;
+ std::atomic<bool> run_;
+ RecvCB onRecv_;
+
+ std::unique_ptr<Queue> mq_;
+};
+
+
+#endif // end of include guard: SOCKET_GWTJHBPO
diff --git a/utest/utest.cpp b/utest/utest.cpp
index f7571c8..bb5c14d 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -8,6 +8,7 @@
#include "pubsub.h"
#include "defs.h"
#include "util.h"
+#include "socket.h"
template <class A, class B> struct IsSameType { static const bool value = false; };
template <class A> struct IsSameType<A,A> { static const bool value = true; };
@@ -73,45 +74,48 @@
std::atomic<uint64_t> last_count(0);
const uint64_t nmsg = 100;
-
const int timeout = 1000;
auto Sub = [&](int id, const std::vector<std::string> &topics) {
- ShmMsgQueue client(shm, 8);
- client.Send(kBHBusQueueId, MakeSub(client.Id(), topics), timeout);
- for (int i = 0; i < nmsg * topics.size(); ++i) {
- BHMsg msg;
- if (client.Recv(msg, 1000)) {
- if (msg.type() != kMsgTypePublish) {
- BOOST_CHECK(false);
- }
- DataPub pub;
- if (!pub.ParseFromString(msg.body())) {
- BOOST_CHECK(false);
- }
- ++count;
- auto cur = Now();
- if (last_time.exchange(cur) < cur) {
- std::cout << "time: " << cur;
- printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
- count.load(), count - last_count.exchange(count), init_avail - Avail());
- }
- // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
- } else {
- printf("sub %2d recv timeout\n", id);
- }
+ ShmSocket client(ShmSocket::eSockSubscribe, shm);
+ bool r = client.Subscribe(topics, timeout);
+ std::mutex mutex;
+ std::condition_variable cv;
- }
+ int i = 0;
+ auto OnRecv = [&](BHMsg &msg) {
+ if (msg.type() != kMsgTypePublish) {
+ BOOST_CHECK(false);
+ }
+ DataPub pub;
+ if (!pub.ParseFromString(msg.body())) {
+ BOOST_CHECK(false);
+ }
+ ++count;
+
+ auto cur = Now();
+ if (last_time.exchange(cur) < cur) {
+ std::cout << "time: " << cur;
+ printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
+ count.load(), count - last_count.exchange(count), init_avail - Avail());
+ }
+ if (++i >= nmsg*topics.size()) {
+ cv.notify_one();
+ }
+ // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
+ };
+ client.SetRecvCallback(OnRecv);
+
+ std::unique_lock<std::mutex> lk(mutex);
+ cv.wait(lk);
+
};
+
auto Pub = [&](const std::string &topic) {
- ShmMsgQueue provider(shm, 0);
+ ShmSocket provider(ShmSocket::eSockPublish, shm);
for (int i = 0; i < nmsg; ++i) {
std::string data = topic + std::to_string(i) + std::string(1000, '-');
- MsgI msg;
- msg.MakeRC(shm, MakePub(topic, data.data(), data.size()));
- DEFER1(msg.Release(shm));
- bool r = provider.Send(kBHBusQueueId, msg, timeout);
-
+ bool r = provider.Publish(topic, data.data(), data.size(), timeout);
// bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
if (!r) {
printf("pub ret: %s\n", r ? "ok" : "fail");
--
Gitblit v1.8.0