lichao
2021-04-14 aa1542b6d6a4680088ac715c4ce40f97ada554fb
add SendQ TrySend() TryRecv(); handle re-register.
2个文件已添加
13个文件已修改
681 ■■■■■ 已修改文件
box/center.cpp 65 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.h 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/failed_msg.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 25 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 138 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 66 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/timed_queue.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 150 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -18,7 +18,6 @@
#include "center.h"
#include "bh_util.h"
#include "defs.h"
#include "failed_msg.h"
#include "shm.h"
#include <chrono>
#include <set>
@@ -52,7 +51,7 @@
    };
    struct ProcState {
        int64_t timestamp_;
        int64_t timestamp_ = 0;
        uint32_t flag_ = 0; // reserved
        void UpdateState(const int64_t now, const int64_t offline_time, const int64_t kill_time)
        {
@@ -111,15 +110,32 @@
        }
        try {
            Node node(new NodeInfo);
            node->addrs_.insert(SrcAddr(head));
            for (auto &addr : msg.addrs()) {
                node->addrs_.insert(addr.mq_id());
            auto UpdateRegInfo = [&](Node &node) {
                node->addrs_.insert(SrcAddr(head));
                for (auto &addr : msg.addrs()) {
                    node->addrs_.insert(addr.mq_id());
                }
                node->proc_.Swap(msg.mutable_proc());
                node->state_.timestamp_ = head.timestamp();
                node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
            };
            auto pos = nodes_.find(head.proc_id());
            if (pos == nodes_.end()) { // new client
                Node node(new NodeInfo);
                UpdateRegInfo(node);
                nodes_[node->proc_.proc_id()] = node;
            } else {
                Node &node = pos->second;
                if (node->addrs_.find(SrcAddr(head)) == node->addrs_.end()) {
                    // node restarted, release old mq.
                    for (auto &addr : node->addrs_) {
                        cleaner_(addr);
                    }
                    node->addrs_.clear();
                }
                UpdateRegInfo(node);
            }
            node->proc_.Swap(msg.mutable_proc());
            node->state_.timestamp_ = head.timestamp();
            node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
            nodes_[node->proc_.proc_id()] = node;
            return MakeReply(eSuccess);
        } catch (...) {
            return MakeReply(eError, "register node error.");
@@ -134,7 +150,7 @@
            if (pos == nodes_.end()) {
                return MakeReply<Reply>(eNotRegistered, "Node is not registered.");
            } else {
                auto node = pos->second;
                auto &node = pos->second;
                if (!MatchAddr(node->addrs_, SrcAddr(head))) {
                    return MakeReply<Reply>(eAddressNotMatch, "Node address error.");
                } else if (head.type() == kMsgTypeHeartbeat && CanHeartbeat(*node)) {
@@ -342,8 +358,7 @@
        auto node = weak.lock();
        return node && Valid(*node);
    }
    void CheckAllNodes(); //TODO, call it in timer.
    std::string id_;      // center proc id;
    std::string id_; // center proc id;
    std::unordered_map<Topic, Clients> service_map_;
    std::unordered_map<Topic, Clients> subscribe_map_;
@@ -385,30 +400,25 @@
bool AddCenter(const std::string &id, const NodeCenter::Cleaner &cleaner)
{
    auto center_ptr = std::make_shared<Synced<NodeCenter>>(id, cleaner, 5s, 10s);
    auto center_failed_q = std::make_shared<FailedMsgQ>();
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id, FailedMsgQ &failq, const int timeout_ms = 0) {
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
        return [&](auto &&rep_body) {
            auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
            MsgI msg;
            if (msg.Make(socket.shm(), reply_head, rep_body)) {
                auto &remote = head.route(0).mq_id();
                bool r = socket.Send(remote.data(), msg, timeout_ms);
                if (!r) {
                    failq.Push(remote, msg, 60s); // for later retry.
                }
                bool r = socket.Send(remote.data(), msg);
            }
        };
    };
    auto OnCenterIdle = [center_ptr, center_failed_q](ShmSocket &socket) {
    auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
        auto &center = *center_ptr;
        center_failed_q->TrySend(socket);
        center->OnTimer();
    };
    auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
        auto &center = *center_ptr;
        auto replyer = MakeReplyer(socket, head, center->id(), *center_failed_q);
        auto replyer = MakeReplyer(socket, head, center->id());
        switch (head.type()) {
            CASE_ON_MSG_TYPE(Register);
            CASE_ON_MSG_TYPE(Heartbeat);
@@ -419,11 +429,10 @@
        }
    };
    auto bus_failed_q = std::make_shared<FailedMsgQ>();
    auto OnBusIdle = [=](ShmSocket &socket) { bus_failed_q->TrySend(socket); };
    auto OnBusIdle = [=](ShmSocket &socket) {};
    auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
        auto &center = *center_ptr;
        auto replyer = MakeReplyer(socket, head, center->id(), *bus_failed_q);
        auto replyer = MakeReplyer(socket, head, center->id());
        auto OnPublish = [&]() {
            MsgPublish pub;
            NodeCenter::Clients clients;
@@ -442,9 +451,9 @@
                    auto &cli = *it;
                    auto node = cli.weak_node_.lock();
                    if (node) {
                        if (!socket.Send(cli.mq_.data(), msg, 0)) {
                            bus_failed_q->Push(cli.mq_, msg, 60s);
                        }
                        // should also make sure that mq is not killed before msg expires.
                        // it would be ok if (kill_time - offline_time) is longer than expire time.
                        socket.Send(cli.mq_.data(), msg);
                        ++it;
                    } else {
                        it = clients.erase(it);
src/bh_api.cpp
@@ -186,15 +186,14 @@
bool BHSendReply(void *src,
                 const void *reply,
                 const int reply_len,
                 const int timeout_ms)
                 const int reply_len)
{
    MsgRequestTopicReply rep;
    if (!rep.ParseFromArray(reply, reply_len)) {
        SetLastError(eInvalidInput, "invalid input.");
        return false;
    }
    return ProcNode().ServerSendReply(src, rep, timeout_ms);
    return ProcNode().ServerSendReply(src, rep);
}
int BHCleanUp()
src/bh_api.h
@@ -64,8 +64,7 @@
bool BHSendReply(BHSrcInfo *src,
                 const void *reply,
                 const int reply_len,
                 const int timeout_ms);
                 const int reply_len);
// int BHCleanUp();
src/failed_msg.cpp
@@ -23,7 +23,7 @@
    return [remote, msg](void *valid_sock) {
        assert(valid_sock);
        ShmSocket &sock = *static_cast<ShmSocket *>(valid_sock);
        bool r = sock.Send(remote.data(), msg, 0);
        bool r = sock.Send(remote.data(), msg);
        //TODO check remote removed.
        if (r && msg.IsCounted()) {
            auto tmp = msg; // Release() is not const, but it's safe to release.
src/sendq.cpp
New file
@@ -0,0 +1,62 @@
/*
 * =====================================================================================
 *
 *       Filename:  sendq.cpp
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月14日 09时22分50秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#include "sendq.h"
#include "shm_queue.h"
#include <chrono>
bool SendQ::TrySend(bhome_shm::ShmMsgQueue &mq)
{
    auto FirstNotExpired = [](MsgList &l) {
        auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
        return std::lower_bound(l.begin(), l.end(), Now(), Less);
    };
    auto SendOneRemote = [&](const Remote &remote, MsgList &msg_list) {
        auto pos = FirstNotExpired(msg_list);
        for (auto it = msg_list.begin(); it != pos; ++it) {
            auto &info = it->data();
            if (info.on_expire_) {
                info.on_expire_(info.msg_);
            }
            info.msg_.Release(mq.shm());
        }
        //TODO maybe use TrySendAll ?
        while (pos != msg_list.end() && mq.TrySend(*(MQId *) remote.data(), pos->data().msg_)) {
            auto &msg = pos->data().msg_;
            if (msg.IsCounted()) {
                msg.Release(mq.shm());
            }
            ++pos;
        }
        msg_list.erase(msg_list.begin(), pos);
    };
    if (!store_.empty()) {
        auto rec = store_.begin();
        do {
            SendOneRemote(rec->first, rec->second);
            if (rec->second.empty()) {
                rec = store_.erase(rec);
            } else {
                ++rec;
            }
        } while (rec != store_.end());
    }
    return !store_.empty();
}
src/sendq.h
New file
@@ -0,0 +1,74 @@
/*
 * =====================================================================================
 *
 *       Filename:  sendq.h
 *
 *    Description:
 *
 *        Version:  1.0
 *        Created:  2021年04月14日 09时22分59秒
 *       Revision:  none
 *       Compiler:  gcc
 *
 *         Author:  Li Chao (), lichao@aiotlink.com
 *   Organization:
 *
 * =====================================================================================
 */
#ifndef SENDQ_IWKMSK7M
#define SENDQ_IWKMSK7M
#include "defs.h"
#include "msg.h"
#include "timed_queue.h"
#include <deque>
#include <functional>
#include <string>
#include <unordered_map>
namespace bhome_shm
{
class ShmMsgQueue;
} // namespace bhome_shm
class SendQ
{
public:
    typedef std::string Remote;
    typedef bhome_msg::MsgI MsgI;
    typedef std::function<void(const MsgI &msg)> OnMsgEvent;
    struct MsgInfo {
        MsgI msg_;
        OnMsgEvent on_expire_;
        // OnMsgEvent on_send_;
    };
    typedef TimedData<MsgInfo> TimedMsg;
    typedef TimedMsg::TimePoint TimePoint;
    typedef TimedMsg::Duration Duration;
    void Append(const MQId &id, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
    {
        Append(std::string((const char *) &id, sizeof(id)), msg, onExpire);
    }
    void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
    {
        using namespace std::chrono_literals;
        Append(addr, msg, Now() + 60s, onExpire);
    }
    bool TrySend(bhome_shm::ShmMsgQueue &mq);
    // bool empty() const { return store_.empty(); }
private:
    static TimePoint Now() { return TimedMsg::Clock::now(); }
    void Append(const Remote &addr, const MsgI &msg, const TimePoint &expire, OnMsgEvent onExpire)
    {
        msg.AddRef();
        store_[addr].emplace_back(TimedMsg(expire, MsgInfo{msg, onExpire}));
    }
    typedef std::deque<TimedMsg> MsgList;
    typedef std::unordered_map<Remote, MsgList> Store;
    Store store_;
};
#endif // end of include guard: SENDQ_IWKMSK7M
src/shm_queue.cpp
@@ -73,17 +73,26 @@
{
    Queue *remote = Find(shm, MsgQIdToName(remote_id));
    if (remote) {
        return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
        if (onsend) {
            return remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
        } else {
            return remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
        }
    } else {
        // SetLestError(eNotFound);
        return false;
    }
}
bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms)
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend)
{
    Queue *remote = Find(shm, MsgQIdToName(remote_id));
    if (remote) {
        return remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); });
        if (onsend) {
            return remote->TryWrite(msg, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); });
        } else {
            return remote->TryWrite(msg, [](const MsgI &msg) { msg.AddRef(); });
        }
    } else {
        // SetLestError(eNotFound);
        return false;
@@ -94,15 +103,5 @@
// 1) build msg first, then find remote queue;
// 2) find remote queue first, then build msg;
// 1 is about 50% faster than 2, maybe cache related.
// bool ShmMsgQueue::Recv(MsgI &imsg, BHMsgHead &head, const int timeout_ms)
// {
//     if (Read(imsg, timeout_ms)) {
//         // DEFER1(imsg.Release(shm()););
//         return imsg.ParseHead(head);
//     } else {
//         return false;
//     }
// }
} // namespace bhome_shm
src/shm_queue.h
@@ -48,6 +48,63 @@
        return cur + millisec(ms);
    }
    auto TimedReadPred(const int timeout_ms)
    {
        auto endtime = MSFromNow(timeout_ms);
        return [this, endtime](Guard &lock) {
            return (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); }));
        };
    }
    auto TryReadPred()
    {
        return [this](Guard &lock) { return !this->empty(); };
    }
    template <class Pred, class OnData>
    int ReadAllOnCond(Pred const &pred, OnData const &onData)
    {
        Guard lock(this->mutex());
        int n = 0;
        while (pred(lock)) {
            ++n;
            onData(this->front());
            this->pop_front();
            this->cond_write_.notify_one();
        }
        return n;
    }
    template <class Pred>
    bool ReadOnCond(D &buf, Pred const &pred)
    {
        int flag = 0;
        auto only_once = [&](Guard &lock) { return flag++ == 0 && pred(lock); };
        auto onData = [&buf](D &d) {
            using std::swap;
            swap(buf, d);
        };
        return ReadAllOnCond(only_once, onData);
    }
    template <class Iter, class Pred, class OnWrite>
    int WriteAllOnCond(Iter begin, Iter end, Pred const &pred, OnWrite const &onWrite)
    {
        if (begin == end) { return 0; }
        int n = 0;
        Guard lock(mutex());
        while (pred(lock)) {
            onWrite(*begin);
            this->push_back(*begin);
            ++n;
            cond_read_.notify_one();
            if (++begin == end) {
                break;
            }
        }
        return n;
    }
