lichao
2021-05-13 db322f33ba13592f2492317e3f1a070454c97059
center alloc all msgs.
24个文件已修改
1071 ■■■■ 已修改文件
box/center.cpp 276 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.h 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_main.cc 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg.proto 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_util.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 29 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 109 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/proto.h 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.cpp 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/robust.h 39 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 59 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 141 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 90 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 95 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/robust_test.cpp 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 9 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -21,7 +21,7 @@
#include "log.h"
#include "shm.h"
#include <chrono>
#include <set>
#include <unordered_map>
using namespace std::chrono;
using namespace std::chrono_literals;
@@ -33,11 +33,118 @@
namespace
{
typedef std::string ProcId;
typedef size_t ProcIndex; // max local procs.
const int kMaxProcs = 65536;
// record all procs ever registered, always grow, never remove.
// mainly for node to request msg allocation.
// use index instead of MQId to save some bits.
class ProcRecords
{
public:
    struct ProcRec {
        ProcId proc_;
        MQId ssn_ = 0;
    };
    ProcRecords() { procs_.reserve(kMaxProcs); }
    ProcIndex Put(const ProcId &proc_id, const MQId ssn)
    {
        if (procs_.size() >= kMaxProcs) {
            return -1;
        }
        auto pos_isnew = proc_index_.emplace(proc_id, procs_.size());
        int index = pos_isnew.first->second;
        if (pos_isnew.second) {
            procs_.emplace_back(ProcRec{proc_id, ssn});
        } else { // update ssn
            procs_[index].ssn_ = ssn;
        }
        return index;
    }
    const ProcRec &Get(const ProcIndex index) const
    {
        static ProcRec empty_rec;
        return (index < procs_.size()) ? procs_[index] : empty_rec;
    }
private:
    std::unordered_map<ProcId, size_t> proc_index_;
    std::vector<ProcRec> procs_;
};
class MsgRecords
{
    typedef int64_t MsgId;
    typedef int64_t Offset;
public:
    void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); }
    void FreeMsg(MsgId id)
    {
        auto pos = msgs_.find(id);
        if (pos != msgs_.end()) {
            ShmMsg(pos->second).Free();
            msgs_.erase(pos);
        } else {
            LOG_TRACE() << "ignore late free request.";
        }
    }
    void AutoRemove()
    {
        auto now = NowSec();
        if (now < time_to_clean_) {
            return;
        }
        LOG_FUNCTION;
        time_to_clean_ = now + 1;
        int64_t limit = std::max(10000ul, msgs_.size() / 10);
        int64_t n = 0;
        auto it = msgs_.begin();
        while (it != msgs_.end() && --limit > 0) {
            ShmMsg msg(it->second);
            if (msg.Count() == 0) {
                msg.Free();
                it = msgs_.erase(it);
                ++n;
            } else if (msg.timestamp() + 10 < NowSec()) {
                msg.Free();
                it = msgs_.erase(it);
                ++n;
                // LOG_DEBUG() << "release timeout msg, someone crashed.";
            } else {
                ++it;
            }
        }
        if (n > 0) {
            LOG_DEBUG() << "~~~~~~~~~~~~~~~~ auto release msgs: " << n;
        }
    }
    size_t size() const { return msgs_.size(); }
    void DebugPrint() const
    {
        LOG_DEBUG() << "msgs : " << size();
        int i = 0;
        int total_count = 0;
        for (auto &kv : msgs_) {
            MsgI msg(kv.second);
            total_count += msg.Count();
            LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
        }
        LOG_DEBUG() << "total count: " << total_count;
    }
private:
    std::unordered_map<MsgId, Offset> msgs_;
    int64_t time_to_clean_ = 0;
};
//TODO check proc_id
class NodeCenter
{
public:
    typedef std::string ProcId;
    typedef MQId Address;
    typedef bhome_msg::ProcInfo ProcInfo;
    typedef std::function<void(Address const)> Cleaner;
@@ -102,13 +209,14 @@
    // center name, no relative to shm.
    const std::string &id() const { return id_; }
    void OnNodeInit(SharedMemory &shm, const int64_t msg)
    void OnNodeInit(ShmSocket &socket, const int64_t val)
    {
        MQId ssn = msg;
        LOG_FUNCTION;
        SharedMemory &shm = socket.shm();
        MQId ssn = (val >> 4) & MaskBits(60);
        if (nodes_.find(ssn) != nodes_.end()) {
            return; // ignore in exists.
        }
        auto UpdateRegInfo = [&](Node &node) {
            for (int i = 0; i < 10; ++i) {
                node->addrs_.insert(ssn + i);
@@ -118,12 +226,10 @@
            // create sockets.
            try {
                auto CreateSocket = [](SharedMemory &shm, const MQId id) {
                    ShmSocket tmp(shm, true, id, 16);
                };
                auto CreateSocket = [&](const MQId id) { ShmSocket tmp(shm, true, id, 16); };
                // alloc(-1), node, server, sub, request,
                for (int i = -1; i < 4; ++i) {
                    CreateSocket(shm, ssn + i);
                for (int i = 0; i < 4; ++i) {
                    CreateSocket(ssn + i);
                    node->addrs_.insert(ssn + i);
                }
                return true;
@@ -132,11 +238,93 @@
            }
        };
        auto PrepareProcInit = [&]() {
            bool r = false;
            ShmMsg init_msg;
            if (init_msg.Make(GetAllocSize(CalcAllocIndex(900)))) {
                // 31bit pointer, 4bit cmd+flag
                int64_t reply = (init_msg.Offset() << 4) | EncodeCmd(eCmdNodeInitReply);
                r = SendAllocReply(socket, ssn, reply, init_msg);
            }
            return r;
        };
        Node node(new NodeInfo);
        if (UpdateRegInfo(node)) {
        if (UpdateRegInfo(node) && PrepareProcInit()) {
            nodes_[ssn] = node;
            LOG_INFO() << "new node ssn (" << ssn << ") init";
        } else {
            for (int i = 0; i < 10; ++i) {
                ShmSocket::Remove(shm, ssn + i);
            }
        }
    }
    void RecordMsg(const MsgI &msg) { msgs_.RecordMsg(msg); }
    bool SendAllocReply(ShmSocket &socket, const Address dest, const int64_t reply, const MsgI &msg)
    {
        RecordMsg(msg);
        auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); };
        return socket.Send(dest, reply, onExpireFree);
    }
    bool SendAllocMsg(ShmSocket &socket, const Address dest, const MsgI &msg)
    {
        RecordMsg(msg);
        auto onExpireFree = [this, msg](const SendQ::Data &) { msgs_.FreeMsg(msg.id()); };
        return socket.Send(dest, msg, onExpireFree);
    }
    void OnAlloc(ShmSocket &socket, const int64_t val)
    {
        // LOG_FUNCTION;
        // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
        int64_t msg_id = (val >> 4) & MaskBits(28);
        int proc_index = (val >> 32) & MaskBits(16);
        int socket_index = ((val) >> 48) & MaskBits(4);
        auto proc_rec(procs_.Get(proc_index));
        if (proc_rec.proc_.empty()) {
            return;
        }
        Address dest = proc_rec.ssn_ + socket_index;
        auto size = GetAllocSize((val >> 52) & MaskBits(8));
        MsgI new_msg;
        if (new_msg.Make(size)) {
            // 31bit proc index, 28bit id, ,4bit cmd+flag
            int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
            SendAllocReply(socket, dest, reply, new_msg);
        } else {
            int64_t reply = (msg_id << 4) | EncodeCmd(eCmdAllocReply0); // send empty, ack failure.
            socket.Send(dest, reply);
        }
    }
    void OnFree(ShmSocket &socket, const int64_t val)
    {
        int64_t msg_id = (val >> 4) & MaskBits(31);
        msgs_.FreeMsg(msg_id);
    }
    bool OnCommand(ShmSocket &socket, const int64_t val)
    {
        assert(IsCmd(val));
        int cmd = DecodeCmd(val);
        switch (cmd) {
        case eCmdNodeInit: OnNodeInit(socket, val); break;
        case eCmdAllocRequest0: OnAlloc(socket, val); break;
        case eCmdFree: OnFree(socket, val); break;
        default: return false;
        }
        return true;
    }
    MsgProcInitReply ProcInit(const BHMsgHead &head, MsgProcInit &msg)
    {
        LOG_DEBUG() << "center got proc init.";
        auto index = procs_.Put(head.proc_id(), head.ssn_id());
        auto reply(MakeReply<MsgProcInitReply>(eSuccess));
        reply.set_proc_index(index);
        return reply;
    }
    MsgCommonReply Register(const BHMsgHead &head, MsgRegister &msg)
@@ -160,14 +348,13 @@
            };
            auto pos = nodes_.find(ssn);
            if (pos != nodes_.end()) { // update
                Node &node = pos->second;
                UpdateRegInfo(node);
            } else {
                Node node(new NodeInfo);
                UpdateRegInfo(node);
                nodes_[ssn] = node;
            if (pos == nodes_.end()) {
                return MakeReply(eInvalidInput, "invalid session.");
            }
            // update proc info
            Node &node = pos->second;
            UpdateRegInfo(node);
            LOG_DEBUG() << "node (" << head.proc_id() << ") ssn (" << ssn << ")";
            auto old = online_node_addr_map_.find(head.proc_id());
@@ -376,13 +563,14 @@
    void OnTimer()
    {
        CheckNodes();
        msgs_.AutoRemove();
    }
private:
    void CheckNodes()
    {
        auto now = NowSec();
        if (now - last_check_time_ < 1) { return; }
        if (now <= last_check_time_) { return; }
        last_check_time_ = now;
        auto it = nodes_.begin();
@@ -396,6 +584,7 @@
                ++it;
            }
        }
        msgs_.DebugPrint();
    }
    bool CanHeartbeat(const NodeInfo &node)
    {
@@ -448,7 +637,10 @@
    std::unordered_map<Topic, Clients> service_map_;
    std::unordered_map<Topic, Clients> subscribe_map_;
    std::unordered_map<Address, Node> nodes_;
    std::unordered_map<std::string, Address> online_node_addr_map_;
    std::unordered_map<ProcId, Address> online_node_addr_map_;
    ProcRecords procs_; // To get a short index for msg alloc.
    MsgRecords msgs_;   // record all msgs alloced.
    Cleaner cleaner_; // remove mqs.
    int64_t offline_time_;
    int64_t kill_time_;
@@ -483,25 +675,28 @@
            msg, head, [&](auto &body) { return center->MsgTag(head, body); }, replyer); \
        return true;
auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, const std::string &proc_id)
auto MakeReplyer(ShmSocket &socket, BHMsgHead &head, Synced<NodeCenter> &center)
{
    return [&](auto &&rep_body) {
        auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.ssn_id(), head.msg_id()));
        auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
        auto remote = head.route(0).mq_id();
        socket.Send(remote, reply_head, rep_body);
        MsgI msg;
        if (msg.Make(reply_head, rep_body)) {
            DEFER1(msg.Release(););
            center->SendAllocMsg(socket, remote, msg);
        }
    };
}
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
{
    auto OnNodeInit = [center_ptr](ShmSocket &socket, MsgI &msg) {
    // command
    auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
        auto &center = *center_ptr;
        center->OnNodeInit(socket.shm(), msg.Offset());
        return IsCmd(cmd) && center->OnCommand(socket, cmd);
    };
    auto Nothing = [](ShmSocket &socket) {};
    BHCenter::Install("#centetr.Init", OnNodeInit, Nothing, BHInitAddress(), 16);
    // now we can talk.
    auto OnCenterIdle = [center_ptr](ShmSocket &socket) {
        auto &center = *center_ptr;
        center->OnTimer();
@@ -509,8 +704,9 @@
    auto OnCenter = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
        auto &center = *center_ptr;
        auto replyer = MakeReplyer(socket, head, center->id());
        auto replyer = MakeReplyer(socket, head, center);
        switch (head.type()) {
            CASE_ON_MSG_TYPE(ProcInit);
            CASE_ON_MSG_TYPE(Register);
            CASE_ON_MSG_TYPE(Heartbeat);
            CASE_ON_MSG_TYPE(Unregister);
@@ -520,12 +716,13 @@
        default: return false;
        }
    };
    BHCenter::Install("#center.main", OnCenter, OnCenterIdle, BHTopicCenterAddress(), 1000);
    BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000);
    auto OnBusIdle = [=](ShmSocket &socket) {};
    auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
    auto OnPubSub = [=](ShmSocket &socket, MsgI &msg, BHMsgHead &head) -> bool {
        auto &center = *center_ptr;
        auto replyer = MakeReplyer(socket, head, center->id());
        auto replyer = MakeReplyer(socket, head, center);
        auto OnPublish = [&]() {
            MsgPublish pub;
            NodeCenter::Clients clients;
@@ -561,7 +758,7 @@
        }
    };
    BHCenter::Install("#center.bus", OnPubSub, OnBusIdle, BHTopicBusAddress(), 1000);
    BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000);
    return true;
}
@@ -576,14 +773,9 @@
    return rec;
}
bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
bool BHCenter::Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len)
{
    Centers()[name] = CenterInfo{name, handler, MsgIHandler(), idle, mqid, mq_len};
    return true;
}
bool BHCenter::Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
{
    Centers()[name] = CenterInfo{name, MsgHandler(), handler, idle, mqid, mq_len};
    Centers()[name] = CenterInfo{name, handler, raw_handler, idle, mqid, mq_len};
    return true;
}
@@ -609,11 +801,7 @@
{
    for (auto &kv : Centers()) {
        auto &info = kv.second;
        if (info.handler_) {
            sockets_[info.name_]->Start(info.handler_, info.idle_);
        } else {
            sockets_[info.name_]->Start(info.raw_handler_, info.idle_);
        }
        sockets_[info.name_]->Start(1, info.handler_, info.raw_handler_, info.idle_);
    }
    return true;
box/center.h
@@ -29,10 +29,9 @@
public:
    typedef Socket::PartialRecvCB MsgHandler;
    typedef Socket::RawRecvCB MsgIHandler;
    typedef Socket::RawRecvCB RawHandler;
    typedef Socket::IdleCB IdleHandler;
    static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
    static bool Install(const std::string &name, MsgIHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
    static bool Install(const std::string &name, MsgHandler handler, RawHandler raw_handler, IdleHandler idle, const MQId mqid, const int mq_len);
    BHCenter(Socket::Shm &shm);
    ~BHCenter() { Stop(); }
@@ -43,7 +42,7 @@
    struct CenterInfo {
        std::string name_;
        MsgHandler handler_;
        MsgIHandler raw_handler_;
        RawHandler raw_handler_;
        IdleHandler idle_;
        MQId mqid_;
        int mq_len_ = 0;
box/center_main.cc
@@ -83,6 +83,12 @@
    std::atomic<bool> run_;
};
bool CenterInit(bhome_shm::SharedMemory &shm)
{
    ShmSocket create(shm, BHGlobalSenderAddress(), 16);
    return true;
}
} // namespace
int center_main(int argc, const char *argv[])
{
@@ -102,6 +108,7 @@
    if (strcasecmp(lvl.c_str(), "fatal") == 0) { ns_log::ResetLogLevel(ns_log::LogLevel::fatal); }
    auto &shm = BHomeShm();
    CenterInit(shm);
    GlobalInit(shm);
    InstanceFlag inst(shm, kCenterRunningFlag);
proto/source/bhome_msg.proto
@@ -28,6 +28,8 @@
    kMsgTypeCommonReply = 2;
    kMsgTypeProcInit = 8;
    kMsgTypeProcInitReply = 9;
    kMsgTypeRegister= 10;
    // kMsgTypeRegisterReply= 11;
    kMsgTypeHeartbeat = 12;
@@ -60,6 +62,13 @@
    MsgTopicList topics = 1;
}
message MsgProcInit{ } // proc_id is in header.
message MsgProcInitReply {
    ErrorMsg errmsg = 1;
    int32 proc_index = 2;
}
service TopicRPC {
    rpc Query (MsgQueryTopic) returns (MsgQueryTopicReply);
    rpc Request (MsgRequestTopic) returns (MsgQueryTopicReply);
src/bh_util.h
@@ -92,6 +92,8 @@
inline void PutInt(void *p, uint32_t u) { Put32(p, u); }
inline void PutInt(void *p, uint64_t u) { Put64(p, u); }
constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
class ExitCall
{
    typedef std::function<void(void)> func_t;
src/defs.cpp
@@ -33,8 +33,53 @@
    return le;
}
constexpr int64_t AllocSizeIndex[] = {
    16, 24, 32, 40, 48, 56, 64, 72,
    80, 88, 96, 104, 120, 136, 152, 168,
    184, 200, 224, 248, 272, 296, 328, 360,
    392, 432, 472, 520, 568, 624, 680, 744,
    816, 896, 984, 1080, 1184, 1296, 1416, 1544,
    1688, 1848, 2016, 2200, 2400, 2624, 2864, 3128,
    3416, 3728, 4072, 4448, 4856, 5304, 5792, 6320,
    6896, 7528, 8216, 8968, 9784, 10680, 11656, 12720,
    13880, 15144, 16520, 18024, 19664, 21456, 23408, 25536,
    27864, 30400, 33168, 36184, 39480, 43072, 46992, 51264,
    55928, 61016, 66568, 72624, 79232, 86440, 94304, 102880,
    112232, 122440, 133576, 145720, 158968, 173424, 189192, 206392,
    225160, 245632, 267968, 292328, 318904, 347896, 379528, 414032,
    451672, 492736, 537536, 586408, 639720, 697880, 761328, 830544,
    906048, 988416, 1078272, 1176296, 1283232, 1399896, 1527160, 1665992,
    1817448, 1982672, 2162920, 2359552, 2574056, 2808064, 3063344, 3341832,
    3645640, 3977064, 4338616, 4733040, 5163320, 5632712, 6144776, 6703392,
    7312792, 7977592, 8702832, 9494000, 10357096, 11298656, 12325808, 13446336,
    14668736, 16002264, 17457016, 19044024, 20775304, 22663968, 24724328, 26972000,
    29424000, 32098912, 35017000, 38200368, 41673128, 45461600, 49594472, 54103064,
    59021528, 64387128, 70240504, 76626008, 83592008, 91191288, 99481408, 108525176,
    118391104, 129153936, 140895208, 153703864, 167676944, 182920304, 199549424, 217690280,
    237480312, 259069432, 282621200, 308314040, 336342592, 366919192, 400275488, 436664168,
    476360912, 519666456, 566908864, 618446040, 674668408, 736001904, 802911168, 875903096,
    955530656, 1042397080, 1137160456, 1240538680, 1353314928, 1476343560, 1610556616, 1756970856,
    1916695480, 2090940528, 2281026032, 2488392040, 2714609504, 2961392192, 3230609664, 3524301456,
    3844692504, 4194210008, 4575501832, 4991456544, 5445225320, 5940245808, 6480268160, 7069383448,
    7712054672, 8413150552, 9177982424, 10012344464, 10922557600, 11915517384, 12998746240, 14180450448,
    15469582312, 16875907976, 18410081432, 20083725200, 21909518400, 23901292800, 26074137600, 28444513752,
    31030378640, 33851322152, 36928715080, 40285871000, 43948222912, 47943515904, 52302017352, 57056746208,
    62243723136, 67902243424, 74075174648, 80809281440, 88155579752, 96169723368, 104912425496, 114449918728,
    124854456800, 136204861968, 148587122152, 162095042352, 176830955296, 192906496688, 210443450936, 229574673752};
const int kAllocIndexLen = sizeof(AllocSizeIndex) / sizeof(AllocSizeIndex[0]);
static_assert(kAllocIndexLen == 256, "Make sure alloc 8 bit is enough.");
static_assert(AllocSizeIndex[255] > uint32_t(-1), "Make sure alloc size correct.");
} // namespace
int64_t CalcAllocIndex(int64_t size)
{
    auto pos = std::lower_bound(AllocSizeIndex, AllocSizeIndex + kAllocIndexLen, size);
    return (pos == AllocSizeIndex + kAllocIndexLen) ? -1 : pos - AllocSizeIndex;
}
int64_t GetAllocSize(int index) { return index < kAllocIndexLen ? AllocSizeIndex[index] : 0; }
std::string BHomeShmName()
{
    return "bhome_default_shm_v0";
src/defs.h
@@ -23,14 +23,15 @@
typedef uint64_t MQId;
const MQId kBHNodeInit = 10;
const MQId kBHDefaultSender = 99;
const MQId kBHTopicCenter = 100;
const MQId kBHTopicBus = 101;
const MQId kBHUniCenter = 102;
inline const MQId BHInitAddress() { return kBHNodeInit; }
inline const MQId BHGlobalSenderAddress() { return kBHDefaultSender; }
inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
inline const MQId BHUniCenterAddress() { return kBHUniCenter; }
int64_t CalcAllocIndex(int64_t size);
int64_t GetAllocSize(int index);
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
src/msg.cpp
@@ -17,8 +17,37 @@
 */
#include "msg.h"
#include "bh_util.h"
#include "socket.h"
namespace bhome_msg
{
ShmSocket &ShmMsg::Sender()
{
    static ShmSocket sender(shm(), false, BHGlobalSenderAddress(), 16);
    return sender;
}
int ShmMsg::Release()
{
    if (!valid()) {
        return 0;
    }
    auto n = meta()->count_.Dec();
    if (n == 0) {
        int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree);
        Sender().Send(BHTopicCenterAddress(), free_cmd);
    } else if (n < 0) {
        throw -123;
    }
    return n;
}
void ShmMsg::Free()
{
    assert(valid());
    shm().Dealloc(meta());
    offset_ = 0;
    assert(!valid());
}
} // namespace bhome_msg
src/msg.h
@@ -26,6 +26,7 @@
#include <functional>
#include <stdint.h>
class ShmSocket;
namespace bhome_msg
{
using namespace bhome_shm;
@@ -35,8 +36,9 @@
class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg>
{
private:
    static inline SharedMemory &shm() { return GetData(); }
    static ShmSocket &Sender();
    // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
    class RefCount : private boost::noncopyable
    {
@@ -49,6 +51,7 @@
        int Dec() { return --num_; }
        int Get() { return num_.load(); }
    };
    typedef int64_t OffsetType;
    static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); }
    static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); }
