From 5657dca25451cfb63a90a3908db0c464fe3f343d Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期一, 29 三月 2021 14:16:49 +0800 Subject: [PATCH] add protobuf; refactor. --- src/msg.h | 7 + src/pubsub.h | 25 +++++ src/shm_queue.h | 1 proto/source/bhome_msg.proto | 34 ++++++++ src/defs.h | 29 +++++++ src/pubsub.cpp | 58 ++++++++++++++ utest/utest.cpp | 5 + src/shm_queue.cpp | 2 CMakeLists.txt | 16 +-- proto/cpp/CMakeLists.txt | 12 +++ src/msg.cpp | 4 cmake_options.cmake | 22 +++++ 12 files changed, 198 insertions(+), 17 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index bff076f..23f3ce7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,24 +1,22 @@ cmake_minimum_required(VERSION 3.5) # set the project name and version -project(B_BUS VERSION 2.2) +project(BHomeBus VERSION 1.0) -# specify the C++ standard -set(CMAKE_CXX_STANDARD 14) -set(CMAKE_CXX_STANDARD_REQUIRED True) +include(cmake_options.cmake) -if(CMAKE_GENERATOR MATCHES "Ninja") - set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always") -endif() # control where the static and shared libraries are built so that on windows # we don't need to tinker with the path to run the executable set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib") set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib") set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/bin") -option(BUILD_SHARED_LIBS "Build using shared libraries" ON) +#option(BUILD_SHARED_LIBS "Build using shared libraries" ON) +#option(BUILD_STATIC_LIBS "Build using static libraries" ON) -include_directories(${PROJECT_SOURCE_DIR}/include) +add_subdirectory(${PROJECT_SOURCE_DIR}/proto/cpp proto) +include_directories(${CMAKE_CURRENT_BINARY_DIR}/proto) add_subdirectory(${PROJECT_SOURCE_DIR}/src) add_subdirectory(${PROJECT_SOURCE_DIR}/utest) + diff --git a/cmake_options.cmake b/cmake_options.cmake new file mode 100644 index 0000000..1bb4b16 --- /dev/null +++ b/cmake_options.cmake @@ -0,0 +1,22 @@ + +macro (Append var value) + set(${var} "${${var}} ${value}") +endmacro() + +set(CMAKE_CXX_STANDARD 14) +set(CMAKE_CXX_STANDARD_REQUIRED True) + +Append(CMAKE_CXX_FLAGS "-fPIC -Wall -Wno-unused-variable") +Append(CMAKE_CXX_FLAGS_RELEASE "-ffunction-sections -fdata-sections") +Append(CMAKE_CXX_FLAGS_DEBUG "-pg") + +set (MY_LINK_FLAGS "-fPIC -static-libstdc++ -static-libgcc") +set (MY_LINK_FLAGS_RELEASE "-Wl,--gc-sections -s") + +Append(CMAKE_EXE_LINKER_FLAGS "${MY_LINK_FLAGS}") +Append(CMAKE_EXE_LINKER_FLAGS_RELEASE "${MY_LINK_FLAGS_RELEASE}") + +if(CMAKE_GENERATOR MATCHES "Ninja") + set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always") +endif() + diff --git a/proto/cpp/CMakeLists.txt b/proto/cpp/CMakeLists.txt new file mode 100644 index 0000000..f8d07f7 --- /dev/null +++ b/proto/cpp/CMakeLists.txt @@ -0,0 +1,12 @@ +cmake_minimum_required(VERSION 3.0) + +set (Target "bhome_msg") +project(${Target}) + +find_package(Protobuf REQUIRED) +file(GLOB proto_files ../source/*.proto) +PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${proto_files}) + +add_library(${Target} STATIC ${PROTO_SRCS}) +target_link_libraries(${Target} libprotobuf-lite.a) + diff --git a/proto/source/bhome_msg.proto b/proto/source/bhome_msg.proto new file mode 100644 index 0000000..01498fa --- /dev/null +++ b/proto/source/bhome_msg.proto @@ -0,0 +1,34 @@ +syntax = "proto3"; + +package bhome.msg; + +message BHAddress { + bytes mq_id = 1; // mqid, uuid + bytes ip = 2; // + int32 port = 3; +} + +message BHMsg { + bytes msg_id = 1; + int64 timestamp = 2; + int32 type = 3; + repeated BHAddress route = 4; // for reply and proxy. + bytes body = 5; +} + +enum MsgType { + kMsgTypeInvalid = 0; + kMsgTypeRequest = 1; + kMsgTypeReply = 2; + kMsgTypePublish = 3; + kMsgTypeSubscribe = 4; +} + +message DataPub { + bytes topic = 1; + bytes data = 2; +} + +message DataSub { + repeated bytes topics = 1; +} diff --git a/src/defs.h b/src/defs.h new file mode 100644 index 0000000..02978cd --- /dev/null +++ b/src/defs.h @@ -0,0 +1,29 @@ +/* + * ===================================================================================== + * + * Filename: defs.h + * + * Description: + * + * Version: 1.0 + * Created: 2021骞�03鏈�26鏃� 19鏃�26鍒�17绉� + * Revision: none + * Compiler: gcc + * + * Author: Li Chao (), + * Organization: + * + * ===================================================================================== + */ + +#ifndef DEFS_KP8LKGD0 +#define DEFS_KP8LKGD0 + +#include <boost/uuid/uuid.hpp> +#include <boost/uuid/uuid_generators.hpp> + +typedef boost::uuids::uuid MQId; + +const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); + +#endif // end of include guard: DEFS_KP8LKGD0 diff --git a/src/msg.cpp b/src/msg.cpp index 9883246..bb193ec 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -17,7 +17,7 @@ */ #include "msg.h" -namespace bhome_shm { +namespace bhome_msg { bool MsgMetaV1::Parse(const void *p) @@ -75,4 +75,4 @@ return 0; } -} // namespace bhome_shm +} // namespace bhome_msg diff --git a/src/msg.h b/src/msg.h index 2c2efac..99fb2e2 100644 --- a/src/msg.h +++ b/src/msg.h @@ -22,8 +22,11 @@ #include "shm.h" #include <boost/interprocess/offset_ptr.hpp> #include <boost/uuid/uuid_generators.hpp> +#include "bhome_msg.pb.h" -namespace bhome_shm { +namespace bhome_msg { + using namespace bhome_shm; + using namespace bhome::msg; // for serialized data in Msg // msg is safe to be stored in shared memory, so POD data or offset_ptr is required. // message format: header(meta) + body(data). @@ -83,7 +86,7 @@ inline void swap(Msg &m1, Msg &m2) { m1.swap(m2); } -} // namespace bhome_shm +} // namespace bhome_msg diff --git a/src/pubsub.cpp b/src/pubsub.cpp index b592113..e38c445 100644 --- a/src/pubsub.cpp +++ b/src/pubsub.cpp @@ -16,9 +16,67 @@ * ===================================================================================== */ #include "pubsub.h" +#include <chrono> namespace bhome_shm { +using namespace std::chrono_literals; +const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); +const int kMaxWorker = 16; + +BusManager::BusManager(SharedMemory &shm): +busq_(kBusQueueId, shm, 1000), +run_(false) +{ +} +BusManager::~BusManager() +{ + Stop(); +} + +bool BusManager::Start(const int nworker) +{ + std::lock_guard<std::mutex> guard(mutex_); + StopNoLock(); + // start + auto Worker = [&](){ + while (this->run_) { + std::this_thread::sleep_for(100ms); + BusManager &self = *this; + Msg msg; + const int timeout_ms = 100; + if (!self.busq_.Recv(msg, timeout_ms)) { + continue; + } + // handle msg; + // type: subscribe(topic), publish(topic, data) + } + }; + + run_.store(true); + const int n = std::min(nworker, kMaxWorker); + for (int i = 0; i < n; ++i) { + workers_.emplace_back(Worker); + } +} + +bool BusManager::Stop() +{ + std::lock_guard<std::mutex> guard(mutex_); + StopNoLock(); +} + +bool BusManager::StopNoLock() +{ + if (run_.exchange(false)) { + for (auto &w: workers_) { + if (w.joinable()) { + w.join(); + } + } + } +} + } // namespace bhome_shm diff --git a/src/pubsub.h b/src/pubsub.h index 0d7f4f0..0628216 100644 --- a/src/pubsub.h +++ b/src/pubsub.h @@ -18,12 +18,31 @@ #ifndef PUBSUB_4KGRA997 #define PUBSUB_4KGRA997 -#include "shm.h" +#include "shm_queue.h" +#include <thread> +#include <atomic> +#include <mutex> +#include <vector> namespace bhome_shm { -bool Subscribe(const std::string &topic); - +// publish/subcribe manager. +class BusManager +{ + ShmMsgQueue busq_; + std::atomic<bool> run_; + std::vector<std::thread> workers_; + std::mutex mutex_; + + bool StopNoLock(); +public: + BusManager(SharedMemory &shm); + ~BusManager(); + bool Start(const int nworker = 2); + bool Stop(); +}; + + } // namespace bhome_shm #endif // end of include guard: PUBSUB_4KGRA997 diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp index 401f346..8d90083 100644 --- a/src/shm_queue.cpp +++ b/src/shm_queue.cpp @@ -21,7 +21,7 @@ #include "bh_util.h" namespace bhome_shm { - +using namespace bhome_msg; using namespace boost::interprocess; using namespace boost::uuids; diff --git a/src/shm_queue.h b/src/shm_queue.h index 1a4a57d..5b30380 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -100,6 +100,7 @@ } }; +using namespace bhome_msg; class ShmMsgQueue : private ShmObject<SharedQueue<Msg> > { diff --git a/utest/utest.cpp b/utest/utest.cpp index 89d1ea3..23af0b8 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -17,11 +17,16 @@ #include <sys/wait.h> using namespace std::chrono_literals; +using namespace bhome_msg; using namespace bhome_shm; using namespace boost::posix_time; auto Now = []() { return second_clock::universal_time(); }; +auto F() +{ + return [](){}; +} struct s1000 { char a[1000]; }; typedef std::function<void(void)> FuncVV; -- Gitblit v1.8.0