lichao
2021-03-29 5657dca25451cfb63a90a3908db0c464fe3f343d
add protobuf; refactor.
4个文件已添加
8个文件已修改
215 ■■■■■ 已修改文件
CMakeLists.txt 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cmake_options.cmake 22 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/cpp/CMakeLists.txt 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto 34 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.cpp 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.h 25 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
CMakeLists.txt
@@ -1,24 +1,22 @@
cmake_minimum_required(VERSION 3.5)
# set the project name and version
project(B_BUS VERSION 2.2)
project(BHomeBus VERSION 1.0)
# specify the C++ standard
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED True)
include(cmake_options.cmake)
if(CMAKE_GENERATOR MATCHES "Ninja")
    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always")
endif()
# control where the static and shared libraries are built so that on windows
# we don't need to tinker with the path to run the executable
set(CMAKE_ARCHIVE_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
set(CMAKE_LIBRARY_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/lib")
set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${PROJECT_BINARY_DIR}/bin")
option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
#option(BUILD_SHARED_LIBS "Build using shared libraries" ON)
#option(BUILD_STATIC_LIBS "Build using static libraries" ON)
include_directories(${PROJECT_SOURCE_DIR}/include)
add_subdirectory(${PROJECT_SOURCE_DIR}/proto/cpp proto)
include_directories(${CMAKE_CURRENT_BINARY_DIR}/proto)
add_subdirectory(${PROJECT_SOURCE_DIR}/src)
add_subdirectory(${PROJECT_SOURCE_DIR}/utest)
cmake_options.cmake
New file
@@ -0,0 +1,22 @@
macro (Append var value)
    set(${var} "${${var}} ${value}")
endmacro()
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED True)
Append(CMAKE_CXX_FLAGS "-fPIC -Wall -Wno-unused-variable")
Append(CMAKE_CXX_FLAGS_RELEASE "-ffunction-sections -fdata-sections")
Append(CMAKE_CXX_FLAGS_DEBUG "-pg")
set (MY_LINK_FLAGS "-fPIC -static-libstdc++ -static-libgcc")
set (MY_LINK_FLAGS_RELEASE "-Wl,--gc-sections -s")
Append(CMAKE_EXE_LINKER_FLAGS "${MY_LINK_FLAGS}")
Append(CMAKE_EXE_LINKER_FLAGS_RELEASE "${MY_LINK_FLAGS_RELEASE}")
if(CMAKE_GENERATOR MATCHES "Ninja")
    set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-color=always")
endif()
proto/cpp/CMakeLists.txt
New file
@@ -0,0 +1,12 @@
cmake_minimum_required(VERSION 3.0)
set (Target "bhome_msg")
project(${Target})
find_package(Protobuf REQUIRED)
file(GLOB proto_files ../source/*.proto)
PROTOBUF_GENERATE_CPP(PROTO_SRCS PROTO_HDRS ${proto_files})
add_library(${Target} STATIC ${PROTO_SRCS})
target_link_libraries(${Target} libprotobuf-lite.a)
proto/source/bhome_msg.proto
New file
@@ -0,0 +1,34 @@
syntax = "proto3";
package bhome.msg;
message BHAddress {
    bytes mq_id = 1; // mqid, uuid
    bytes ip = 2;   //
    int32 port = 3;
}
message BHMsg {
    bytes msg_id = 1;
    int64 timestamp = 2;
    int32 type = 3;
    repeated BHAddress route = 4; // for reply and proxy.
    bytes body = 5;
}
enum MsgType {
    kMsgTypeInvalid = 0;
    kMsgTypeRequest = 1;
    kMsgTypeReply = 2;
    kMsgTypePublish = 3;
    kMsgTypeSubscribe = 4;
}
message DataPub {
    bytes topic = 1;
    bytes data = 2;
}
message DataSub {
    repeated bytes topics = 1;
}
src/defs.h
New file
@@ -0,0 +1,29 @@
/*
 * =====================================================================================
 *
 *       Filename:  defs.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年03月26日 19时26分17秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef DEFS_KP8LKGD0
#define DEFS_KP8LKGD0
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
typedef boost::uuids::uuid MQId;
const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
#endif // end of include guard: DEFS_KP8LKGD0
src/msg.cpp
@@ -17,7 +17,7 @@
 */
#include "msg.h"
namespace bhome_shm {
namespace bhome_msg {
    
bool MsgMetaV1::Parse(const void *p)
@@ -75,4 +75,4 @@
    return 0;
}
} // namespace bhome_shm
} // namespace bhome_msg
src/msg.h
@@ -22,8 +22,11 @@
#include "shm.h"
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include "bhome_msg.pb.h"
namespace bhome_shm {
namespace bhome_msg {
    using namespace bhome_shm;
    using namespace bhome::msg; // for serialized data in Msg
// msg is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message format: header(meta) + body(data).
@@ -83,7 +86,7 @@
inline void swap(Msg &m1, Msg &m2) { m1.swap(m2); }
} // namespace bhome_shm
} // namespace bhome_msg
src/pubsub.cpp
@@ -16,9 +16,67 @@
 * =====================================================================================
 */
#include "pubsub.h"
#include <chrono>
namespace bhome_shm {
using namespace std::chrono_literals;
const MQId kBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
const int kMaxWorker = 16;
BusManager::BusManager(SharedMemory &shm):
busq_(kBusQueueId, shm, 1000),
run_(false)
{
}
    
BusManager::~BusManager()
{
    Stop();
}
bool BusManager::Start(const int nworker)
{
    std::lock_guard<std::mutex> guard(mutex_);
    StopNoLock();
    // start
    auto Worker = [&](){
        while (this->run_) {
            std::this_thread::sleep_for(100ms);
            BusManager &self = *this;
            Msg msg;
            const int timeout_ms = 100;
            if (!self.busq_.Recv(msg, timeout_ms)) {
                continue;
            }
            // handle msg;
            // type: subscribe(topic), publish(topic, data)
        }
    };
    run_.store(true);
    const int n = std::min(nworker, kMaxWorker);
    for (int i = 0; i < n; ++i) {
        workers_.emplace_back(Worker);
    }
}
bool BusManager::Stop()
{
    std::lock_guard<std::mutex> guard(mutex_);
    StopNoLock();
}
bool BusManager::StopNoLock()
{
    if (run_.exchange(false)) {
        for (auto &w: workers_) {
            if (w.joinable()) {
                w.join();
            }
        }
    }
}
} // namespace bhome_shm
src/pubsub.h
@@ -18,12 +18,31 @@
#ifndef PUBSUB_4KGRA997
#define PUBSUB_4KGRA997
#include "shm.h"
#include "shm_queue.h"
#include <thread>
#include <atomic>
#include <mutex>
#include <vector>
namespace bhome_shm {
bool Subscribe(const std::string &topic);
// publish/subcribe manager.
class BusManager
{
    ShmMsgQueue busq_;
    std::atomic<bool> run_;
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    bool StopNoLock();
public:
    BusManager(SharedMemory &shm);
    ~BusManager();
    bool Start(const int nworker = 2);
    bool Stop();
};
} // namespace bhome_shm
#endif // end of include guard: PUBSUB_4KGRA997
src/shm_queue.cpp
@@ -21,7 +21,7 @@
#include "bh_util.h"
namespace bhome_shm {
using namespace bhome_msg;
using namespace boost::interprocess;
using namespace boost::uuids;
src/shm_queue.h
@@ -100,6 +100,7 @@
    }
};
using namespace bhome_msg;
class ShmMsgQueue : private ShmObject<SharedQueue<Msg> >
{
utest/utest.cpp
@@ -17,11 +17,16 @@
#include <sys/wait.h>
using namespace std::chrono_literals;
using namespace bhome_msg;
using namespace bhome_shm;
using namespace boost::posix_time;
auto Now = []() { return second_clock::universal_time(); };
auto F()
{
    return [](){};
}
struct s1000 { char a[1000]; };
typedef std::function<void(void)> FuncVV;