public:
    SharedQueue(const uint32_t len, Allocator<D> const &alloc) :
        Super(len, alloc) {}
@@ -56,60 +113,42 @@
    template <class Iter, class OnWrite>
    int Write(Iter begin, Iter end, const int timeout_ms, const OnWrite &onWrite)
    {
        int n = 0;
        if (begin != end) {
            auto endtime = MSFromNow(timeout_ms);
            Guard lock(mutex());
            while (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); })) {
                onWrite(*begin);
                this->push_back(*begin);
                ++n;
                cond_read_.notify_one();
                if (++begin == end) {
                    break;
                }
            }
        }
        return n;
        auto endtime = MSFromNow(timeout_ms);
        auto timedWritePred = [this, endtime](Guard &lock) {
            return (cond_write_.timed_wait(lock, endtime, [&]() { return !this->full(); }));
        };
        return WriteAllOnCond(begin, end, timedWritePred, onWrite);
    }
    template <class OnWrite>
    bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite)
    {
        return Write(&buf, (&buf) + 1, timeout_ms, onWrite);
    }
    bool Write(const D &buf, const int timeout_ms, const OnWrite &onWrite) { return Write(&buf, (&buf) + 1, timeout_ms, onWrite); }
    bool Write(const D &buf, const int timeout_ms)
    {
        return Write(buf, timeout_ms, [](const D &buf) {});
    }
    template <class OnData>
    bool Read(const int timeout_ms, OnData onData)
    template <class Iter, class OnWrite>
    int TryWrite(Iter begin, Iter end, const OnWrite &onWrite)
    {
        int n = 0;
        auto endtime = MSFromNow(timeout_ms);
        Guard lock(mutex());
        while (cond_read_.timed_wait(lock, endtime, [&]() { return !this->empty(); })) {
            const bool more = onData(this->front());
            this->pop_front();
            cond_write_.notify_one();
            ++n;
            if (!more) {
                break;
            }
        }
        return n;
        auto tryWritePred = [this](Guard &lock) { return !this->full(); };
        return WriteAllOnCond(begin, end, tryWritePred, onWrite);
    }
    bool Read(D &buf, const int timeout_ms)
    template <class OnWrite>
    bool TryWrite(const D &buf, const OnWrite &onWrite) { return TryWrite(&buf, (&buf) + 1, onWrite); }
    bool TryWrite(const D &buf)
    {
        auto read1 = [&](D &d) {
            using std::swap;
            swap(buf, d);
            return false;
        };
        return Read(timeout_ms, read1) == 1;
        return TryWrite(buf, [](const D &buf) {});
    }
    template <class OnData>
    int ReadAll(const int timeout_ms, OnData const &onData) { return ReadAllOnCond(TimedReadPred(timeout_ms), onData); }
    template <class OnData>
    int TryReadAll(OnData const &onData) { return ReadAllOnCond(TryReadPred(), onData); }
    bool Read(D &buf, const int timeout_ms) { return ReadOnCond(buf, TimedReadPred(timeout_ms)); }
    bool TryRead(D &buf) { return ReadOnCond(buf, TryReadPred()); }
};
using namespace bhome_msg;
@@ -119,8 +158,6 @@
    typedef ShmObject<SharedQueue<MsgI>> Super;
    typedef Super::Data Queue;
    typedef std::function<void()> OnSend;
    bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); }
    bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); }
    MQId id_;
