lichao
2021-05-17 cab831748a2a9cc18b7f18f3b5e14a4374b7ab68
socket send using abs addr, avoid shm find by id.
18个文件已修改
449 ■■■■■ 已修改文件
.vscode/settings.json 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp 95 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg_api.proto 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 15 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_util.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp 33 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 44 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.vscode/settings.json
@@ -61,7 +61,16 @@
        "strstream": "cpp",
        "unordered_set": "cpp",
        "cfenv": "cpp",
        "*.ipp": "cpp"
        "*.ipp": "cpp",
        "cassert": "cpp",
        "cerrno": "cpp",
        "cfloat": "cpp",
        "ciso646": "cpp",
        "climits": "cpp",
        "ios": "cpp",
        "locale": "cpp",
        "queue": "cpp",
        "random": "cpp"
    },
    "files.exclude": {
        "**/*.un~": true,
box/center.cpp
@@ -105,17 +105,18 @@
        auto it = msgs_.begin();
        while (it != msgs_.end() && --limit > 0) {
            ShmMsg msg(it->second);
            if (msg.Count() == 0) {
            auto Free = [&]() {
                msg.Free();
                it = msgs_.erase(it);
                ++n;
            } else if (msg.timestamp() + 60 < NowSec()) {
                msg.Free();
                it = msgs_.erase(it);
                ++n;
                // LOG_DEBUG() << "release timeout msg, someone crashed.";
            } else {
            };
            int n = now - msg.timestamp();
            if (n < 10) {
                ++it;
            } else if (msg.Count() == 0) {
                Free();
            } else if (n > 60) {
                Free();
            }
        }
        if (n > 0) {
@@ -182,7 +183,7 @@
    struct NodeInfo {
        ProcState state_;             // state
        std::set<Address> addrs_;     // registered mqs
        std::map<MQId, int64_t> addrs_; // registered mqs
        ProcInfo proc_;               //
        AddressTopics services_;      // address: topics
        AddressTopics subscriptions_; // address: topics
@@ -191,12 +192,14 @@
    typedef std::weak_ptr<NodeInfo> WeakNode;
    struct TopicDest {
        Address mq_;
        MQId mq_id_;
        int64_t mq_abs_addr_;
        WeakNode weak_node_;
        bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
        bool operator<(const TopicDest &a) const { return mq_id_ < a.mq_id_; }
    };
    inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
    inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
    inline int64_t SrcAbsAddr(const BHMsgHead &head) { return head.route(0).abs_addr(); }
    inline bool MatchAddr(std::map<Address, int64_t> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
    NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) :
        id_(id), cleaner_(cleaner), offline_time_(offline_time), kill_time_(kill_time), last_check_time_(0) {}
@@ -218,39 +221,38 @@
            return; // ignore in exists.
        }
        auto UpdateRegInfo = [&](Node &node) {
            for (int i = 0; i < 10; ++i) {
                node->addrs_.insert(ssn + i);
            }
            node->state_.timestamp_ = NowSec() - offline_time_;
            node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
            // create sockets.
            const int nsocks = 4;
            try {
                auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); };
                // alloc(-1), node, server, sub, request,
                for (int i = 0; i < 4; ++i) {
                    CreateSocket(ssn + i);
                    node->addrs_.insert(ssn + i);
                for (int i = 0; i < nsocks; ++i) {
                    ShmSocket tmp(shm, true, ssn + i, 16);
                    node->addrs_.emplace(ssn + i, tmp.AbsAddr());
                }
                return true;
            } catch (...) {
                for (int i = 0; i < nsocks; ++i) {
                    ShmSocket::Remove(shm, ssn + i);
                }
                return false;
            }
        };
        auto PrepareProcInit = [&]() {
        auto PrepareProcInit = [&](Node &node) {
            bool r = false;
            ShmMsg init_msg;
            if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) {
                // 31bit pointer, 4bit cmd+flag
                int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply);
                r = SendAllocReply(socket, ssn, reply, init_msg);
                r = SendAllocReply(socket, {ssn, node->addrs_[ssn]}, reply, init_msg);
            }
            return r;
        };
        Node node(new NodeInfo);
        if (UpdateRegInfo(node) && PrepareProcInit()) {
        if (UpdateRegInfo(node) && PrepareProcInit(node)) {
            nodes_[ssn] = node;
            LOG_INFO() << "new node ssn (" << ssn << ") init";
        } else {
@@ -261,13 +263,13 @@
    }
    void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); }
    bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg)
    bool SendAllocReply(ShmSocket &socket, const MQInfo &dest, const int64_t reply, const MsgI &msg)
    {
        RecordMsg(msg);
        auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); };
        return socket.Send(dest, reply, onExpireFree);
    }
    bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg)
    bool SendAllocMsg(ShmSocket &socket, const MQInfo &dest, const MsgI &msg)
    {
        RecordMsg(msg);
        return socket.Send(dest, msg);
@@ -284,7 +286,21 @@
        if (proc_rec.proc_.empty()) {
            return;
        }
        Address dest = proc_rec.ssn_ + socket_index;
        MQInfo dest = {proc_rec.ssn_ + socket_index, 0};
        auto FindMq = [&]() {
            auto pos = nodes_.find(proc_rec.ssn_);
            if (pos != nodes_.end()) {
                for (auto &&mq : pos->second->addrs_) {
                    if (mq.first == dest.id_) {
                        dest.offset_ = mq.second;
                        return true;
                    }
                }
            }
            return false;
        };
        if (!FindMq()) { return; }
        auto size = GetAllocSize((val >> 52) & MaskBits(8));
        MsgI new_msg;
@@ -337,10 +353,6 @@
            // when node restart, ssn will change,
            // and old node will be removed after timeout.
            auto UpdateRegInfo = [&](Node &node) {
                node->addrs_.insert(SrcAddr(head));
                for (auto &addr : msg.addrs()) {
                    node->addrs_.insert(addr.mq_id());
                }
                node->proc_.Swap(msg.mutable_proc());
                node->state_.timestamp_ = head.timestamp();
                node->state_.UpdateState(NowSec(), offline_time_, kill_time_);
@@ -420,11 +432,11 @@
                auto src = SrcAddr(head);
                auto &topics = msg.topics().topic_list();
                node->services_[src].insert(topics.begin(), topics.end());
                TopicDest dest = {src, node};
                TopicDest dest = {src, SrcAbsAddr(head), node};
                for (auto &topic : topics) {
                    service_map_[topic].insert(dest);
                }
                LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << *node->addrs_.begin() << " serve " << topics.size() << " topics:\n";
                LOG_DEBUG() << "node " << node->proc_.proc_id() << " ssn " << node->addrs_.begin()->first << " serve " << topics.size() << " topics:\n";
                for (auto &topic : topics) {
                    LOG_DEBUG() << "\t" << topic;
                }
@@ -464,7 +476,8 @@
                    if (dest_node && Valid(*dest_node)) {
                        auto node_addr = reply.add_node_address();
                        node_addr->set_proc_id(dest_node->proc_.proc_id());
                        node_addr->mutable_addr()->set_mq_id(dest.mq_);
                        node_addr->mutable_addr()->set_mq_id(dest.mq_id_);
                        node_addr->mutable_addr()->set_abs_addr(dest.mq_abs_addr_);
                    }
                }
                return reply;
@@ -482,7 +495,7 @@
            auto src = SrcAddr(head);
            auto &topics = msg.topics().topic_list();
            node->subscriptions_[src].insert(topics.begin(), topics.end());
            TopicDest dest = {src, node};
            TopicDest dest = {src, SrcAbsAddr(head), node};
            for (auto &topic : topics) {
                subscribe_map_[topic].insert(dest);
            }
@@ -505,7 +518,7 @@
            };
            if (pos != node->subscriptions_.end()) {
                const TopicDest &dest = {src, node};
                const TopicDest &dest = {src, SrcAbsAddr(head), node};
                auto &topics = msg.topics().topic_list();
                // clear node sub records;
                for (auto &topic : topics) {
@@ -602,7 +615,7 @@
    {
        auto EraseMapRec = [&node](auto &rec_map, auto &node_rec) {
            for (auto &addr_topics : node_rec) {
                TopicDest dest{addr_topics.first, node};
                TopicDest dest{addr_topics.first, 0, node}; // abs_addr is not used.
                for (auto &topic : addr_topics.second) {
                    auto pos = rec_map.find(topic);
                    if (pos != rec_map.end()) {
@@ -626,7 +639,7 @@
        }
        for (auto &addr : node->addrs_) {
            cleaner_(addr);
            cleaner_(addr.first);
        }
        node->addrs_.clear();
@@ -678,7 +691,7 @@
{
    return [&](auto &&rep_body) {
        auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
        auto remote = head.route(0).mq_id();
        MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
        MsgI msg;
        if (msg.Make(reply_head, rep_body)) {
            DEFER1(msg.Release(););
@@ -741,7 +754,7 @@
                    if (node) {
                        // should also make sure that mq is not killed before msg expires.
                        // it would be ok if (kill_time - offline_time) is longer than expire time.
                        socket.Send(cli.mq_, msg);
                        socket.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
                        ++it;
                    } else {
                        it = clients.erase(it);
@@ -772,9 +785,9 @@
    return rec;
}
bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len)
bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len)
{
    Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len};
    Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mq, mq_len};
    return true;
}
@@ -792,7 +805,7 @@
    for (auto &kv : Centers()) {
        auto &info = kv.second;
        sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_);
        sockets_[info.name_] = std::make_shared<ShmSocket>(info.mq_.offset_, shm, info.mq_.id_);
    }
}
box/center.h
@@ -31,7 +31,7 @@
    typedef Socket::PartialRecvCB MsgHandler;
    typedef Socket::RawRecvCB RawHandler;
    typedef Socket::IdleCB IdleHandler;
    static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len);
    static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQInfo &mq, const int mq_len);
    BHCenter(Socket::Shm &shm);
    ~BHCenter() { Stop(); }
@@ -44,7 +44,7 @@
        MsgHandler handler_;
        RawHandler raw_handler_;
        IdleHandler idle_;
        MQId mqid_;
        MQInfo mq_;
        int mq_len_ = 0;
    };
    typedef std::map<std::string, CenterInfo> CenterRecords;
