From 1fbfef2a51db4a3bac9d8a5b87af94a40a913b7a Mon Sep 17 00:00:00 2001 From: lichao <lichao@aiotlink.com> Date: 星期日, 25 四月 2021 15:33:40 +0800 Subject: [PATCH] change mqid from uuid to uint64. --- src/shm.h | 16 ++ src/socket.h | 34 ++-- box/center.cpp | 32 ++--- src/msg.h | 24 --- src/socket.cpp | 2 src/defs.h | 14 +- utest/utest.cpp | 6 box/center.h | 5 src/sendq.cpp | 8 src/shm_queue.cpp | 26 ++- src/topic_node.cpp | 44 +++--- src/defs.cpp | 19 +- box/center_main.cc | 6 utest/speed_test.cpp | 12 - proto/source/bhome_msg_api.proto | 4 utest/api_test.cpp | 4 box/status_main.cc | 2 src/bh_util.h | 25 ++++ src/shm_queue.h | 34 ++--- src/sendq.h | 22 +- src/bh_api.cpp | 2 21 files changed, 177 insertions(+), 164 deletions(-) diff --git a/box/center.cpp b/box/center.cpp index badfbfe..d920ff7 100644 --- a/box/center.cpp +++ b/box/center.cpp @@ -37,9 +37,9 @@ { public: typedef std::string ProcId; - typedef std::string Address; + typedef MQId Address; typedef bhome_msg::ProcInfo ProcInfo; - typedef std::function<void(Address const &)> Cleaner; + typedef std::function<void(Address const)> Cleaner; private: enum { @@ -84,7 +84,7 @@ WeakNode weak_node_; bool operator<(const TopicDest &a) const { return mq_ < a.mq_; } }; - inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } + inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) : @@ -182,7 +182,7 @@ { return HandleMsg( head, [&](Node node) -> MsgCommonReply { - auto &src = SrcAddr(head); + auto src = SrcAddr(head); auto &topics = msg.topics().topic_list(); node->services_[src].insert(topics.begin(), topics.end()); TopicDest dest = {src, node}; @@ -240,7 +240,7 @@ MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) { return HandleMsg(head, [&](Node node) { - auto &src = SrcAddr(head); + auto src = SrcAddr(head); auto &topics = msg.topics().topic_list(); node->subscriptions_[src].insert(topics.begin(), topics.end()); TopicDest dest = {src, node}; @@ -253,7 +253,7 @@ MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) { return HandleMsg(head, [&](Node node) { - auto &src = SrcAddr(head); + auto src = SrcAddr(head); auto pos = node->subscriptions_.find(src); auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { @@ -426,8 +426,8 @@ auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { return [&](auto &&rep_body) { auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); - auto &remote = head.route(0).mq_id(); - socket.Send(remote.data(), reply_head, rep_body); + auto remote = head.route(0).mq_id(); + socket.Send(remote, reply_head, rep_body); }; }; @@ -473,7 +473,7 @@ if (node) { // should also make sure that mq is not killed before msg expires. // it would be ok if (kill_time - offline_time) is longer than expire time. - socket.Send(cli.mq_.data(), msg); + socket.Send(cli.mq_, msg); ++it; } else { it = clients.erase(it); @@ -505,28 +505,24 @@ return rec; } -bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len) +bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) { Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len}; return true; } -bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len) -{ - return Install(name, handler, idle, std::string((const char *) &mqid, sizeof(mqid)), mq_len); -} BHCenter::BHCenter(Socket::Shm &shm) { - auto gc = [&](const std::string &id) { - auto r = ShmSocket::Remove(shm, *(MQId *) id.data()); - printf("remove mq : %s\n", r ? "ok" : "failed"); + auto gc = [&](const MQId id) { + auto r = ShmSocket::Remove(shm, id); + printf("remove mq %ld : %s\n", id, (r ? "ok" : "failed")); }; AddCenter("#bhome_center", gc); for (auto &kv : Centers()) { auto &info = kv.second; - sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_); + sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_); } } diff --git a/box/center.h b/box/center.h index 60639d5..ab8b15f 100644 --- a/box/center.h +++ b/box/center.h @@ -30,8 +30,7 @@ public: typedef Socket::PartialRecvCB MsgHandler; typedef Socket::IdleCB IdleHandler; - static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len); - static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len); + static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len); BHCenter(Socket::Shm &shm); ~BHCenter() { Stop(); } @@ -43,7 +42,7 @@ std::string name_; MsgHandler handler_; IdleHandler idle_; - std::string mqid_; + MQId mqid_; int mq_len_ = 0; }; typedef std::map<std::string, CenterInfo> CenterRecords; diff --git a/box/center_main.cc b/box/center_main.cc index 7f4b26b..fdda2cd 100644 --- a/box/center_main.cc +++ b/box/center_main.cc @@ -44,8 +44,8 @@ return true; } - auto mtx(shm_.find_or_construct<Mutex>((name_ + "_mutex_0").c_str())()); - auto time_stamp(shm_.find_or_construct<int64_t>((name_ + "_timestamp_0").c_str())(0)); + auto mtx(shm_.FindOrCreate<Mutex>(name_ + "_mutex_0")); + auto time_stamp(shm_.FindOrCreate<int64_t>(name_ + "_timestamp_0", 0)); if (mtx && time_stamp) { Guard lock(*mtx); @@ -86,7 +86,7 @@ int center_main(int argc, const char *argv[]) { auto &shm = BHomeShm(); - MsgI::BindShm(shm); + GlobalInit(shm); AppArg args(argc, argv); if (args.Has("remove")) { diff --git a/box/status_main.cc b/box/status_main.cc index a435c2f..e0fb932 100644 --- a/box/status_main.cc +++ b/box/status_main.cc @@ -44,7 +44,7 @@ return shm_name; } }; - printf("monitoring shm : %s, size : %dM\n", DisplayName().c_str(), shm_size); + printf("monitoring shm : %s, size : %ldM\n", DisplayName().c_str(), shm_size); SharedMemory shm(shm_name, 1024 * 1024 * shm_size); std::atomic<bool> run(true); diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto index 838c228..94bc82e 100644 --- a/proto/source/bhome_msg_api.proto +++ b/proto/source/bhome_msg_api.proto @@ -8,8 +8,8 @@ package bhome_msg; message BHAddress { - bytes mq_id = 1; // mqid, uuid - bytes ip = 2; // + uint64 mq_id = 1; + bytes ip = 2; int32 port = 3; } diff --git a/src/bh_api.cpp b/src/bh_api.cpp index c4ac9c9..7e7b2e9 100644 --- a/src/bh_api.cpp +++ b/src/bh_api.cpp @@ -10,7 +10,7 @@ { TopicNode &ProcNode() { - static bool init_bind_msg_shm = MsgI::BindShm(BHomeShm()); + static bool init = GlobalInit(BHomeShm()); static TopicNode node(BHomeShm()); return node; } diff --git a/src/bh_util.h b/src/bh_util.h index e3ab70b..c419a59 100644 --- a/src/bh_util.h +++ b/src/bh_util.h @@ -143,6 +143,31 @@ } }; +template <class T, class Tag> +class StaticDataRef +{ + typedef T *Ptr; + static inline Ptr &ptr() + { + static Ptr sp(nullptr); + return sp; + } + +protected: + static inline T &GetData() + { + if (!ptr()) { throw std::string("Must set ShmMsg shm before use!"); } + return *ptr(); + } + +public: + static bool SetData(T &t) + { + auto Bind = [&]() { ptr() = &t; return true; }; + return ptr() ? false : Bind(); + } +}; + // macro helper #define JOIN_IMPL(a, b) a##b #define JOIN(a, b) JOIN_IMPL(a, b) diff --git a/src/defs.cpp b/src/defs.cpp index 0ff671b..0ca82bf 100644 --- a/src/defs.cpp +++ b/src/defs.cpp @@ -16,14 +16,11 @@ * ===================================================================================== */ #include "defs.h" -#include "shm.h" +#include "msg.h" +#include "shm_queue.h" namespace { - -const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); -const MQId kBHTopicCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff"); -const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff"); struct LastError { int ec_ = 0; @@ -38,16 +35,20 @@ } // namespace -const MQId &BHTopicBusAddress() { return kBHTopicBus; } -const MQId &BHTopicCenterAddress() { return kBHTopicCenter; } -const MQId &BHUniCenterAddress() { return kBHUniCenter; } - bhome_shm::SharedMemory &BHomeShm() { static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512); return shm; } +bool GlobalInit(bhome_shm::SharedMemory &shm) +{ + MsgI::BindShm(shm); + typedef std::atomic<MQId> IdSrc; + IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000); + return ShmMsgQueue::SetData(*psrc); +} + void SetLastError(const int ec, const std::string &msg) { LastErrorStore().ec_ = ec; diff --git a/src/defs.h b/src/defs.h index 08181d8..1c9e663 100644 --- a/src/defs.h +++ b/src/defs.h @@ -19,15 +19,16 @@ #ifndef DEFS_KP8LKGD0 #define DEFS_KP8LKGD0 -#include <boost/uuid/uuid.hpp> -#include <boost/uuid/uuid_generators.hpp> #include <string> -typedef boost::uuids::uuid MQId; +typedef uint64_t MQId; -const MQId &BHTopicBusAddress(); -const MQId &BHTopicCenterAddress(); -const MQId &BHUniCenterAddress(); +const MQId kBHTopicCenter = 100; +const MQId kBHTopicBus = 101; +const MQId kBHUniCenter = 102; +inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; } +inline const MQId BHTopicBusAddress() { return kBHTopicBus; } +inline const MQId BHUniCenterAddress() { return kBHUniCenter; } const int kBHCenterPort = 24287; const char kTopicSep = '.'; @@ -37,6 +38,7 @@ } // namespace bhome_shm bhome_shm::SharedMemory &BHomeShm(); +bool GlobalInit(bhome_shm::SharedMemory &shm); typedef std::string Topic; void SetLastError(const int ec, const std::string &msg); void GetLastError(int &ec, std::string &msg); diff --git a/src/msg.h b/src/msg.h index 6ce4902..e332a5d 100644 --- a/src/msg.h +++ b/src/msg.h @@ -23,7 +23,6 @@ #include "shm.h" #include <atomic> #include <boost/interprocess/offset_ptr.hpp> -#include <boost/uuid/uuid_generators.hpp> #include <functional> #include <stdint.h> @@ -34,11 +33,10 @@ // ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required. // message content layout: (meta) / header_size + header + data_size + data -typedef boost::uuids::uuid MQId; - -class ShmMsg +class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg> { private: + static inline SharedMemory &shm() { return GetData(); } // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. class RefCount : private boost::noncopyable { @@ -58,16 +56,6 @@ { static const Offset base = Addr(shm().get_address()); // cache value. return base; - } - static inline SharedMemory &shm() - { - if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); } - return *pshm(); - } - static inline SharedMemory *&pshm() - { - static SharedMemory *pshm = 0; - return pshm; } static const uint32_t kMsgTag = 0xf1e2d3c4; @@ -145,13 +133,7 @@ T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); } public: - static bool BindShm(SharedMemory &shm) - { - assert(!pshm()); - pshm() = &shm; - return true; - } - + static bool BindShm(SharedMemory &shm) { return SetData(shm); } ShmMsg() : ShmMsg(nullptr) {} explicit ShmMsg(const size_t size) : diff --git a/src/sendq.cpp b/src/sendq.cpp index 54de419..5b57d72 100644 --- a/src/sendq.cpp +++ b/src/sendq.cpp @@ -19,7 +19,7 @@ #include "shm_queue.h" #include <chrono> -int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr) +int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr) { auto FirstNotExpired = [](Array &l) { auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; @@ -41,7 +41,7 @@ bool r = false; if (d.index() == 0) { auto &msg = boost::variant2::get<0>(pos->data().data_); - r = mq.TrySend(*(MQId *) remote.data(), msg); + r = mq.TrySend(remote, msg); if (r) { msg.Release(); } @@ -50,7 +50,7 @@ MsgI msg; if (msg.Make(content)) { DEFER1(msg.Release();); - r = mq.TrySend(*(MQId *) remote.data(), msg); + r = mq.TrySend(remote, msg); } } return r; @@ -65,7 +65,7 @@ return nprocessed; } -int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al) +int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &al) { int nsend = 0; auto AllSent = [&](Array &arr) { diff --git a/src/sendq.h b/src/sendq.h index bba44af..0699df7 100644 --- a/src/sendq.h +++ b/src/sendq.h @@ -37,7 +37,7 @@ class SendQ { public: - typedef std::string Remote; + typedef MQId Remote; typedef bhome_msg::MsgI MsgI; typedef std::string Content; typedef boost::variant2::variant<MsgI, Content> Data; @@ -50,18 +50,18 @@ typedef TimedMsg::TimePoint TimePoint; typedef TimedMsg::Duration Duration; - template <class... Rest> - void Append(const MQId &id, Rest &&...rest) - { - Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...); - } + // template <class... Rest> + // void Append(const MQId &id, Rest &&...rest) + // { + // Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...); + // } - void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) + void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent()) { msg.AddRef(); AppendData(addr, Data(msg), DefaultExpire(), onExpire); } - void Append(const Remote &addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent()) + void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent()) { AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire); } @@ -71,7 +71,7 @@ private: static TimePoint Now() { return TimedMsg::Clock::now(); } static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); } - void AppendData(const Remote &addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire) + void AppendData(const Remote addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire) { //TODO simple queue, organize later ? @@ -88,8 +88,8 @@ typedef std::list<Array> ArrayList; typedef std::unordered_map<Remote, ArrayList> Store; - int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr); - int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr); + int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr); + int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &arr); std::mutex mutex_in_; std::mutex mutex_out_; diff --git a/src/shm.h b/src/shm.h index 0e834c3..a70afcb 100644 --- a/src/shm.h +++ b/src/shm.h @@ -25,7 +25,6 @@ #include <boost/interprocess/sync/interprocess_mutex.hpp> #include <boost/interprocess/sync/scoped_lock.hpp> #include <boost/noncopyable.hpp> -#include <boost/uuid/uuid.hpp> #include <chrono> #include <thread> @@ -103,7 +102,16 @@ ~SharedMemory(); std::string name() const { return name_; } bool Remove() { return Remove(name()); } - + template <class T, class... Params> + T *FindOrCreate(const std::string &name, Params &&...params) + { + return find_or_construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...); + } + template <class T, class... Params> + T *Create(const std::string &name, Params &&...params) + { + return construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...); + } void *Alloc(const size_t size) { return allocate(size, std::nothrow); } void Dealloc(void *p) { @@ -113,7 +121,7 @@ void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); } template <class T, class... Params> - T *New(Params const &...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); } + T *New(Params &&...params) { return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...); } template <class T> void Delete(T *p) { @@ -157,7 +165,7 @@ ShmObject(ShmType &segment, const std::string &name, Params &&...t) : shm_(segment), name_(name) { - pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...); + pdata_ = shm_.Create<Data>(ObjName(name_), std::forward<decltype(t)>(t)...); if (!IsOk()) { throw("Error: Not enough memory, can not allocate \"" + name_ + "\""); } diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp index 215a8ac..1be8021 100644 --- a/src/shm_queue.cpp +++ b/src/shm_queue.cpp @@ -18,20 +18,21 @@ #include "shm_queue.h" #include "bh_util.h" -#include <boost/uuid/uuid_generators.hpp> -#include <boost/uuid/uuid_io.hpp> namespace bhome_shm { using namespace bhome_msg; using namespace boost::interprocess; -using namespace boost::uuids; namespace { -std::string MsgQIdToName(const MQId &id) { return "shmq" + to_string(id); } -// MQId EmptyId() { return nil_uuid(); } -MQId NewId() { return random_generator()(); } +std::string MsgQIdToName(const ShmMsgQueue::MQId id) +{ + char buf[40] = "mqOx"; + int n = sprintf(buf + 4, "%lx", id); + return std::string(buf, n + 4); +} + const int AdjustMQLength(const int len) { const int kMaxLength = 10000; @@ -47,8 +48,13 @@ } // namespace +ShmMsgQueue::MQId ShmMsgQueue::NewId() +{ + static auto &id = GetData(); + return ++id; +} // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 -ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len) : +ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()), id_(id) { @@ -59,7 +65,7 @@ ShmMsgQueue::~ShmMsgQueue() {} -bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId &id) +bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) { Queue *q = Find(shm, id); if (q) { @@ -71,12 +77,12 @@ return Super::Remove(shm, MsgQIdToName(id)); } -ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId &remote_id) +ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id) { return Super::Find(shm, MsgQIdToName(remote_id)); } -bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend) +bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend) { Queue *remote = Find(shm, remote_id); if (remote) { diff --git a/src/shm_queue.h b/src/shm_queue.h index 93d77df..70039b5 100644 --- a/src/shm_queue.h +++ b/src/shm_queue.h @@ -21,6 +21,7 @@ #include "msg.h" #include "shm.h" +#include <atomic> #include <boost/circular_buffer.hpp> #include <boost/date_time/posix_time/posix_time.hpp> @@ -29,8 +30,6 @@ template <class D> using Circular = boost::circular_buffer<D, Allocator<D>>; - -typedef boost::uuids::uuid MQId; template <class D> class SharedQueue : private Circular<D> @@ -137,32 +136,32 @@ using namespace bhome_msg; -class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>> +class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>>, public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> { typedef ShmObject<SharedQueue<MsgI>> Super; typedef Super::Data Queue; typedef std::function<void()> OnSend; - MQId id_; -protected: - ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use. public: - ShmMsgQueue(const MQId &id, ShmType &segment, const int len); + typedef uint64_t MQId; + + static MQId NewId(); + + ShmMsgQueue(const MQId id, ShmType &segment, const int len); ShmMsgQueue(ShmType &segment, const int len); ~ShmMsgQueue(); - static bool Remove(SharedMemory &shm, const MQId &id); - const MQId &Id() const { return id_; } + static bool Remove(SharedMemory &shm, const MQId id); + MQId Id() const { return id_; } using Super::shm; bool Recv(MsgI &msg, const int timeout_ms) { return data()->Read(msg, timeout_ms); } bool TryRecv(MsgI &msg) { return data()->TryRead(msg); } template <class OnData> int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); } - static Queue *Find(SharedMemory &shm, const MQId &remote_id); - // static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend()); - static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); + static Queue *Find(SharedMemory &shm, const MQId remote_id); + static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); template <class Iter> - static int TrySendAll(SharedMemory &shm, const MQId &remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) + static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) { Queue *remote = Find(shm, remote_id); if (remote) { @@ -177,14 +176,13 @@ } } - // template <class... Rest> - // bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); } template <class... Rest> - bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } + bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } template <class... Rest> - int TrySendAll(const MQId &remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } + int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } - size_t Pending() const { return data()->size(); } +private: + MQId id_; }; } // namespace bhome_shm diff --git a/src/socket.cpp b/src/socket.cpp index 313c212..e471633 100644 --- a/src/socket.cpp +++ b/src/socket.cpp @@ -24,7 +24,7 @@ using namespace bhome_msg; using namespace bhome_shm; -ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) : +ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : run_(false), mq_(id, shm, len) { Start(); diff --git a/src/socket.h b/src/socket.h index 1ba10cb..cd6bfee 100644 --- a/src/socket.h +++ b/src/socket.h @@ -33,44 +33,37 @@ #include <vector> using namespace bhome_msg; - class ShmSocket : private boost::noncopyable { - template <class... T> - bool SendImpl(const void *valid_remote, T &&...rest) - { - send_buffer_.Append(*static_cast<const MQId *>(valid_remote), std::forward<decltype(rest)>(rest)...); - return true; - } protected: typedef bhome_shm::ShmMsgQueue Queue; public: + typedef ShmMsgQueue::MQId MQId; typedef bhome_shm::SharedMemory Shm; typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB; typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB; typedef std::function<void(ShmSocket &sock)> IdleCB; - ShmSocket(Shm &shm, const MQId &id, const int len); + ShmSocket(Shm &shm, const MQId id, const int len); ShmSocket(Shm &shm, const int len = 12); ~ShmSocket(); - static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); } + static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); } bool Remove() { return Remove(shm(), id()); } - const MQId &id() const { return mq().Id(); } + MQId id() const { return mq().Id(); } // start recv. bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } bool Stop(); - size_t Pending() const { return mq().Pending(); } template <class Body> - bool Send(const void *valid_remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) + bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) { try { if (!cb) { - return SendImpl(valid_remote, MsgI::Serialize(head, body)); + return SendImpl(remote, MsgI::Serialize(head, body)); } else { std::string msg_id(head.msg_id()); per_msg_cbs_->Store(msg_id, std::move(cb)); @@ -78,7 +71,7 @@ RecvCB cb_no_use; per_msg_cbs_->Pick(msg_id, cb_no_use); }; - return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB); + return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB); } } catch (...) { SetLastError(eError, "Send internal error."); @@ -86,15 +79,15 @@ } } - bool Send(const void *valid_remote, const MsgI &imsg) + bool Send(const MQId remote, const MsgI &imsg) { - return SendImpl(valid_remote, imsg); + return SendImpl(remote, imsg); } bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); template <class Body> - bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) + bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) { struct State { std::mutex mutex; @@ -144,6 +137,13 @@ bool StopNoLock(); bool RunningNoLock() { return !workers_.empty(); } + template <class... Rest> + bool SendImpl(const MQId remote, Rest &&...rest) + { + send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); + return true; + } + std::vector<std::thread> workers_; std::mutex mutex_; std::atomic<bool> run_; diff --git a/src/topic_node.cpp b/src/topic_node.cpp index 00db773..9398318 100644 --- a/src/topic_node.cpp +++ b/src/topic_node.cpp @@ -25,7 +25,7 @@ namespace { -inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } +inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); } struct SrcInfo { std::vector<BHAddress> route; @@ -82,7 +82,7 @@ auto &sock = SockNode(); MsgRegister body; body.mutable_proc()->Swap(&proc); - auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); }; + auto AddId = [&](const MQId id) { body.add_addrs()->set_mq_id(id); }; AddId(SockNode().id()); AddId(SockServer().id()); AddId(SockClient().id()); @@ -108,12 +108,12 @@ MsgCommonReply body; CheckResult(imsg, head, body); }; - return sock.Send(&BHTopicCenterAddress(), head, body, onResult); + return sock.Send(BHTopicCenterAddress(), head, body, onResult); } else { MsgI reply; DEFER1(reply.Release();); BHMsgHead reply_head; - bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); if (r) { CheckResult(reply, reply_head, reply_body); } @@ -144,12 +144,12 @@ MsgCommonReply body; CheckResult(imsg, head, body); }; - return sock.Send(&BHTopicCenterAddress(), head, body, onResult); + return sock.Send(BHTopicCenterAddress(), head, body, onResult); } else { MsgI reply; DEFER1(reply.Release();); BHMsgHead reply_head; - bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); return r && CheckResult(reply, reply_head, reply_body); } } @@ -169,12 +169,12 @@ AddRoute(head, sock.id()); if (timeout_ms == 0) { - return sock.Send(&BHTopicCenterAddress(), head, body); + return sock.Send(BHTopicCenterAddress(), head, body); } else { MsgI reply; DEFER1(reply.Release();); BHMsgHead reply_head; - bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); return (r && IsSuccess(reply_body.errmsg().errcode())); } @@ -201,7 +201,7 @@ MsgI reply; DEFER1(reply.Release()); BHMsgHead reply_head; - return (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && + return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeQueryTopicReply && reply.ParseBody(reply_body)); } @@ -221,12 +221,12 @@ AddRoute(head, sock.id()); if (timeout_ms == 0) { - return sock.Send(&BHTopicCenterAddress(), head, body); + return sock.Send(BHTopicCenterAddress(), head, body); } else { MsgI reply; DEFER1(reply.Release();); BHMsgHead reply_head; - bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); + bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); r = r && reply_head.type() == kMsgTypeCommonReply; r = r && reply.ParseBody(reply_body); return r; @@ -247,8 +247,8 @@ for (int i = 0; i < head.route_size() - 1; ++i) { reply_head.add_route()->Swap(head.mutable_route(i)); } - auto &remote = head.route().rbegin()->mq_id(); - sock.Send(remote.data(), reply_head, reply_body); + auto remote = head.route().rbegin()->mq_id(); + sock.Send(remote, reply_head, reply_body); } }; @@ -315,7 +315,7 @@ for (unsigned i = 0; i < p->route.size() - 1; ++i) { head.add_route()->Swap(&p->route[i]); } - return sock.Send(p->route.back().mq_id().data(), head, body); + return sock.Send(p->route.back().mq_id(), head, body); } bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) @@ -361,9 +361,9 @@ } } }; - return sock.Send(addr.mq_id().data(), head, req, onRecv); + return sock.Send(addr.mq_id(), head, req, onRecv); } else { - return sock.Send(addr.mq_id().data(), head, req); + return sock.Send(addr.mq_id(), head, req); } }; @@ -396,7 +396,7 @@ DEFER1(reply_msg.Release();); BHMsgHead reply_head; - if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) && + if (sock.SendAndRecv(addr.mq_id(), head, request, reply_msg, reply_head, timeout_ms) && reply_head.type() == kMsgTypeRequestTopicReply && reply_msg.ParseBody(out_reply)) { reply_head.mutable_proc_id()->swap(out_proc_id); @@ -441,7 +441,7 @@ std::vector<NodeAddress> lst; if (QueryRPCTopics(topic, lst, timeout_ms)) { addr = lst.front().addr(); - if (!addr.mq_id().empty()) { + if (addr.mq_id() != 0) { topic_query_cache_.Store(topic, addr); return true; } @@ -464,13 +464,13 @@ AddRoute(head, sock.id()); if (timeout_ms == 0) { - return sock.Send(&BHTopicBusAddress(), head, pub); + return sock.Send(BHTopicBusAddress(), head, pub); } else { MsgI reply; DEFER1(reply.Release();); BHMsgHead reply_head; MsgCommonReply reply_body; - return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && + return sock.SendAndRecv(BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body) && IsSuccess(reply_body.errmsg().errcode()); @@ -497,12 +497,12 @@ BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); AddRoute(head, sock.id()); if (timeout_ms == 0) { - return sock.Send(&BHTopicBusAddress(), head, sub); + return sock.Send(BHTopicBusAddress(), head, sub); } else { MsgI reply; DEFER1(reply.Release();); BHMsgHead reply_head; - return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && + return sock.SendAndRecv(BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body) && IsSuccess(reply_body.errmsg().errcode()); diff --git a/utest/api_test.cpp b/utest/api_test.cpp index 5d65bd5..dd59b09 100644 --- a/utest/api_test.cpp +++ b/utest/api_test.cpp @@ -198,8 +198,8 @@ const std::string mtx_name("test_mutex"); const std::string int_name("test_int"); - auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(); - auto pi = shm.find_or_construct<int>(int_name.c_str())(100); + auto mtx = shm.FindOrCreate<Mutex>(mtx_name); + auto pi = shm.FindOrCreate<int>(int_name, 100); printf("mutetx "); PrintPtr(mtx); diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp index 4615c53..d145ab4 100644 --- a/utest/speed_test.cpp +++ b/utest/speed_test.cpp @@ -26,7 +26,7 @@ SharedMemory &shm = TestShm(); MsgI::BindShm(shm); - MQId id = boost::uuids::random_generator()(); + MQId id = ShmMsgQueue::NewId(); const int timeout = 1000; const uint32_t data_size = 4000; const std::string proc_id = "demo_proc"; @@ -157,8 +157,8 @@ req_body.set_topic("topic"); req_body.set_data(msg_content); auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); - req_head.add_route()->set_mq_id(&cli.id(), cli.id().size()); - return cli.Send(&srv.id(), req_head, req_body); + req_head.add_route()->set_mq_id(cli.id()); + return cli.Send(srv.id(), req_head, req_body); }; Req(); @@ -175,15 +175,13 @@ DEFER1(req.Release()); if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { - auto &mqid = req_head.route()[0].mq_id(); - MQId src_id; - memcpy(&src_id, mqid.data(), sizeof(src_id)); + auto src_id = req_head.route()[0].mq_id(); auto Reply = [&]() { MsgRequestTopic reply_body; reply_body.set_topic("topic"); reply_body.set_data(msg_content); auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); - return srv.Send(&src_id, reply_head, reply_body); + return srv.Send(src_id, reply_head, reply_body); }; Reply(); } diff --git a/utest/utest.cpp b/utest/utest.cpp index ff5d2ed..d058471 100644 --- a/utest/utest.cpp +++ b/utest/utest.cpp @@ -2,8 +2,6 @@ #include "defs.h" #include "util.h" #include <atomic> -#include <boost/uuid/uuid_generators.hpp> -#include <boost/uuid/uuid_io.hpp> #include <condition_variable> #include <stdio.h> #include <string> @@ -96,7 +94,7 @@ auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); - int *flag = shm.find_or_construct<int>("flag")(123); + int *flag = shm.FindOrCreate<int>("flag", 123); printf("flag = %d\n", *flag); ++*flag; const std::string sub_proc_id = "subscriber"; @@ -207,7 +205,7 @@ auto Avail = [&]() { return shm.get_free_memory(); }; auto init_avail = Avail(); - int *flag = shm.find_or_construct<int>("flag")(123); + int *flag = shm.FindOrCreate<int>("flag", 123); printf("flag = %d\n", *flag); ++*flag; -- Gitblit v1.8.0