protected:
@@ -131,14 +168,21 @@
    ~ShmMsgQueue();
    static bool Remove(SharedMemory &shm, const MQId &id);
    const MQId &Id() const { return id_; }
    using Super::shm;
    // bool Recv(MsgI &msg, BHMsgHead &head, const int timeout_ms);
    bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); }
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend);
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms);
    bool Recv(MsgI &msg, const int timeout_ms) { return data()->Read(msg, timeout_ms); }
    bool TryRecv(MsgI &msg) { return data()->TryRead(msg); }
    template <class OnData>
    int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); }
    static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend());
    static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
    template <class... Rest>
    bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
    template <class... Rest>
    bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
    size_t Pending() const { return data()->size(); }
};
src/socket.cpp
@@ -30,11 +30,11 @@
} // namespace
ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
    shm_(shm), run_(false), mq_(id, shm, len)
    run_(false), mq_(id, shm, len)
{
}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
    shm_(shm), run_(false), mq_(shm, len) {}
    run_(false), mq_(shm, len) {}
ShmSocket::~ShmSocket()
{
@@ -43,28 +43,38 @@
bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
    auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
        RecvCB cb;
        if (per_msg_cbs_->Find(head.msg_id(), cb)) {
            cb(socket, imsg, head);
        } else if (onData) {
            onData(socket, imsg, head);
        } else { // else ignored, or dropped
        }
    };
    auto ioProc = [this, onData, onIdle]() {
        auto DoSend = [this]() { return send_buffer_->TrySend(mq()); };
        auto DoRecv = [=] {
            auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
                RecvCB cb;
                if (per_msg_cbs_->Find(head.msg_id(), cb)) {
                    cb(socket, imsg, head);
                } else if (onData) {
                    onData(socket, imsg, head);
                }
            };
    auto recvLoopBody = [this, onRecvWithPerMsgCB, onIdle]() {
        try {
            MsgI imsg;
            if (mq().Recv(imsg, 10)) {
            // do not recv if no cb is set.
            if (!onData && per_msg_cbs_->empty()) {
                return false;
            }
            auto onMsg = [&](MsgI &imsg) {
                DEFER1(imsg.Release(shm()));
                BHMsgHead head;
                if (imsg.ParseHead(head)) {
                    onRecvWithPerMsgCB(*this, imsg, head);
                }
            }
            if (onIdle) {
                onIdle(*this);
            };
            return mq().TryRecvAll(onMsg) > 0; // this will recv all msgs.
        };
        try {
            bool more_to_send = DoSend();
            bool more_to_recv = DoRecv();
            if (onIdle) { onIdle(*this); }
            if (!more_to_send && !more_to_recv) {
                std::this_thread::yield();
            }
        } catch (...) {
        }
@@ -75,7 +85,7 @@
    run_.store(true);
    for (int i = 0; i < nworker; ++i) {
        workers_.emplace_back([this, recvLoopBody]() { while (run_) { recvLoopBody(); } });
        workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
    }
    return true;
}
@@ -100,18 +110,17 @@
    return false;
}
//maybe reimplment, using async cbs?
bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms)
{
    std::lock_guard<std::mutex> lock(mutex_);
    auto Recv = [&]() {
        if (mq().Recv(msg, timeout_ms)) {
            if (msg.ParseHead(head)) {
                return true;
            } else {
                msg.Release(shm());
            }
    // std::lock_guard<std::mutex> lock(mutex_); // seems no need to lock mutex_.
    bool got = (timeout_ms == 0) ? mq().TryRecv(msg) : mq().Recv(msg, timeout_ms);
    if (got) {
        if (msg.ParseHead(head)) {
            return true;
        } else {
            msg.Release(shm());
        }
        return false;
    };
    return !RunningNoLock() && Recv();
    }
    return false;
}
src/socket.h
@@ -21,6 +21,7 @@
#include "bh_util.h"
#include "defs.h"
#include "sendq.h"
#include "shm_queue.h"
#include <atomic>
#include <boost/noncopyable.hpp>
@@ -35,13 +36,10 @@
class ShmSocket : private boost::noncopyable
{
    template <class DoSend>
    inline bool SendImpl(MsgI &msg, const int timeout_ms, const DoSend &doSend)
    bool SendImpl(const void *valid_remote, const MsgI &imsg, SendQ::OnMsgEvent onExpire = SendQ::OnMsgEvent())
    {
        bool r = false;
        DEFER1(if (msg.IsCounted() || !r) { msg.Release(shm()); });
        r = doSend(msg);
        return r;
        send_buffer_->Append(*static_cast<const MQId *>(valid_remote), imsg, onExpire);
        return true;
    }
protected:
@@ -58,7 +56,6 @@
    ~ShmSocket();
    static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); }
    const MQId &id() const { return mq().Id(); }
    Shm &shm() { return shm_; }
    // start recv.
    bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