@@ -60,14 +63,22 @@
    static const uint32_t kMsgTag = 0xf1e2d3c4;
    struct Meta {
        static int64_t NewId()
        {
            static std::atomic<int64_t> id(0);
            return ++id;
        }
        RefCount count_;
        const uint32_t tag_ = kMsgTag;
        const uint32_t size_ = 0;
        const int64_t id_ = 0;
        std::atomic<int64_t> timestamp_;
        Meta(uint32_t size) :
            size_(size) {}
            size_(size), id_(NewId()), timestamp_(NowSec()) {}
    };
    OffsetType offset_;
    void *Alloc(const size_t size)
    static void *Alloc(const size_t size)
    {
        void *p = shm().Alloc(sizeof(Meta) + size);
        if (p) {
@@ -76,45 +87,33 @@
        }
        return p;
    }
    void Free()
    {
        assert(valid());
        shm().Dealloc(meta());
        offset_ = 0;
        assert(!valid());
    }
private:
    Meta *meta() const { return get<Meta>() - 1; }
    typedef std::function<void(void *p, int len)> ToArray;
    void *Pack(const uint32_t head_len, const ToArray &headToArray,
               const uint32_t body_len, const ToArray &bodyToArray)
    template <class Body>
    void *Pack(const BHMsgHead &head, const uint32_t head_len, const Body &body, const uint32_t body_len)
    {
        void *addr = Alloc(sizeof(head_len) + head_len + sizeof(body_len) + body_len);
        void *addr = get();
        if (addr) {
            auto p = static_cast<char *>(addr);
            auto Pack1 = [&p](auto len, auto &writer) {
            auto Pack1 = [&p](auto len, auto &&writer) {
                Put32(p, len);
                p += sizeof(len);
                writer(p, len);
                p += len;
            };
            Pack1(head_len, headToArray);
            Pack1(body_len, bodyToArray);
            Pack1(head_len, [&](void *p, int len) { head.SerializeToArray(p, len); });
            Pack1(body_len, [&](void *p, int len) { body.SerializeToArray(p, len); });
        }
        return addr;
    }
    template <class Body>
    void *Pack(const BHMsgHead &head, const Body &body)
    {
        return Pack(
            uint32_t(head.ByteSizeLong()), [&](void *p, int len) { head.SerializeToArray(p, len); },
            uint32_t(body.ByteSizeLong()), [&](void *p, int len) { body.SerializeToArray(p, len); });
    }
    void *Pack(const std::string &content)
    {
        void *addr = Alloc(content.size());
        void *addr = get();
        if (addr) {
            memcpy(addr, content.data(), content.size());
        }
@@ -133,36 +132,48 @@
        offset_(p ? (Addr(p) - BaseAddr()) : 0) {}
    template <class T = void>
    T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); }
    T *get() const { return offset_ != 0 ? static_cast<T *>(Ptr(offset_ + BaseAddr())) : nullptr; }
