lichao
2021-05-21 1ff714838c03cba1a18884d5b48a20ee6c4275ac
class MsgI, ShmMsgQueue, no bind to shm.
17个文件已修改
358 ■■■■ 已修改文件
box/center.cpp 13 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.cpp 17 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/node_center.h 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 23 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.cpp 9 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 48 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_msg_queue.h 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_socket.cpp 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_socket.h 23 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 52 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.h 4 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/simple_tests.cpp 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -65,7 +65,7 @@
    return [&](auto &&rep_body) {
        auto reply_head(InitMsgHead(GetType(rep_body), center->id(), head.ssn_id(), head.msg_id()));
        MQInfo remote = {head.route(0).mq_id(), head.route(0).abs_addr()};
        MsgI msg;
        MsgI msg(socket.shm());
        if (msg.Make(reply_head, rep_body)) {
            DEFER1(msg.Release(););
            center->SendAllocMsg(socket, remote, msg);
@@ -73,7 +73,7 @@
    };
}
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr)
bool AddCenter(std::shared_ptr<Synced<NodeCenter>> center_ptr, SharedMemory &shm)
{
    // command
    auto OnCommand = [center_ptr](ShmSocket &socket, ShmMsgQueue::RawData &cmd) -> bool {
@@ -87,7 +87,7 @@
        auto onInit = [&](const int64_t request) {
            return center->OnNodeInit(socket, request);
        };
        BHCenterHandleInit(onInit);
        BHCenterHandleInit(socket.shm(), onInit);
        center->OnTimer();
    };
@@ -106,7 +106,7 @@
        default: return false;
        }
    };
    BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(), 1000);
    BHCenter::Install("#center.main", OnCenter, OnCommand, OnCenterIdle, BHTopicCenterAddress(shm), 1000);
    auto OnBusIdle = [=](ShmSocket &socket) {};
    auto OnBusCmd = [=](ShmSocket &socket, ShmMsgQueue::RawData &val) { return false; };