proto/source/bhome_msg_api.proto
@@ -9,8 +9,9 @@
message BHAddress {
    uint64 mq_id = 1;
    bytes ip = 2;
    int32 port = 3;
    int64 abs_addr = 2;
    bytes ip = 3;
    int32 port = 4;
}
message ProcInfo
@@ -48,7 +49,6 @@
message MsgRegister
{
    ProcInfo proc = 1;
    repeated BHAddress addrs = 2;
}
message MsgUnregister
src/bh_api.cpp
@@ -30,7 +30,11 @@
}
std::unique_ptr<TopicNode> &ProcNodePtr()
{
    static bool init = GlobalInit(BHomeShm());
    static std::mutex mtx;
    std::lock_guard<std::mutex> lk(mtx);
    static std::unique_ptr<TopicNode> ptr;
    if (!ptr && GlobalInit(BHomeShm())) {
    auto InitLog = []() {
        auto id = GetProcExe();
        char path[200] = {0};
@@ -39,7 +43,8 @@
        return true;
    };
    static bool init_log = InitLog();
    static std::unique_ptr<TopicNode> ptr(new TopicNode(BHomeShm()));
        ptr.reset(new TopicNode(BHomeShm()));
    }
    return ptr;
}
TopicNode &ProcNode()
@@ -114,6 +119,12 @@
        return false;
    }
    MsgOut msg_reply;
    auto &ptr = ProcNodePtr();
    if (!ptr) {
        SetLastError(eNotFound, "center not started.");
        return 0;
    }
    return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
           PackOutput(msg_reply, reply, reply_len);
}
src/bh_util.h
@@ -157,9 +157,9 @@
    }