@@ -66,29 +63,36 @@
    bool Stop();
    size_t Pending() const { return mq().Pending(); }
    bool Send(const void *valid_remote, const MsgI &imsg, const int timeout_ms)
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body)
    {
        assert(valid_remote);
        return mq().Send(*static_cast<const MQId *>(valid_remote), imsg, timeout_ms);
        MsgI msg;
        return msg.Make(shm(), head, body) && SendImpl(valid_remote, msg);
    }
    //TODO reimplment, using async.
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const RecvCB &cb)
    {
        //TODO send_buffer_ need flag, and remove callback on expire.
        MsgI msg;
        if (msg.Make(shm(), head, body)) {
            std::string msg_id(head.msg_id());
            per_msg_cbs_->Add(msg_id, cb);
            auto onExpireRemoveCB = [this, msg_id](MsgI const &msg) {
                RecvCB cb_no_use;
                per_msg_cbs_->Find(msg_id, cb_no_use);
            };
            return SendImpl(valid_remote, msg, onExpireRemoveCB);
        }
        return false;
    }
    bool Send(const void *valid_remote, const MsgI &imsg)
    {
        return SendImpl(valid_remote, imsg);
    }
    bool SyncRecv(MsgI &msg, bhome::msg::BHMsgHead &head, const int timeout_ms);
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms, const RecvCB &cb)
    {
        auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms, [&]() { per_msg_cbs_->Add(head.msg_id(), cb); }); };
        MsgI msg;
        return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
    }
    template <class Body>
    bool Send(const void *valid_remote, const BHMsgHead &head, const Body &body, const int timeout_ms)
    {
        auto DoSend = [&](MsgI &msg) { return mq().Send(*static_cast<const MQId *>(valid_remote), msg, timeout_ms); };
        MsgI msg;
        return msg.Make(shm(), head, body) && SendImpl(msg, timeout_ms, DoSend);
    }
    template <class Body>
    bool SendAndRecv(const void *remote, const BHMsgHead &head, const Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
@@ -114,7 +118,7 @@
            };
            std::unique_lock<std::mutex> lk(st->mutex);
            bool sendok = Send(remote, head, body, timeout_ms, OnRecv);
            bool sendok = Send(remote, head, body, OnRecv);
            if (!sendok) {
                printf("send timeout\n");
            }
