CMakeLists.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
cmake_options.cmake | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
proto/cpp/CMakeLists.txt | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
proto/source/bhome_msg.proto | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/defs.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/msg.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/msg.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/pubsub.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/pubsub.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/shm_queue.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
src/shm_queue.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
utest/utest.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
CMakeLists.txt
@@ -1,24 +1,22 @@ cmake_minimum_required(VERSION 3.5) # set the project name and version project(B_BUS VERSION 2.2) project(BHomeBus VERSION 1.0) # specify the C++ standard set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_STANDARD_REQUIRED True) include(cmake_options.cmake) if(CMAKE_GENERATOR MATCHES "Ninja") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always") endif() # control where the static and shared libraries are built so that on windows # we don't need to tinker with the path to run the executable set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib") set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib") set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/bin") option(BUILD_SHARED_LIBS "Build using shared libraries" ON) #option(BUILD_SHARED_LIBS "Build using shared libraries" ON) #option(BUILD_STATIC_LIBS "Build using static libraries" ON) include_directories(${PROJECT_SOURCE_DIR}/include) add_subdirectory(${PROJECT_SOURCE_DIR}/proto/cpp proto) include_directories(${CMAKE_CURRENT_BINARY_DIR}/proto) add_subdirectory(${PROJECT_SOURCE_DIR}/src) add_subdirectory(${PROJECT_SOURCE_DIR}/utest) cmake_options.cmake
New file @@ -0,0 +1,22 @@ macro (Append var value) set(${var} "${${var}} ${value}") endmacro() set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_STANDARD_REQUIRED True) Append(CMAKE_CXX_FLAGS "-fPIC -Wall -Wno-unused-variable") Append(CMAKE_CXX_FLAGS_RELEASE "-ffunction-sections -fdata-sections") Append(CMAKE_CXX_FLAGS_DEBUG "-pg") set (MY_LINK_FLAGS "-fPIC -static-libstdc++ -static-libgcc") set (MY_LINK_FLAGS_RELEASE "-Wl,--gc-sections -s") Append(CMAKE_EXE_LINKER_FLAGS "${MY_LINK_FLAGS}") Append(CMAKE_EXE_LINKER_FLAGS_RELEASE "${MY_LINK_FLAGS_RELEASE}") if(CMAKE_GENERATOR MATCHES "Ninja") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always") endif() proto/cpp/CMakeLists.txt
New file @@ -0,0 +1,12 @@ cmake_minimum_required(VERSION 3.0) set (Target "bhome_msg") project(${Target}) find_package(Protobuf REQUIRED) file(GLOB proto_files ../source/*.proto) PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${proto_files}) add_library(${Target} STATIC ${PROTO_SRCS}) target_link_libraries(${Target} libprotobuf-lite.a) proto/source/bhome_msg.proto
New file @@ -0,0 +1,34 @@ syntax = "proto3"; package bhome.msg; message BHAddress { bytes mq_id = 1; // mqid, uuid bytes ip = 2; // int32 port = 3; } message BHMsg { bytes msg_id = 1; int64 timestamp = 2; int32 type = 3; repeated BHAddress route = 4; // for reply and proxy. bytes body = 5; } enum MsgType { kMsgTypeInvalid = 0; kMsgTypeRequest = 1; kMsgTypeReply = 2; kMsgTypePublish = 3; kMsgTypeSubscribe = 4; } message DataPub { bytes topic = 1; bytes data = 2; } message DataSub { repeated bytes topics = 1; } src/defs.h
New file @@ -0,0 +1,29 @@ /* * ===================================================================================== * * Filename: defs.h * * Description: * * Version: 1.0 * Created: 2021年03月26日 19时26分17秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #ifndef DEFS_KP8LKGD0 #define DEFS_KP8LKGD0 #include <boost/uuid/uuid.hpp> #include <boost/uuid/uuid_generators.hpp> typedef boost::uuids::uuid MQId; const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); #endif // end of include guard: DEFS_KP8LKGD0 src/msg.cpp
@@ -17,7 +17,7 @@ */ #include "msg.h" namespace bhome_shm { namespace bhome_msg { bool MsgMetaV1::Parse(const void *p) @@ -75,4 +75,4 @@ return 0; } } // namespace bhome_shm } // namespace bhome_msg src/msg.h
@@ -22,8 +22,11 @@ #include "shm.h" #include <boost/interprocess/offset_ptr.hpp> #include <boost/uuid/uuid_generators.hpp> #include "bhome_msg.pb.h" namespace bhome_shm { namespace bhome_msg { using namespace bhome_shm; using namespace bhome::msg; // for serialized data in Msg // msg is safe to be stored in shared memory, so POD data or offset_ptr is required. // message format: header(meta) + body(data). @@ -83,7 +86,7 @@ inline void swap(Msg &m1, Msg &m2) { m1.swap(m2); } } // namespace bhome_shm } // namespace bhome_msg src/pubsub.cpp
@@ -16,9 +16,67 @@ * ===================================================================================== */ #include "pubsub.h" #include <chrono> namespace bhome_shm { using namespace std::chrono_literals; const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); const int kMaxWorker = 16; BusManager::BusManager(SharedMemory &shm): busq_(kBusQueueId, shm, 1000), run_(false) { } BusManager::~BusManager() { Stop(); } bool BusManager::Start(const int nworker) { std::lock_guard<std::mutex> guard(mutex_); StopNoLock(); // start auto Worker = [&](){ while (this->run_) { std::this_thread::sleep_for(100ms); BusManager &self = *this; Msg msg; const int timeout_ms = 100; if (!self.busq_.Recv(msg, timeout_ms)) { continue; } // handle msg; // type: subscribe(topic), publish(topic, data) } }; run_.store(true); const int n = std::min(nworker, kMaxWorker); for (int i = 0; i < n; ++i) { workers_.emplace_back(Worker); } } bool BusManager::Stop() { std::lock_guard<std::mutex> guard(mutex_); StopNoLock(); } bool BusManager::StopNoLock() { if (run_.exchange(false)) { for (auto &w: workers_) { if (w.joinable()) { w.join(); } } } } } // namespace bhome_shm src/pubsub.h
@@ -18,12 +18,31 @@ #ifndef PUBSUB_4KGRA997 #define PUBSUB_4KGRA997 #include "shm.h" #include "shm_queue.h" #include <thread> #include <atomic> #include <mutex> #include <vector> namespace bhome_shm { bool Subscribe(const std::string &topic); // publish/subcribe manager. class BusManager { ShmMsgQueue busq_; std::atomic<bool> run_; std::vector<std::thread> workers_; std::mutex mutex_; bool StopNoLock(); public: BusManager(SharedMemory &shm); ~BusManager(); bool Start(const int nworker = 2); bool Stop(); }; } // namespace bhome_shm #endif // end of include guard: PUBSUB_4KGRA997 src/shm_queue.cpp
@@ -21,7 +21,7 @@ #include "bh_util.h" namespace bhome_shm { using namespace bhome_msg; using namespace boost::interprocess; using namespace boost::uuids; src/shm_queue.h
@@ -100,6 +100,7 @@ } }; using namespace bhome_msg; class ShmMsgQueue : private ShmObject<SharedQueue<Msg> > { utest/utest.cpp
@@ -17,11 +17,16 @@ #include <sys/wait.h> using namespace std::chrono_literals; using namespace bhome_msg; using namespace bhome_shm; using namespace boost::posix_time; auto Now = []() { return second_clock::universal_time(); }; auto F() { return [](){}; } struct s1000 { char a[1000]; }; typedef std::function<void(void)> FuncVV;