lichao
2021-04-02 c28cdf2fbf1565709b359c9cca6c5e29d9592dce
typedef Topic.
8个文件已修改
58 ■■■■ 已修改文件
src/defs.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub.h 10 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/pubsub_center.h 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep.h 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/reqrep_center.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h
@@ -21,6 +21,7 @@
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <string>
typedef boost::uuids::uuid MQId;
@@ -34,6 +35,7 @@
} // namespace bhome_shm
bhome_shm::SharedMemory &BHomeShm();
typedef std::string Topic;
//TODO center can check shm for previous crash.
src/pubsub.cpp
@@ -22,7 +22,7 @@
using namespace std::chrono_literals;
using namespace bhome_msg;
bool SocketPublish::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms)
bool SocketPublish::Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms)
{
    try {
        MsgI imsg;
@@ -36,7 +36,7 @@
    }
}
bool SocketSubscribe::Subscribe(const std::vector<std::string> &topics, const int timeout_ms)
bool SocketSubscribe::Subscribe(const std::vector<Topic> &topics, const int timeout_ms)
{
    try {
        return mq().Send(kBHTopicBus, MakeSub(mq().Id(), topics), timeout_ms);
@@ -61,7 +61,7 @@
    return tdcb && Start(AsyncRecvProc, nworker);
}
bool SocketSubscribe::RecvSub(std::string &topic, std::string &data, const int timeout_ms)
bool SocketSubscribe::RecvSub(Topic &topic, std::string &data, const int timeout_ms)
{
    BHMsg msg;
    if (SyncRecv(msg, timeout_ms) && msg.type() == kMsgTypePublish) {
src/pubsub.h
@@ -33,8 +33,8 @@
        shm_(shm) {}
    SocketPublish() :
        SocketPublish(BHomeShm()) {}
    bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
    bool Publish(const std::string &topic, const std::string &data, const int timeout_ms)
    bool Publish(const Topic &topic, const void *data, const size_t size, const int timeout_ms);
    bool Publish(const Topic &topic, const std::string &data, const int timeout_ms)
    {
        return Publish(topic, data.data(), data.size(), timeout_ms);
    }
@@ -52,11 +52,11 @@
        SocketSubscribe(BHomeShm()) {}
    ~SocketSubscribe() { Stop(); }
    typedef std::function<void(const std::string &topic, const std::string &data)> TopicDataCB;
    typedef std::function<void(const Topic &topic, const std::string &data)> TopicDataCB;
    bool StartRecv(const TopicDataCB &tdcb, int nworker = 2);
    bool Stop() { return Socket::Stop(); }
    bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
    bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
    bool Subscribe(const std::vector<Topic> &topics, const int timeout_ms);
    bool RecvSub(Topic &topic, std::string &data, const int timeout_ms);
};
#endif // end of include guard: PUBSUB_4KGRA997
src/pubsub_center.h
@@ -36,9 +36,11 @@
    };
    SocketBus socket_;
    ShmSocket::Shm &shm() { return socket_.shm(); }
    std::mutex mutex_;
    typedef std::set<MQId> Clients;
    std::unordered_map<std::string, Clients> records_;
    std::unordered_map<Topic, Clients> records_;
    bool Find1(const Topic &topic);
public:
    PubSubCenter(ShmSocket::Shm &shm) :
src/reqrep.cpp
@@ -55,7 +55,7 @@
    return Start(AsyncRecvProc, nworker);
}
bool SocketRequest::AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
bool SocketRequest::AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb)
{
    auto Call = [&](const void *remote) {
        const BHMsg &msg(MakeRequest(mq().Id(), topic, data, size));
@@ -82,7 +82,7 @@
    }
}
bool SocketRequest::SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
bool SocketRequest::SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms)
{
    try {
        BHAddress addr;
@@ -153,7 +153,7 @@
    }
}
bool SocketRequest::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
bool SocketRequest::QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms)
{
    if (topic_cache_.Find(topic, addr)) {
        return true;
src/reqrep.h
@@ -42,13 +42,13 @@
    bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
    bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
    bool Stop() { return Socket::Stop(); }
    bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
    bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
    bool AsyncRequest(const Topic &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
    bool AsyncRequest(const Topic &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
    {
        return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
    }
    bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
    bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms)
    bool SyncRequest(const Topic &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
    bool SyncRequest(const Topic &topic, const std::string &data, std::string &out, const int timeout_ms)
    {
        return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
    }
@@ -56,7 +56,7 @@
private:
    bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
    bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
    bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    bool QueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
    std::unordered_map<std::string, RecvCB> async_cbs_;
    typedef bhome_msg::BHAddress Address;
@@ -64,11 +64,11 @@
    {
        class Impl
        {
            typedef std::unordered_map<std::string, Address> Store;
            typedef std::unordered_map<Topic, Address> Store;
            Store store_;
        public:
            bool Find(const std::string &topic, Address &addr)
            bool Find(const Topic &topic, Address &addr)
            {
                auto pos = store_.find(topic);
                if (pos != store_.end()) {
@@ -78,7 +78,7 @@
                    return false;
                }
            }
            bool Update(const std::string &topic, const Address &addr)
            bool Update(const Topic &topic, const Address &addr)
            {
                store_[topic] = addr;
                return true;
@@ -92,8 +92,8 @@
        // }
    public:
        bool Find(const std::string &topic, Address &addr) { return impl_->Find(topic, addr); }
        bool Update(const std::string &topic, const Address &addr) { return impl_->Update(topic, addr); }
        bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
        bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
    };
    TopicCache topic_cache_;
};
src/reqrep_center.cpp
@@ -66,7 +66,7 @@
            }
        }
    }
    bool QueryTopic(const std::string &topic, ProcAddr &addr)
    bool QueryTopic(const Topic &topic, ProcAddr &addr)
    {
        auto pos = topic_map_.find(topic);
        if (pos != topic_map_.end()) {
@@ -96,7 +96,7 @@
    };
    typedef std::shared_ptr<NodeInfo> Node;
    typedef std::weak_ptr<NodeInfo> WeakNode;
    std::unordered_map<std::string, WeakNode> topic_map_;
    std::unordered_map<Topic, WeakNode> topic_map_;
    std::unordered_map<ProcId, Node> nodes_;
};
} // namespace
utest/utest.cpp
@@ -25,7 +25,7 @@
BOOST_AUTO_TEST_CASE(Temp)
{
    std::string topics[] = {
    Topic topics[] = {
        "",
        ".",
        "a",
@@ -128,7 +128,7 @@
        }
    };
    ThreadManager threads;
    typedef std::vector<std::string> Topics;
    typedef std::vector<Topic> Topics;
    Topics topics;
    for (int i = 0; i < 100; ++i) {
        topics.push_back("t" + std::to_string(i));
@@ -208,7 +208,7 @@
        }
    };
    ThreadManager clients, servers;
    std::vector<std::string> topics = {"topic1", "topic2"};
    std::vector<Topic> topics = {"topic1", "topic2"};
    servers.Launch(Server, "server", topics);
    std::this_thread::sleep_for(100ms);
    for (auto &t : topics) {