protected:
    static inline T &GetData()
    static inline T &GetData(const std::string &msg = "Must set data before use!")
    {
        if (!ptr()) { throw std::string("Must set ShmMsg shm before use!"); }
        if (!ptr()) { throw std::logic_error(msg); }
        return *ptr();
    }
src/defs.cpp
@@ -141,10 +141,10 @@
    return false;
}
uint64_t BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_.id_; }
uint64_t BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_.id_; }
uint64_t BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_.id_; }
uint64_t BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_.id_; }
const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; }
const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; }
const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; }
const MQInfo &BHCenterReplyAddress() { return GetCenterInfo(BHomeShm())->mq_init_; }
int64_t CalcAllocIndex(int64_t size)
{
src/defs.h
@@ -27,12 +27,12 @@
int64_t CalcAllocIndex(int64_t size);
int64_t GetAllocSize(int index);
struct CenterInfo {
    struct MQInfo {
        int64_t id_ = 0;
    MQId id_ = 0;
        int64_t offset_ = 0;
    };
struct CenterInfo {
    MQInfo mq_center_;
    MQInfo mq_bus_;
    MQInfo mq_init_;
@@ -59,9 +59,9 @@
void GetLastError(int &ec, std::string &msg);
//TODO center can check shm for previous crash.
uint64_t BHGlobalSenderAddress();
uint64_t BHTopicCenterAddress();
uint64_t BHTopicBusAddress();
uint64_t BHCenterReplyAddress();
const MQInfo &BHGlobalSenderAddress();
const MQInfo &BHTopicCenterAddress();
const MQInfo &BHTopicBusAddress();
const MQInfo &BHCenterReplyAddress();
#endif // end of include guard: DEFS_KP8LKGD0
src/sendq.cpp
@@ -21,6 +21,24 @@
using namespace bhome_shm;
void SendQ::AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire)
{
    TimedMsg tmp(expire, MsgInfo{mq, data, std::move(onExpire)});
    std::unique_lock<std::mutex> lock(mutex_in_);
    try {
        auto &al = in_[mq.id_];
        if (!al.empty()) {
            al.front().emplace_back(std::move(tmp));
        } else {
            al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
        }
    } catch (std::exception &e) {
        LOG_ERROR() << "sendq error: " << e.what();
        throw e;
    }
}
int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr)
{
    auto FirstNotExpired = [](Array &l) {
@@ -36,7 +54,7 @@
        }
    }
    while (pos != arr.end() && mq.TrySend(remote, pos->data().data_)) {
    while (pos != arr.end() && mq.TrySend(pos->data().mq_, pos->data().data_)) {
        ++pos;
    }
@@ -59,6 +77,8 @@
bool SendQ::TrySend(ShmMsgQueue &mq)
{
    std::unique_lock<std::mutex> lock(mutex_out_);
    // if (TooFast()) { return false; }
    size_t nsend = 0;
    if (!out_.empty()) {
        auto rec = out_.begin();
@@ -89,3 +109,14 @@
    return !out_.empty();
}
bool SendQ::TooFast()
{
    auto cur = NowSec();
    if (cur > last_time_) {
        last_time_ = cur;
        count_ = 0;
    }
    return ++count_ > 1000 * 100;
} // not accurate in multi-thread.
src/sendq.h
@@ -39,6 +39,7 @@
    typedef int64_t Data;
    typedef std::function<void(const Data &)> OnMsgEvent;
    struct MsgInfo {
        MQInfo mq_;
        Data data_;
        OnMsgEvent on_expire_;
    };
@@ -46,45 +47,51 @@
    typedef TimedMsg::TimePoint TimePoint;
    typedef TimedMsg::Duration Duration;
    void Append(const Remote addr, const MsgI msg)
    bool Append(const MQInfo &mq, MsgI msg)
    {
        msg.AddRef();
        auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); };
        AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
        try {
            AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
            return true;
        } catch (...) {
            msg.Release();
            return false;
        }
    }
    void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire)
    bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
    {
        msg.AddRef();
        auto onMsgExpire = [onExpire](const Data &d) {
            onExpire(d);
            MsgI(d).Release();
        };
        AppendData(addr, msg.Offset(), DefaultExpire(), onMsgExpire);
        try {
            AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
            return true;
        } catch (...) {
            msg.Release();
            return false;
        }
    }
    void Append(const Remote addr, const Data command, OnMsgEvent onExpire = OnMsgEvent())
    bool Append(const MQInfo &mq, const Data command, OnMsgEvent onExpire = OnMsgEvent())
    {
        AppendData(addr, command, DefaultExpire(), onExpire);
        try {
            AppendData(mq, command, DefaultExpire(), onExpire);
            return true;
        } catch (...) {
            return false;
        }
    }
    bool TrySend(ShmMsgQueue &mq);
