lichao
2021-04-25 1fbfef2a51db4a3bac9d8a5b87af94a40a913b7a
change mqid from uuid to uint64.
21个文件已修改
341 ■■■■ 已修改文件
box/center.cpp 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.h 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center_main.cc 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/status_main.cc 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/source/bhome_msg_api.proto 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_api.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/bh_util.h 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.cpp 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/defs.h 14 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/msg.h 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.cpp 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/sendq.h 22 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm.h 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.cpp 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/shm_queue.h 34 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.cpp 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/socket.h 34 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
src/topic_node.cpp 44 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/api_test.cpp 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/speed_test.cpp 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
utest/utest.cpp 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
box/center.cpp
@@ -37,9 +37,9 @@
{
public:
    typedef std::string ProcId;
    typedef std::string Address;
    typedef MQId Address;
    typedef bhome_msg::ProcInfo ProcInfo;
    typedef std::function<void(Address const &)> Cleaner;
    typedef std::function<void(Address const)> Cleaner;
private:
    enum {
@@ -84,7 +84,7 @@
        WeakNode weak_node_;
        bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
    };
    inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
    inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
    inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
    NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) :
@@ -182,7 +182,7 @@
    {
        return HandleMsg(
            head, [&](Node node) -> MsgCommonReply {
                auto &src = SrcAddr(head);
                auto src = SrcAddr(head);
                auto &topics = msg.topics().topic_list();
                node->services_[src].insert(topics.begin(), topics.end());
                TopicDest dest = {src, node};
@@ -240,7 +240,7 @@
    MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
    {
        return HandleMsg(head, [&](Node node) {
            auto &src = SrcAddr(head);
            auto src = SrcAddr(head);
            auto &topics = msg.topics().topic_list();
            node->subscriptions_[src].insert(topics.begin(), topics.end());
            TopicDest dest = {src, node};
@@ -253,7 +253,7 @@
    MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
    {
        return HandleMsg(head, [&](Node node) {
            auto &src = SrcAddr(head);
            auto src = SrcAddr(head);
            auto pos = node->subscriptions_.find(src);
            auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
@@ -426,8 +426,8 @@
    auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
        return [&](auto &&rep_body) {
            auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
            auto &remote = head.route(0).mq_id();
            socket.Send(remote.data(), reply_head, rep_body);
            auto remote = head.route(0).mq_id();
            socket.Send(remote, reply_head, rep_body);
        };
    };
@@ -473,7 +473,7 @@
                    if (node) {
                        // should also make sure that mq is not killed before msg expires.
                        // it would be ok if (kill_time - offline_time) is longer than expire time.
                        socket.Send(cli.mq_.data(), msg);
                        socket.Send(cli.mq_, msg);
                        ++it;
                    } else {
                        it = clients.erase(it);
@@ -505,28 +505,24 @@
    return rec;
}
bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len)
bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
{
    Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len};
    return true;
}
bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len)
{
    return Install(name, handler, idle, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
}
BHCenter::BHCenter(Socket::Shm &shm)
{
    auto gc = [&](const std::string &id) {
        auto r = ShmSocket::Remove(shm, *(MQId *) id.data());
        printf("remove mq : %s\n", r ? "ok" : "failed");
    auto gc = [&](const MQId id) {
        auto r = ShmSocket::Remove(shm, id);
        printf("remove mq %ld : %s\n", id, (r ? "ok" : "failed"));
    };
    AddCenter("#bhome_center", gc);
    for (auto &kv : Centers()) {
        auto &info = kv.second;
        sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_);
        sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_);
    }
}
box/center.h
@@ -30,8 +30,7 @@
public:
    typedef Socket::PartialRecvCB MsgHandler;
    typedef Socket::IdleCB IdleHandler;
    static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len);
    static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len);
    static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
    BHCenter(Socket::Shm &shm);
    ~BHCenter() { Stop(); }
@@ -43,7 +42,7 @@
        std::string name_;
        MsgHandler handler_;
        IdleHandler idle_;
        std::string mqid_;
        MQId mqid_;
        int mq_len_ = 0;
    };
    typedef std::map<std::string, CenterInfo> CenterRecords;
box/center_main.cc
@@ -44,8 +44,8 @@
            return true;
        }
        auto mtx(shm_.find_or_construct<Mutex>((name_ + "_mutex_0").c_str())());
        auto time_stamp(shm_.find_or_construct<int64_t>((name_ + "_timestamp_0").c_str())(0));
        auto mtx(shm_.FindOrCreate<Mutex>(name_ + "_mutex_0"));
        auto time_stamp(shm_.FindOrCreate<int64_t>(name_ + "_timestamp_0", 0));
        if (mtx && time_stamp) {
            Guard lock(*mtx);
@@ -86,7 +86,7 @@
int center_main(int argc, const char *argv[])
{
    auto &shm = BHomeShm();
    MsgI::BindShm(shm);
    GlobalInit(shm);
    AppArg args(argc, argv);
    if (args.Has("remove")) {
box/status_main.cc
@@ -44,7 +44,7 @@
            return shm_name;
        }
    };
    printf("monitoring shm : %s, size : %dM\n", DisplayName().c_str(), shm_size);
    printf("monitoring shm : %s, size : %ldM\n", DisplayName().c_str(), shm_size);
    SharedMemory shm(shm_name, 1024 * 1024 * shm_size);
    std::atomic<bool> run(true);
proto/source/bhome_msg_api.proto
@@ -8,8 +8,8 @@
package bhome_msg;
message BHAddress {
    bytes mq_id = 1; // mqid, uuid
    bytes ip = 2;   //
    uint64 mq_id = 1;
    bytes ip = 2;
    int32 port = 3;
}
src/bh_api.cpp
@@ -10,7 +10,7 @@
{
TopicNode &ProcNode()
{
    static bool init_bind_msg_shm = MsgI::BindShm(BHomeShm());
    static bool init = GlobalInit(BHomeShm());
    static TopicNode node(BHomeShm());
    return node;
}
src/bh_util.h
@@ -143,6 +143,31 @@
    }
};
template <class T, class Tag>
class StaticDataRef
{
    typedef T *Ptr;
    static inline Ptr &ptr()
    {
        static Ptr sp(nullptr);
        return sp;
    }
protected:
    static inline T &GetData()
    {
        if (!ptr()) { throw std::string("Must set ShmMsg shm before use!"); }
        return *ptr();
    }
public:
    static bool SetData(T &t)
    {
        auto Bind = [&]() { ptr() = &t; return true; };
        return ptr() ? false : Bind();
    }
};
// macro helper
#define JOIN_IMPL(a, b) a##b
#define JOIN(a, b) JOIN_IMPL(a, b)
src/defs.cpp
@@ -16,14 +16,11 @@
 * =====================================================================================
 */
#include "defs.h"
#include "shm.h"
#include "msg.h"
#include "shm_queue.h"
namespace
{
const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
const MQId kBHTopicCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff");
struct LastError {
    int ec_ = 0;
@@ -38,16 +35,20 @@
} // namespace
const MQId &BHTopicBusAddress() { return kBHTopicBus; }
const MQId &BHTopicCenterAddress() { return kBHTopicCenter; }
const MQId &BHUniCenterAddress() { return kBHUniCenter; }
bhome_shm::SharedMemory &BHomeShm()
{
    static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
    return shm;
}
bool GlobalInit(bhome_shm::SharedMemory &shm)
{
    MsgI::BindShm(shm);
    typedef std::atomic<MQId> IdSrc;
    IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
    return ShmMsgQueue::SetData(*psrc);
}
void SetLastError(const int ec, const std::string &msg)
{
    LastErrorStore().ec_ = ec;
src/defs.h
@@ -19,15 +19,16 @@
#ifndef DEFS_KP8LKGD0
#define DEFS_KP8LKGD0
#include <boost/uuid/uuid.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <string>
typedef boost::uuids::uuid MQId;
typedef uint64_t MQId;
const MQId &BHTopicBusAddress();
const MQId &BHTopicCenterAddress();
const MQId &BHUniCenterAddress();
const MQId kBHTopicCenter = 100;
const MQId kBHTopicBus = 101;
const MQId kBHUniCenter = 102;
inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
inline const MQId BHUniCenterAddress() { return kBHUniCenter; }
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
@@ -37,6 +38,7 @@
} // namespace bhome_shm
bhome_shm::SharedMemory &BHomeShm();
bool GlobalInit(bhome_shm::SharedMemory &shm);
typedef std::string Topic;
void SetLastError(const int ec, const std::string &msg);
void GetLastError(int &ec, std::string &msg);
src/msg.h
@@ -23,7 +23,6 @@
#include "shm.h"
#include <atomic>
#include <boost/interprocess/offset_ptr.hpp>
#include <boost/uuid/uuid_generators.hpp>
#include <functional>
#include <stdint.h>
@@ -34,11 +33,10 @@
// ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message content layout: (meta) / header_size + header + data_size + data
typedef boost::uuids::uuid MQId;
class ShmMsg
class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg>
{
private:
    static inline SharedMemory &shm() { return GetData(); }
    // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
    class RefCount : private boost::noncopyable
    {
@@ -58,16 +56,6 @@
    {
        static const Offset base = Addr(shm().get_address()); // cache value.
        return base;
    }
    static inline SharedMemory &shm()
    {
        if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); }
        return *pshm();
    }
    static inline SharedMemory *&pshm()
    {
        static SharedMemory *pshm = 0;
        return pshm;
    }
    static const uint32_t kMsgTag = 0xf1e2d3c4;
@@ -145,13 +133,7 @@
    T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); }