@@ -129,8 +133,9 @@
        }
    }
    Shm &shm() const { return mq().shm(); }
protected:
    const Shm &shm() const { return shm_; }
    Queue &mq() { return mq_; } // programmer should make sure that mq_ is valid.
    const Queue &mq() const { return mq_; }
    std::mutex &mutex() { return mutex_; }
@@ -139,7 +144,6 @@
    bool StopNoLock();
    bool RunningNoLock() { return !workers_.empty(); }
    Shm &shm_;
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::atomic<bool> run_;
@@ -150,6 +154,7 @@
        std::unordered_map<std::string, RecvCB> store_;
    public:
        bool empty() const { return store_.empty(); }
        bool Add(const std::string &id, const RecvCB &cb) { return store_.emplace(id, cb).second; }
        bool Find(const std::string &id, RecvCB &cb)
        {
@@ -165,6 +170,7 @@
    };
    Synced<AsyncCBs> per_msg_cbs_;
    Synced<SendQ> send_buffer_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
src/timed_queue.h
@@ -1,7 +1,7 @@
/*
 * =====================================================================================
 *
 *       Filename:  failed_msg.h
 *       Filename:  timed_queue.h
 *
 *    Description:  
 *
@@ -35,7 +35,8 @@
        expire_(expire), data_(data) {}
    TimedData(const TimePoint &expire, Data &&data) :
        expire_(expire), data_(std::move(data)) {}
    bool Expired() { return Clock::now() > expire_; }
    bool Expired() const { return Clock::now() > expire_; }
    const TimePoint &expire() const { return expire_; }
    Data &data() { return data_; }
    Data const &data() const { return data_; }
src/topic_node.cpp
@@ -17,7 +17,6 @@
 */