public:
    static bool BindShm(SharedMemory &shm) { return SetData(shm); }
    ShmMsg() :
        ShmMsg(nullptr) {}
        offset_(0) {}
    explicit ShmMsg(const OffsetType offset) :
        offset_(offset) {}
    OffsetType Offset() const { return offset_; }
    OffsetType &OffsetRef() { return offset_; }
    void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
    bool valid() const { return static_cast<bool>(offset_) && meta()->tag_ == kMsgTag; }
    int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
    int Release()
    {
        if (!valid()) {
            return 0;
        }
        auto n = meta()->count_.Dec();
        if (n == 0) {
            Free();
        }
        return n;
    }
    bool valid() const { return offset_ != 0 && meta()->tag_ == kMsgTag; }
    int64_t id() const { return valid() ? meta()->id_ : 0; }
    int64_t timestamp() const { return valid() ? meta()->timestamp_.load() : 0; }
    size_t Size() const { return valid() ? meta()->size_ : 0; }
    int Count() const { return valid() ? meta()->count_.Get() : 1; }
    int AddRef() const { return valid() ? meta()->count_.Inc() : 1; }
    int Release();
    void Free();
    template <class Body>
    inline bool Make(const BHMsgHead &head, const Body &body) { return Make(Pack(head, body)); }
    inline bool Make(const std::string &content) { return Make(Pack(content)); }
    inline bool Make(const BHMsgHead &head, const Body &body)
    {
        uint32_t head_len = head.ByteSizeLong();
        uint32_t body_len = body.ByteSizeLong();
        uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
        return Make(size) && Pack(head, head_len, body, body_len);
    }
    template <class Body>
    inline bool Fill(const BHMsgHead &head, const Body &body)
    {
        uint32_t head_len = head.ByteSizeLong();
        uint32_t body_len = body.ByteSizeLong();
        uint32_t size = sizeof(head_len) + head_len + sizeof(body_len) + body_len;
        return valid() && (meta()->size_ >= size) && Pack(head, head_len, body, body_len);
    }
    inline bool Make(const std::string &content) { return Make(content.size()) && Pack(content); }
    inline bool Fill(const std::string &content) { return valid() && (meta()->size_ >= content.size()) && Pack(content); }
    inline bool Make(const size_t size) { return Make(Alloc(size)); }
    template <class Body>
    static inline std::string Serialize(const BHMsgHead &head, const Body &body)
    {
@@ -208,6 +219,18 @@
typedef ShmMsg MsgI;
constexpr inline int EncodeCmd(int cmd) { return ((cmd & MaskBits(3)) << 1) | 1; }
constexpr inline int DecodeCmd(int64_t msg) { return (msg >> 1) & MaskBits(3); }
constexpr inline bool IsCmd(int64_t msg) { return (msg & 1) != 0; }
// int64_t pack format: cmd data ,3bit cmd, 1bit flag.
enum MsgCmd {
    eCmdNodeInit = 0,      // upto 59bit ssn id
    eCmdNodeInitReply = 1, // 31bit proc index,
    eCmdAllocRequest0 = 2, // 8bit size, 4bit socket index, 16bit proc index, 28bit id
    eCmdAllocReply0 = 3,   // 31bit ptr, 28bit id,
    eCmdFree = 4,          // upto 59bit msg id,
};
} // namespace bhome_msg
#endif // end of include guard: MSG_5BILLZET
src/proto.h
@@ -48,6 +48,8 @@
BHOME_SIMPLE_MAP_MSG(Publish);
BHOME_SIMPLE_MAP_MSG(Subscribe);
BHOME_SIMPLE_MAP_MSG(Unsubscribe);
BHOME_SIMPLE_MAP_MSG(ProcInit);
BHOME_SIMPLE_MAP_MSG(ProcInitReply);
#undef BHOME_SIMPLE_MAP_MSG
#undef BHOME_MAP_MSG_AND_TYPE
src/robust.cpp
@@ -35,24 +35,30 @@
bool FMutex::try_lock()
{
    if (mtx_.try_lock()) {
        if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
    if (flock(fd_, LOCK_EX | LOCK_NB) == 0) {
        ++count_;
        if (mtx_.try_lock()) {
            return true;
        } else {
            mtx_.unlock();
            if (--count_ == 0) {
                flock(fd_, LOCK_UN);
            }
        }
    }
    return false;
}
void FMutex::lock()
{
    mtx_.lock();
    flock(fd_, LOCK_EX);
    ++count_;
    mtx_.lock();
}
void FMutex::unlock()
{
    flock(fd_, LOCK_UN);
    mtx_.unlock();
    if (--count_ == 0) {
        flock(fd_, LOCK_UN);
    }
}
} // namespace robust
src/robust.h
@@ -19,6 +19,7 @@
#ifndef ROBUST_Q31RCWYU
#define ROBUST_Q31RCWYU
#include "bh_util.h"
#include "log.h"
#include <atomic>
#include <chrono>
@@ -37,8 +38,6 @@
using namespace std::chrono;
using namespace std::chrono_literals;
constexpr uint64_t MaskBits(int nbits) { return (uint64_t(1) << nbits) - 1; }
void QuickSleep();
class CasMutex
@@ -99,7 +98,7 @@
public:
    typedef uint64_t id_t;
    FMutex(id_t id) :
        id_(id), fd_(Open(id_))
        id_(id), fd_(Open(id_)), count_(0)
    {
        if (fd_ == -1) { throw "error create mutex!"; }
    }