public:
    static bool BindShm(SharedMemory &shm)
    {
        assert(!pshm());
        pshm() = &shm;
        return true;
    }
    static bool BindShm(SharedMemory &shm) { return SetData(shm); }
    ShmMsg() :
        ShmMsg(nullptr) {}
    explicit ShmMsg(const size_t size) :
src/sendq.cpp
@@ -19,7 +19,7 @@
#include "shm_queue.h"
#include <chrono>
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr)
{
    auto FirstNotExpired = [](Array &l) {
        auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
@@ -41,7 +41,7 @@
        bool r = false;
        if (d.index() == 0) {
            auto &msg = boost::variant2::get<0>(pos->data().data_);
            r = mq.TrySend(*(MQId *) remote.data(), msg);
            r = mq.TrySend(remote, msg);
            if (r) {
                msg.Release();
            }
@@ -50,7 +50,7 @@
            MsgI msg;
            if (msg.Make(content)) {
                DEFER1(msg.Release(););
                r = mq.TrySend(*(MQId *) remote.data(), msg);
                r = mq.TrySend(remote, msg);
            }
        }
        return r;
@@ -65,7 +65,7 @@
    return nprocessed;
}
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al)
int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &al)
{
    int nsend = 0;
    auto AllSent = [&](Array &arr) {
src/sendq.h
@@ -37,7 +37,7 @@
class SendQ
{
public:
    typedef std::string Remote;
    typedef MQId Remote;
    typedef bhome_msg::MsgI MsgI;
    typedef std::string Content;
    typedef boost::variant2::variant<MsgI, Content> Data;
@@ -50,18 +50,18 @@
    typedef TimedMsg::TimePoint TimePoint;
    typedef TimedMsg::Duration Duration;
    template <class... Rest>
    void Append(const MQId &id, Rest &&...rest)
    {
        Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
    }
    // template <class... Rest>
    // void Append(const MQId &id, Rest &&...rest)
    // {
    //     Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
    // }
    void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
    void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent())
    {
        msg.AddRef();
        AppendData(addr, Data(msg), DefaultExpire(), onExpire);
    }
    void Append(const Remote &addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
    void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
    {
        AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
    }
@@ -71,7 +71,7 @@
private:
    static TimePoint Now() { return TimedMsg::Clock::now(); }
    static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
    void AppendData(const Remote &addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
    void AppendData(const Remote addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
    {
        //TODO simple queue, organize later ?
@@ -88,8 +88,8 @@
    typedef std::list<Array> ArrayList;
    typedef std::unordered_map<Remote, ArrayList> Store;
    int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr);
    int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr);
    int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr);
    int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
    std::mutex mutex_in_;
    std::mutex mutex_out_;
src/shm.h
@@ -25,7 +25,6 @@
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/noncopyable.hpp>
#include <boost/uuid/uuid.hpp>
#include <chrono>
#include <thread>
@@ -103,7 +102,16 @@
    ~SharedMemory();
    std::string name() const { return name_; }
    bool Remove() { return Remove(name()); }
    template <class T, class... Params>
    T *FindOrCreate(const std::string &name, Params &&...params)
    {
        return find_or_construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...);
    }
    template <class T, class... Params>
    T *Create(const std::string &name, Params &&...params)
    {
        return construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...);
    }
    void *Alloc(const size_t size) { return allocate(size, std::nothrow); }
    void Dealloc(void *p)
    {
@@ -113,7 +121,7 @@
    void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); }
    template <class T, class... Params>
    T *New(Params const &...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); }
    T *New(Params &&...params) { return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...); }
    template <class T>
    void Delete(T *p)
    {
@@ -157,7 +165,7 @@
    ShmObject(ShmType &segment, const std::string &name, Params &&...t) :
        shm_(segment), name_(name)
    {
        pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...);
        pdata_ = shm_.Create<Data>(ObjName(name_), std::forward<decltype(t)>(t)...);
        if (!IsOk()) {
            throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
        }
src/shm_queue.cpp
@@ -18,20 +18,21 @@
#include "shm_queue.h"
#include "bh_util.h"
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
namespace bhome_shm
{
using namespace bhome_msg;
using namespace boost::interprocess;
using namespace boost::uuids;
namespace
{
std::string MsgQIdToName(const MQId &id) { return "shmq" + to_string(id); }
// MQId EmptyId() { return nil_uuid(); }
MQId NewId() { return random_generator()(); }
std::string MsgQIdToName(const ShmMsgQueue::MQId id)
{
    char buf[40] = "mqOx";
    int n = sprintf(buf + 4, "%lx", id);
    return std::string(buf, n + 4);
}
const int AdjustMQLength(const int len)
{
    const int kMaxLength = 10000;
@@ -47,8 +48,13 @@
} // namespace
ShmMsgQueue::MQId ShmMsgQueue::NewId()
{
    static auto &id = GetData();
    return ++id;
}
// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len) :
ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
    Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()),
    id_(id)
{
@@ -59,7 +65,7 @@
ShmMsgQueue::~ShmMsgQueue() {}
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId &id)
bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
{
    Queue *q = Find(shm, id);
    if (q) {
@@ -71,12 +77,12 @@
    return Super::Remove(shm, MsgQIdToName(id));
}
ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId &remote_id)
ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id)
{
    return Super::Find(shm, MsgQIdToName(remote_id));
}
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend)
bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend)
{
    Queue *remote = Find(shm, remote_id);
    if (remote) {
src/shm_queue.h
@@ -21,6 +21,7 @@
#include "msg.h"
#include "shm.h"
#include <atomic>
#include <boost/circular_buffer.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
@@ -29,8 +30,6 @@
template <class D>
using Circular = boost::circular_buffer<D, Allocator<D>>;
typedef boost::uuids::uuid MQId;
template <class D>
class SharedQueue : private Circular<D>
@@ -137,32 +136,32 @@
using namespace bhome_msg;
class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>>
class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>>, public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
{
    typedef ShmObject<SharedQueue<MsgI>> Super;
    typedef Super::Data Queue;
    typedef std::function<void()> OnSend;
    MQId id_;
protected:
    ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
public:
    ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
    typedef uint64_t MQId;
    static MQId NewId();
    ShmMsgQueue(const MQId id, ShmType &segment, const int len);
    ShmMsgQueue(ShmType &segment, const int len);
    ~ShmMsgQueue();
    static bool Remove(SharedMemory &shm, const MQId &id);
    const MQId &Id() const { return id_; }
    static bool Remove(SharedMemory &shm, const MQId id);
    MQId Id() const { return id_; }
    using Super::shm;
    bool Recv(MsgI &msg, const int timeout_ms) { return data()->Read(msg, timeout_ms); }
    bool TryRecv(MsgI &msg) { return data()->TryRead(msg); }
    template <class OnData>
    int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); }
    static Queue *Find(SharedMemory &shm, const MQId &remote_id);
    // static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend());
    static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
    static Queue *Find(SharedMemory &shm, const MQId remote_id);
    static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
    template <class Iter>
    static int TrySendAll(SharedMemory &shm, const MQId &remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
    static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
    {
        Queue *remote = Find(shm, remote_id);
        if (remote) {
@@ -177,14 +176,13 @@
        }
    }
    // template <class... Rest>
    // bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
    template <class... Rest>
    bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
    bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
    template <class... Rest>
    int TrySendAll(const MQId &remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
    int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
    size_t Pending() const { return data()->size(); }
private:
    MQId id_;
};
} // namespace bhome_shm
src/socket.cpp
@@ -24,7 +24,7 @@
using namespace bhome_msg;
using namespace bhome_shm;
ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
    run_(false), mq_(id, shm, len)
{
    Start();
src/socket.h
@@ -33,44 +33,37 @@
#include <vector>
using namespace bhome_msg;
class ShmSocket : private boost::noncopyable
{
    template <class... T>
    bool SendImpl(const void *valid_remote, T &&...rest)
    {
        send_buffer_.Append(*static_cast<const MQId *>(valid_remote), std::forward<decltype(rest)>(rest)...);
        return true;
    }
protected:
    typedef bhome_shm::ShmMsgQueue Queue;
public:
    typedef ShmMsgQueue::MQId MQId;
    typedef bhome_shm::SharedMemory Shm;
    typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
    typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
    typedef std::function<void(ShmSocket &sock)> IdleCB;
    ShmSocket(Shm &shm, const MQId &id, const int len);
    ShmSocket(Shm &shm, const MQId id, const int len);
    ShmSocket(Shm &shm, const int len = 12);
    ~ShmSocket();
    static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); }
    static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
    bool Remove() { return Remove(shm(), id()); }
    const MQId &id() const { return mq().Id(); }
    MQId id() const { return mq().Id(); }
    // start recv.
    bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
    bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
    bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
    bool Stop();
    size_t Pending() const { return mq().Pending(); }
    template <class Body>
    bool Send(const void *valid_remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
    bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
    {
        try {
            if (!cb) {
                return SendImpl(valid_remote, MsgI::Serialize(head, body));
                return SendImpl(remote, MsgI::Serialize(head, body));
            } else {
                std::string msg_id(head.msg_id());
                per_msg_cbs_->Store(msg_id, std::move(cb));
@@ -78,7 +71,7 @@
                    RecvCB cb_no_use;
                    per_msg_cbs_->Pick(msg_id, cb_no_use);
                };
                return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB);
                return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB);
            }
        } catch (...) {
            SetLastError(eError, "Send internal error.");
@@ -86,15 +79,15 @@
        }
    }
    bool Send(const void *valid_remote, const MsgI &imsg)
    bool Send(const MQId remote, const MsgI &imsg)
    {
        return SendImpl(valid_remote, imsg);
        return SendImpl(remote, imsg);
    }
    bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
    template <class Body>
    bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
    bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
    {
        struct State {
            std::mutex mutex;
@@ -144,6 +137,13 @@
    bool StopNoLock();
    bool RunningNoLock() { return !workers_.empty(); }
    template <class... Rest>
    bool SendImpl(const MQId remote, Rest &&...rest)
    {
        send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
        return true;
    }
    std::vector<std::thread> workers_;
    std::mutex mutex_;
    std::atomic<bool> run_;
src/topic_node.cpp
@@ -25,7 +25,7 @@
namespace
{
inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); }
struct SrcInfo {
    std::vector<BHAddress> route;
@@ -82,7 +82,7 @@
    auto &sock = SockNode();
    MsgRegister body;
    body.mutable_proc()->Swap(&proc);
    auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); };
    auto AddId = [&](const MQId id) { body.add_addrs()->set_mq_id(id); };
    AddId(SockNode().id());
    AddId(SockServer().id());
    AddId(SockClient().id());
