lichao
2021-03-31 3c2b6739208d961cf8b86460d7f05516d044960c
add async recv suport; sync by waiting for async.
2个文件已添加
8个文件已修改
299 ■■■■ 已修改文件
proto/source/bhome_msg.proto 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 140 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_rpc.cpp 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_rpc.h 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto
@@ -25,6 +25,8 @@
    kMsgTypePublish = 3;
    kMsgTypeSubscribe = 4;
    kMsgTypeUnsubscribe = 5;
    kMsgTypeQueryTopic = 6;
    kMsgTypeQueryTopicReply = 7;
}
message DataPub {
@@ -35,3 +37,20 @@
message DataSub {
    repeated bytes topics = 1;
}
message DataRequest {
    bytes topic = 1;
    bytes data = 2;
}
message DataReply {
    bytes data = 1;
}
message DataQueryTopic {
    bytes topic = 1;
}
message DataQueryTopicReply {
    BHAddress address = 1;
}
src/defs.h
@@ -25,6 +25,7 @@
typedef boost::uuids::uuid MQId;
const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
const MQId kBHTopicRPCId = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
namespace bhome_shm
src/msg.cpp
@@ -41,12 +41,22 @@
    msg.add_route()->set_mq_id(&src_id, sizeof(src_id));
    return msg;
}
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size)
{
    DataRequest req;
    req.set_topic(topic);
    req.set_data(data, size);
    const std::string &body(req.SerializeAsString());
    return MakeRequest(src_id, body.data(), body.size());
}
BHMsg MakeReply(const void *data, const size_t size)
{
    assert(data && size);
    BHMsg msg(InitMsg(kMsgTypeReply));
    msg.set_body(data, size);
    DataReply reply;
    reply.set_data(data, size);
    msg.set_body(reply.SerializeAsString());
    return msg;
}
@@ -77,6 +87,15 @@
    return msg;
}
BHMsg MakeQueryTopic(const std::string &topic)
{
    BHMsg msg(InitMsg(kMsgTypeQueryTopic));
    DataQueryTopic query;
    query.set_topic(topic);
    msg.set_body(query.SerializeAsString());
    return msg;
}
void *Pack(SharedMemory &shm, const BHMsg &msg)
{
    uint32_t msg_size = msg.ByteSizeLong();
src/msg.h
@@ -59,7 +59,9 @@
    int num_ = 1;
};
BHMsg MakeQueryTopic(const std::string &topic);
BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size);
BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size);
BHMsg MakeReply(const void *data, const size_t size);
BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics);
BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics);
src/shm_queue.cpp
@@ -64,23 +64,22 @@
    Remove();
}
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
{
    Queue *remote = Find(shm, MsgQIdToName(remote_id));
    return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
    return remote && remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
}
// bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
// {
//     Queue *remote = Find(MsgQIdToName(remote_id));
//     return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();});
// }
// Test shows that in the 2 cases:
// 1) build msg first, then find remote queue;
// 2) find remote queue first, then build msg;
// 1 is about 50% faster than 2, maybe cache related.
bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms)
bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, const std::function<void()> &onsend)
{
    MsgI msg;
    if (msg.Make(shm(), data)) {
        if (Send(remote_id, msg, timeout_ms)) {
        if (Send(remote_id, msg, timeout_ms, onsend)) {
            return true;
        } else {
            msg.Release(shm());
@@ -89,25 +88,6 @@
    return false;
}
/*
bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms)
{
    // Test shows that in the 2 cases:
    // 1) build msg first, then find remote queue;
    // 2) find remote queue first, then build msg;
    // 1 is about 50% faster than 2, maybe cache related.
    MsgI msg;
    if(msg.BuildRequest(shm(), Id(), data, size)) {
        if(Send(remote_id, msg, timeout_ms)) {
            return true;
        } else {
            msg.Release(shm());
        }
    }
    return false;
}
//*/
bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms)
{
    MsgI imsg;
src/shm_queue.h
@@ -118,6 +118,7 @@
{
    typedef ShmObject<SharedQueue<MsgI>> Super;
    typedef Super::Data Queue;
    typedef std::function<void()> OnSend;
    bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
    bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
    MQId id_;
@@ -132,8 +133,20 @@
    bool Recv(BHMsg &msg, const int timeout_ms);
    bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
    bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms);
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
    {
        return Send(shm, remote_id, msg, timeout_ms, []() {});
    }
    bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms, OnSend const &onsend);
    bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms)
    {
        return Send(remote_id, msg, timeout_ms, []() {});
    }
    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend)
    {
        return Send(shm(), remote_id, msg, timeout_ms, onsend);
    }
    bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms)
    {
        return Send(shm(), remote_id, msg, timeout_ms);
src/socket.cpp
@@ -20,6 +20,8 @@
#include "bh_util.h"
#include "defs.h"
#include "msg.h"
#include <chrono>
#include <condition_variable>
using namespace bhome_msg;
using namespace bhome_shm;
@@ -28,6 +30,8 @@
{
} // namespace
//TODO maybe change to base class, each type is a sub class.
ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) :
    shm_(shm), type_(type), run_(false)
