lichao
2021-04-01 b55ffe89f4b237be5f79232cfddfe22bfdb87c64
make req/rep,sub/pub sockets sub class;
4个文件已添加
5个文件已修改
902 ■■■■■ 已修改文件
src/pubsub.cpp 153 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.h 50 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.cpp 134 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.h 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep.cpp 164 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep.h 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 210 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.cpp
@@ -19,127 +19,58 @@
#include "bh_util.h"
#include "defs.h"
namespace bhome_shm
{
using namespace std::chrono_literals;
const int kMaxWorker = 16;
using namespace bhome_msg;
BusManager::BusManager(SharedMemory &shm) :
    shm_(shm), socket_(ShmSocket::eSockBus, shm) {}
BusManager::BusManager() :
    BusManager(BHomeShm()) {}
bool BusManager::Start(const int nworker)
bool SocketPublish::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
{
    auto onRecv = [&](MsgI &imsg) {
#ifndef NDEBUG
        static std::atomic<time_t> last(0);
        time_t now = 0;
        time(&now);
        if (last.exchange(now) < now) {
            printf("bus queue size: %ld\n", socket_.Pending());
    try {
        MsgI imsg;
        if (!imsg.MakeRC(shm(), MakePub(topic, data, size))) {
            return false;
        }
#endif
        DEFER1(imsg.Release(shm()));
        return ShmMsgQueue::Send(shm(), kBHBusQueueId, imsg, timeout_ms);
    } catch (...) {
        return false;
    }
}
        BHMsg msg;
        if (!imsg.Unpack(msg)) {
            return;
        }
bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
{
    try {
        return mq().Send(kBHBusQueueId, MakeSub(mq().Id(), topics), timeout_ms);
    } catch (...) {
        return false;
    }
}
        auto OnSubChange = [&](auto &&update) {
            DataSub sub;
            if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
                assert(sizeof(MQId) == msg.route(0).mq_id().size());
                MQId client;
                memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
                std::lock_guard<std::mutex> guard(mutex_);
                auto &topics = sub.topics();
                for (auto &topic : topics) {
                    try {
                        update(topic, client);
                    } catch (...) {
                        //TODO log error
                    }
                }
bool SocketSubscribe::StartRecv(const TopicDataCB &tdcb, int nworker)
{
    auto AsyncRecvProc = [this, tdcb](BHMsg &msg) {
        if (msg.type() == kMsgTypePublish) {
            DataPub d;
            if (d.ParseFromString(msg.body())) {
                tdcb(d.topic(), d.data());
            }
        };
        auto Sub1 = [this](const std::string &topic, const MQId &id) {
            records_[topic].insert(id);
        };
        auto Unsub1 = [this](const std::string &topic, const MQId &id) {
            auto pos = records_.find(topic);
            if (pos != records_.end()) {
                if (pos->second.erase(id) && pos->second.empty()) {
                    records_.erase(pos);
                }
            }
        };
        auto OnPublish = [&]() {
            DataPub pub;
            if (!pub.ParseFromString(msg.body())) {
                return;
            }
            auto FindClients = [&](const std::string &topic) {
                Clients dests;
                std::lock_guard<std::mutex> guard(mutex_);
                auto Find1 = [&](const std::string &t) {
                    auto pos = records_.find(topic);
                    if (pos != records_.end() && !pos->second.empty()) {
                        auto &clients = pos->second;
                        for (auto &cli : clients) {
                            dests.insert(cli);
                        }
                    }
                };
                Find1(topic);
                //TODO check and adjust topic on client side sub/pub.
                size_t pos = 0;
                while (true) {
                    pos = topic.find(kTopicSep, pos);
                    if (pos == topic.npos || ++pos == topic.size()) {
                        // Find1(std::string()); // sub all.
                        break;
                    } else {
                        Find1(topic.substr(0, pos));
                    }
                }
                return dests;
            };
            auto Dispatch = [&](auto &&send1) {
                const Clients &clients(FindClients(pub.topic()));
                for (auto &cli : clients) {
                    send1(cli);
                }
            };
            if (imsg.IsCounted()) {
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, imsg, 100); });
            } else {
                MsgI pubmsg;
                if (!pubmsg.MakeRC(shm_, msg)) { return; }
                DEFER1(pubmsg.Release(shm_));
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm_, cli, pubmsg, 100); });
            }
        };
        switch (msg.type()) {
        case kMsgTypeSubscribe: OnSubChange(Sub1); break;
        case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
        case kMsgTypePublish: OnPublish(); break;
        default: break;
        } else {
            // ignored, or dropped
        }
    };
    return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
    return tdcb && Start(AsyncRecvProc, nworker);
}
} // namespace bhome_shm
bool SocketSubscribe::RecvSub(std::string &topic, std::string &data, const int timeout_ms)
{
    BHMsg msg;
    if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
        DataPub d;
        if (d.ParseFromString(msg.body())) {
            d.mutable_topic()->swap(topic);
            d.mutable_data()->swap(data);
            return true;
        }
    }
    return false;
}
src/pubsub.h
@@ -18,31 +18,43 @@
#ifndef PUBSUB_4KGRA997
#define PUBSUB_4KGRA997
#include "defs.h"
#include "socket.h"
#include <mutex>
#include <set>
#include <unordered_map>
#include <string>
namespace bhome_shm
class SocketPublish
{
// publish/subcribe manager.
class BusManager
{
    SharedMemory &shm_;
    ShmSocket socket_;
    std::mutex mutex_;
    typedef std::set<MQId> Clients;
    std::unordered_map<std::string, Clients> records_;
    typedef ShmSocket Socket;
    Socket::Shm &shm_;
    Socket::Shm &shm() { return shm_; }
public:
    BusManager(SharedMemory &shm);
    BusManager();
    ~BusManager() { Stop(); }
    bool Start(const int nworker = 2);
    bool Stop() { return socket_.Stop(); }
    SocketPublish(Socket::Shm &shm) :
        shm_(shm) {}
    SocketPublish() :
        SocketPublish(BHomeShm()) {}
    bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
    bool Publish(const std::string &topic, const std::string &data, const int timeout_ms)
    {
        return Publish(topic, data.data(), data.size(), timeout_ms);
    }
};
} // namespace bhome_shm
// socket subscribe
class SocketSubscribe : private ShmSocket
{
    typedef ShmSocket Socket;
public:
    SocketSubscribe(Socket::Shm &shm) :
        Socket(shm, 64) {}
    SocketSubscribe() :
        SocketSubscribe(BHomeShm()) {}
    typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB;
    bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
    bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
    bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
};
#endif // end of include guard: PUBSUB_4KGRA997
src/pubsub_center.cpp
New file
@@ -0,0 +1,134 @@
/*
 * =====================================================================================
 *
 *       Filename:  pubsub_center.cpp
 *
 *    Description:  pub/sub center/manager
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 09时29分04秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#include "pubsub_center.h"
#include "bh_util.h"
PubSubCenter::PubSubCenter(SharedMemory &shm) :
    socket_(shm) {}
bool PubSubCenter::Start(const int nworker)
{
    auto onRecv = [&](MsgI &imsg) {
#ifndef NDEBUG
        static std::atomic<time_t> last(0);
        time_t now = 0;
        time(&now);
        if (last.exchange(now) < now) {
            printf("bus queue size: %ld\n", socket_.Pending());
        }
#endif
        BHMsg msg;
        if (!imsg.Unpack(msg)) {
            return;
        }
        auto OnSubChange = [&](auto &&update) {
            DataSub sub;
            if (!msg.route().empty() && sub.ParseFromString(msg.body()) && !sub.topics().empty()) {
                assert(sizeof(MQId) == msg.route(0).mq_id().size());
                MQId client;
                memcpy(&client, msg.route(0).mq_id().data(), sizeof(client));
                std::lock_guard<std::mutex> guard(mutex_);
                auto &topics = sub.topics();
                for (auto &topic : topics) {
                    try {
                        update(topic, client);
                    } catch (...) {
                        //TODO log error
                    }
                }
            }
        };
        auto Sub1 = [this](const std::string &topic, const MQId &id) {
            records_[topic].insert(id);
        };
        auto Unsub1 = [this](const std::string &topic, const MQId &id) {
            auto pos = records_.find(topic);
            if (pos != records_.end()) {
                if (pos->second.erase(id) && pos->second.empty()) {
                    records_.erase(pos);
                }
            }
        };
        auto OnPublish = [&]() {
            DataPub pub;
            if (!pub.ParseFromString(msg.body())) {
                return;
            }
            auto FindClients = [&](const std::string &topic) {
                Clients dests;
                std::lock_guard<std::mutex> guard(mutex_);
                auto Find1 = [&](const std::string &t) {
                    auto pos = records_.find(topic);
                    if (pos != records_.end() && !pos->second.empty()) {
                        auto &clients = pos->second;
                        for (auto &cli : clients) {
                            dests.insert(cli);
                        }
                    }
                };
                Find1(topic);
                //TODO check and adjust topic on client side sub/pub.
                size_t pos = 0;
                while (true) {
                    pos = topic.find(kTopicSep, pos);
                    if (pos == topic.npos || ++pos == topic.size()) {
                        // Find1(std::string()); // sub all.
                        break;
                    } else {
                        Find1(topic.substr(0, pos));
                    }
                }
                return dests;
            };
            auto Dispatch = [&](auto &&send1) {
                const Clients &clients(FindClients(pub.topic()));
                for (auto &cli : clients) {
                    send1(cli);
                }
            };
            if (imsg.IsCounted()) {
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, imsg, 100); });
            } else {
                MsgI pubmsg;
                if (!pubmsg.MakeRC(shm(), msg)) { return; }
                DEFER1(pubmsg.Release(shm()));
                Dispatch([&](const MQId &cli) { ShmMsgQueue::Send(shm(), cli, pubmsg, 100); });
            }
        };
        switch (msg.type()) {
        case kMsgTypeSubscribe: OnSubChange(Sub1); break;
        case kMsgTypeUnsubscribe: OnSubChange(Unsub1); break;
        case kMsgTypePublish: OnPublish(); break;
        default: break;
        }
    };
    const int kMaxWorker = 16;
    return socket_.StartRaw(onRecv, std::min((nworker > 0 ? nworker : 2), kMaxWorker));
}
src/pubsub_center.h
New file
@@ -0,0 +1,53 @@
/*
 * =====================================================================================
 *
 *       Filename:  pubsub_center.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 09时29分39秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef PUBSUB_CENTER_MFSUZJU7
#define PUBSUB_CENTER_MFSUZJU7
#include "defs.h"
#include "socket.h"
#include <mutex>
#include <set>
#include <unordered_map>
using namespace bhome_shm;
// publish/subcribe manager.
class PubSubCenter
{
    class SocketBus : public ShmSocket
    {
    public:
        SocketBus(SharedMemory &shm) :
            ShmSocket(shm, &kBHBusQueueId, 1000) {}
        using ShmSocket::shm;
    };
    SocketBus socket_;
    std::mutex mutex_;
    typedef std::set<MQId> Clients;
    std::unordered_map<std::string, Clients> records_;
    ShmSocket::Shm &shm() { return socket_.shm(); }
public:
    PubSubCenter(SharedMemory &shm);
    PubSubCenter() :
        PubSubCenter(BHomeShm()) {}
    ~PubSubCenter() { Stop(); }
    bool Start(const int nworker = 2);
    bool Stop() { return socket_.Stop(); }
};
#endif // end of include guard: PUBSUB_CENTER_MFSUZJU7
src/reqrep.cpp
New file
@@ -0,0 +1,164 @@
/*
 * =====================================================================================
 *
 *       Filename:  reqrep.cpp
 *
 *    Description:  topic request/reply sockets
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 09时35分35秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#include "reqrep.h"
#include "msg.h"
#include <chrono>
#include <condition_variable>
using namespace bhome_msg;
bool SocketRequest::StartWorker(const RequestResultCB &rrcb, int nworker)
{
    auto AsyncRecvProc = [this, rrcb](BHMsg &msg) {
        auto Find = [&](RecvCB &cb) {
            std::lock_guard<std::mutex> lock(mutex());
            const std::string &msgid = msg.msg_id();
            auto pos = async_cbs_.find(msgid);
            if (pos != async_cbs_.end()) {
                cb.swap(pos->second);
                async_cbs_.erase(pos);
                return true;
            } else {
                return false;
            }
        };
        RecvCB cb;
        if (Find(cb) && cb) {
            cb(msg);
        } else if (rrcb && msg.type() == kMsgTypeReply) {
            DataReply reply;
            if (reply.ParseFromString(msg.body())) {
                rrcb(reply.data());
            }
        } else {
            // ignored, or dropped
        }
    };
    return Start(AsyncRecvProc, nworker);
}
bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
{
    auto Call = [&](const void *remote) {
        const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
        auto onRecv = [cb](BHMsg &msg) {
            if (msg.type() == kMsgTypeReply) {
                DataReply reply;
                if (reply.ParseFromString(msg.body())) {
                    cb(reply.data());
                }
            }
        };
        return AsyncSend(remote, &msg, timeout_ms, onRecv);
    };
    try {
        BHAddress addr;
        if (QueryRPCTopic(topic, addr, timeout_ms)) {
            return Call(addr.mq_id().data());
        }
    } catch (...) {
        return false;
    }
}
bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
{
    try {
        BHAddress addr;
        if (QueryRPCTopic(topic, addr, timeout_ms)) {
            const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
            BHMsg reply;
            if (SyncSendAndRecv(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
                DataReply dr;
                if (dr.ParseFromString(msg.body())) {
                    dr.mutable_data()->swap(out);
                    return true;
                }
            }
        }
    } catch (...) {
        return false;
    }
}
bool SocketRequest::AsyncSend(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
{
    assert(remote && pmsg);
    try {
        const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
        auto RegisterCB = [&]() {
            std::lock_guard<std::mutex> lock(mutex());
            async_cbs_.emplace(msg.msg_id(), cb);
        };
        return mq().Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB);
    } catch (...) {
        return false;
    }
}
bool SocketRequest::SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms)
{
    struct State {
        std::mutex mutex;
        std::condition_variable cv;
        bool canceled = false;
    };
    try {
        std::shared_ptr<State> st(new State);
        auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
        auto OnRecv = [=](BHMsg &msg) {
            std::unique_lock<std::mutex> lk(st->mutex);
            if (!st->canceled) {
                static_cast<BHMsg *>(result)->Swap(&msg);
                st->cv.notify_one();
            } // else result is no longer valid.
        };
        std::unique_lock<std::mutex> lk(st->mutex);
        if (AsyncSend(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, endtime) == std::cv_status::no_timeout) {
            return true;
        } else {
            st->canceled = true;
            return false;
        }
    } catch (...) {
        return false;
    }
}
bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
    BHMsg result;
    const BHMsg &msg = MakeQueryTopic(topic);
    if (SyncSendAndRecv(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
        if (result.type() == kMsgTypeQueryTopicReply) {
            DataQueryTopicReply reply;
            if (reply.ParseFromString(result.body())) {
                addr = reply.address();
                return !addr.mq_id().empty();
            }
        }
    }
    return false;
}
src/reqrep.h
New file
@@ -0,0 +1,57 @@
/*
 * =====================================================================================
 *
 *       Filename:  reqrep.h
 *
 *    Description:  topic request/reply sockets
 *
 *        Version:  1.0
 *        Created:  2021年04月01日 09时36分06秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (),
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef REQREP_ACEH09NK
#define REQREP_ACEH09NK
#include "defs.h"
#include "socket.h"
#include <functional>
#include <unordered_map>
class SocketRequest : private ShmSocket
{
    typedef ShmSocket Socket;
public:
    SocketRequest(Socket::Shm &shm) :
        Socket(shm, 64) { StartWorker(); }
    SocketRequest() :
        SocketRequest(BHomeShm()) {}
    typedef std::function<void(const std::string &data)> RequestResultCB;
    bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
    bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
    bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
    bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
    {
        return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
    }
    bool SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
    bool SyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, std::string &out)
    {
        return SyncRequest(topic, data.data(), data.size(), timeout_ms, out);
    }
private:
    bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
    bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
    bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    std::unordered_map<std::string, RecvCB> async_cbs_;
};
#endif // end of include guard: REQREP_ACEH09NK
src/socket.cpp
@@ -20,8 +20,6 @@
#include "bh_util.h"
#include "defs.h"
#include "msg.h"
#include <chrono>
#include <condition_variable>
using namespace bhome_msg;
using namespace bhome_shm;
@@ -31,78 +29,33 @@
} // namespace
//TODO maybe change to base class, each type is a sub class.
ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) :
    shm_(shm), type_(type), run_(false)
ShmSocket::ShmSocket(Shm &shm, const void *id, const int len) :
    shm_(shm), run_(false)
{
    switch (type) {
    case eSockBus: mq_.reset(new Queue(kBHBusQueueId, shm_, 1000)); break;
    case eSockRequest: mq_.reset(new Queue(shm_, 12)); break;
    case eSockReply: mq_.reset(new Queue(shm_, 64)); break;
    case eSockSubscribe: mq_.reset(new Queue(shm_, 64)); break;
    case eSockPublish: break; // no recv mq needed
    default: break;
    if (id && len > 0) {
        mq_.reset(new Queue(*static_cast<const MQId *>(id), shm, len));
    }
}
ShmSocket::ShmSocket(Type type) :
    ShmSocket(type, BHomeShm()) {}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
    shm_(shm), run_(false)
{
    if (len > 0) {
        mq_.reset(new Queue(shm_, len));
    }
}
ShmSocket::~ShmSocket()
{
    Stop();
}
bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
{
    if (type_ != eSockPublish) {
        return false;
    }
    assert(!mq_);
    try {
        MsgI imsg;
        if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) {
            return false;
        }
        DEFER1(imsg.Release(shm_));
        return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms);
    } catch (...) {
        return false;
    }
}
bool ShmSocket::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
{
    if (type_ != eSockSubscribe) {
        return false;
    }
    assert(mq_);
    try {
        return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms);
    } catch (...) {
        return false;
    }
}
bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker)
{
    auto CanRecv = [this]() {
        switch (type_) {
        case eSockRequest:
        case eSockReply:
        case eSockBus:
        case eSockSubscribe:
            return true;
        default:
            return false;
        }
    };
    if (!CanRecv()) {
    if (!mq_) {
        return false;
    }
    std::lock_guard<std::mutex> lock(mutex_);
    std::lock_guard<std::mutex> lock(mutex_);
    StopNoLock();
    auto RecvProc = [this, onData]() {
        while (run_) {
@@ -127,31 +80,6 @@
    return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker);
}
bool ShmSocket::StartAsync(int nworker)
{
    auto AsyncRecvProc = [this](BHMsg &msg) {
        auto Find = [&](RecvCB &cb) {
            std::lock_guard<std::mutex> lock(mutex_);
            const std::string &msgid = msg.msg_id();
            auto pos = async_cbs_.find(msgid);
            if (pos != async_cbs_.end()) {
                cb.swap(pos->second);
                async_cbs_.erase(pos);
                return true;
            } else {
                return false;
            }
        };
        RecvCB cb;
        if (Find(cb) && cb) {
            cb(msg);
        }
    };
    return Start(AsyncRecvProc, nworker);
}
bool ShmSocket::Stop()
{
    std::lock_guard<std::mutex> lock(mutex_);
@@ -166,118 +94,28 @@
                w.join();
            }
        }
        workers_.clear();
        return true;
    }
    return false;
}
bool ShmSocket::AsyncRequest(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb)
bool ShmSocket::SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms)
{
    if (type_ != eSockRequest) {
    std::lock_guard<std::mutex> lock(mutex_);
    if (!mq_ || RunningNoLock()) {
        return false;
    }
    assert(remote && pmsg && !mq_);
    try {
        const BHMsg &msg = *static_cast<const BHMsg *>(pmsg);
        auto RegisterCB = [&]() {
            std::lock_guard<std::mutex> lock(mutex_);
            async_cbs_.emplace(msg.msg_id(), cb);
        };
        return mq_->Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB);
    } catch (...) {
        return false;
    } else {
        return mq_->Send(*static_cast<const MQId *>(id), msg, timeout_ms);
    }
}
bool ShmSocket::SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms)
bool ShmSocket::SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms)
{
    struct State {
        std::mutex mutex;
        std::condition_variable cv;
        bool canceled = false;
    };
    try {
        std::shared_ptr<State> st(new State);
        auto OnRecv = [=](BHMsg &msg) {
            std::unique_lock<std::mutex> lk(st->mutex);
            if (!st->canceled) {
                static_cast<BHMsg *>(result)->Swap(&msg);
                st->cv.notify_one();
            }
        };
        std::unique_lock<std::mutex> lk(st->mutex);
        auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
        if (AsyncRequest(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, end) == std::cv_status::no_timeout) {
            return true;
        } else {
            st->canceled = true;
            return false;
        }
    } catch (...) {
    std::lock_guard<std::mutex> lock(mutex_);
    if (!mq_ || RunningNoLock()) {
        return false;
    }
}
bool ShmSocket::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
    BHMsg result;
    const BHMsg &msg = MakeQueryTopic(topic);
    if (SyncRequest(&kBHTopicRPCId, &msg, &result, timeout_ms)) {
        if (result.type() == kMsgTypeQueryTopicReply) {
            DataQueryTopicReply reply;
            if (reply.ParseFromString(result.body())) {
                addr = reply.address();
                return !addr.mq_id().empty();
            }
        }
    }
    return false;
}
bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
{
    auto Call = [&](const void *remote) {
        const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
        auto onRecv = [cb](BHMsg &msg) {
            if (msg.type() == kMsgTypeReply) {
                DataReply reply;
                if (reply.ParseFromString(msg.body())) {
                    cb(reply.data().data(), reply.data().size());
                }
            }
        };
        return AsyncRequest(remote, &msg, timeout_ms, onRecv);
    };
    try {
        BHAddress addr;
        if (QueryRPCTopic(topic, addr, timeout_ms)) {
            return Call(addr.mq_id().data());
        }
    } catch (...) {
        return false;
    }
}
bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out)
{
    try {
        BHAddress addr;
        if (QueryRPCTopic(topic, addr, timeout_ms)) {
            const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size));
            BHMsg reply;
            if (SyncRequest(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) {
                DataReply dr;
                if (dr.ParseFromString(msg.body())) {
                    dr.mutable_data()->swap(out);
                    return true;
                }
            }
        }
    } catch (...) {
        return false;
    } else {
        return mq_->Recv(msg, timeout_ms);
    }
}
src/socket.h
@@ -26,59 +26,48 @@
#include <memory>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <vector>
class ShmSocket : private boost::noncopyable
{
protected:
    typedef bhome_shm::ShmMsgQueue Queue;
public:
    enum Type {
        eSockRequest,
        eSockReply,
        eSockSubscribe,
        eSockPublish,
        eSockBus,
    };
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
    typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
    typedef std::function<void(const void *data, const size_t size)> RequestResultCB;
    ShmSocket(Type type, bhome_shm::SharedMemory &shm);
    ShmSocket(Type type);
    ShmSocket(Shm &shm, const int len = 12);
    ~ShmSocket();
    bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
    bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
    // bool HandleRequest(onData);
    bool ReadRequest(); // exclude with HandleRequest
    bool SendReply();   // exclude with HandleRequest
    bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
    bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
    bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
    // start recv.
    bool Start(const RecvCB &onData, int nworker = 1);
    bool StartRaw(const RecvRawCB &onData, int nworker = 1);
    bool StartAsync(int nworker = 2);
    bool Stop();
    size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
protected:
    ShmSocket(Shm &shm, const void *id, const int len);
    Shm &shm() { return shm_; }
    const Shm &shm() const { return shm_; }
    Queue &mq() { return *mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return *mq_; }
    std::mutex &mutex() { return mutex_; }
    bool SyncSend(const void *id, const bhome_msg::BHMsg &msg, const int timeout_ms);
    bool SyncRecv(bhome_msg::BHMsg &msg, const int timeout_ms);
private:
    bool AsyncRequest(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
    bool SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms);
    bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    bool StopNoLock();
    bhome_shm::SharedMemory &shm_;
    const Type type_;
    bool RunningNoLock() { return !workers_.empty(); }
    Shm &shm_;
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::atomic<bool> run_;
    std::unique_ptr<Queue> mq_;
    std::unordered_map<std::string, RecvCB> async_cbs_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
utest/utest.cpp
@@ -1,5 +1,6 @@
#include "defs.h"
#include "pubsub.h"
#include "pubsub_center.h"
#include "socket.h"
#include "util.h"
#include <atomic>
@@ -66,7 +67,7 @@
BOOST_AUTO_TEST_CASE(PubSubTest)
{
    const std::string shm_name("ShmPubSub");
    // ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
    ShmRemover auto_remove(shm_name); //remove twice? in case of killed?
    SharedMemory shm(shm_name, 1024 * 1024 * 50);
    DEFER1(shm.Remove());
    auto Avail = [&]() { return shm.get_free_memory(); };
@@ -75,57 +76,50 @@
    printf("flag = %d\n", *flag);
    ++*flag;
    BusManager bus(shm);
    PubSubCenter bus(shm);
    bus.Start();
    std::this_thread::sleep_for(100ms);
    std::atomic<uint64_t> count(0);
    std::atomic<uint64_t> total_count(0);
    std::atomic<ptime> last_time(Now() - seconds(1));
    std::atomic<uint64_t> last_count(0);
    const uint64_t nmsg = 100 * 2;
    const int timeout = 1000;
    auto Sub = [&](int id, const std::vector<std::string> &topics) {
        ShmSocket client(ShmSocket::eSockSubscribe, shm);
        SocketSubscribe client(shm);
        bool r = client.Subscribe(topics, timeout);
        std::mutex mutex;
        std::condition_variable cv;
        uint64_t i = 0;
        auto OnRecv = [&](BHMsg &msg) {
            if (msg.type() != kMsgTypePublish) {
                BOOST_CHECK(false);
            }
            DataPub pub;
            if (!pub.ParseFromString(msg.body())) {
                BOOST_CHECK(false);
            }
            ++count;
        std::atomic<uint64_t> n(0);
        auto OnTopicData = [&](const std::string &topic, const std::string &data) {
            ++total_count;
            auto cur = Now();
            if (last_time.exchange(cur) < cur) {
                std::cout << "time: " << cur;
                printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
                       count.load(), count - last_count.exchange(count), init_avail - Avail());
                       total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
            }
            if (++i >= nmsg * topics.size()) {
            if (++n >= nmsg * topics.size()) {
                cv.notify_one();
            }
            // printf("sub %2d recv: %s/%s\n", id, pub.topic().c_str(), pub.data().c_str());
        };
        client.Start(OnRecv);
        client.StartRecv(OnTopicData, 1);
        std::unique_lock<std::mutex> lk(mutex);
        cv.wait(lk);
    };
    auto Pub = [&](const std::string &topic) {
        ShmSocket provider(ShmSocket::eSockPublish, shm);
        SocketPublish provider(shm);
        for (unsigned i = 0; i < nmsg; ++i) {
            std::string data = topic + std::to_string(i) + std::string(1000, '-');
            bool r = provider.Publish(topic, data.data(), data.size(), timeout);
            // bool r = provider.Send(kBHBusQueueId, MakePub(topic, data.data(), data.size()), timeout);
            bool r = provider.Publish(topic, data, timeout);
            if (!r) {
                printf("pub ret: %s\n", r ? "ok" : "fail");
            }
@@ -151,7 +145,7 @@
    threads.WaitAll();
    std::cout << "end : " << Now();
    printf("sub recv, total msg:%10ld, speed:[%8ld/s], used mem:%8ld \n",
           count.load(), count - last_count.exchange(count), init_avail - Avail());
           total_count.load(), total_count - last_count.exchange(total_count), init_avail - Avail());
    bus.Stop();
}