@@ -108,12 +108,12 @@
            MsgCommonReply body;
            CheckResult(imsg, head, body);
        };
        return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
        return sock.Send(BHTopicCenterAddress(), head, body, onResult);
    } else {
        MsgI reply;
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        if (r) {
            CheckResult(reply, reply_head, reply_body);
        }
@@ -144,12 +144,12 @@
            MsgCommonReply body;
            CheckResult(imsg, head, body);
        };
        return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
        return sock.Send(BHTopicCenterAddress(), head, body, onResult);
    } else {
        MsgI reply;
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        return r && CheckResult(reply, reply_head, reply_body);
    }
}
@@ -169,12 +169,12 @@
    AddRoute(head, sock.id());
    if (timeout_ms == 0) {
        return sock.Send(&BHTopicCenterAddress(), head, body);
        return sock.Send(BHTopicCenterAddress(), head, body);
    } else {
        MsgI reply;
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
        return (r && IsSuccess(reply_body.errmsg().errcode()));
    }
@@ -201,7 +201,7 @@
    MsgI reply;
    DEFER1(reply.Release());
    BHMsgHead reply_head;
    return (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
    return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
            reply_head.type() == kMsgTypeQueryTopicReply &&
            reply.ParseBody(reply_body));
}
@@ -221,12 +221,12 @@
    AddRoute(head, sock.id());
    if (timeout_ms == 0) {
        return sock.Send(&BHTopicCenterAddress(), head, body);
        return sock.Send(BHTopicCenterAddress(), head, body);
    } else {
        MsgI reply;
        DEFER1(reply.Release(););
        BHMsgHead reply_head;
        bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
        r = r && reply_head.type() == kMsgTypeCommonReply;
        r = r && reply.ParseBody(reply_body);
        return r;
@@ -247,8 +247,8 @@
            for (int i = 0; i < head.route_size() - 1; ++i) {
                reply_head.add_route()->Swap(head.mutable_route(i));
            }
            auto &remote = head.route().rbegin()->mq_id();
            sock.Send(remote.data(), reply_head, reply_body);
            auto remote = head.route().rbegin()->mq_id();
            sock.Send(remote, reply_head, reply_body);
        }
    };