@@ -123,6 +127,31 @@
    return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker);
}
bool ShmSocket::StartAsync(int nworker)
{
    auto AsyncRecvProc = [this](BHMsg &msg) {
        auto Find = [&](RecvCB &cb) {
            std::lock_guard<std::mutex> lock(mutex_);
            const std::string &msgid = msg.msg_id();
            auto pos = async_cbs_.find(msgid);
            if (pos != async_cbs_.end()) {
                cb.swap(pos->second);
                async_cbs_.erase(pos);
                return true;
            } else {
                return false;
            }
        };
        RecvCB cb;
        if (Find(cb) && cb) {
            cb(msg);
        }
    };
    return Start(AsyncRecvProc, nworker);
}
bool ShmSocket::Stop()
{
    std::lock_guard<std::mutex> lock(mutex_);
@@ -141,3 +170,114 @@
    }
    return false;
}
bool ShmSocket::AsyncRequest(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
{
    if (type_ != eSockRequest) {
        return false;
    }
    assert(remote && pmsg && !mq_);
    try {
        const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
        auto RegisterCB = [&]() {
            std::lock_guard<std::mutex> lock(mutex_);
            async_cbs_.emplace(msg.msg_id(), cb);
        };
        return mq_->Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB);
    } catch (...) {
        return false;
    }
}
bool ShmSocket::SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms)
{
    struct State {
        std::mutex mutex;
        std::condition_variable cv;
        bool canceled = false;
    };
    try {
        std::shared_ptr<State> st(new State);
        auto OnRecv = [=](BHMsg &msg) {
            std::unique_lock<std::mutex> lk(st->mutex);
            if (!st->canceled) {
                static_cast<BHMsg *>(result)->Swap(&msg);
                st->cv.notify_one();
            }
        };
        std::unique_lock<std::mutex> lk(st->mutex);
        auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
        if (AsyncRequest(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, end) == std::cv_status::no_timeout) {
            return true;
        } else {
            st->canceled = true;
            return false;
        }
    } catch (...) {
        return false;
    }
}
bool ShmSocket::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
    BHMsg result;
    const BHMsg &msg = MakeQueryTopic(topic);
    if (SyncRequest(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
        if (result.type() == kMsgTypeQueryTopicReply) {
            DataQueryTopicReply reply;
            if (reply.ParseFromString(result.body())) {
                addr = reply.address();
                return !addr.mq_id().empty();
            }
        }
    }
    return false;
}
bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
{
    auto Call = [&](const void *remote) {
        const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
        auto onRecv = [cb](BHMsg &msg) {
            if (msg.type() == kMsgTypeReply) {
                DataReply reply;
                if (reply.ParseFromString(msg.body())) {
                    cb(reply.data().data(), reply.data().size());
                }
            }
        };
        return AsyncRequest(remote, &msg, timeout_ms, onRecv);
    };
    try {
        BHAddress addr;
        if (QueryRPCTopic(topic, addr, timeout_ms)) {
            return Call(addr.mq_id().data());
        }
    } catch (...) {
        return false;
    }
}
bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
{
    try {
        BHAddress addr;
        if (QueryRPCTopic(topic, addr, timeout_ms)) {
            const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
            BHMsg reply;
            if (SyncRequest(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
                DataReply dr;
                if (dr.ParseFromString(msg.body())) {
                    dr.mutable_data()->swap(out);
                    return true;
                }
            }
        }
    } catch (...) {
        return false;
    }
}
src/socket.h
@@ -26,6 +26,7 @@
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <vector>
class ShmSocket : private boost::noncopyable
@@ -42,12 +43,13 @@
    };
    typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
    typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
    typedef std::function<void(const void *data, const size_t size)> RequestResultCB;
    ShmSocket(Type type, bhome_shm::SharedMemory &shm);
    ShmSocket(Type type);
    ~ShmSocket();
    // bool Request(const std::string &topic, const void *data, const size_t size, onReply);
    bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv
    bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
    bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
    // bool HandleRequest(onData);
    bool ReadRequest(); // exclude with HandleRequest
@@ -60,10 +62,14 @@
    // start recv.
    bool Start(const RecvCB &onData, int nworker = 1);
    bool StartRaw(const RecvRawCB &onData, int nworker = 1);
    bool StartAsync(int nworker = 2);
    bool Stop();
    size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
private:
    bool AsyncRequest(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
    bool SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms);
    bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    bool StopNoLock();
    bhome_shm::SharedMemory &shm_;
    const Type type_;
@@ -72,6 +78,7 @@
    std::atomic<bool> run_;
    std::unique_ptr<Queue> mq_;
    std::unordered_map<std::string, RecvCB> async_cbs_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
src/topic_rpc.cpp
New file
@@ -0,0 +1,21 @@
/*
 * =====================================================================================
 *
 *       Filename:  topic_rpc.cpp
 *
 *    Description:  topic request/reply manager
 *
 *        Version:  1.0
 *        Created:  2021年03月31日 16时29分31秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  YOUR NAME (),
 *   Organization:
 *
 * =====================================================================================
 */
#include "topic_rpc.h"
src/topic_rpc.h
New file
@@ -0,0 +1,31 @@
/*
 * =====================================================================================
 *
 *       Filename:  topic_rpc.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年03月31日 16时30分10秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  YOUR NAME (),
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef TOPIC_RPC_JU1AYN5L
#define TOPIC_RPC_JU1AYN5L
#include "socket.h"
// request/reply topic manager
class RPCManager
{
    ShmSocket socket_;
public:
};
#endif // end of include guard: TOPIC_RPC_JU1AYN5L