lichao
2021-03-30 491d98b3ba32cafed5682552bd870ca0ef93275c
add ShmSocket as shm interface, add sub/pub.
4个文件已添加
5个文件已修改
356 ■■■■ 已修改文件
src/center.cpp 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.h 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.h 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 134 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 66 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/center.cpp
New file
@@ -0,0 +1,29 @@
/*
 * =====================================================================================
 *
 *       Filename:  center.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 16时19分37秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#include "center.h"
#include "defs.h"
#include "shm.h"
using namespace bhome_shm;
SharedMemory &BHomeShm()
{
    static SharedMemory shm("bhome_default_shm_v0", 1024*1024*64);
    return shm;
}
src/center.h
New file
@@ -0,0 +1,26 @@
/*
 * =====================================================================================
 *
 *       Filename:  center.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 16时22分24秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef CENTER_TM9OUQTG
#define CENTER_TM9OUQTG
class BHCenter
{
};
#endif // end of include guard: CENTER_TM9OUQTG
src/defs.h
@@ -27,6 +27,12 @@
const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
namespace bhome_shm {
class SharedMemory;
}
bhome_shm::SharedMemory &BHomeShm();
//TODO center can check shm for previous crash.
#endif // end of include guard: DEFS_KP8LKGD0
src/shm.h
@@ -61,6 +61,7 @@
    template <class T, class ...Params> T * New(Params const&...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); }
    template <class T> void Delete(T *p) { if (p) { destroy_ptr<T>(p); }; }
    template <class T> void Delete(offset_ptr<T> p) { Delete(p.get()); }
    template <class T> T *Find(const std::string &name) { return find<T>(name.c_str()).first; }
};
@@ -91,7 +92,8 @@
            throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
        }
    }
    Data *find(const std::string &name) { return shm_.find<Data>(ObjName(name).c_str()).first; }
    static Data *Find(SharedMemory &shm, const std::string &name) { return shm.Find<Data>(ObjName(name)); }
    Data *Find(const std::string &name) { return Find(shm_, ObjName(name)); }
    virtual ~ShmObject() {}
    std::string name() const { return name_; }
    Data* data() { return pdata_; }
src/shm_queue.cpp
@@ -28,7 +28,7 @@
namespace {
std::string MsgQIdToName(const MQId& id) { return "shmq" + to_string(id); }
MQId EmptyId() { return nil_uuid(); }
// MQId EmptyId() { return nil_uuid(); }
MQId NewId() { return random_generator()(); }
const int AdjustMQLength(const int len) {
    const int kMaxLength = 10000; 
@@ -59,12 +59,18 @@
    Remove();
}
bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
{
    Queue *remote = find(MsgQIdToName(remote_id));
    Queue *remote = Find(shm, MsgQIdToName(remote_id));
    return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
}
// bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
// {
//     Queue *remote = Find(MsgQIdToName(remote_id));
//     return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
// }
bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms)
{
    MsgI msg;
src/shm_queue.h
@@ -120,7 +120,10 @@
    bool Recv(BHMsg &msg, const int timeout_ms);
    bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
    bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms);
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) {
        return Send(shm(), remote_id, msg, timeout_ms);
    }
};
} // namespace bhome_shm
src/socket.cpp
New file
@@ -0,0 +1,134 @@
/*
 * =====================================================================================
 *
 *       Filename:  socket.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 15时48分58秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#include "socket.h"
#include <chrono>
#include "msg.h"
#include "defs.h"
#include "bh_util.h"
using namespace bhome_msg;
using namespace bhome_shm;
using namespace std::chrono_literals;
namespace {
int GetSocketDefaultLen(ShmSocket::Type type)
{
    switch (type) {
        case ShmSocket::eSockRequest : return 12;
        case ShmSocket::eSockReply : return 64;
        case ShmSocket::eSockPublish : return 0;
        case ShmSocket::eSockSubscribe : return 64;
        default: return 0;
    }
}
}
ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm)
    : shm_(shm),
      type_(type),
      run_(false)
{
    int len = GetSocketDefaultLen(type);
    if (len != 0) {
        mq_.reset(new Queue(shm_, len));
        auto RecvProc = [this](){
            while (run_) {
                try {
                    std::unique_lock<std::mutex> lk(mutex_);
                    if (cv_recv_cb_.wait_for(lk, 100ms, [this](){ return HasRecvCB(); })) {
                        BHMsg msg;
                        if (mq_->Recv(msg, 100)) {
                            this->onRecv_(msg);
                        }
                    }
                } catch (...) {}
            }
        };
        run_.store(true);
        workers_.emplace_back(RecvProc);
    }
}
ShmSocket::ShmSocket(Type type)
    : ShmSocket(type, BHomeShm())
{
}
ShmSocket::~ShmSocket()
{
    Stop();
}
bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
{
    if (type_ != eSockPublish) {
        return false;
    }
    assert(!mq_);
    try {
        MsgI imsg;
        if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) {
            return false;
        }
        DEFER1(imsg.Release(shm_));
        return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms);
    } catch (...) {
        return false;
    }
}
bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
{
    if (type_ != eSockSubscribe) {
        return false;
    }
    assert(mq_);
    try {
        return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms);
    } catch (...) {
        return false;
    }
}
bool ShmSocket::SetRecvCallback(const RecvCB &onRecv)
{
    std::lock_guard<std::mutex> lock(mutex_);
    onRecv_ = onRecv;
    cv_recv_cb_.notify_one();
    return true;
}
bool ShmSocket::HasRecvCB()
{
    return static_cast<bool>(onRecv_);
}
void ShmSocket::Stop()
{
    run_ = false;
    for (auto &t : workers_) {
        if (t.joinable()) {
            t.join();
        }
    }
}
src/socket.h
New file
@@ -0,0 +1,74 @@
/*
 * =====================================================================================
 *
 *       Filename:  socket.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年03月30日 15时49分19秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef SOCKET_GWTJHBPO
#define SOCKET_GWTJHBPO
#include "shm_queue.h"
#include <vector>
#include <thread>
#include <memory>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
class ShmSocket
{
    typedef bhome_shm::ShmMsgQueue Queue;
public:
    enum Type {
        eSockRequest,
        eSockReply,
        eSockSubscribe,
        eSockPublish,
    };
    typedef std::function<void (bhome_msg::BHMsg &msg)> RecvCB;
    ShmSocket(Type type);
    ShmSocket(Type type, bhome_shm::SharedMemory &shm);
    ~ShmSocket();
    // bool Request(const std::string &topic, const void *data, const size_t size, onReply);
    bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
    // bool HandleRequest(onData);
    bool ReadRequest(); // exclude with HandleRequest
    bool SendReply();   // exclude with HandleRequest
    bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
    bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
    bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
    bool SetRecvCallback(const RecvCB &onRecv);
private:
    bool HasRecvCB();
    void Stop();
    bhome_shm::SharedMemory &shm_;
    Type type_;
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::condition_variable cv_recv_cb_;
    std::atomic<bool> run_;
    RecvCB onRecv_;
    std::unique_ptr<Queue> mq_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
utest/utest.cpp
@@ -8,6 +8,7 @@
#include "pubsub.h"
#include "defs.h"
#include "util.h"
#include "socket.h"
template <class A, class B> struct IsSameType { static const bool value = false; };
template <class A> struct IsSameType<A,A> { static const bool value = true; };
@@ -73,45 +74,48 @@
    std::atomic<uint64_t> last_count(0);
    const uint64_t nmsg = 100;
    const int timeout = 1000;
    auto Sub = [&](int id, const std::vector<std::string> &topics) {
        ShmMsgQueue client(shm, 8);
        client.Send(kBHBusQueueId, MakeSub(client.Id(), topics), timeout);
        for (int i = 0; i < nmsg * topics.size(); ++i) {
            BHMsg msg;
            if (client.Recv(msg, 1000)) {
                if (msg.type() != kMsgTypePublish) {
                    BOOST_CHECK(false);
                }
                DataPub pub;
                if (!pub.ParseFromString(msg.body())) {
                    BOOST_CHECK(false);
                }
                ++count;
                auto cur = Now();
                if (last_time.exchange(cur) < cur) {
                    std::cout << "time: " << cur;
                    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
                           count.load(), count - last_count.exchange(count), init_avail - Avail());
                }
                // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
            } else {
                printf("sub %2d recv timeout\n", id);
            }
        ShmSocket client(ShmSocket::eSockSubscribe, shm);
        bool r = client.Subscribe(topics, timeout);
        std::mutex mutex;
        std::condition_variable cv;
        }
        int i = 0;
        auto OnRecv = [&](BHMsg &msg) {
            if (msg.type() != kMsgTypePublish) {
                BOOST_CHECK(false);
            }
            DataPub pub;
            if (!pub.ParseFromString(msg.body())) {
                BOOST_CHECK(false);
            }
            ++count;
            auto cur = Now();
            if (last_time.exchange(cur) < cur) {
                std::cout << "time: " << cur;
                printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
                       count.load(), count - last_count.exchange(count), init_avail - Avail());
            }
            if (++i >= nmsg*topics.size()) {
                cv.notify_one();
            }
            // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
        };
        client.SetRecvCallback(OnRecv);
        std::unique_lock<std::mutex> lk(mutex);
        cv.wait(lk);
    };
    auto Pub = [&](const std::string &topic) {
        ShmMsgQueue provider(shm, 0);
        ShmSocket provider(ShmSocket::eSockPublish, shm);
        for (int i = 0; i < nmsg; ++i) {
            std::string data = topic + std::to_string(i) + std::string(1000, '-');
            MsgI msg;
            msg.MakeRC(shm, MakePub(topic, data.data(), data.size()));
            DEFER1(msg.Release(shm));
            bool r = provider.Send(kBHBusQueueId, msg, timeout);
            bool r = provider.Publish(topic, data.data(), data.size(), timeout);
            // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
            if (!r) {
                printf("pub ret: %s\n", r ? "ok" : "fail");