From 5657dca25451cfb63a90a3908db0c464fe3f343d Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期一, 29 三月 2021 14:16:49 +0800
Subject: [PATCH] add protobuf; refactor.

---
 src/msg.h                    |    7 +
 src/pubsub.h                 |   25 +++++
 src/shm_queue.h              |    1 
 proto/source/bhome_msg.proto |   34 ++++++++
 src/defs.h                   |   29 +++++++
 src/pubsub.cpp               |   58 ++++++++++++++
 utest/utest.cpp              |    5 +
 src/shm_queue.cpp            |    2 
 CMakeLists.txt               |   16 +--
 proto/cpp/CMakeLists.txt     |   12 +++
 src/msg.cpp                  |    4 
 cmake_options.cmake          |   22 +++++
 12 files changed, 198 insertions(+), 17 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index bff076f..23f3ce7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -1,24 +1,22 @@
 cmake_minimum_required(VERSION 3.5)
 
 # set the project name and version
-project(B_BUS VERSION 2.2)
+project(BHomeBus VERSION 1.0)
 
-# specify the C++ standard
-set(CMAKE_CXX_STANDARD 14)
-set(CMAKE_CXX_STANDARD_REQUIRED True)
+include(cmake_options.cmake)
 
-if(CMAKE_GENERATOR MATCHES "Ninja")
-	set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always")
-endif()
 # control where the static and shared libraries are built so that on windows
 # we don't need to tinker with the path to run the executable
 set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
 set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
 set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/bin")
 
-option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
+#option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
+#option(BUILD_STATIC_LIBS "Build using static libraries" ON)
 
-include_directories(${PROJECT_SOURCE_DIR}/include)
+add_subdirectory(${PROJECT_SOURCE_DIR}/proto/cpp proto)
+include_directories(${CMAKE_CURRENT_BINARY_DIR}/proto)
 
 add_subdirectory(${PROJECT_SOURCE_DIR}/src)
 add_subdirectory(${PROJECT_SOURCE_DIR}/utest)
+
diff --git a/cmake_options.cmake b/cmake_options.cmake
new file mode 100644
index 0000000..1bb4b16
--- /dev/null
+++ b/cmake_options.cmake
@@ -0,0 +1,22 @@
+
+macro (Append var value)
+	set(${var} "${${var}} ${value}")
+endmacro()
+
+set(CMAKE_CXX_STANDARD 14)
+set(CMAKE_CXX_STANDARD_REQUIRED True)
+
+Append(CMAKE_CXX_FLAGS "-fPIC -Wall -Wno-unused-variable")
+Append(CMAKE_CXX_FLAGS_RELEASE "-ffunction-sections -fdata-sections")
+Append(CMAKE_CXX_FLAGS_DEBUG "-pg")
+
+set (MY_LINK_FLAGS "-fPIC -static-libstdc++ -static-libgcc")
+set (MY_LINK_FLAGS_RELEASE "-Wl,--gc-sections -s")
+
+Append(CMAKE_EXE_LINKER_FLAGS "${MY_LINK_FLAGS}")
+Append(CMAKE_EXE_LINKER_FLAGS_RELEASE "${MY_LINK_FLAGS_RELEASE}")
+
+if(CMAKE_GENERATOR MATCHES "Ninja")
+	set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always")
+endif()
+
diff --git a/proto/cpp/CMakeLists.txt b/proto/cpp/CMakeLists.txt
new file mode 100644
index 0000000..f8d07f7
--- /dev/null
+++ b/proto/cpp/CMakeLists.txt
@@ -0,0 +1,12 @@
+cmake_minimum_required(VERSION 3.0)
+
+set (Target "bhome_msg")
+project(${Target})
+
+find_package(Protobuf REQUIRED)
+file(GLOB proto_files ../source/*.proto)
+PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${proto_files})
+
+add_library(${Target} STATIC ${PROTO_SRCS})
+target_link_libraries(${Target} libprotobuf-lite.a)
+
diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto
new file mode 100644
index 0000000..01498fa
--- /dev/null
+++ b/proto/source/bhome_msg.proto
@@ -0,0 +1,34 @@
+syntax = "proto3";
+
+package bhome.msg;
+
+message BHAddress {
+	bytes mq_id = 1; // mqid, uuid
+	bytes ip = 2;   //
+	int32 port = 3;
+}
+
+message BHMsg {
+	bytes msg_id = 1;
+	int64 timestamp = 2;
+	int32 type = 3;
+	repeated BHAddress route = 4; // for reply and proxy.
+	bytes body = 5;
+}
+
+enum MsgType {
+	kMsgTypeInvalid = 0;
+	kMsgTypeRequest = 1;
+	kMsgTypeReply = 2;
+	kMsgTypePublish = 3;
+	kMsgTypeSubscribe = 4;
+}
+
+message DataPub {
+	bytes topic = 1;
+	bytes data = 2; 
+}
+
+message DataSub {
+	repeated bytes topics = 1;
+}
diff --git a/src/defs.h b/src/defs.h
new file mode 100644
index 0000000..02978cd
--- /dev/null
+++ b/src/defs.h
@@ -0,0 +1,29 @@
+/*
+ * =====================================================================================
+ *
+ *       Filename:  defs.h
+ *
+ *    Description:  
+ *
+ *        Version:  1.0
+ *        Created:  2021骞�03鏈�26鏃� 19鏃�26鍒�17绉�
+ *       Revision:  none
+ *       Compiler:  gcc
+ *
+ *         Author:  Li Chao (), 
+ *   Organization:  
+ *
+ * =====================================================================================
+ */
+
+#ifndef DEFS_KP8LKGD0
+#define DEFS_KP8LKGD0
+
+#include <boost/uuid/uuid.hpp>
+#include <boost/uuid/uuid_generators.hpp>
+
+typedef boost::uuids::uuid MQId;
+
+const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
+
+#endif // end of include guard: DEFS_KP8LKGD0
diff --git a/src/msg.cpp b/src/msg.cpp
index 9883246..bb193ec 100644
--- a/src/msg.cpp
+++ b/src/msg.cpp
@@ -17,7 +17,7 @@
  */
 #include "msg.h"
 
-namespace bhome_shm {
+namespace bhome_msg {
 	
 
 bool MsgMetaV1::Parse(const void *p)
@@ -75,4 +75,4 @@
     return 0;
 }
 
-} // namespace bhome_shm
+} // namespace bhome_msg
diff --git a/src/msg.h b/src/msg.h
index 2c2efac..99fb2e2 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -22,8 +22,11 @@
 #include "shm.h"
 #include <boost/interprocess/offset_ptr.hpp>
 #include <boost/uuid/uuid_generators.hpp>
