From 5657dca25451cfb63a90a3908db0c464fe3f343d Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 29 三月 2021 14:16:49 +0800
Subject: [PATCH] add protobuf; refactor.
---
src/msg.h | 7 +
src/pubsub.h | 25 +++++
src/shm_queue.h | 1
proto/source/bhome_msg.proto | 34 ++++++++
src/defs.h | 29 +++++++
src/pubsub.cpp | 58 ++++++++++++++
utest/utest.cpp | 5 +
src/shm_queue.cpp | 2
CMakeLists.txt | 16 +--
proto/cpp/CMakeLists.txt | 12 +++
src/msg.cpp | 4
cmake_options.cmake | 22 +++++
12 files changed, 198 insertions(+), 17 deletions(-)
diff --git a/CMakeLists.txt b/CMakeLists.txt
index bff076f..23f3ce7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,24 +1,22 @@
cmake_minimum_required(VERSION 3.5)
# set the project name and version
-project(B_BUS VERSION 2.2)
+project(BHomeBus VERSION 1.0)
-# specify the C++ standard
-set(CMAKE_CXX_STANDARD 14)
-set(CMAKE_CXX_STANDARD_REQUIRED True)
+include(cmake_options.cmake)
-if(CMAKE_GENERATOR MATCHES "Ninja")
- set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always")
-endif()
# control where the static and shared libraries are built so that on windows
# we don't need to tinker with the path to run the executable
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/bin")
-option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
+#option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
+#option(BUILD_STATIC_LIBS "Build using static libraries" ON)
-include_directories(${PROJECT_SOURCE_DIR}/include)
+add_subdirectory(${PROJECT_SOURCE_DIR}/proto/cpp proto)
+include_directories(${CMAKE_CURRENT_BINARY_DIR}/proto)
add_subdirectory(${PROJECT_SOURCE_DIR}/src)
add_subdirectory(${PROJECT_SOURCE_DIR}/utest)
+
diff --git a/cmake_options.cmake b/cmake_options.cmake
new file mode 100644
index 0000000..1bb4b16
--- /dev/null
+++ b/cmake_options.cmake
@@ -0,0 +1,22 @@
+
+macro (Append var value)
+ set(${var} "${${var}} ${value}")
+endmacro()
+
+set(CMAKE_CXX_STANDARD 14)
+set(CMAKE_CXX_STANDARD_REQUIRED True)
+
+Append(CMAKE_CXX_FLAGS "-fPIC -Wall -Wno-unused-variable")
+Append(CMAKE_CXX_FLAGS_RELEASE "-ffunction-sections -fdata-sections")
+Append(CMAKE_CXX_FLAGS_DEBUG "-pg")
+
+set (MY_LINK_FLAGS "-fPIC -static-libstdc++ -static-libgcc")
+set (MY_LINK_FLAGS_RELEASE "-Wl,--gc-sections -s")
+
+Append(CMAKE_EXE_LINKER_FLAGS "${MY_LINK_FLAGS}")
+Append(CMAKE_EXE_LINKER_FLAGS_RELEASE "${MY_LINK_FLAGS_RELEASE}")
+
+if(CMAKE_GENERATOR MATCHES "Ninja")
+ set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always")
+endif()
+
diff --git a/proto/cpp/CMakeLists.txt b/proto/cpp/CMakeLists.txt
new file mode 100644
index 0000000..f8d07f7
--- /dev/null
+++ b/proto/cpp/CMakeLists.txt
@@ -0,0 +1,12 @@
+cmake_minimum_required(VERSION 3.0)
+
+set (Target "bhome_msg")
+project(${Target})
+
+find_package(Protobuf REQUIRED)
+file(GLOB proto_files ../source/*.proto)
+PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${proto_files})
+
+add_library(${Target} STATIC ${PROTO_SRCS})
+target_link_libraries(${Target} libprotobuf-lite.a)
+
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
new file mode 100644
index 0000000..01498fa
--- /dev/null
+++ b/proto/source/bhome_msg.proto
@@ -0,0 +1,34 @@
+syntax = "proto3";
+
+package bhome.msg;
+
+message BHAddress {
+ bytes mq_id = 1; // mqid, uuid
+ bytes ip = 2; //
+ int32 port = 3;
+}
+
+message BHMsg {
+ bytes msg_id = 1;
+ int64 timestamp = 2;
+ int32 type = 3;
+ repeated BHAddress route = 4; // for reply and proxy.
+ bytes body = 5;
+}
+
+enum MsgType {
+ kMsgTypeInvalid = 0;
+ kMsgTypeRequest = 1;
+ kMsgTypeReply = 2;
+ kMsgTypePublish = 3;
+ kMsgTypeSubscribe = 4;
+}
+
+message DataPub {
+ bytes topic = 1;
+ bytes data = 2;
+}
+
+message DataSub {
+ repeated bytes topics = 1;
+}
diff --git a/src/defs.h b/src/defs.h
new file mode 100644
index 0000000..02978cd
--- /dev/null
+++ b/src/defs.h
@@ -0,0 +1,29 @@
+/*
+ * =====================================================================================
+ *
+ * Filename: defs.h
+ *
+ * Description:
+ *
+ * Version: 1.0
+ * Created: 2021骞�03鏈�26鏃� 19鏃�26鍒�17绉�
+ * Revision: none
+ * Compiler: gcc
+ *
+ * Author: Li Chao (),
+ * Organization:
+ *
+ * =====================================================================================
+ */
+
+#ifndef DEFS_KP8LKGD0
+#define DEFS_KP8LKGD0
+
+#include <boost/uuid/uuid.hpp>
+#include <boost/uuid/uuid_generators.hpp>
+
+typedef boost::uuids::uuid MQId;
+
+const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
+
+#endif // end of include guard: DEFS_KP8LKGD0
diff --git a/src/msg.cpp b/src/msg.cpp
index 9883246..bb193ec 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -17,7 +17,7 @@
*/
#include "msg.h"
-namespace bhome_shm {
+namespace bhome_msg {
bool MsgMetaV1::Parse(const void *p)
@@ -75,4 +75,4 @@
return 0;
}
-} // namespace bhome_shm
+} // namespace bhome_msg
diff --git a/src/msg.h b/src/msg.h
index 2c2efac..99fb2e2 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -22,8 +22,11 @@
#include "shm.h"
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/uuid/uuid_generators.hpp>
+#include "bhome_msg.pb.h"
-namespace bhome_shm {
+namespace bhome_msg {
+ using namespace bhome_shm;
+ using namespace bhome::msg; // for serialized data in Msg
// msg is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message format: header(meta) + body(data).
@@ -83,7 +86,7 @@
inline void swap(Msg &m1, Msg &m2) { m1.swap(m2); }
-} // namespace bhome_shm
+} // namespace bhome_msg
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index b592113..e38c445 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -16,9 +16,67 @@
* =====================================================================================
*/
#include "pubsub.h"
+#include <chrono>
namespace bhome_shm {
+using namespace std::chrono_literals;
+const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
+const int kMaxWorker = 16;
+
+BusManager::BusManager(SharedMemory &shm):
+busq_(kBusQueueId, shm, 1000),
+run_(false)
+{
+}
+BusManager::~BusManager()
+{
+ Stop();
+}
+
+bool BusManager::Start(const int nworker)
+{
+ std::lock_guard<std::mutex> guard(mutex_);
+ StopNoLock();
+ // start
+ auto Worker = [&](){
+ while (this->run_) {
+ std::this_thread::sleep_for(100ms);
+ BusManager &self = *this;
+ Msg msg;
+ const int timeout_ms = 100;
+ if (!self.busq_.Recv(msg, timeout_ms)) {
+ continue;
+ }
+ // handle msg;
+ // type: subscribe(topic), publish(topic, data)
+ }
+ };
+
+ run_.store(true);
+ const int n = std::min(nworker, kMaxWorker);
+ for (int i = 0; i < n; ++i) {
+ workers_.emplace_back(Worker);
+ }
+}
+
+bool BusManager::Stop()
+{
+ std::lock_guard<std::mutex> guard(mutex_);
+ StopNoLock();
+}
+
+bool BusManager::StopNoLock()
+{
+ if (run_.exchange(false)) {
+ for (auto &w: workers_) {
+ if (w.joinable()) {
+ w.join();
+ }
+ }
+ }
+}
+
} // namespace bhome_shm
diff --git a/src/pubsub.h b/src/pubsub.h
index 0d7f4f0..0628216 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -18,12 +18,31 @@
#ifndef PUBSUB_4KGRA997
#define PUBSUB_4KGRA997
-#include "shm.h"
+#include "shm_queue.h"
+#include <thread>
+#include <atomic>
+#include <mutex>
+#include <vector>
namespace bhome_shm {
-bool Subscribe(const std::string &topic);
-
+// publish/subcribe manager.
+class BusManager
+{
+ ShmMsgQueue busq_;
+ std::atomic<bool> run_;
+ std::vector<std::thread> workers_;
+ std::mutex mutex_;
+
+ bool StopNoLock();
+public:
+ BusManager(SharedMemory &shm);
+ ~BusManager();
+ bool Start(const int nworker = 2);
+ bool Stop();
+};
+
+
} // namespace bhome_shm
#endif // end of include guard: PUBSUB_4KGRA997
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index 401f346..8d90083 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -21,7 +21,7 @@
#include "bh_util.h"
namespace bhome_shm {
-
+using namespace bhome_msg;
using namespace boost::interprocess;
using namespace boost::uuids;
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 1a4a57d..5b30380 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -100,6 +100,7 @@
}
};
+using namespace bhome_msg;
class ShmMsgQueue : private ShmObject<SharedQueue<Msg> >
{
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 89d1ea3..23af0b8 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -17,11 +17,16 @@
#include <sys/wait.h>
using namespace std::chrono_literals;
+using namespace bhome_msg;
using namespace bhome_shm;
using namespace boost::posix_time;
auto Now = []() { return second_clock::universal_time(); };
+auto F()
+{
+ return [](){};
+}
struct s1000 { char a[1000]; };
typedef std::function<void(void)> FuncVV;
--
Gitblit v1.8.0