@@ -142,7 +142,7 @@
        }
    };
    BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(), 1000);
    BHCenter::Install("#center.bus", OnPubSub, OnBusCmd, OnBusIdle, BHTopicBusAddress(shm), 1000);
    return true;
}
@@ -167,7 +167,7 @@
{
    auto nsec = NodeTimeoutSec();
    auto center_ptr = std::make_shared<Synced<NodeCenter>>("#bhome_center", nsec, nsec * 3); // *3 to allow other clients to finish sending msgs.
    AddCenter(center_ptr);
    AddCenter(center_ptr, shm);
    for (auto &kv : Centers()) {
        auto &info = kv.second;
@@ -176,6 +176,7 @@
    topic_node_.reset(new CenterTopicNode(center_ptr, shm));
}
BHCenter::~BHCenter() { Stop(); }
bool BHCenter::Start()
box/node_center.cpp
@@ -57,7 +57,7 @@
{
    auto pos = msgs_.find(id);
    if (pos != msgs_.end()) {
        ShmMsg(pos->second).Free();
        pos->second.Free();
        msgs_.erase(pos);
    } else {
        LOG_TRACE() << "ignore late free request.";
@@ -101,9 +101,9 @@
    int i = 0;
    int total_count = 0;
    for (auto &kv : msgs_) {
        MsgI msg(kv.second);
        auto &msg = kv.second;
        total_count += msg.Count();
        LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second << ", count: " << msg.Count() << ", size: " << msg.Size();
        LOG_TRACE() << "  " << i++ << ": msg id: " << kv.first << ", offset: " << kv.second.Offset() << ", count: " << msg.Count() << ", size: " << msg.Size();
    }
    LOG_TRACE() << "total count: " << total_count;
}
@@ -173,7 +173,7 @@
    auto PrepareProcInit = [&](Node &node) {
        bool r = false;
        ShmMsg init_msg;
        ShmMsg init_msg(shm);
        DEFER1(init_msg.Release());
        MsgProcInit body;
        auto head = InitMsgHead(GetType(body), id(), ssn);
@@ -238,7 +238,7 @@
    if (!FindMq()) { return; }
    auto size = GetAllocSize((val >> 52) & MaskBits(8));
    MsgI new_msg;
    MsgI new_msg(socket.shm());
    if (new_msg.Make(size)) {
        // 31bit proc index, 28bit id, ,4bit cmd+flag
        int64_t reply = (new_msg.Offset() << 32) | (msg_id << 4) | EncodeCmd(eCmdAllocReply0);
@@ -612,18 +612,15 @@
        pub.set_topic(topic);
        pub.set_data(content);
        BHMsgHead head(InitMsgHead(GetType(pub), id(), 0));
        MsgI msg;
        MsgI msg(shm);
        if (msg.Make(head, pub)) {
            DEFER1(msg.Release());
            RecordMsg(msg);
            auto &mq = GetCenterInfo(shm)->mq_sender_;
            ShmSocket sender(mq.offset_, shm, mq.id_);
            for (auto &cli : clients) {
                auto node = cli.weak_node_.lock();
                if (node && node->state_.flag_ == kStateNormal) {
                    sender.Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
                    DefaultSender(shm).Send({cli.mq_id_, cli.mq_abs_addr_}, msg);
                }
            }
        }
box/node_center.h
@@ -51,14 +51,14 @@
    typedef int64_t Offset;
public:
    void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg.Offset()); }
    void RecordMsg(const MsgI &msg) { msgs_.emplace(msg.id(), msg); }
    void FreeMsg(MsgId id);
    void AutoRemove();
    size_t size() const { return msgs_.size(); }
    void DebugPrint() const;
private:
    std::unordered_map<MsgId, Offset> msgs_;
    std::unordered_map<MsgId, MsgI> msgs_;
    int64_t time_to_clean_ = 0;
};
src/defs.cpp
@@ -90,7 +90,7 @@
} // namespace
CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm)
CenterInfo *GetCenterInfo(SharedMemory &shm)
{
    auto pmeta = Ptr<CenterMetaInfo>(kCenterInfoFixedAddress + Addr(shm.get_address()));
    if (pmeta->tag_ == kCenterInfoTag) {
@@ -98,11 +98,33 @@
    }
    return nullptr;
}
ShmSocket &DefaultSender(SharedMemory &shm)
{
    typedef std::pair<void *, std::shared_ptr<ShmSocket>> Pair;
    static std::vector<Pair> store;
    static std::mutex s_mtx;
    thread_local Pair local_cache;
    if (local_cache.first == &shm) {
        return *local_cache.second;
    }
    std::lock_guard<std::mutex> lk(s_mtx);
    for (auto &kv : store) {
        if (kv.first == &shm) {
            local_cache = kv;
            return *local_cache.second;
        }
    }
    auto &mq = GetCenterInfo(shm)->mq_sender_;
    store.emplace_back(&shm, new ShmSocket(mq.offset_, shm, mq.id_));
    local_cache = store.back();
    return *local_cache.second;
}
// put center info at fixed memory position.
// as boost shm find object (find socket/mq by id, etc...) also locks inside,
// which node might crash inside and cause deadlock.
bool CenterInit(bhome_shm::SharedMemory &shm)
bool CenterInit(SharedMemory &shm)
{
    Mutex *mutex = shm.FindOrCreate<Mutex>("shm_center_lock");
    if (!mutex || !mutex->try_lock()) {
@@ -140,16 +162,15 @@
    return false;
}
const MQInfo &BHGlobalSenderAddress() { return GetCenterInfo(BHomeShm())->mq_sender_; }
const MQInfo &BHTopicCenterAddress() { return GetCenterInfo(BHomeShm())->mq_center_; }
const MQInfo &BHTopicBusAddress() { return GetCenterInfo(BHomeShm())->mq_bus_; }
bool BHNodeInit(const int64_t request, int64_t &reply)
const MQInfo &BHTopicCenterAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_center_; }
const MQInfo &BHTopicBusAddress(SharedMemory &shm) { return GetCenterInfo(shm)->mq_bus_; }
bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply)
{
    return GetCenterInfo(BHomeShm())->init_rr_.ClientRequest(request, reply);
    return GetCenterInfo(shm)->init_rr_.ClientRequest(request, reply);
}
void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq)
void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq)
{
    GetCenterInfo(BHomeShm())->init_rr_.ServerProcess(onReq);
    GetCenterInfo(shm)->init_rr_.ServerProcess(onReq);
}
int64_t CalcAllocIndex(int64_t size)
@@ -164,18 +185,15 @@
{
    return "bhome_default_shm_v0";
}
bhome_shm::SharedMemory &BHomeShm()
SharedMemory &BHomeShm()
{
    static bhome_shm::SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
    static SharedMemory shm(BHomeShmName(), 1024 * 1024 * 512);
    return shm;
}
bool GlobalInit(bhome_shm::SharedMemory &shm)
{
    MsgI::BindShm(shm);
    CenterInfo *pinfo = GetCenterInfo(shm);
    return pinfo && ShmMsgQueue::SetData(pinfo->mqid_);
}
bool GlobalInit(SharedMemory &shm) { return GetCenterInfo(shm); }
MQId NewSession() { return 10 * (++GetCenterInfo(BHomeShm())->mqid_); }
void SetLastError(const int ec, const std::string &msg)
{
src/defs.h
@@ -23,6 +23,7 @@
#include <atomic>
#include <string>
class ShmSocket;
typedef uint64_t MQId;
int64_t CalcAllocIndex(int64_t size);
@@ -50,21 +51,25 @@
class SharedMemory;
} // namespace bhome_shm
using bhome_shm::SharedMemory;
std::string BHomeShmName();
bhome_shm::SharedMemory &BHomeShm();
CenterInfo *GetCenterInfo(bhome_shm::SharedMemory &shm);
bool CenterInit(bhome_shm::SharedMemory &shm);
bool GlobalInit(bhome_shm::SharedMemory &shm);
SharedMemory &BHomeShm();
CenterInfo *GetCenterInfo(SharedMemory &shm);
ShmSocket &DefaultSender(SharedMemory &shm);
MQId NewSession();
bool CenterInit(SharedMemory &shm);
bool GlobalInit(SharedMemory &shm);
typedef std::string Topic;
void SetLastError(const int ec, const std::string &msg);
void GetLastError(int &ec, std::string &msg);
//TODO center can check shm for previous crash.
const MQInfo &BHGlobalSenderAddress();
const MQInfo &BHTopicCenterAddress();
const MQInfo &BHTopicBusAddress();
bool BHNodeInit(const int64_t request, int64_t &reply);
void BHCenterHandleInit(std::function<int64_t(const int64_t)> const &onReq);
const MQInfo &BHTopicCenterAddress(SharedMemory &shm);
const MQInfo &BHTopicBusAddress(SharedMemory &shm);
bool BHNodeInit(SharedMemory &shm, const int64_t request, int64_t &reply);
void BHCenterHandleInit(SharedMemory &shm, std::function<int64_t(const int64_t)> const &onReq);
// node mq is avail with in timeout; after that may get killed.
int NodeTimeoutSec();
src/msg.cpp
@@ -23,13 +23,6 @@
namespace bhome_msg
{
ShmSocket &ShmMsg::Sender()
{
    static auto &mq = GetCenterInfo(shm())->mq_sender_;
    static ShmSocket sender(mq.offset_, shm(), mq.id_);
    return sender;
}
int ShmMsg::Release()
{
    if (!valid()) {
@@ -39,7 +32,7 @@
    if (n == 0) {
        if (meta()->managed_) {
            int64_t free_cmd = (id() << 4) | EncodeCmd(eCmdFree);
            Sender().Send(BHTopicCenterAddress(), free_cmd);
            DefaultSender(shm()).Send(BHTopicCenterAddress(shm()), free_cmd);
        } else {
            Free();
        }
src/msg.h
@@ -34,14 +34,9 @@
// ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message content layout: (meta) / header_size + header + data_size + data
class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg>
class ShmMsg
{
public:
    static inline SharedMemory &shm() { return GetData(); }
private:
    static ShmSocket &Sender();
    // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
    class RefCount : private boost::noncopyable
    {
@@ -58,11 +53,7 @@
    typedef int64_t OffsetType;
    static OffsetType Addr(void *ptr) { return reinterpret_cast<OffsetType>(ptr); }
    static void *Ptr(const OffsetType offset) { return reinterpret_cast<void *>(offset); }
    static inline OffsetType BaseAddr()
    {
        static const OffsetType base = Addr(shm().get_address()); // cache value.
        return base;
    }
    OffsetType BaseAddr() const { return Addr(shm().get_address()); }
    static const uint32_t kMsgTag = 0xf1e2d3c4;
    struct Meta {
@@ -83,7 +74,8 @@
            capacity_(size), id_(NewId()), timestamp_(NowSec()) {}
    };
    OffsetType offset_;
    static void *Alloc(const size_t size)
    SharedMemory *pshm_;
    void *Alloc(const size_t size)
    {
        void *p = shm().Alloc(sizeof(Meta) + size);
        if (p) {
@@ -132,24 +124,27 @@
        if (!addr) {
            return false;
        }
        ShmMsg(addr).swap(*this);
        offset_ = Addr(addr) - BaseAddr();
        return true;
    }
    ShmMsg(void *p) :
        offset_(p ? (Addr(p) - BaseAddr()) : 0) {}
    template <class T = void>
    T *get() const { return offset_ != 0 ? static_cast<T *>(Ptr(offset_ + BaseAddr())) : nullptr; }
public:
    static bool BindShm(SharedMemory &shm) { return SetData(shm); }
    ShmMsg() :
        offset_(0) {}
    explicit ShmMsg(const OffsetType offset) :
        offset_(offset) {}
    explicit ShmMsg(SharedMemory &shm) :
        offset_(0), pshm_(&shm) {}
    ShmMsg(const OffsetType offset, SharedMemory &shm) :
        offset_(offset), pshm_(&shm) {}
    OffsetType Offset() const { return offset_; }
    OffsetType &OffsetRef() { return offset_; }
    void swap(ShmMsg &a) { std::swap(offset_, a.offset_); }
    SharedMemory &shm() const { return *pshm_; }
    void swap(ShmMsg &a)
    {
        std::swap(offset_, a.offset_);
        std::swap(pshm_, a.pshm_);
    }
    bool valid() const { return offset_ != 0 && meta()->tag_ == kMsgTag; }
    int64_t id() const { return valid() ? meta()->id_ : 0; }
    int64_t timestamp() const { return valid() ? meta()->timestamp_.load() : 0; }
src/sendq.cpp
@@ -33,13 +33,14 @@
        } else {
            al.insert(al.begin(), Array())->emplace_back(std::move(tmp));
        }
        count_in_.Count1();
    } catch (std::exception &e) {
        LOG_ERROR() << "sendq error: " << e.what();
        throw e;
    }
}
int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr)
int SendQ::DoSend1Remote(const Remote remote, Array &arr)
{
    auto FirstNotExpired = [](Array &l) {
        auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
@@ -53,8 +54,8 @@
            info.on_expire_(info.data_);
        }
    }
    while (pos != arr.end() && mq.TrySend(pos->data().mq_, pos->data().data_)) {
    auto TrySend1 = [this](MsgInfo const &info) { return ShmMsgQueue::TrySend(shm_, info.mq_, info.data_); };
    while (pos != arr.end() && TrySend1(pos->data())) {
        ++pos;
    }
@@ -63,27 +64,26 @@
    return nprocessed;
}
int SendQ::DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &al)
int SendQ::DoSend1Remote(const Remote remote, ArrayList &al)
{
    int nsend = 0;
    auto AllSent = [&](Array &arr) {
        nsend += DoSend1Remote(mq, remote, arr);
        nsend += DoSend1Remote(remote, arr);
        return arr.empty();
    };
    for (auto it = al.begin(); it != al.end() && AllSent(*it); it = al.erase(it)) {}
    return nsend;
}
bool SendQ::TrySend(ShmMsgQueue &mq)
bool SendQ::TrySend()
{
    std::unique_lock<std::mutex> lock(mutex_out_);
    // if (TooFast()) { return false; }
    size_t nsend = 0;
    if (!out_.empty()) {
        auto rec = out_.begin();
        do {
            nsend += DoSend1Remote(mq, rec->first, rec->second);
            nsend += DoSend1Remote(rec->first, rec->second);
            if (rec->second.empty()) {
                rec = out_.erase(rec);
            } else {
@@ -91,6 +91,7 @@
            }
        } while (rec != out_.end());
    }
    count_out_.Count(nsend);
    auto Collect = [&]() {
        std::unique_lock<std::mutex> lock(mutex_in_);
@@ -109,14 +110,3 @@
    return !out_.empty();
}
bool SendQ::TooFast()
{
    auto cur = NowSec();
    if (cur > last_time_) {
        last_time_ = cur;
        count_ = 0;
    }
    return ++count_ > 1000 * 100;
} // not accurate in multi-thread.
src/sendq.h
@@ -46,11 +46,13 @@
    typedef TimedData<MsgInfo> TimedMsg;
    typedef TimedMsg::TimePoint TimePoint;
    typedef TimedMsg::Duration Duration;
    SendQ(SharedMemory &shm) :
        shm_(shm) {}
    bool Append(const MQInfo &mq, MsgI msg)
    {
        msg.AddRef();
        auto onMsgExpire = [](const Data &d) { MsgI(d).Release(); };
        auto onMsgExpire = [msg](const Data &d) mutable { msg.Release(); };
        try {
            AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
            return true;
@@ -63,9 +65,9 @@
    bool Append(const MQInfo &mq, MsgI msg, OnMsgEvent onExpire)
    {
        msg.AddRef();
        auto onMsgExpire = [onExpire](const Data &d) {
        auto onMsgExpire = [onExpire, msg](const Data &d) mutable {
            onExpire(d);
            MsgI(d).Release();
            msg.Release();
        };
        try {
            AppendData(mq, msg.Offset(), DefaultExpire(), onMsgExpire);
@@ -85,7 +87,7 @@
            return false;
        }
    }
    bool TrySend(ShmMsgQueue &mq);
    bool TrySend();
private:
    static TimePoint Now() { return TimedMsg::Clock::now(); }
@@ -96,18 +98,48 @@
    typedef std::list<Array> ArrayList;
    typedef std::unordered_map<Remote, ArrayList> Store;
    int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, Array &arr);
    int DoSend1Remote(ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
    int DoSend1Remote(const Remote remote, Array &arr);
    int DoSend1Remote(const Remote remote, ArrayList &arr);
    bool TooFast();
    SharedMemory &shm_;
    std::mutex mutex_in_;
    std::mutex mutex_out_;
    Store in_;
    Store out_;
    int64_t count_ = 0;
    int64_t last_time_ = 0;
    struct Counter {
        std::atomic<int64_t> count_;
        std::atomic<int64_t> count_1sec_;
        std::atomic<int64_t> last_time_;
        Counter() :
            count_(0), count_1sec_(0), last_time_(0) {}
        void Count1()
        {
            CheckTime();
            ++count_1sec_;
            ++count_;
        }
        void Count(int n)
        {
            CheckTime();
            count_1sec_ += n;
            count_ += n;
        }
        void CheckTime()
        {
            auto cur = NowSec();
            if (cur > last_time_) {
                count_1sec_ = 0;
                last_time_ = cur;
            }
        }
        int64_t GetCount() const { return count_.load(); }
        int64_t LastSec() const { return count_1sec_.load(); }
    };
    Counter count_in_;
    Counter count_out_;
};
#endif // end of include guard: SENDQ_IWKMSK7M
src/shm_msg_queue.cpp
@@ -31,12 +31,6 @@
} // namespace
ShmMsgQueue::MQId ShmMsgQueue::NewId()
{
    static auto &id = GetData("Must init shared memory before use! Please make sure center is running.");
    return (++id) * 10;
}
ShmMsgQueue::ShmMsgQueue(ShmType &segment, const MQId id, const int len) :
    id_(id),
    queue_(segment, MsgQIdToName(id_), len, segment.get_segment_manager())
@@ -84,7 +78,7 @@
            if (IsCmd(val)) {
                LOG_DEBUG() << "clsing queue " << id << ", has a cmd" << DecodeCmd(val);
            } else {
                MsgI(val).Release();
                MsgI(val, shm).Release();
            }
        }
    }
src/shm_msg_queue.h
@@ -27,7 +27,7 @@
#define BH_USE_ATOMIC_Q
class ShmMsgQueue : public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
class ShmMsgQueue
{
public:
    typedef int64_t RawData;
@@ -45,8 +45,6 @@
    typedef Shmq::Data Queue;
    typedef Shmq::ShmType ShmType;
    typedef uint64_t MQId;
    static MQId NewId();
    ShmMsgQueue(ShmType &segment, const MQId id, const int len);
    ShmMsgQueue(ShmType &segment, const bool create_or_else_find, const MQId id, const int len);
src/shm_socket.cpp
@@ -30,18 +30,18 @@
using namespace bhome_shm;
ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
    run_(false), mq_(shm, id, len), alloc_id_(0) { Start(); }
    run_(false), mq_(shm, id, len), alloc_id_(0), send_buffer_(shm) { Start(); }
ShmSocket::ShmSocket(Shm &shm, const bool create_or_else_find, const MQId id, const int len) :
    run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0) { Start(); }
    run_(false), mq_(shm, create_or_else_find, id, len), alloc_id_(0), send_buffer_(shm) { Start(); }
ShmSocket::ShmSocket(int64_t abs_addr, Shm &shm, const MQId id) :
    run_(false), mq_(abs_addr, shm, id), alloc_id_(0) { Start(); }
    run_(false), mq_(abs_addr, shm, id), alloc_id_(0), send_buffer_(shm) { Start(); }
ShmSocket::~ShmSocket() { Stop(); }
bool ShmSocket::Start(int nworker, const RecvCB &onData, const RawRecvCB &onRaw, const IdleCB &onIdle)
{
    auto ioProc = [this, onData, onRaw, onIdle]() {
        auto DoSend = [this]() { return send_buffer_.TrySend(mq()); };
        auto DoSend = [this]() { return send_buffer_.TrySend(); };
        auto DoRecv = [=] {
            // do not recv if no cb is set.
            if (!onData && per_msg_cbs_->empty() && !onRaw && alloc_cbs_->empty()) { return false; }
@@ -73,7 +73,7 @@
                if (IsCmd(val)) {
                    onCmdCB(*this, val);
                } else {
                    MsgI imsg(val);
                    MsgI imsg(val, shm());
                    DEFER1(imsg.Release());
                    BHMsgHead head;
                    if (imsg.ParseHead(head)) {
@@ -113,7 +113,7 @@
        while (run_) { ioProc(); }
        // try send pending msgs.
        auto end_time = steady_clock::now() + 3s;
        while (send_buffer_.TrySend(mq()) && steady_clock::now() < end_time) {
        while (send_buffer_.TrySend() && steady_clock::now() < end_time) {
            // LOG_DEBUG() << "try send pending msgs.";
        }
    };
@@ -170,7 +170,7 @@
    };
#if 0
    // self alloc
    MsgI msg;
    MsgI msg(shm());
    if (msg.Make(size)) {
        DEFER1(msg.Release());
        return OnResult(msg);
@@ -194,7 +194,7 @@
                  (id << 4) |
                  EncodeCmd(eCmdAllocRequest0);
    auto rawCB = [onResult](ShmSocket &sock, int64_t &val) {
        MsgI msg((val >> 32) & MaskBits(31));
        MsgI msg(((val >> 32) & MaskBits(31)), sock.shm());
        DEFER1(msg.Release());
        onResult(msg);
        return true;
@@ -206,5 +206,5 @@
        alloc_cbs_->Pick(id, cb_no_use);
    };
    return Send(BHTopicCenterAddress(), cmd, onExpireRemoveCB);
    return Send(BHTopicCenterAddress(shm()), cmd, onExpireRemoveCB);
}
src/shm_socket.h
@@ -66,22 +66,6 @@
    bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
    bool Stop();
    template <class Body>
    bool CenterSend(const MQInfo &remote, BHMsgHead &head, Body &body)
    {
        try {
            //TODO alloc outsiez and use send.
            MsgI msg;
            if (!msg.Make(head, body)) { return false; }
            DEFER1(msg.Release());
            return Send(remote, msg);
        } catch (...) {
            SetLastError(eError, "Send internal error.");
            return false;
        }
    }
    bool RequestAlloc(const int64_t size, std::function<void(MsgI &msg)> const &onResult);
    template <class Body>
@@ -155,9 +139,9 @@
    bool Send(const MQInfo &remote, std::string &&content, const std::string &msg_id, RecvCB &&cb = RecvCB());
    template <class... Rest>
    bool SendImpl(const MQInfo &remote, Rest &&...rest)
    bool SendImpl(Rest &&...rest)
    {
        return send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
        return send_buffer_.Append(std::forward<decltype(rest)>(rest)...);
    }
    std::vector<std::thread> workers_;
@@ -188,12 +172,13 @@
    Synced<CallbackRecords<std::string, RecvCB>> per_msg_cbs_;
    Synced<CallbackRecords<int, RawRecvCB>> alloc_cbs_;
    SendQ send_buffer_;
    // node request center alloc memory.
    int node_proc_index_ = -1;
    int socket_index_ = -1;
    std::atomic<int> alloc_id_;
    SendQ send_buffer_;
};
#endif // end of include guard: SHM_SOCKET_GWTJHBPO
src/topic_node.cpp
@@ -66,13 +66,13 @@
    }
    if (ssn_id_ == 0) {
        ssn_id_ = ShmMsgQueue::NewId();
        ssn_id_ = NewSession();
    }
    LOG_DEBUG() << "Node Init, id " << ssn_id_;
    auto NodeInit = [&]() {
        int64_t init_request = ssn_id_ << 4 | EncodeCmd(eCmdNodeInit);
        int64_t reply = 0;
        if (BHNodeInit(init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) {
        if (BHNodeInit(shm(), init_request, reply) && DecodeCmd(reply) == eCmdNodeInitReply) {
            int64_t abs_addr = reply >> 4;
            sockets_.emplace_back(new ShmSocket(abs_addr, shm_, ssn_id_));
            LOG_DEBUG() << "node init ok";
@@ -94,7 +94,7 @@
                auto head = InitMsgHead(GetType(body), info_.proc_id(), ssn_id_);
                AddRoute(head, socket);
                if (imsg.Fill(head, body)) {
                    socket.Send(BHTopicCenterAddress(), imsg);
                    socket.Send(CenterAddr(), imsg);
                }
            } break;
            case kMsgTypeProcInitReply: {
@@ -187,12 +187,12 @@
            MsgCommonReply body;
            CheckResult(imsg, head, body);
        };
        return sock.Send(BHTopicCenterAddress(), head, body, onResult);
        return sock.Send(CenterAddr(), head, body, onResult);
    } else {
        MsgI reply;
        MsgI reply(shm());
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms);
        if (r) {
            CheckResult(reply, reply_head, reply_body);
        }
@@ -228,12 +228,12 @@
            MsgCommonReply body;
            CheckResult(imsg, head, body);
        };
        return sock.Send(BHTopicCenterAddress(), head, body, onResult);
        return sock.Send(CenterAddr(), head, body, onResult);
    } else {
        MsgI reply;
        MsgI reply(shm());
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms);
        return r && CheckResult(reply, reply_head, reply_body);
    }
}
@@ -253,12 +253,12 @@
    AddRoute(head, sock);
    if (timeout_ms == 0) {
        return sock.Send(BHTopicCenterAddress(), head, body);
        return sock.Send(CenterAddr(), head, body);
    } else {
        MsgI reply;
        MsgI reply(shm());
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms);
        r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
        return (r && IsSuccess(reply_body.errmsg().errcode()));
    }
@@ -282,10 +282,10 @@
    BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
    AddRoute(head, sock);
    MsgI reply;
    MsgI reply(shm());
    DEFER1(reply.Release());
    BHMsgHead reply_head;
    return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
    return (sock.SendAndRecv(CenterAddr(), head, query, reply, reply_head, timeout_ms) &&
            reply_head.type() == kMsgTypeQueryTopicReply &&
            reply.ParseBody(reply_body));
}
@@ -301,10 +301,10 @@
    BHMsgHead head(InitMsgHead(GetType(query), proc_id(), ssn()));
    AddRoute(head, sock);
    MsgI reply;
    MsgI reply(shm());
    DEFER1(reply.Release());
    BHMsgHead reply_head;
    return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
    return (sock.SendAndRecv(CenterAddr(), head, query, reply, reply_head, timeout_ms) &&
            reply_head.type() == kMsgTypeQueryProcReply &&
            reply.ParseBody(reply_body));
}
@@ -324,12 +324,12 @@
    AddRoute(head, sock);
    if (timeout_ms == 0) {
        return sock.Send(BHTopicCenterAddress(), head, body);
        return sock.Send(CenterAddr(), head, body);
    } else {
        MsgI reply;
        MsgI reply(shm());
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        bool r = sock.SendAndRecv(CenterAddr(), head, body, reply, reply_head, timeout_ms);
        r = r && reply_head.type() == kMsgTypeCommonReply;
        r = r && reply.ParseBody(reply_body);
        return r;
@@ -525,7 +525,7 @@
            AddRoute(head, sock);
            head.set_topic(request.topic());
            MsgI reply_msg;
            MsgI reply_msg(shm());
            DEFER1(reply_msg.Release(););
            BHMsgHead reply_head;
@@ -596,13 +596,13 @@
        AddRoute(head, sock);
        if (timeout_ms == 0) {
            return sock.Send(BHTopicBusAddress(), head, pub);
            return sock.Send(BusAddr(), head, pub);
        } else {
            MsgI reply;
            MsgI reply(shm());
            DEFER1(reply.Release(););
            BHMsgHead reply_head;
            MsgCommonReply reply_body;
            return sock.SendAndRecv(BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
            return sock.SendAndRecv(BusAddr(), head, pub, reply, reply_head, timeout_ms) &&
                   reply_head.type() == kMsgTypeCommonReply &&
                   reply.ParseBody(reply_body) &&
                   IsSuccess(reply_body.errmsg().errcode());
@@ -629,12 +629,12 @@
        BHMsgHead head(InitMsgHead(GetType(sub), proc_id(), ssn()));
        AddRoute(head, sock);
        if (timeout_ms == 0) {
            return sock.Send(BHTopicBusAddress(), head, sub);
            return sock.Send(BusAddr(), head, sub);
        } else {
            MsgI reply;
            MsgI reply(shm());
            DEFER1(reply.Release(););
            BHMsgHead reply_head;
            return sock.SendAndRecv(BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
            return sock.SendAndRecv(BusAddr(), head, sub, reply, reply_head, timeout_ms) &&
                   reply_head.type() == kMsgTypeCommonReply &&
                   reply.ParseBody(reply_body) &&
                   IsSuccess(reply_body.errmsg().errcode());
src/topic_node.h
@@ -34,7 +34,9 @@
    SharedMemory &shm_;
    ProcInfo info_;
    SharedMemory &shm() { return shm_; }
    SharedMemory &shm() const { return shm_; }
    const MQInfo &CenterAddr() const { return BHTopicCenterAddress(shm()); }
    const MQInfo &BusAddr() const { return BHTopicBusAddress(shm()); }
public:
    TopicNode(SharedMemory &shm);
utest/simple_tests.cpp
@@ -108,12 +108,12 @@
{
    SharedMemory &shm = TestShm();
    GlobalInit(shm);
    ShmMsgQueue q(shm, ShmMsgQueue::NewId(), 64);
    ShmMsgQueue q(shm, NewSession(), 64);
    for (int i = 0; i < 2; ++i) {
        int ms = i * 100;
        printf("Timeout Test %4d: ", ms);
        boost::timer::auto_cpu_timer timer;
        MsgI msg;
        MsgI msg(shm);
        bool r = q.Recv(msg, ms);
        BOOST_CHECK(!r);
    }
@@ -125,7 +125,7 @@
    typedef MsgI Msg;
    GlobalInit(shm);
    Msg m0(1000);
    Msg m0(1000, shm);
    BOOST_CHECK(m0.valid());
    BOOST_CHECK_EQUAL(m0.Count(), 1);
    Msg m1 = m0;
utest/speed_test.cpp
@@ -24,7 +24,7 @@
{
    SharedMemory &shm = TestShm();
    GlobalInit(shm);
    MQId server_id = ShmMsgQueue::NewId();
    MQId server_id = NewSession();
    ShmMsgQueue server(server_id, shm, 1000);
    const int timeout = 1000;
@@ -35,10 +35,10 @@
    std::string str(data_size, 'a');
    auto Writer = [&](int writer_id, uint64_t n) {
        MQId cli_id = ShmMsgQueue::NewId();
        MQId cli_id = NewSession();
        ShmMsgQueue mq(cli_id, shm, 64);
        MsgI msg;
        MsgI msg(shm);
        MsgRequestTopic body;
        body.set_topic("topic");
        body.set_data(str);
@@ -58,7 +58,7 @@
        auto now = []() { return steady_clock::now(); };
        auto tm = now();
        while (*run) {
            MsgI msg;
            MsgI msg(shm);
            BHMsgHead head;
            if (mq.TryRecv(msg)) {
                DEFER1(msg.Release());
@@ -149,8 +149,8 @@
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    ShmSocket srv(shm, ShmMsgQueue::NewId(), qlen);
    ShmSocket cli(shm, ShmMsgQueue::NewId(), qlen);
    ShmSocket srv(shm, NewSession(), qlen);
    ShmSocket cli(shm, NewSession(), qlen);
    int ncli = 1;
    uint64_t nmsg = 1000 * 1000 * 1;