@@ -117,11 +116,10 @@
    }
    static int Open(id_t id) { return open(GetPath(id).c_str(), O_CREAT | O_RDONLY, 0666); }
    static int Close(int fd) { return close(fd); }
    void FLock();
    void FUnlock();
    id_t id_;
    int fd_;
    std::mutex mtx_;
    std::atomic<int32_t> count_;
};
union semun {
@@ -310,5 +308,36 @@
    AData buf[capacity];
};
template <class Int>
class AtomicQueue<0, Int>
{
    typedef Int Data;
    typedef std::atomic<Data> AData;
    static_assert(sizeof(Data) == sizeof(AData));
public:
    AtomicQueue() { memset(this, 0, sizeof(*this)); }
    bool push(const Data d, bool try_more = false)
    {
        auto cur = buf.load();
        return Empty(cur) && buf.compare_exchange_strong(cur, Enc(d));
    }
    bool pop(Data &d, bool try_more = false)
    {
        Data cur = buf.load();
        bool r = !Empty(cur) && buf.compare_exchange_strong(cur, 0);
        if (r) { d = Dec(cur); }
        return r;
    }
    uint32_t head() const { return 0; }
    uint32_t tail() const { return 0; }
private:
    static inline bool Empty(const Data d) { return (d & 1) == 0; } // lowest bit 1 means data ok.
    static inline Data Enc(const Data d) { return (d << 1) | 1; }   // lowest bit 1 means data ok.
    static inline Data Dec(const Data d) { return d >> 1; }         // lowest bit 1 means data ok.
    AData buf;
};
} // namespace robust
#endif // end of include guard: ROBUST_Q31RCWYU
src/sendq.cpp
@@ -40,20 +40,24 @@
    }
    auto SendData = [&](Data &d) {
        auto TryLoop = [&](auto &&data) {
            for (int i = 0; i < 1; ++i) {
                if (mq.TrySend(remote, data)) {
                    return true;
                }
            }
            return false;
        };
        bool r = false;
        if (d.index() == 0) {
            auto &msg = boost::variant2::get<0>(pos->data().data_);
            r = mq.TrySend(remote, msg);
            r = TryLoop(msg);
            if (r) {
                msg.Release();
            }
        } else {
            auto &content = boost::variant2::get<1>(pos->data().data_);
            MsgI msg;
            if (msg.Make(content)) {
                DEFER1(msg.Release(););
                r = mq.TrySend(remote, msg);
            }
            auto command = boost::variant2::get<1>(pos->data().data_);
            r = TryLoop(command);
        }
        return r;
    };
@@ -110,4 +114,4 @@
    Collect();
    return !out_.empty();
}
}
src/sendq.h
@@ -37,7 +37,8 @@
    typedef MQId Remote;
    typedef bhome_msg::MsgI MsgI;
    typedef std::string Content;
    typedef boost::variant2::variant<MsgI, Content> Data;
    typedef int64_t Command;
    typedef boost::variant2::variant<MsgI, Command> Data;
    typedef std::function<void(const Data &)> OnMsgEvent;
    struct MsgInfo {
        Data data_;
@@ -47,23 +48,16 @@
    typedef TimedMsg::TimePoint TimePoint;
    typedef TimedMsg::Duration Duration;
    // template <class... Rest>
    // void Append(const MQId &id, Rest &&...rest)
    // {
    //     Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
    // }
    void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent())
    {
        msg.AddRef();
        AppendData(addr, Data(msg), DefaultExpire(), onExpire);
    }
    void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
    void Append(const Remote addr, const Command command, OnMsgEvent onExpire = OnMsgEvent())
    {
        AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
        AppendData(addr, Data(command), DefaultExpire(), onExpire);
    }
    bool TrySend(ShmMsgQueue &mq);
    // bool empty() const { return store_.empty(); }
private:
    static TimePoint Now() { return TimedMsg::Clock::now(); }