private:
    static TimePoint Now() { return TimedMsg::Clock::now(); }
    static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
    void AppendData(const Remote addr, const Data data, const TimePoint &expire, OnMsgEvent onExpire)
    {
        //TODO simple queue, organize later ?
    void AppendData(const MQInfo &mq, const Data data, const TimePoint &expire, OnMsgEvent onExpire);
        TimedMsg tmp(expire, MsgInfo{data, std::move(onExpire)});
        std::unique_lock<std::mutex> lock(mutex_in_);
        auto &al = in_[addr];
        if (!al.empty()) {
            al.front().emplace_back(std::move(tmp));
        } else {
            al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
        }
    }
    typedef std::deque<TimedMsg> Array;
    typedef std::list<Array> ArrayList;
    typedef std::unordered_map<Remote, ArrayList> Store;
@@ -92,10 +99,15 @@
    int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
    int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
    bool TooFast();
    std::mutex mutex_in_;
    std::mutex mutex_out_;
    Store in_;
    Store out_;
    int64_t count_ = 0;
    int64_t last_time_ = 0;
};
#endif // end of include guard: SENDQ_IWKMSK7M
src/shm_msg_queue.cpp
@@ -33,7 +33,7 @@
ShmMsgQueue::MQId ShmMsgQueue::NewId()
{
    static auto &id = GetData();
    static auto &id = GetData("Must init shared memory before use! Please make sure center is running.");
    return (++id) * 10;
}
@@ -96,11 +96,11 @@
    return Shmq::Find(shm, MsgQIdToName(remote_id));
}
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote, int64_t val)
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQInfo &remote, const RawData val)
{
    try {
        //TODO find from center, or use offset.
        ShmMsgQueue dest(shm, false, remote, 1);
        ShmMsgQueue dest(remote.offset_, shm, remote.id_);
#ifndef BH_USE_ATOMIC_Q
        Guard lock(GetMutex(remote_id));
#endif
src/shm_msg_queue.h
@@ -18,6 +18,7 @@
#ifndef SHM_MSG_QUEUE_D847TQXH
#define SHM_MSG_QUEUE_D847TQXH
#include "defs.h"
#include "msg.h"
#include "shm_queue.h"
@@ -75,8 +76,8 @@
    bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
    bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
    static Queue *Find(ShmType &shm, const MQId remote);
    static bool TrySend(ShmType &shm, const MQId remote, const RawData val);
    bool TrySend(const MQId remote, const RawData val) { return TrySend(shm(), remote, val); }
    static bool TrySend(ShmType &shm, const MQInfo &remote, const RawData val);
    bool TrySend(const MQInfo &remote, const RawData val) { return TrySend(shm(), remote, val); }
private:
#ifndef BH_USE_ATOMIC_Q
src/socket.cpp
@@ -80,15 +80,13 @@
                }
            };
            ShmMsgQueue::RawData val = 0;
            auto TryRecvMore = [&]() {
                for (int i = 0; i < 100; ++i) {
                    if (mq().TryRecv(val)) {
                    onRecv(val);
                        return true;
                    }
                }
                return false;
            };
            return TryRecvMore() ? (onRecv(val), true) : false;
        };
        try {
@@ -160,6 +158,31 @@
    return false;
}
bool ShmSocket::Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb)
{
    size_t size = content.size();
    auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
        if (!msg.Fill(content)) { return; }
        try {
            if (!cb) {
                Send(remote, msg);
            } else {
                per_msg_cbs_->Store(msg_id, std::move(cb));
                auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                    RecvCB cb_no_use;
                    per_msg_cbs_->Pick(msg_id, cb_no_use);
                };
                Send(remote, msg, onExpireRemoveCB);
            }
        } catch (...) {
            SetLastError(eError, "Send internal error.");
        }
    };
    return RequestAlloc(size, OnResult);
}
bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
    // LOG_FUNCTION;