#include "topic_node.h"
#include "bh_util.h"
#include "failed_msg.h"
#include <chrono>
#include <list>
@@ -33,9 +32,8 @@
    std::string msg_id;
};
typedef FailedMsgQ ServerFailedQ;
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), sock_node_(shm), sock_request_(shm), sock_reply_(shm), sock_sub_(shm)
{
@@ -76,15 +74,20 @@
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    AddRoute(head, sock.id());
    MsgI reply;
    DEFER1(reply.Release(shm_););
    BHMsgHead reply_head;
    bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
    r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
    if (r && IsSuccess(reply_body.errmsg().errcode())) {
        info_ = body;
    if (timeout_ms == 0) {
        return sock.Send(&BHTopicCenterAddress(), head, body);
    } else {
        MsgI reply;
        DEFER1(reply.Release(shm_););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
        if (r && IsSuccess(reply_body.errmsg().errcode())) {
            info_ = body;
            return true;
        }
        return false;
    }
    return r;
}
bool TopicNode::Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
@@ -96,22 +99,23 @@
    auto head(InitMsgHead(GetType(body), body.proc().proc_id()));
    AddRoute(head, sock.id());
    MsgI reply;
    DEFER1(reply.Release(shm_););
    BHMsgHead reply_head;
    bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
    r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
    if (r && IsSuccess(reply_body.errmsg().errcode())) {
        // TODO update proc info
    if (timeout_ms == 0) {
        return sock.Send(&BHTopicCenterAddress(), head, body);
    } else {
        MsgI reply;
        DEFER1(reply.Release(shm_););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
        return (r && IsSuccess(reply_body.errmsg().errcode()));
    }
    return r;
}
bool TopicNode::Heartbeat(const int timeout_ms)
{
    ProcInfo proc;
    proc.set_proc_id(proc_id());
    MsgCommonReply reply_body;
    return Heartbeat(proc, reply_body, timeout_ms) && IsSuccess(reply_body.errmsg().errcode());
    return Heartbeat(proc, reply_body, timeout_ms);
}
bool TopicNode::ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms)
@@ -124,50 +128,43 @@
    auto head(InitMsgHead(GetType(body), proc_id()));
    AddRoute(head, sock.id());
    MsgI reply;
    DEFER1(reply.Release(shm_););
    BHMsgHead reply_head;
    bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
    r = r && reply_head.type() == kMsgTypeCommonReply;
    r = r && reply.ParseBody(reply_body);
    return r;
    if (timeout_ms == 0) {
        return sock.Send(&BHTopicCenterAddress(), head, body);
    } else {
        MsgI reply;
        DEFER1(reply.Release(shm_););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        r = r && reply_head.type() == kMsgTypeCommonReply;
        r = r && reply.ParseBody(reply_body);
        return r;
    }
}
bool TopicNode::ServerStart(const ServerCB &rcb, int nworker)
{
    auto failed_q = std::make_shared<ServerFailedQ>();
    auto onRecv = [this, rcb](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
        if (head.type() != kMsgTypeRequestTopic || head.route_size() == 0) { return; }
        MsgRequestTopic req;
        if (!imsg.ParseBody(req)) { return; }
    auto onIdle = [failed_q](ShmSocket &socket) { failed_q->TrySend(socket); };
        MsgRequestTopicReply reply_body;
        if (rcb(head.proc_id(), req, reply_body)) {
            BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
    auto onRecv = [this, rcb, failed_q, onIdle](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
        if (head.type() == kMsgTypeRequestTopic && head.route_size() > 0) {
            MsgRequestTopic req;
            if (imsg.ParseBody(req)) {
                MsgRequestTopicReply reply_body;
                if (rcb(head.proc_id(), req, reply_body)) {
                    BHMsgHead reply_head(InitMsgHead(GetType(reply_body), proc_id(), head.msg_id()));
                    for (int i = 0; i < head.route_size() - 1; ++i) {
                        reply_head.add_route()->Swap(head.mutable_route(i));
                    }
                    MsgI msg;
                    if (msg.Make(sock.shm(), reply_head, reply_body)) {
                        auto &remote = head.route().rbegin()->mq_id();
                        if (!sock.Send(remote.data(), msg, 10)) {
                            failed_q->Push(remote, msg, 10s);
                        }
                    }
                }
            for (int i = 0; i < head.route_size() - 1; ++i) {
                reply_head.add_route()->Swap(head.mutable_route(i));
            }
        } else {
            // ignored, or dropped
            MsgI msg;
            if (msg.Make(sock.shm(), reply_head, reply_body)) {
                auto &remote = head.route().rbegin()->mq_id();
                sock.Send(remote.data(), msg);
            }
        }
        onIdle(sock);
    };
    auto &sock = SockServer();
    return rcb && sock.Start(onRecv, onIdle, nworker);
    return rcb && sock.Start(onRecv, nworker);
}
bool TopicNode::ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms)
@@ -189,7 +186,7 @@
    return false;
}
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body, const int timeout_ms)
bool TopicNode::ServerSendReply(void *src_info, const MsgRequestTopicReply &body)
{
    auto &sock = SockServer();
@@ -202,7 +199,7 @@
    for (unsigned i = 0; i < p->route.size() - 1; ++i) {
        head.add_route()->Swap(&p->route[i]);
    }
    return sock.Send(p->route.back().mq_id().data(), head, body, timeout_ms);
    return sock.Send(p->route.back().mq_id().data(), head, body);
}
bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -222,7 +219,7 @@
    return SockRequest().Start(onData, nworker);
}
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const int timeout_ms, const RequestResultCB &cb)
bool TopicNode::ClientAsyncRequest(const MsgRequestTopic &req, const RequestResultCB &cb)
{
    auto Call = [&](const void *remote) {
        auto &sock = SockRequest();
@@ -239,15 +236,15 @@
                    }
                }
            };
            return sock.Send(remote, head, req, timeout_ms, onRecv);
            return sock.Send(remote, head, req, onRecv);
        } else {
            return sock.Send(remote, head, req, timeout_ms);
            return sock.Send(remote, head, req);
        }
    };
    try {
        BHAddress addr;
        if (ClientQueryRPCTopic(req.topic(), addr, timeout_ms)) {
        if (ClientQueryRPCTopic(req.topic(), addr, 1000)) {
            return Call(addr.mq_id().data());
        } else {
            SetLastError(eNotFound, "remote not found.");
@@ -333,14 +330,18 @@
        BHMsgHead head(InitMsgHead(GetType(pub), proc_id()));
        AddRoute(head, sock.id());
        MsgI reply;
        DEFER1(reply.Release(shm()););
        BHMsgHead reply_head;
        MsgCommonReply reply_body;
        return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
               reply_head.type() == kMsgTypeCommonReply &&
               reply.ParseBody(reply_body) &&
               IsSuccess(reply_body.errmsg().errcode());
        if (timeout_ms == 0) {
            return sock.Send(&BHTopicBusAddress(), head, pub);
        } else {
            MsgI reply;
            DEFER1(reply.Release(shm()););
            BHMsgHead reply_head;
            MsgCommonReply reply_body;
            return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
                   reply_head.type() == kMsgTypeCommonReply &&
                   reply.ParseBody(reply_body) &&
                   IsSuccess(reply_body.errmsg().errcode());
        }
    } catch (...) {
    }
    return false;
@@ -357,8 +358,19 @@
        BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
        AddRoute(head, sock.id());
        return sock.Send(&BHTopicBusAddress(), head, sub, timeout_ms);
        if (timeout_ms == 0) {
            return sock.Send(&BHTopicBusAddress(), head, sub);
        } else {
            MsgI reply;
            DEFER1(reply.Release(shm()););
            BHMsgHead reply_head;
            MsgCommonReply reply_body;
            return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
                   reply_head.type() == kMsgTypeCommonReply &&
                   reply.ParseBody(reply_body) &&
                   IsSuccess(reply_body.errmsg().errcode());
        }
        // TODO wait for result?
    } catch (...) {
        return false;
    }
src/topic_node.h
@@ -48,12 +48,12 @@
    bool ServerStart(ServerCB const &cb, const int nworker = 2);
    bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
    bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms);
    bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply, const int timeout_ms);
    bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
    // topic client
    typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB;
    bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
    bool ClientAsyncRequest(const MsgRequestTopic &request, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientAsyncRequest(const MsgRequestTopic &request, const RequestResultCB &rrcb = RequestResultCB());
    bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
    // publish
utest/speed_test.cpp
@@ -161,9 +161,9 @@
                req_body.set_topic("topic");
                req_body.set_data(msg_content);
                auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
                return cli.Send(&srv.id(), req_head, req_body, 100);
                return cli.Send(&srv.id(), req_head, req_body);
            };
            auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc, 1000); };
            auto ReqRC = [&]() { return cli.Send(&srv.id(), request_rc); };
            if (!ReqRC()) {
                printf("********** client send error.\n");
@@ -204,9 +204,9 @@
                        reply_body.set_topic("topic");
                        reply_body.set_data(msg_content);
                        auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
                        return srv.Send(&src_id, reply_head, reply_body, 100);
                        return srv.Send(&src_id, reply_head, reply_body);
                    };
                    auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc, 100); };
                    auto ReplyRC = [&]() { return srv.Send(&src_id, reply_rc); };
                    if (ReplyRC()) {
                    }
utest/utest.cpp
@@ -1,6 +1,5 @@
#include "center.h"
#include "defs.h"
#include "failed_msg.h"
#include "util.h"
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
@@ -21,8 +20,6 @@
struct IsSameType<A, A> {
    static const bool value = true;
};
typedef FailedMsgQ ServerFailedQ;
BOOST_AUTO_TEST_CASE(Temp)
{
@@ -232,7 +229,7 @@
            MsgRequestTopic req;
            req.set_topic(topic);
            req.set_data("data " + std::to_string(i));
            if (!client.ClientAsyncRequest(req, 1000)) {
            if (!client.ClientAsyncRequest(req)) {
                printf("client request failed\n");
                ++count;
            }