@@ -315,7 +315,7 @@
    for (unsigned i = 0; i < p->route.size() - 1; ++i) {
        head.add_route()->Swap(&p->route[i]);
    }
    return sock.Send(p->route.back().mq_id().data(), head, body);
    return sock.Send(p->route.back().mq_id(), head, body);
}
bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -361,9 +361,9 @@
                    }
                }
            };
            return sock.Send(addr.mq_id().data(), head, req, onRecv);
            return sock.Send(addr.mq_id(), head, req, onRecv);
        } else {
            return sock.Send(addr.mq_id().data(), head, req);
            return sock.Send(addr.mq_id(), head, req);
        }
    };
@@ -396,7 +396,7 @@
            DEFER1(reply_msg.Release(););
            BHMsgHead reply_head;
            if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) &&
            if (sock.SendAndRecv(addr.mq_id(), head, request, reply_msg, reply_head, timeout_ms) &&
                reply_head.type() == kMsgTypeRequestTopicReply &&
                reply_msg.ParseBody(out_reply)) {
                reply_head.mutable_proc_id()->swap(out_proc_id);
@@ -441,7 +441,7 @@
    std::vector<NodeAddress> lst;
    if (QueryRPCTopics(topic, lst, timeout_ms)) {
        addr = lst.front().addr();
        if (!addr.mq_id().empty()) {
        if (addr.mq_id() != 0) {
            topic_query_cache_.Store(topic, addr);
            return true;
        }
@@ -464,13 +464,13 @@
        AddRoute(head, sock.id());
        if (timeout_ms == 0) {
            return sock.Send(&BHTopicBusAddress(), head, pub);
            return sock.Send(BHTopicBusAddress(), head, pub);
        } else {
            MsgI reply;
            DEFER1(reply.Release(););
            BHMsgHead reply_head;
            MsgCommonReply reply_body;
            return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
            return sock.SendAndRecv(BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
                   reply_head.type() == kMsgTypeCommonReply &&
                   reply.ParseBody(reply_body) &&
                   IsSuccess(reply_body.errmsg().errcode());
@@ -497,12 +497,12 @@
        BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
        AddRoute(head, sock.id());
        if (timeout_ms == 0) {
            return sock.Send(&BHTopicBusAddress(), head, sub);
            return sock.Send(BHTopicBusAddress(), head, sub);
        } else {
            MsgI reply;
            DEFER1(reply.Release(););
            BHMsgHead reply_head;
            return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
            return sock.SendAndRecv(BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
                   reply_head.type() == kMsgTypeCommonReply &&
                   reply.ParseBody(reply_body) &&
                   IsSuccess(reply_body.errmsg().errcode());
utest/api_test.cpp
@@ -198,8 +198,8 @@
    const std::string mtx_name("test_mutex");
    const std::string int_name("test_int");
    auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())();
    auto pi = shm.find_or_construct<int>(int_name.c_str())(100);
    auto mtx = shm.FindOrCreate<Mutex>(mtx_name);
    auto pi = shm.FindOrCreate<int>(int_name, 100);
    printf("mutetx ");
    PrintPtr(mtx);
utest/speed_test.cpp
@@ -26,7 +26,7 @@
    SharedMemory &shm = TestShm();
    MsgI::BindShm(shm);
    MQId id = boost::uuids::random_generator()();
    MQId id = ShmMsgQueue::NewId();
    const int timeout = 1000;
    const uint32_t data_size = 4000;
    const std::string proc_id = "demo_proc";
@@ -157,8 +157,8 @@
                req_body.set_topic("topic");
                req_body.set_data(msg_content);
                auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
                req_head.add_route()->set_mq_id(&cli.id(), cli.id().size());
                return cli.Send(&srv.id(), req_head, req_body);
                req_head.add_route()->set_mq_id(cli.id());
                return cli.Send(srv.id(), req_head, req_body);
            };
            Req();
@@ -175,15 +175,13 @@
                DEFER1(req.Release());
                if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
                    auto &mqid = req_head.route()[0].mq_id();
                    MQId src_id;
                    memcpy(&src_id, mqid.data(), sizeof(src_id));
                    auto src_id = req_head.route()[0].mq_id();
                    auto Reply = [&]() {
                        MsgRequestTopic reply_body;
                        reply_body.set_topic("topic");
                        reply_body.set_data(msg_content);
                        auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
                        return srv.Send(&src_id, reply_head, reply_body);
                        return srv.Send(src_id, reply_head, reply_body);
                    };
                    Reply();
                }
utest/utest.cpp
@@ -2,8 +2,6 @@
#include "defs.h"
#include "util.h"
#include <atomic>
#include <boost/uuid/uuid_generators.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <condition_variable>
#include <stdio.h>
#include <string>
@@ -96,7 +94,7 @@
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    int *flag = shm.find_or_construct<int>("flag")(123);
    int *flag = shm.FindOrCreate<int>("flag", 123);
    printf("flag = %d\n", *flag);
    ++*flag;
    const std::string sub_proc_id = "subscriber";
@@ -207,7 +205,7 @@
    auto Avail = [&]() { return shm.get_free_memory(); };
    auto init_avail = Avail();
    int *flag = shm.find_or_construct<int>("flag")(123);
    int *flag = shm.FindOrCreate<int>("flag", 123);
    printf("flag = %d\n", *flag);
    ++*flag;