@@ -184,5 +207,6 @@
        RawRecvCB cb_no_use;
        alloc_cbs_->Pick(id, cb_no_use);
    };
    return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB);
}
src/socket.h
@@ -59,7 +59,6 @@
    {
        node_proc_index_ = proc_index;
        socket_index_ = socket_index;
        LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
    }
    // start recv.
    bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
@@ -68,7 +67,7 @@
    bool Stop();
    template <class Body>
    bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
    bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body)
    {
        try {
            //TODO alloc outsiez and use send.
@@ -86,39 +85,17 @@
    bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
    template <class Body>
    bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
    bool Send(const MQInfo &remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
    {
        std::string msg_id(head.msg_id());
        std::string content(MsgI::Serialize(head, body));
        size_t size = content.size();
        auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
            if (!msg.Fill(content)) { return; }
            try {
                if (!cb) {
                    Send(remote, msg);
                } else {
                    per_msg_cbs_->Store(msg_id, std::move(cb));
                    auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                        RecvCB cb_no_use;
                        per_msg_cbs_->Pick(msg_id, cb_no_use);
                    };
                    Send(remote, msg, onExpireRemoveCB);
                }
            } catch (...) {
                SetLastError(eError, "Send internal error.");
            }
        };
        return RequestAlloc(size, OnResult);
        return Send(remote, MsgI::Serialize(head, body), head.msg_id(), std::move(cb));
    }
    template <class... T>
    bool Send(const MQId remote, const MsgI &imsg, T &&...t)
    bool Send(const MQInfo &remote, const MsgI &imsg, T &&...t)
    {
        return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
    }
    template <class... T>
    bool Send(const MQId remote, const int64_t cmd, T &&...t)
    bool Send(const MQInfo &remote, const int64_t cmd, T &&...t)
    {
        return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
    }
@@ -126,7 +103,7 @@
    bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
    template <class Body>
    bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
    bool SendAndRecv(const MQInfo &remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
    {
        struct State {
            std::mutex mutex;
@@ -136,6 +113,7 @@
        try {
            std::shared_ptr<State> st(new State);
            auto endtime = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
            auto OnRecv = [st, &reply, &reply_head](ShmSocket &sock, MsgI &msg, BHMsgHead &head) {
@@ -176,12 +154,12 @@
    bool StopNoLock();
    bool RunningNoLock() { return !workers_.empty(); }
    bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB());
    template <class... Rest>
    bool SendImpl(const MQId remote, Rest &&...rest)
    bool SendImpl(const MQInfo &remote, Rest &&...rest)
    {
        // TODO send alloc request, and pack later, higher bit means alloc?
        send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
        return true;
        return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
    }
    std::vector<std::thread> workers_;
src/topic_node.cpp
@@ -28,7 +28,12 @@
namespace
{
inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); }
inline void AddRoute(BHMsgHead &head, const ShmSocket &sock)
{
    auto route = head.add_route();
    route->set_mq_id(sock.id());
    route->set_abs_addr(sock.AbsAddr());
}
struct SrcInfo {
    std::vector<BHAddress> route;
@@ -40,7 +45,7 @@
} // namespace
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), state_(eStateUnregistered)
    shm_(shm), state_(eStateUninited)
{
}
@@ -79,6 +84,7 @@
            auto end_time = steady_clock::now() + 3s;
            do {
                try {
                    //TODO recv offset, avoid query.
                    for (int i = eSockStart; i < eSockEnd; ++i) {
                        sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
                    }
@@ -94,7 +100,6 @@
        NodeInit();
    }
    if (!sockets_.empty()) {
        LOG_DEBUG() << "node sockets ok";
        auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) {
            LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val);
            switch (DecodeCmd(val)) {
@@ -103,7 +108,7 @@
                DEFER1(msg.Release());
                MsgProcInit body;
                auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_);
                head.add_route()->set_mq_id(ssn_id_);
                AddRoute(head, socket);
                if (msg.Fill(head, body)) {
                    socket.Send(BHTopicCenterAddress(), msg);
                }
@@ -122,12 +127,12 @@
                MsgProcInitReply reply;
                if (imsg.ParseBody(reply)) {
                    SetProcIndex(reply.proc_index());
                    this->state_ = eStateUnregistered;
                }
            }
            return true;
        };
        SockNode().Start(1, onMsg, onNodeCmd);
        LOG_DEBUG() << "sockets ok.";
        return true;
    }
    return false;
@@ -167,19 +172,22 @@
        SetLastError(eError, kErrMsgNotInit);
        return false;
    }
    auto end_time = steady_clock::now() + milliseconds(timeout_ms);
    while (state_ != eStateUnregistered && steady_clock::now() < end_time) {
        std::this_thread::yield();
    }
    if (state_ != eStateUnregistered) {
        SetLastError(eError, kErrMsgNotInit);
        return false;
    }
    auto &sock = SockNode();
    MsgRegister body;
    body.mutable_proc()->Swap(&proc);
    auto AddId = [&](const MQId id) { body.add_addrs()->set_mq_id(id); };
    AddId(SockNode().id());
    AddId(SockServer().id());
    AddId(SockClient().id());
    AddId(SockSub().id());
    AddId(SockPub().id());
    auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
    AddRoute(head, sock.id());
    AddRoute(head, sock);
    auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
        bool ok = head.type() == kMsgTypeCommonReply &&