src/shm_msg_queue.cpp
@@ -56,6 +56,7 @@
ShmMsgQueue::~ShmMsgQueue() {}
#ifndef BH_USE_ATOMIC_Q
ShmMsgQueue::Mutex &ShmMsgQueue::GetMutex(const MQId id)
{
    static std::unordered_map<MQId, std::shared_ptr<Mutex>> imm;
@@ -69,13 +70,19 @@
    }
    return *pos->second;
}
#endif
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
{
    Queue *q = Find(shm, id);
    if (q) {
        MsgI msg;
        while (q->TryRead(msg.OffsetRef())) {
            msg.Release();
        RawData val = 0;
        while (q->TryRead(val)) {
            if (IsCmd(val)) {
                LOG_DEBUG() << "clsing queue " << id << ", has a cmd" << DecodeCmd(val);
            } else {
                MsgI(val).Release();
            }
        }
    }
    return Shmq::Remove(shm, MsgQIdToName(id));
@@ -86,19 +93,18 @@
    return Shmq::Find(shm, MsgQIdToName(remote_id));
}
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg)
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, int64_t val)
{
    bool r = false;
    try {
        ShmMsgQueue dest(remote_id, false, shm, 1);
        msg.AddRef();
        DEFER1(if (!r) { msg.Release(); });
#ifndef BH_USE_ATOMIC_Q
        Guard lock(GetMutex(remote_id));
        r = dest.queue().TryWrite(msg.Offset());
#endif
        return dest.queue().TryWrite(val);
    } catch (...) {
        // SetLastError(eNotFound, "remote not found");
        return false;
    }
    return r;
}
// Test shows that in the 2 cases:
src/shm_msg_queue.h
@@ -24,19 +24,25 @@
using namespace bhome_shm;
using namespace bhome_msg;
#define BH_USE_ATOMIC_Q
class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
{
    // typedef ShmObject<SharedQ63<4>> Shmq;
    typedef ShmObject<SharedQueue<int64_t>> Shmq;
    typedef Shmq::ShmType ShmType;
    typedef Shmq::Data Queue;
    typedef std::function<void()> OnSend;
    typedef robust::FMutex Mutex;
    // typedef robust::SemMutex Mutex;
    // typedef robust::NullMutex Mutex;
    typedef robust::Guard<Mutex> Guard;
public:
    typedef int64_t RawData;
#ifdef BH_USE_ATOMIC_Q
    typedef ShmObject<SharedQ63<0>> Shmq;
#else
    typedef ShmObject<SharedQueue<RawData>> Shmq;
    // typedef robust::FMutex Mutex;
    // typedef robust::SemMutex Mutex;
    typedef robust::NullMutex Mutex;
    typedef robust::Guard<Mutex> Guard;
#endif
    typedef Shmq::Data Queue;
    typedef Shmq::ShmType ShmType;
    typedef uint64_t MQId;
    static MQId NewId();
@@ -45,26 +51,45 @@
    ShmMsgQueue(const MQId id, const bool create_or_else_find, ShmType &segment, const int len);
    ShmMsgQueue(ShmType &segment, const int len);
    ~ShmMsgQueue();
    static bool Remove(SharedMemory &shm, const MQId id);
    static bool Remove(ShmType &shm, const MQId id);
    MQId Id() const { return id_; }
    ShmType &shm() const { return queue_.shm(); }
    bool Recv(MsgI &msg, const int timeout_ms)
    bool Recv(RawData &val, const int timeout_ms)
    {
#ifndef BH_USE_ATOMIC_Q
        Guard lock(GetMutex(Id()));
        return queue().Read(msg.OffsetRef(), timeout_ms);
#endif
        return queue().Read(val, timeout_ms);
    }
    bool TryRecv(MsgI &msg)
    bool TryRecv(RawData &val)
    {
#ifndef BH_USE_ATOMIC_Q
        Guard lock(GetMutex(Id()));
        return queue().TryRead(msg.OffsetRef());
#endif
        return queue().TryRead(val);
    }
    static Queue *Find(SharedMemory &shm, const MQId remote_id);
    static bool TrySend(SharedMemory &shm, const MQId remote_id, MsgI msg);
    bool Recv(MsgI &msg, const int timeout_ms) { return Recv(msg.OffsetRef(), timeout_ms); }
    bool TryRecv(MsgI &msg) { return TryRecv(msg.OffsetRef()); }
    static Queue *Find(ShmType &shm, const MQId remote_id);
    static bool TrySend(ShmType &shm, const MQId remote_id, const RawData val);
    static bool TrySend(ShmType &shm, const MQId remote_id, MsgI msg)
    {
        bool r = false;
        msg.AddRef(); // TODO check if we could avoid addref here.
        DEFER1(if (!r) { msg.Release(); });
        r = TrySend(shm, remote_id, msg.Offset());
        return r;
    }
    bool TrySend(const MQId remote_id, const MsgI &msg) { return TrySend(shm(), remote_id, msg); }
    bool TrySend(const MQId remote_id, const RawData val) { return TrySend(shm(), remote_id, val); }
private:
#ifndef BH_USE_ATOMIC_Q
    static Mutex &GetMutex(const MQId id);
#endif
    MQId id_;
    Queue &queue() { return *queue_.data(); }
    Shmq queue_;
src/shm_queue.h
@@ -76,7 +76,7 @@
private:
    Circular<D> queue_;
    bhome_shm::Mutex mutex_;
    // bhome_shm::Mutex mutex_;
};
template <int Power = 4>
@@ -92,11 +92,12 @@
        using namespace std::chrono;
        auto end_time = steady_clock::now() + milliseconds(timeout_ms);
        do {
            if (TryRead(d)) {
                return true;
            } else {
                robust::QuickSleep();
            for (int i = 0; i < 100; ++i) {
                if (TryRead(d)) {
                    return true;
                }
            }
            robust::QuickSleep();
        } while (steady_clock::now() < end_time);
        return false;
    }
src/socket.cpp
@@ -20,22 +20,25 @@
#include "bh_util.h"
#include "defs.h"
#include "msg.h"
#include <chrono>
using namespace std::chrono;
using namespace std::chrono_literals;
using namespace bhome_msg;
using namespace bhome_shm;
ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
    run_(false), mq_(id, shm, len)
    run_(false), mq_(id, shm, len), alloc_id_(0)
{
    Start();
}
ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
    run_(false), mq_(id, create_or_else_find, shm, len)
    run_(false), mq_(id, create_or_else_find, shm, len), alloc_id_(0)
{
    Start();
}
ShmSocket::ShmSocket(bhome_shm::SharedMemory &shm, const int len) :
    run_(false), mq_(shm, len)
    run_(false), mq_(shm, len), alloc_id_(0)
{
    Start();
}
@@ -45,50 +48,15 @@
    Stop();
}
bool ShmSocket::Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker)
bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle)
{
    auto ioProc = [this, onData, onIdle]() {
    auto ioProc = [this, onData, onRaw, onIdle]() {
        auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
        auto DoRecv = [=] {
            // do not recv if no cb is set.
            if (!onData) {
                return false;
            }
            auto onMsg = [&](MsgI &imsg) {
                DEFER1(imsg.Release());
                onData(*this, imsg);
            };
            MsgI imsg;
            return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
        };
            if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; }
        try {
            bool more_to_send = DoSend();
            bool more_to_recv = DoRecv();
            if (onIdle) { onIdle(*this); }
            if (!more_to_send && !more_to_recv) {
                robust::QuickSleep();
            }
        } catch (...) {
        }
    };
    std::lock_guard<std::mutex> lock(mutex_);
    StopNoLock();
    run_.store(true);
    for (int i = 0; i < nworker; ++i) {
        workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
    }
    return true;
}
bool ShmSocket::Start(int nworker, const RecvCB &onData, const IdleCB &onIdle)
{
    auto ioProc = [this, onData, onIdle]() {
        auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
        auto DoRecv = [=] {
            auto onRecvWithPerMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
            auto onMsgCB = [this, onData](ShmSocket &socket, MsgI &imsg, BHMsgHead &head) {
                RecvCB cb;
                if (per_msg_cbs_->Pick(head.msg_id(), cb)) {
                    cb(socket, imsg, head);
@@ -96,20 +64,43 @@
                    onData(socket, imsg, head);
                }
            };
            // do not recv if no cb is set.
            if (!onData && per_msg_cbs_->empty()) {
                return false;
            }
            auto onMsg = [&](MsgI &imsg) {
                DEFER1(imsg.Release());
                BHMsgHead head;
                if (imsg.ParseHead(head)) {
                    onRecvWithPerMsgCB(*this, imsg, head);
            auto onCmdCB = [this, onRaw](ShmSocket &socket, int64_t val) {
                int cmd = DecodeCmd(val);
                if (cmd == eCmdAllocReply0) {
                    int id = (val >> 4) & MaskBits(28);
                    RawRecvCB cb;
                    if (alloc_cbs_->Pick(id, cb)) {
                        cb(socket, val);
                        return;
                    }
                }
                if (onRaw) {
                    onRaw(socket, val);
                }
            };
            MsgI imsg;
            return mq().TryRecv(imsg) ? (onMsg(imsg), true) : false;
            auto onRecv = [&](auto &val) {
                if (IsCmd(val)) {
                    onCmdCB(*this, val);
                } else {
                    MsgI imsg(val);
                    DEFER1(imsg.Release());
                    BHMsgHead head;
                    if (imsg.ParseHead(head)) {
                        onMsgCB(*this, imsg, head);
                    }
                }
            };
            ShmMsgQueue::RawData val = 0;
            auto TryRecvMore = [&]() {
                for (int i = 0; i < 100; ++i) {
                    if (mq().TryRecv(val)) {
                        return true;
                    }
                }
                return false;
            };
            return TryRecvMore() ? (onRecv(val), true) : false;
        };
        try {
@@ -126,9 +117,18 @@
    std::lock_guard<std::mutex> lock(mutex_);
    StopNoLock();
    auto worker_proc = [this, ioProc]() {
        while (run_) { ioProc(); }
        // try send pending msgs.
        auto end_time = steady_clock::now() + 3s;
        while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) {
            // LOG_DEBUG() << "try send pending msgs.";
        }
    };
    run_.store(true);
    for (int i = 0; i < nworker; ++i) {
        workers_.emplace_back([this, ioProc]() { while (run_) { ioProc(); } });
        workers_.emplace_back(worker_proc);
    }
    return true;
}
@@ -153,6 +153,10 @@
    return false;
}
bool ShmSocket::SyncRecv(int64_t &cmd, const int timeout_ms)
{
    return (timeout_ms == 0) ? mq().TryRecv(cmd) : mq().Recv(cmd, timeout_ms);
}
//maybe reimplment, using async cbs?
bool ShmSocket::SyncRecv(bhome_msg::MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms)
{
@@ -167,3 +171,30 @@
    }
    return false;
}
bool ShmSocket::RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult)
{ // 8bit size, 4bit socket index, 16bit proc index, 28bit id, ,4bit cmd+flag
    // LOG_FUNCTION;
    if (node_proc_index_ == -1 || socket_index_ == -1) {
        return false;
    }
    int id = (++alloc_id_) & MaskBits(28);
    int64_t cmd = (CalcAllocIndex(size) << 52) |
                  ((socket_index_ & MaskBits(4)) << 48) |
                  ((node_proc_index_ & MaskBits(16)) << 32) |
                  (id << 4) |
                  EncodeCmd(eCmdAllocRequest0);
    auto rawCB = [onResult](ShmSocket &sock, int64_t &val) {
        MsgI msg((val >> 32) & MaskBits(31));
        DEFER1(msg.Release());
        onResult(msg);
        return true;
    };
    alloc_cbs_->Store(id, std::move(rawCB));
    auto onExpireRemoveCB = [this, id](SendQ::Data const &msg) {
        RawRecvCB cb_no_use;
        alloc_cbs_->Pick(id, cb_no_use);
    };
    return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB);
}
src/socket.h
@@ -42,7 +42,7 @@
public:
    typedef ShmMsgQueue::MQId MQId;
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(ShmSocket &sock, MsgI &imsg)> RawRecvCB;
    typedef std::function<void(ShmSocket &sock, Queue::RawData &val)> RawRecvCB;
    typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
    typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
    typedef std::function<void(ShmSocket &sock)> IdleCB;