+#include "bhome_msg.pb.h"
 
-namespace bhome_shm {
+namespace bhome_msg {
+    using namespace bhome_shm;
+    using namespace bhome::msg; // for serialized data in Msg
 
 // msg is safe to be stored in shared memory, so POD data or offset_ptr is required.
 // message format: header(meta) + body(data).
@@ -83,7 +86,7 @@
 inline void swap(Msg &m1, Msg &m2) { m1.swap(m2); }
 
 
-} // namespace bhome_shm
+} // namespace bhome_msg
 
 
 
diff --git a/src/pubsub.cpp b/src/pubsub.cpp
index b592113..e38c445 100644
--- a/src/pubsub.cpp
+++ b/src/pubsub.cpp
@@ -16,9 +16,67 @@
  * =====================================================================================
  */
 #include "pubsub.h"
+#include <chrono>
 
 namespace bhome_shm {
 
+using namespace std::chrono_literals;
+const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
+const int kMaxWorker = 16;
+
+BusManager::BusManager(SharedMemory &shm):
+busq_(kBusQueueId, shm, 1000),
+run_(false)
+{
+}
 	
+BusManager::~BusManager()
+{
+    Stop();
+}
+
+bool BusManager::Start(const int nworker)
+{
+    std::lock_guard<std::mutex> guard(mutex_);
+    StopNoLock();
+    // start
+    auto Worker = [&](){
+        while (this->run_) {
+            std::this_thread::sleep_for(100ms);
+            BusManager &self = *this;
+            Msg msg;
+            const int timeout_ms = 100;
+            if (!self.busq_.Recv(msg, timeout_ms)) {
+                continue;
+            }
+            // handle msg;
+            // type: subscribe(topic), publish(topic, data)
+        }
+    };
+
+    run_.store(true);
+    const int n = std::min(nworker, kMaxWorker);
+    for (int i = 0; i < n; ++i) {
+        workers_.emplace_back(Worker);
+    }
+}
+
+bool BusManager::Stop()
+{
+    std::lock_guard<std::mutex> guard(mutex_);
+    StopNoLock();
+}
+
+bool BusManager::StopNoLock()
+{
+    if (run_.exchange(false)) {
+        for (auto &w: workers_) {
+            if (w.joinable()) {
+                w.join();
+            }
+        }
+    }    
+}
+
 } // namespace bhome_shm
 
diff --git a/src/pubsub.h b/src/pubsub.h
index 0d7f4f0..0628216 100644
--- a/src/pubsub.h
+++ b/src/pubsub.h
@@ -18,12 +18,31 @@
 #ifndef PUBSUB_4KGRA997
 #define PUBSUB_4KGRA997
 
-#include "shm.h"
+#include "shm_queue.h"
+#include <thread>
+#include <atomic>
+#include <mutex>
+#include <vector>
 
 namespace bhome_shm {
 
-bool Subscribe(const std::string &topic);
-	
+// publish/subcribe manager.
+class BusManager
+{
+    ShmMsgQueue busq_;
+    std::atomic<bool> run_;
+    std::vector<std::thread> workers_;
+    std::mutex mutex_;
+
+    bool StopNoLock();
+public:
+    BusManager(SharedMemory &shm);
+    ~BusManager();
+    bool Start(const int nworker = 2);
+    bool Stop();
+};
+
+
 } // namespace bhome_shm
 
 #endif // end of include guard: PUBSUB_4KGRA997
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index 401f346..8d90083 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -21,7 +21,7 @@
 #include "bh_util.h"
 
 namespace bhome_shm {
-	
+using namespace bhome_msg;	
 using namespace boost::interprocess;
 using namespace boost::uuids;
 
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 1a4a57d..5b30380 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -100,6 +100,7 @@
     }
 };
 
+using namespace bhome_msg;
 
 class ShmMsgQueue : private ShmObject<SharedQueue<Msg> >
 {
diff --git a/utest/utest.cpp b/utest/utest.cpp
index 89d1ea3..23af0b8 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -17,11 +17,16 @@
 #include <sys/wait.h>
 
 using namespace std::chrono_literals;
+using namespace bhome_msg;
 using namespace bhome_shm;
 
 using namespace boost::posix_time;
 auto Now = []() { return second_clock::universal_time(); };
 
+auto F()
+{
+	return [](){};
+}
 struct s1000 { char a[1000]; };
 
 typedef std::function<void(void)> FuncVV;

--
Gitblit v1.8.0