@@ -224,7 +232,7 @@
    body.mutable_proc()->Swap(&proc);
    auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
    AddRoute(head, sock.id());
    AddRoute(head, sock);
    auto CheckResult = [this](MsgI &msg, BHMsgHead &head, MsgCommonReply &rbody) {
        bool r = head.type() == kMsgTypeCommonReply &&
@@ -260,7 +268,7 @@
    body.mutable_proc()->Swap(&proc);
    auto head(InitMsgHead(GetType(body), body.proc().proc_id(), ssn()));
    AddRoute(head, sock.id());
    AddRoute(head, sock);
    if (timeout_ms == 0) {
        return sock.Send(BHTopicCenterAddress(), head, body);
@@ -290,7 +298,7 @@
    auto &sock = SockNode();
    BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
    AddRoute(head, sock.id());
    AddRoute(head, sock);
    MsgI reply;
    DEFER1(reply.Release());
@@ -312,7 +320,7 @@
    body.mutable_topics()->Swap(&topics);
    auto head(InitMsgHead(GetType(body), proc_id(), ssn()));
    AddRoute(head, sock.id());
    AddRoute(head, sock);
    if (timeout_ms == 0) {
        return sock.Send(BHTopicCenterAddress(), head, body);
@@ -341,7 +349,7 @@
            for (int i = 0; i < head.route_size() - 1; ++i) {
                reply_head.add_route()->Swap(head.mutable_route(i));
            }
            auto remote = head.route().rbegin()->mq_id();
            MQInfo remote = {head.route().rbegin()->mq_id(), head.route().rbegin()->abs_addr()};
            sock.Send(remote, reply_head, reply_body);
        }
    };
@@ -357,10 +365,17 @@
        MsgRequestTopic req;
        if (!imsg.ParseBody(req)) { return; }
        try {
        SrcInfo *p = new SrcInfo;
            if (!p) {
                throw std::runtime_error("no memory.");
            }
        p->route.assign(head.route().begin(), head.route().end());
        p->msg_id = head.msg_id();
        acb(p, *head.mutable_proc_id(), req);
        } catch (std::exception &e) {
            LOG_ERROR() << "error server handle msg:" << e.what();
        }
    };
    auto &sock = SockServer();