@@ -54,39 +54,74 @@
    static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
    bool Remove() { return Remove(shm(), id()); }
    MQId id() const { return mq().Id(); }
    void SetNodeProc(const int proc_index, const int socket_index)
    {
        node_proc_index_ = proc_index;
        socket_index_ = socket_index;
        LOG_DEBUG() << "Set Node Proc " << node_proc_index_ << ", " << socket_index_;
    }
    // start recv.
    bool Start(const RawRecvCB &onData, const IdleCB &onIdle, int nworker = 1);
    bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
    bool Start(int nworker = 1, const RecvCB &onMsg = RecvCB(), const RawRecvCB &onRaw = RawRecvCB(), const IdleCB &onIdle = IdleCB());
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, RawRecvCB(), onIdle); }
    bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
    bool Stop();
    template <class Body>
    bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
    bool CenterSend(const MQId remote, BHMsgHead &head, Body &body)
    {
        try {
            if (!cb) {
                return SendImpl(remote, MsgI::Serialize(head, body));
            } else {
                std::string msg_id(head.msg_id());
                per_msg_cbs_->Store(msg_id, std::move(cb));
                auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                    RecvCB cb_no_use;
                    per_msg_cbs_->Pick(msg_id, cb_no_use);
                };
                return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB);
            }
            //TODO alloc outsiez and use send.
            MsgI msg;
            if (!msg.Make(head, body)) { return false; }
            DEFER1(msg.Release());
            return Send(remote, msg);
        } catch (...) {
            SetLastError(eError, "Send internal error.");
            return false;
        }
    }
    bool Send(const MQId remote, const MsgI &imsg)
    {
        return SendImpl(remote, imsg);
    }
    bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
    template <class Body>
    bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
    {
        std::string msg_id(head.msg_id());
        std::string content(MsgI::Serialize(head, body));
        size_t size = content.size();
        auto OnResult = [content = std::move(content), msg_id, remote, cb = std::move(cb), this](MsgI &msg) mutable {
            if (!msg.Fill(content)) { return; }
            try {
                if (!cb) {
                    Send(remote, msg);
                } else {
                    per_msg_cbs_->Store(msg_id, std::move(cb));
                    auto onExpireRemoveCB = [this, msg_id](SendQ::Data const &msg) {
                        RecvCB cb_no_use;
                        per_msg_cbs_->Pick(msg_id, cb_no_use);
                    };
                    Send(remote, msg, onExpireRemoveCB);
                }
            } catch (...) {
                SetLastError(eError, "Send internal error.");
            }
        };
        return RequestAlloc(size, OnResult);
    }
    template <class... T>
    bool Send(const MQId remote, const MsgI &imsg, T &&...t)
    {
        return SendImpl(remote, imsg, std::forward<decltype(t)>(t)...);
    }
    template <class... T>
    bool Send(const MQId remote, const int64_t cmd, T &&...t)
    {
        return SendImpl(remote, cmd, std::forward<decltype(t)>(t)...);
    }
    bool SyncRecv(int64_t &cmd, const int timeout_ms);
    bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
    template <class Body>
@@ -153,15 +188,15 @@
    std::atomic<bool> run_;
    Queue mq_;
    template <class Key>
    template <class Key, class CB>
    class CallbackRecords
    {
        std::unordered_map<Key, RecvCB> store_;
        std::unordered_map<Key, CB> store_;
    public:
        bool empty() const { return store_.empty(); }
        bool Store(const Key &id, RecvCB &&cb) { return store_.emplace(id, std::move(cb)).second; }
        bool Pick(const Key &id, RecvCB &cb)
        bool Store(const Key &id, CB &&cb) { return store_.emplace(id, std::move(cb)).second; }
        bool Pick(const Key &id, CB &cb)
        {
            auto pos = store_.find(id);
            if (pos != store_.end()) {
@@ -174,9 +209,14 @@
        }
    };
    Synced<CallbackRecords<std::string>> per_msg_cbs_;
    Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
    Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
    SendQ send_buffer_;
    // node request center alloc memory.
    int node_proc_index_ = -1;
    int socket_index_ = -1;
    std::atomic<int> alloc_id_;
};
#endif // end of include guard: SOCKET_GWTJHBPO
src/topic_node.cpp
@@ -42,7 +42,6 @@
TopicNode::TopicNode(SharedMemory &shm) :
    shm_(shm), state_(eStateUnregistered)
{
    Init();
}
TopicNode::~TopicNode()
@@ -57,34 +56,79 @@
    if (Valid()) {
        return true;
    } else if (info_.proc_id().empty()) {
        return false;
    }
    if (ssn_id_ == 0) {
        ssn_id_ = ShmMsgQueue::NewId();
    }
    LOG_DEBUG() << "Node Init, id " << ssn_id_;
    MsgI msg;
    msg.OffsetRef() = ssn_id_;
    if (ShmMsgQueue::TrySend(shm(), BHInitAddress(), msg)) {
        auto end_time = steady_clock::now() + 3s;
        do {
            try {
                for (int i = eSockStart; i < eSockEnd; ++i) {
                    sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
    auto NodeInit = [&]() {
        auto SendInitCmd = [&]() {
            int64_t init_cmd = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit);
            auto end_time = steady_clock::now() + 3s;
            bool r = false;
            do {
                r = ShmMsgQueue::TrySend(shm(), BHTopicCenterAddress(), init_cmd);
            } while (!r && steady_clock::now() < end_time);
            return r;
        };
        if (SendInitCmd()) {
            LOG_DEBUG() << "node send init ok";
            auto end_time = steady_clock::now() + 3s;
            do {
                try {
                    for (int i = eSockStart; i < eSockEnd; ++i) {
                        sockets_.emplace_back(new ShmSocket(shm_, false, ssn_id_ + i, kMqLen));
                    }
                    break;
                } catch (...) {
                    sockets_.clear();
                    std::this_thread::sleep_for(100ms);
                }
                break;
            } catch (...) {
                sockets_.clear();
                std::this_thread::sleep_for(100ms);
            }
        } while (steady_clock::now() < end_time);
        if (!sockets_.empty()) {
            // recv msgs to avoid memory leak.
            auto default_ignore_msg = [](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) { return true; };
            SockNode().Start(default_ignore_msg);
            return true;
            } while (steady_clock::now() < end_time);
        }
    };
    if (sockets_.empty()) {
        NodeInit();
    }
    if (!sockets_.empty()) {
        LOG_DEBUG() << "node sockets ok";
        auto onNodeCmd = [this](ShmSocket &socket, int64_t &val) {
            LOG_DEBUG() << "node recv cmd: " << DecodeCmd(val);
            switch (DecodeCmd(val)) {
            case eCmdNodeInitReply: {
                MsgI msg(val >> 4);
                DEFER1(msg.Release());
                MsgProcInit body;
                auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_);
                head.add_route()->set_mq_id(ssn_id_);
                if (msg.Fill(head, body)) {
                    socket.Send(BHTopicCenterAddress(), msg);
                }
            } break;
            default:
                break;
            }
            return true;
        };
        // recv msgs to avoid memory leak.
        auto onMsg = [this](ShmSocket &sock, MsgI &imsg, BHMsgHead &head) {
            LOG_DEBUG() << "node recv type: " << head.type();
            if (head.type() == kMsgTypeProcInitReply) {
                LOG_DEBUG() << "got proc init reply";
                MsgProcInitReply reply;
                if (imsg.ParseBody(reply)) {
                    SetProcIndex(reply.proc_index());
                }
            }
            return true;
        };
        SockNode().Start(1, onMsg, onNodeCmd);
        LOG_DEBUG() << "sockets ok.";
        return true;
    }
    return false;
}
@@ -100,7 +144,7 @@
    } else if (nworker > 16) {
        nworker = 16;
    }
    SockNode().Start();
    // SockNode().Start();
    ServerStart(server_cb, nworker);
    SubscribeStartWorker(sub_cb, nworker);
    ClientStartWorker(client_cb, nworker);
@@ -114,12 +158,15 @@
bool TopicNode::Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms)
{
    {
        std::lock_guard<std::mutex> lk(mutex_);
        info_ = proc;
    }
    if (!Init()) {
        SetLastError(eError, kErrMsgNotInit);
        return false;
    }
    info_ = proc;
    auto &sock = SockNode();
    MsgRegister body;
src/topic_node.h
@@ -130,6 +130,14 @@
    ShmSocket &SockClient() { return *sockets_[eSockClient]; }
    ShmSocket &SockServer() { return *sockets_[eSockServer]; }
    void SetProcIndex(int index)
    {
        proc_index_ = index;
        for (int i = eSockStart; i < eSockEnd; ++i) {
            sockets_[i]->SetNodeProc(index, i);
        }
    }
    enum State {
        eStateUnregistered,
        eStateOnline,
@@ -144,6 +152,7 @@
    std::mutex mutex_;
    MQId ssn_id_ = 0;
    std::atomic<State> state_;
    int proc_index_ = -1;
    TopicQueryCache topic_query_cache_;
};
utest/api_test.cpp
@@ -293,7 +293,7 @@
    // }
    int same = 0;
    int64_t last = 0;
    uint64_t last = 0;
    while (last < nreq * ncli && same < 2) {
        Sleep(1s, false);
        auto cur = Status().nreply_.load();
utest/robust_test.cpp
@@ -39,19 +39,24 @@
    std::atomic<uint64_t> nwrite(0);
    std::atomic<uint64_t> writedone(0);
#if 0
    typedef AtomicQueue<4> Rcb;
#if 1
    const int kPower = 0;
    typedef AtomicQueue<kPower> Rcb;
    Rcb tmp;
    BOOST_CHECK(tmp.like_empty());
    // BOOST_CHECK(tmp.like_empty());
    BOOST_CHECK(tmp.push(1));
    BOOST_CHECK(tmp.tail() == 1);
    if (kPower != 0) {
        BOOST_CHECK(tmp.tail() == 1);
    }
    BOOST_CHECK(tmp.head() == 0);
    int64_t d;
    BOOST_CHECK(tmp.pop(d));
    BOOST_CHECK(tmp.like_empty());
    BOOST_CHECK(tmp.head() == 1);
    BOOST_CHECK(tmp.tail() == 1);
    if (kPower != 0) {
        // BOOST_CHECK(tmp.like_empty());
        BOOST_CHECK(tmp.head() == 1);
        BOOST_CHECK(tmp.tail() == 1);
    }
    ShmObject<Rcb> rcb(shm, "test_rcb");
    bool try_more = true;
@@ -166,18 +171,20 @@
BOOST_AUTO_TEST_CASE(MutexTest)
{
    {
        int fd = open("/tmp/test_fmutex", O_CREAT | O_RDONLY, 0666);
        flock(fd, LOCK_EX);
        printf("lock 1");
        int sem_id = semget(100, 1, 0666 | IPC_CREAT);
        auto P = [&]() {
            sembuf op = {0, -1, SEM_UNDO};
            semop(sem_id, &op, 1);
        };
        auto V = [&]() {
            sembuf op = {0, 1, SEM_UNDO};
            semop(sem_id, &op, 1);
        };
        for (int i = 0; i < 10; ++i) {
            V();
        }
        Sleep(10s);
        flock(fd, LOCK_EX);
        printf("lock 2");
        Sleep(10s);
        flock(fd, LOCK_UN);
        printf("un lock 2");
        Sleep(10s);
        flock(fd, LOCK_UN);
        printf("un lock 1");
        return;
    }
@@ -204,7 +211,7 @@
    std::mutex m;
    typedef std::chrono::steady_clock Clock;
    auto Now = []() { return Clock::now().time_since_epoch(); };
    if (pi) {
        auto old = *pi;
        printf("int : %d, add1: %d\n", old, ++*pi);
utest/speed_test.cpp
@@ -92,9 +92,9 @@
        }
    };
    int nwriters[] = {1, 10, 100};
    int nwriters[] = {1, 10, 100, 1000};
    int nreaders[] = {2};
    const int64_t total_msg = 1000 * 100;
    const int64_t total_msg = 1000 * 1000;
    auto Test = [&](auto &www, auto &rrr, bool isfork) {
        for (auto nreader : nreaders) {
@@ -127,12 +127,13 @@
    // typedef ThreadManager Manager;
    // const bool isfork = IsSameType<Manager, ProcessManager>::value;
    {
    if (0) {
        ThreadManager tw, tr;
        printf("---------------- Testing thread io:  -------------------------------------------------------\n");
        Test(tw, tr, false);
    }
    {
    if (1) {
        ProcessManager pw, pr;
        printf("================ Testing process io: =======================================================\n");
        Test(pw, pr, true);