@@ -381,11 +396,19 @@
    if (sock.SyncRecv(imsg, head, timeout_ms) && head.type() == kMsgTypeRequestTopic) {
        if (imsg.ParseBody(request)) {
            head.mutable_proc_id()->swap(proc_id);
            try {
            SrcInfo *p = new SrcInfo;
                if (!p) {
                    throw std::runtime_error("no memory.");
                }
            p->route.assign(head.route().begin(), head.route().end());
            p->msg_id = head.msg_id();
            src_info = p;
            return true;
            } catch (std::exception &e) {
                LOG_ERROR() << "error recv request: " << e.what();
                return false;
            }
        }
    }
    return false;
@@ -409,7 +432,8 @@
    for (unsigned i = 0; i < p->route.size() - 1; ++i) {
        head.add_route()->Swap(&p->route[i]);
    }
    return sock.Send(p->route.back().mq_id(), head, body);
    MQInfo dest = {p->route.back().mq_id(), p->route.back().abs_addr()};
    return sock.Send(dest, head, body);
}
bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -440,10 +464,10 @@
    out_msg_id = msg_id;
    auto SendTo = [this, msg_id](const BHAddress &addr, const MsgRequestTopic &req, const RequestResultCB &cb) {
    auto SendTo = [this, msg_id](const MQInfo &remote, const MsgRequestTopic &req, const RequestResultCB &cb) {
        auto &sock = SockClient();
        BHMsgHead head(InitMsgHead(GetType(req), proc_id(), ssn(), msg_id));
        AddRoute(head, sock.id());
        AddRoute(head, sock);
        head.set_topic(req.topic());
        if (cb) {
@@ -455,15 +479,15 @@
                    }
                }
            };
            return sock.Send(addr.mq_id(), head, req, onRecv);
            return sock.Send(remote, head, req, onRecv);
        } else {
            return sock.Send(addr.mq_id(), head, req);
            return sock.Send(remote, head, req);
        }
    };
    try {
        BHAddress addr;
        return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(addr, req, cb);
        return (ClientQueryRPCTopic(req.topic(), addr, 3000)) && SendTo(MQInfo{addr.mq_id(), addr.abs_addr()}, req, cb);
    } catch (...) {
        SetLastError(eError, "internal error.");
        return false;
@@ -484,14 +508,14 @@
        if (ClientQueryRPCTopic(request.topic(), addr, timeout_ms)) {
            LOG_TRACE() << "node: " << SockNode().id() << ", topic dest: " << addr.mq_id();
            BHMsgHead head(InitMsgHead(GetType(request), proc_id(), ssn()));
            AddRoute(head, sock.id());
            AddRoute(head, sock);
            head.set_topic(request.topic());
            MsgI reply_msg;
            DEFER1(reply_msg.Release(););
            BHMsgHead reply_head;
            if (sock.SendAndRecv(addr.mq_id(), head, request, reply_msg, reply_head, timeout_ms) &&
            if (sock.SendAndRecv({addr.mq_id(), addr.abs_addr()}, head, request, reply_msg, reply_head, timeout_ms) &&
                reply_head.type() == kMsgTypeRequestTopicReply &&
                reply_msg.ParseBody(out_reply)) {
                reply_head.mutable_proc_id()->swap(out_proc_id);
@@ -504,7 +528,7 @@
    return false;
}
int TopicNode::QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms)
int TopicNode::QueryTopicServers(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms)
{
    int n = 0;
    MsgQueryTopic query;
@@ -532,7 +556,7 @@
        return true;
    }
    std::vector<NodeAddress> lst;
    if (QueryRPCTopics(topic, lst, timeout_ms)) {
    if (QueryTopicServers(topic, lst, timeout_ms)) {
        addr = lst.front().addr();
        if (addr.mq_id() != 0) {
            topic_query_cache_.Store(topic, addr);
@@ -555,7 +579,7 @@
    try {
        auto &sock = SockPub();
        BHMsgHead head(InitMsgHead(GetType(pub), proc_id(), ssn()));
        AddRoute(head, sock.id());
        AddRoute(head, sock);
        if (timeout_ms == 0) {
            return sock.Send(BHTopicBusAddress(), head, pub);
@@ -589,7 +613,7 @@
        sub.mutable_topics()->Swap(&topics);
        BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
        AddRoute(head, sock.id());
        AddRoute(head, sock);
        if (timeout_ms == 0) {
            return sock.Send(BHTopicBusAddress(), head, sub);
        } else {
src/topic_node.h
@@ -78,7 +78,7 @@
    MQId ssn() { return SockNode().id(); }
    bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
    typedef MsgQueryTopicReply::BHNodeAddress NodeAddress;
    int QueryRPCTopics(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
    int QueryTopicServers(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
    const std::string &proc_id() { return info_.proc_id(); }
    typedef BHAddress Address;
@@ -139,6 +139,7 @@
    }
    enum State {
        eStateUninited,
        eStateUnregistered,
        eStateOnline,
        eStateOffline // heartbeat fail.
@@ -146,7 +147,7 @@
    void state(const State st) { state_.store(st); }
    void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); }
    State state() const { return state_.load(); }
    bool IsOnline() { return Init() && state() == eStateOnline; }
    bool IsOnline() { return state() == eStateOnline; }
    bool Init();
    bool Valid() const { return !sockets_.empty(); }
    std::mutex mutex_;
utest/api_test.cpp
@@ -129,7 +129,14 @@
        void *reply = 0;
        int reply_len = 0;
        reg = BHRegister(proc_buf.data(), proc_buf.size(), &reply, &reply_len, 2000);
        printf("register %s\n", reg ? "ok" : "failed");
        if (reg) {
            printf("register ok\n");
        } else {
            int ec = 0;
            std::string msg;
            GetLastError(ec, msg);
            printf("register failed, %d, %s\n", ec, msg.c_str());
        }
        BHFree(reply, reply_len);
        Sleep(1s);
@@ -239,6 +246,7 @@
            DEFER1(BHFree(msg_id, len););
            // Sleep(10ms, false);
            std::string dest(BHAddress().SerializeAsString());
            bool r = BHAsyncRequest(dest.data(), dest.size(), s.data(), s.size(), 0, 0);
            if (r) {
                ++Status().nrequest_;
@@ -294,11 +302,12 @@
    int same = 0;
    uint64_t last = 0;
    while (last < nreq * ncli && same < 2) {
    while (last < nreq * ncli && same < 3) {
        Sleep(1s, false);
        auto cur = Status().nreply_.load();
        if (last == cur) {
            ++same;
            printf("same %d\n", same);
        } else {
            last = cur;
            same = 0;
@@ -308,6 +317,7 @@
    run = false;
    threads.WaitAll();
    auto &st = Status();
    Sleep(1s);
    printf("nreq: %8ld, nsrv: %8ld, nreply: %8ld\n", st.nrequest_.load(), st.nserved_.load(), st.nreply_.load());
    BHCleanup();
    printf("after cleanup\n");
utest/speed_test.cpp
@@ -24,16 +24,8 @@
{
    SharedMemory &shm = TestShm();
    GlobalInit(shm);
    auto InitSem = [](auto id) {
        auto sem_id = semget(id, 1, 0666 | IPC_CREAT);
        union semun init_val;
        init_val.val = 1;
        semctl(sem_id, 0, SETVAL, init_val);
        return;
    };
    MQId id = ShmMsgQueue::NewId();
    InitSem(id);
    MQId server_id = ShmMsgQueue::NewId();
    ShmMsgQueue server(server_id, shm, 1000);
    const int timeout = 1000;
    const uint32_t data_size = 1001;
@@ -44,7 +36,6 @@
    std::string str(data_size, 'a');
    auto Writer = [&](int writer_id, uint64_t n) {
        MQId cli_id = ShmMsgQueue::NewId();
        InitSem(cli_id);
        ShmMsgQueue mq(cli_id, shm, 64);
        MsgI msg;
@@ -58,12 +49,12 @@
        for (uint64_t i = 0; i < n; ++i) {
            msg.AddRef();
            while (!mq.TrySend(id, msg.Offset())) {}
            while (!mq.TrySend({server.Id(), server.AbsAddr()}, msg.Offset())) {}
            ++nwrite;
        }
    };
    auto Reader = [&](int reader_id, std::atomic<bool> *run, bool isfork) {
        ShmMsgQueue mq(id, shm, 1000);
        ShmMsgQueue &mq = server;
        auto now = []() { return steady_clock::now(); };
        auto tm = now();
        while (*run) {
@@ -189,8 +180,10 @@
                req_body.set_topic("topic");
                req_body.set_data(msg_content);
                auto req_head(InitMsgHead(GetType(req_body), client_proc_id, cli.id()));
                req_head.add_route()->set_mq_id(cli.id());
                return cli.Send(srv.id(), req_head, req_body);
                auto route = req_head.add_route();
                route->set_mq_id(cli.id());
                route->set_abs_addr(cli.AbsAddr());
                return cli.Send({srv.id(), srv.AbsAddr()}, req_head, req_body);
            };
            Req();
@@ -207,13 +200,13 @@
                DEFER1(req.Release());
                if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
                    auto src_id = req_head.route()[0].mq_id();
                    MQInfo src_mq = {req_head.route()[0].mq_id(), req_head.route()[0].abs_addr()};
                    auto Reply = [&]() {
                        MsgRequestTopic reply_body;
                        reply_body.set_topic("topic");
                        reply_body.set_data(msg_content);
                        auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, srv.id(), req_head.msg_id()));
                        return srv.Send(src_id, reply_head, reply_body);
                        return srv.Send(src_mq, reply_head, reply_body);
                    };
                    Reply();
                }