From 1fbfef2a51db4a3bac9d8a5b87af94a40a913b7a Mon Sep 17 00:00:00 2001
From: lichao <lichao@aiotlink.com>
Date: 星期日, 25 四月 2021 15:33:40 +0800
Subject: [PATCH] change mqid from uuid to uint64.
---
src/shm.h | 16 ++
src/socket.h | 34 ++--
box/center.cpp | 32 ++---
src/msg.h | 24 ---
src/socket.cpp | 2
src/defs.h | 14 +-
utest/utest.cpp | 6
box/center.h | 5
src/sendq.cpp | 8
src/shm_queue.cpp | 26 ++-
src/topic_node.cpp | 44 +++---
src/defs.cpp | 19 +-
box/center_main.cc | 6
utest/speed_test.cpp | 12 -
proto/source/bhome_msg_api.proto | 4
utest/api_test.cpp | 4
box/status_main.cc | 2
src/bh_util.h | 25 ++++
src/shm_queue.h | 34 ++---
src/sendq.h | 22 +-
src/bh_api.cpp | 2
21 files changed, 177 insertions(+), 164 deletions(-)
diff --git a/box/center.cpp b/box/center.cpp
index badfbfe..d920ff7 100644
--- a/box/center.cpp
+++ b/box/center.cpp
@@ -37,9 +37,9 @@
{
public:
typedef std::string ProcId;
- typedef std::string Address;
+ typedef MQId Address;
typedef bhome_msg::ProcInfo ProcInfo;
- typedef std::function<void(Address const &)> Cleaner;
+ typedef std::function<void(Address const)> Cleaner;
private:
enum {
@@ -84,7 +84,7 @@
WeakNode weak_node_;
bool operator<(const TopicDest &a) const { return mq_ < a.mq_; }
};
- inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
+ inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); }
inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); }
NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) :
@@ -182,7 +182,7 @@
{
return HandleMsg(
head, [&](Node node) -> MsgCommonReply {
- auto &src = SrcAddr(head);
+ auto src = SrcAddr(head);
auto &topics = msg.topics().topic_list();
node->services_[src].insert(topics.begin(), topics.end());
TopicDest dest = {src, node};
@@ -240,7 +240,7 @@
MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg)
{
return HandleMsg(head, [&](Node node) {
- auto &src = SrcAddr(head);
+ auto src = SrcAddr(head);
auto &topics = msg.topics().topic_list();
node->subscriptions_[src].insert(topics.begin(), topics.end());
TopicDest dest = {src, node};
@@ -253,7 +253,7 @@
MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg)
{
return HandleMsg(head, [&](Node node) {
- auto &src = SrcAddr(head);
+ auto src = SrcAddr(head);
auto pos = node->subscriptions_.find(src);
auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) {
@@ -426,8 +426,8 @@
auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) {
return [&](auto &&rep_body) {
auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id()));
- auto &remote = head.route(0).mq_id();
- socket.Send(remote.data(), reply_head, rep_body);
+ auto remote = head.route(0).mq_id();
+ socket.Send(remote, reply_head, rep_body);
};
};
@@ -473,7 +473,7 @@
if (node) {
// should also make sure that mq is not killed before msg expires.
// it would be ok if (kill_time - offline_time) is longer than expire time.
- socket.Send(cli.mq_.data(), msg);
+ socket.Send(cli.mq_, msg);
++it;
} else {
it = clients.erase(it);
@@ -505,28 +505,24 @@
return rec;
}
-bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len)
+bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len)
{
Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len};
return true;
}
-bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len)
-{
- return Install(name, handler, idle, std::string((const char *) &mqid, sizeof(mqid)), mq_len);
-}
BHCenter::BHCenter(Socket::Shm &shm)
{
- auto gc = [&](const std::string &id) {
- auto r = ShmSocket::Remove(shm, *(MQId *) id.data());
- printf("remove mq : %s\n", r ? "ok" : "failed");
+ auto gc = [&](const MQId id) {
+ auto r = ShmSocket::Remove(shm, id);
+ printf("remove mq %ld : %s\n", id, (r ? "ok" : "failed"));
};
AddCenter("#bhome_center", gc);
for (auto &kv : Centers()) {
auto &info = kv.second;
- sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_);
+ sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_);
}
}
diff --git a/box/center.h b/box/center.h
index 60639d5..ab8b15f 100644
--- a/box/center.h
+++ b/box/center.h
@@ -30,8 +30,7 @@
public:
typedef Socket::PartialRecvCB MsgHandler;
typedef Socket::IdleCB IdleHandler;
- static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len);
- static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len);
+ static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len);
BHCenter(Socket::Shm &shm);
~BHCenter() { Stop(); }
@@ -43,7 +42,7 @@
std::string name_;
MsgHandler handler_;
IdleHandler idle_;
- std::string mqid_;
+ MQId mqid_;
int mq_len_ = 0;
};
typedef std::map<std::string, CenterInfo> CenterRecords;
diff --git a/box/center_main.cc b/box/center_main.cc
index 7f4b26b..fdda2cd 100644
--- a/box/center_main.cc
+++ b/box/center_main.cc
@@ -44,8 +44,8 @@
return true;
}
- auto mtx(shm_.find_or_construct<Mutex>((name_ + "_mutex_0").c_str())());
- auto time_stamp(shm_.find_or_construct<int64_t>((name_ + "_timestamp_0").c_str())(0));
+ auto mtx(shm_.FindOrCreate<Mutex>(name_ + "_mutex_0"));
+ auto time_stamp(shm_.FindOrCreate<int64_t>(name_ + "_timestamp_0", 0));
if (mtx && time_stamp) {
Guard lock(*mtx);
@@ -86,7 +86,7 @@
int center_main(int argc, const char *argv[])
{
auto &shm = BHomeShm();
- MsgI::BindShm(shm);
+ GlobalInit(shm);
AppArg args(argc, argv);
if (args.Has("remove")) {
diff --git a/box/status_main.cc b/box/status_main.cc
index a435c2f..e0fb932 100644
--- a/box/status_main.cc
+++ b/box/status_main.cc
@@ -44,7 +44,7 @@
return shm_name;
}
};
- printf("monitoring shm : %s, size : %dM\n", DisplayName().c_str(), shm_size);
+ printf("monitoring shm : %s, size : %ldM\n", DisplayName().c_str(), shm_size);
SharedMemory shm(shm_name, 1024 * 1024 * shm_size);
std::atomic<bool> run(true);
diff --git a/proto/source/bhome_msg_api.proto b/proto/source/bhome_msg_api.proto
index 838c228..94bc82e 100644
--- a/proto/source/bhome_msg_api.proto
+++ b/proto/source/bhome_msg_api.proto
@@ -8,8 +8,8 @@
package bhome_msg;
message BHAddress {
- bytes mq_id = 1; // mqid, uuid
- bytes ip = 2; //
+ uint64 mq_id = 1;
+ bytes ip = 2;
int32 port = 3;
}
diff --git a/src/bh_api.cpp b/src/bh_api.cpp
index c4ac9c9..7e7b2e9 100644
--- a/src/bh_api.cpp
+++ b/src/bh_api.cpp
@@ -10,7 +10,7 @@
{
TopicNode &ProcNode()
{
- static bool init_bind_msg_shm = MsgI::BindShm(BHomeShm());
+ static bool init = GlobalInit(BHomeShm());
static TopicNode node(BHomeShm());
return node;
}
diff --git a/src/bh_util.h b/src/bh_util.h
index e3ab70b..c419a59 100644
--- a/src/bh_util.h
+++ b/src/bh_util.h
@@ -143,6 +143,31 @@
}
};
+template <class T, class Tag>
+class StaticDataRef
+{
+ typedef T *Ptr;
+ static inline Ptr &ptr()
+ {
+ static Ptr sp(nullptr);
+ return sp;
+ }
+
+protected:
+ static inline T &GetData()
+ {
+ if (!ptr()) { throw std::string("Must set ShmMsg shm before use!"); }
+ return *ptr();
+ }
+
+public:
+ static bool SetData(T &t)
+ {
+ auto Bind = [&]() { ptr() = &t; return true; };
+ return ptr() ? false : Bind();
+ }
+};
+
// macro helper
#define JOIN_IMPL(a, b) a##b
#define JOIN(a, b) JOIN_IMPL(a, b)
diff --git a/src/defs.cpp b/src/defs.cpp
index 0ff671b..0ca82bf 100644
--- a/src/defs.cpp
+++ b/src/defs.cpp
@@ -16,14 +16,11 @@
* =====================================================================================
*/
#include "defs.h"
-#include "shm.h"
+#include "msg.h"
+#include "shm_queue.h"
namespace
{
-
-const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff");
-const MQId kBHTopicCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff");
-const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff");
struct LastError {
int ec_ = 0;
@@ -38,16 +35,20 @@
} // namespace
-const MQId &BHTopicBusAddress() { return kBHTopicBus; }
-const MQId &BHTopicCenterAddress() { return kBHTopicCenter; }
-const MQId &BHUniCenterAddress() { return kBHUniCenter; }
-
bhome_shm::SharedMemory &BHomeShm()
{
static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512);
return shm;
}
+bool GlobalInit(bhome_shm::SharedMemory &shm)
+{
+ MsgI::BindShm(shm);
+ typedef std::atomic<MQId> IdSrc;
+ IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000);
+ return ShmMsgQueue::SetData(*psrc);
+}
+
void SetLastError(const int ec, const std::string &msg)
{
LastErrorStore().ec_ = ec;
diff --git a/src/defs.h b/src/defs.h
index 08181d8..1c9e663 100644
--- a/src/defs.h
+++ b/src/defs.h
@@ -19,15 +19,16 @@
#ifndef DEFS_KP8LKGD0
#define DEFS_KP8LKGD0
-#include <boost/uuid/uuid.hpp>
-#include <boost/uuid/uuid_generators.hpp>
#include <string>
-typedef boost::uuids::uuid MQId;
+typedef uint64_t MQId;
-const MQId &BHTopicBusAddress();
-const MQId &BHTopicCenterAddress();
-const MQId &BHUniCenterAddress();
+const MQId kBHTopicCenter = 100;
+const MQId kBHTopicBus = 101;
+const MQId kBHUniCenter = 102;
+inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; }
+inline const MQId BHTopicBusAddress() { return kBHTopicBus; }
+inline const MQId BHUniCenterAddress() { return kBHUniCenter; }
const int kBHCenterPort = 24287;
const char kTopicSep = '.';
@@ -37,6 +38,7 @@
} // namespace bhome_shm
bhome_shm::SharedMemory &BHomeShm();
+bool GlobalInit(bhome_shm::SharedMemory &shm);
typedef std::string Topic;
void SetLastError(const int ec, const std::string &msg);
void GetLastError(int &ec, std::string &msg);
diff --git a/src/msg.h b/src/msg.h
index 6ce4902..e332a5d 100644
--- a/src/msg.h
+++ b/src/msg.h
@@ -23,7 +23,6 @@
#include "shm.h"
#include <atomic>
#include <boost/interprocess/offset_ptr.hpp>
-#include <boost/uuid/uuid_generators.hpp>
#include <functional>
#include <stdint.h>
@@ -34,11 +33,10 @@
// ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required.
// message content layout: (meta) / header_size + header + data_size + data
-typedef boost::uuids::uuid MQId;
-
-class ShmMsg
+class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg>
{
private:
+ static inline SharedMemory &shm() { return GetData(); }
// store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object.
class RefCount : private boost::noncopyable
{
@@ -58,16 +56,6 @@
{
static const Offset base = Addr(shm().get_address()); // cache value.
return base;
- }
- static inline SharedMemory &shm()
- {
- if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); }
- return *pshm();
- }
- static inline SharedMemory *&pshm()
- {
- static SharedMemory *pshm = 0;
- return pshm;
}
static const uint32_t kMsgTag = 0xf1e2d3c4;
@@ -145,13 +133,7 @@
T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); }
public:
- static bool BindShm(SharedMemory &shm)
- {
- assert(!pshm());
- pshm() = &shm;
- return true;
- }
-
+ static bool BindShm(SharedMemory &shm) { return SetData(shm); }
ShmMsg() :
ShmMsg(nullptr) {}
explicit ShmMsg(const size_t size) :
diff --git a/src/sendq.cpp b/src/sendq.cpp
index 54de419..5b57d72 100644
--- a/src/sendq.cpp
+++ b/src/sendq.cpp
@@ -19,7 +19,7 @@
#include "shm_queue.h"
#include <chrono>
-int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr)
+int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr)
{
auto FirstNotExpired = [](Array &l) {
auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; };
@@ -41,7 +41,7 @@
bool r = false;
if (d.index() == 0) {
auto &msg = boost::variant2::get<0>(pos->data().data_);
- r = mq.TrySend(*(MQId *) remote.data(), msg);
+ r = mq.TrySend(remote, msg);
if (r) {
msg.Release();
}
@@ -50,7 +50,7 @@
MsgI msg;
if (msg.Make(content)) {
DEFER1(msg.Release(););
- r = mq.TrySend(*(MQId *) remote.data(), msg);
+ r = mq.TrySend(remote, msg);
}
}
return r;
@@ -65,7 +65,7 @@
return nprocessed;
}
-int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al)
+int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &al)
{
int nsend = 0;
auto AllSent = [&](Array &arr) {
diff --git a/src/sendq.h b/src/sendq.h
index bba44af..0699df7 100644
--- a/src/sendq.h
+++ b/src/sendq.h
@@ -37,7 +37,7 @@
class SendQ
{
public:
- typedef std::string Remote;
+ typedef MQId Remote;
typedef bhome_msg::MsgI MsgI;
typedef std::string Content;
typedef boost::variant2::variant<MsgI, Content> Data;
@@ -50,18 +50,18 @@
typedef TimedMsg::TimePoint TimePoint;
typedef TimedMsg::Duration Duration;
- template <class... Rest>
- void Append(const MQId &id, Rest &&...rest)
- {
- Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
- }
+ // template <class... Rest>
+ // void Append(const MQId &id, Rest &&...rest)
+ // {
+ // Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...);
+ // }
- void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent())
+ void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent())
{
msg.AddRef();
AppendData(addr, Data(msg), DefaultExpire(), onExpire);
}
- void Append(const Remote &addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
+ void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent())
{
AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire);
}
@@ -71,7 +71,7 @@
private:
static TimePoint Now() { return TimedMsg::Clock::now(); }
static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); }
- void AppendData(const Remote &addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
+ void AppendData(const Remote addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire)
{
//TODO simple queue, organize later ?
@@ -88,8 +88,8 @@
typedef std::list<Array> ArrayList;
typedef std::unordered_map<Remote, ArrayList> Store;
- int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr);
- int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr);
+ int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr);
+ int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &arr);
std::mutex mutex_in_;
std::mutex mutex_out_;
diff --git a/src/shm.h b/src/shm.h
index 0e834c3..a70afcb 100644
--- a/src/shm.h
+++ b/src/shm.h
@@ -25,7 +25,6 @@
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/noncopyable.hpp>
-#include <boost/uuid/uuid.hpp>
#include <chrono>
#include <thread>
@@ -103,7 +102,16 @@
~SharedMemory();
std::string name() const { return name_; }
bool Remove() { return Remove(name()); }
-
+ template <class T, class... Params>
+ T *FindOrCreate(const std::string &name, Params &&...params)
+ {
+ return find_or_construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...);
+ }
+ template <class T, class... Params>
+ T *Create(const std::string &name, Params &&...params)
+ {
+ return construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...);
+ }
void *Alloc(const size_t size) { return allocate(size, std::nothrow); }
void Dealloc(void *p)
{
@@ -113,7 +121,7 @@
void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); }
template <class T, class... Params>
- T *New(Params const &...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); }
+ T *New(Params &&...params) { return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...); }
template <class T>
void Delete(T *p)
{
@@ -157,7 +165,7 @@
ShmObject(ShmType &segment, const std::string &name, Params &&...t) :
shm_(segment), name_(name)
{
- pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...);
+ pdata_ = shm_.Create<Data>(ObjName(name_), std::forward<decltype(t)>(t)...);
if (!IsOk()) {
throw("Error: Not enough memory, can not allocate \"" + name_ + "\"");
}
diff --git a/src/shm_queue.cpp b/src/shm_queue.cpp
index 215a8ac..1be8021 100644
--- a/src/shm_queue.cpp
+++ b/src/shm_queue.cpp
@@ -18,20 +18,21 @@
#include "shm_queue.h"
#include "bh_util.h"
-#include <boost/uuid/uuid_generators.hpp>
-#include <boost/uuid/uuid_io.hpp>
namespace bhome_shm
{
using namespace bhome_msg;
using namespace boost::interprocess;
-using namespace boost::uuids;
namespace
{
-std::string MsgQIdToName(const MQId &id) { return "shmq" + to_string(id); }
-// MQId EmptyId() { return nil_uuid(); }
-MQId NewId() { return random_generator()(); }
+std::string MsgQIdToName(const ShmMsgQueue::MQId id)
+{
+ char buf[40] = "mqOx";
+ int n = sprintf(buf + 4, "%lx", id);
+ return std::string(buf, n + 4);
+}
+
const int AdjustMQLength(const int len)
{
const int kMaxLength = 10000;
@@ -47,8 +48,13 @@
} // namespace
+ShmMsgQueue::MQId ShmMsgQueue::NewId()
+{
+ static auto &id = GetData();
+ return ++id;
+}
// ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2
-ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len) :
+ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) :
Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()),
id_(id)
{
@@ -59,7 +65,7 @@
ShmMsgQueue::~ShmMsgQueue() {}
-bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId &id)
+bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id)
{
Queue *q = Find(shm, id);
if (q) {
@@ -71,12 +77,12 @@
return Super::Remove(shm, MsgQIdToName(id));
}
-ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId &remote_id)
+ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id)
{
return Super::Find(shm, MsgQIdToName(remote_id));
}
-bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend)
+bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend)
{
Queue *remote = Find(shm, remote_id);
if (remote) {
diff --git a/src/shm_queue.h b/src/shm_queue.h
index 93d77df..70039b5 100644
--- a/src/shm_queue.h
+++ b/src/shm_queue.h
@@ -21,6 +21,7 @@
#include "msg.h"
#include "shm.h"
+#include <atomic>
#include <boost/circular_buffer.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
@@ -29,8 +30,6 @@
template <class D>
using Circular = boost::circular_buffer<D, Allocator<D>>;
-
-typedef boost::uuids::uuid MQId;
template <class D>
class SharedQueue : private Circular<D>
@@ -137,32 +136,32 @@
using namespace bhome_msg;
-class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>>
+class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>>, public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue>
{
typedef ShmObject<SharedQueue<MsgI>> Super;
typedef Super::Data Queue;
typedef std::function<void()> OnSend;
- MQId id_;
-protected:
- ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use.
public:
- ShmMsgQueue(const MQId &id, ShmType &segment, const int len);
+ typedef uint64_t MQId;
+
+ static MQId NewId();
+
+ ShmMsgQueue(const MQId id, ShmType &segment, const int len);
ShmMsgQueue(ShmType &segment, const int len);
~ShmMsgQueue();
- static bool Remove(SharedMemory &shm, const MQId &id);
- const MQId &Id() const { return id_; }
+ static bool Remove(SharedMemory &shm, const MQId id);
+ MQId Id() const { return id_; }
using Super::shm;
bool Recv(MsgI &msg, const int timeout_ms) { return data()->Read(msg, timeout_ms); }
bool TryRecv(MsgI &msg) { return data()->TryRead(msg); }
template <class OnData>
int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); }
- static Queue *Find(SharedMemory &shm, const MQId &remote_id);
- // static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend());
- static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
+ static Queue *Find(SharedMemory &shm, const MQId remote_id);
+ static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend());
template <class Iter>
- static int TrySendAll(SharedMemory &shm, const MQId &remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
+ static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend())
{
Queue *remote = Find(shm, remote_id);
if (remote) {
@@ -177,14 +176,13 @@
}
}
- // template <class... Rest>
- // bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); }
template <class... Rest>
- bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
+ bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); }
template <class... Rest>
- int TrySendAll(const MQId &remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
+ int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); }
- size_t Pending() const { return data()->size(); }
+private:
+ MQId id_;
};
} // namespace bhome_shm
diff --git a/src/socket.cpp b/src/socket.cpp
index 313c212..e471633 100644
--- a/src/socket.cpp
+++ b/src/socket.cpp
@@ -24,7 +24,7 @@
using namespace bhome_msg;
using namespace bhome_shm;
-ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) :
+ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) :
run_(false), mq_(id, shm, len)
{
Start();
diff --git a/src/socket.h b/src/socket.h
index 1ba10cb..cd6bfee 100644
--- a/src/socket.h
+++ b/src/socket.h
@@ -33,44 +33,37 @@
#include <vector>
using namespace bhome_msg;
-
class ShmSocket : private boost::noncopyable
{
- template <class... T>
- bool SendImpl(const void *valid_remote, T &&...rest)
- {
- send_buffer_.Append(*static_cast<const MQId *>(valid_remote), std::forward<decltype(rest)>(rest)...);
- return true;
- }
protected:
typedef bhome_shm::ShmMsgQueue Queue;
public:
+ typedef ShmMsgQueue::MQId MQId;
typedef bhome_shm::SharedMemory Shm;
typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB;
typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB;
typedef std::function<void(ShmSocket &sock)> IdleCB;
- ShmSocket(Shm &shm, const MQId &id, const int len);
+ ShmSocket(Shm &shm, const MQId id, const int len);
ShmSocket(Shm &shm, const int len = 12);
~ShmSocket();
- static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); }
+ static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); }
bool Remove() { return Remove(shm(), id()); }
- const MQId &id() const { return mq().Id(); }
+ MQId id() const { return mq().Id(); }
// start recv.
bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB());
bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); }
bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); }
bool Stop();
- size_t Pending() const { return mq().Pending(); }
template <class Body>
- bool Send(const void *valid_remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
+ bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB())
{
try {
if (!cb) {
- return SendImpl(valid_remote, MsgI::Serialize(head, body));
+ return SendImpl(remote, MsgI::Serialize(head, body));
} else {
std::string msg_id(head.msg_id());
per_msg_cbs_->Store(msg_id, std::move(cb));
@@ -78,7 +71,7 @@
RecvCB cb_no_use;
per_msg_cbs_->Pick(msg_id, cb_no_use);
};
- return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB);
+ return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB);
}
} catch (...) {
SetLastError(eError, "Send internal error.");
@@ -86,15 +79,15 @@
}
}
- bool Send(const void *valid_remote, const MsgI &imsg)
+ bool Send(const MQId remote, const MsgI &imsg)
{
- return SendImpl(valid_remote, imsg);
+ return SendImpl(remote, imsg);
}
bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms);
template <class Body>
- bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
+ bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms)
{
struct State {
std::mutex mutex;
@@ -144,6 +137,13 @@
bool StopNoLock();
bool RunningNoLock() { return !workers_.empty(); }
+ template <class... Rest>
+ bool SendImpl(const MQId remote, Rest &&...rest)
+ {
+ send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...);
+ return true;
+ }
+
std::vector<std::thread> workers_;
std::mutex mutex_;
std::atomic<bool> run_;
diff --git a/src/topic_node.cpp b/src/topic_node.cpp
index 00db773..9398318 100644
--- a/src/topic_node.cpp
+++ b/src/topic_node.cpp
@@ -25,7 +25,7 @@
namespace
{
-inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); }
+inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); }
struct SrcInfo {
std::vector<BHAddress> route;
@@ -82,7 +82,7 @@
auto &sock = SockNode();
MsgRegister body;
body.mutable_proc()->Swap(&proc);
- auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); };
+ auto AddId = [&](const MQId id) { body.add_addrs()->set_mq_id(id); };
AddId(SockNode().id());
AddId(SockServer().id());
AddId(SockClient().id());
@@ -108,12 +108,12 @@
MsgCommonReply body;
CheckResult(imsg, head, body);
};
- return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
+ return sock.Send(BHTopicCenterAddress(), head, body, onResult);
} else {
MsgI reply;
DEFER1(reply.Release(););
BHMsgHead reply_head;
- bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+ bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
if (r) {
CheckResult(reply, reply_head, reply_body);
}
@@ -144,12 +144,12 @@
MsgCommonReply body;
CheckResult(imsg, head, body);
};
- return sock.Send(&BHTopicCenterAddress(), head, body, onResult);
+ return sock.Send(BHTopicCenterAddress(), head, body, onResult);
} else {
MsgI reply;
DEFER1(reply.Release(););
BHMsgHead reply_head;
- bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+ bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
return r && CheckResult(reply, reply_head, reply_body);
}
}
@@ -169,12 +169,12 @@
AddRoute(head, sock.id());
if (timeout_ms == 0) {
- return sock.Send(&BHTopicCenterAddress(), head, body);
+ return sock.Send(BHTopicCenterAddress(), head, body);
} else {
MsgI reply;
DEFER1(reply.Release(););
BHMsgHead reply_head;
- bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+ bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body);
return (r && IsSuccess(reply_body.errmsg().errcode()));
}
@@ -201,7 +201,7 @@
MsgI reply;
DEFER1(reply.Release());
BHMsgHead reply_head;
- return (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
+ return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) &&
reply_head.type() == kMsgTypeQueryTopicReply &&
reply.ParseBody(reply_body));
}
@@ -221,12 +221,12 @@
AddRoute(head, sock.id());
if (timeout_ms == 0) {
- return sock.Send(&BHTopicCenterAddress(), head, body);
+ return sock.Send(BHTopicCenterAddress(), head, body);
} else {
MsgI reply;
DEFER1(reply.Release(););
BHMsgHead reply_head;
- bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
+ bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms);
r = r && reply_head.type() == kMsgTypeCommonReply;
r = r && reply.ParseBody(reply_body);
return r;
@@ -247,8 +247,8 @@
for (int i = 0; i < head.route_size() - 1; ++i) {
reply_head.add_route()->Swap(head.mutable_route(i));
}
- auto &remote = head.route().rbegin()->mq_id();
- sock.Send(remote.data(), reply_head, reply_body);
+ auto remote = head.route().rbegin()->mq_id();
+ sock.Send(remote, reply_head, reply_body);
}
};
@@ -315,7 +315,7 @@
for (unsigned i = 0; i < p->route.size() - 1; ++i) {
head.add_route()->Swap(&p->route[i]);
}
- return sock.Send(p->route.back().mq_id().data(), head, body);
+ return sock.Send(p->route.back().mq_id(), head, body);
}
bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker)
@@ -361,9 +361,9 @@
}
}
};
- return sock.Send(addr.mq_id().data(), head, req, onRecv);
+ return sock.Send(addr.mq_id(), head, req, onRecv);
} else {
- return sock.Send(addr.mq_id().data(), head, req);
+ return sock.Send(addr.mq_id(), head, req);
}
};
@@ -396,7 +396,7 @@
DEFER1(reply_msg.Release(););
BHMsgHead reply_head;
- if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) &&
+ if (sock.SendAndRecv(addr.mq_id(), head, request, reply_msg, reply_head, timeout_ms) &&
reply_head.type() == kMsgTypeRequestTopicReply &&
reply_msg.ParseBody(out_reply)) {
reply_head.mutable_proc_id()->swap(out_proc_id);
@@ -441,7 +441,7 @@
std::vector<NodeAddress> lst;
if (QueryRPCTopics(topic, lst, timeout_ms)) {
addr = lst.front().addr();
- if (!addr.mq_id().empty()) {
+ if (addr.mq_id() != 0) {
topic_query_cache_.Store(topic, addr);
return true;
}
@@ -464,13 +464,13 @@
AddRoute(head, sock.id());
if (timeout_ms == 0) {
- return sock.Send(&BHTopicBusAddress(), head, pub);
+ return sock.Send(BHTopicBusAddress(), head, pub);
} else {
MsgI reply;
DEFER1(reply.Release(););
BHMsgHead reply_head;
MsgCommonReply reply_body;
- return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
+ return sock.SendAndRecv(BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) &&
reply_head.type() == kMsgTypeCommonReply &&
reply.ParseBody(reply_body) &&
IsSuccess(reply_body.errmsg().errcode());
@@ -497,12 +497,12 @@
BHMsgHead head(InitMsgHead(GetType(sub), proc_id()));
AddRoute(head, sock.id());
if (timeout_ms == 0) {
- return sock.Send(&BHTopicBusAddress(), head, sub);
+ return sock.Send(BHTopicBusAddress(), head, sub);
} else {
MsgI reply;
DEFER1(reply.Release(););
BHMsgHead reply_head;
- return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
+ return sock.SendAndRecv(BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) &&
reply_head.type() == kMsgTypeCommonReply &&
reply.ParseBody(reply_body) &&
IsSuccess(reply_body.errmsg().errcode());
diff --git a/utest/api_test.cpp b/utest/api_test.cpp
index 5d65bd5..dd59b09 100644
--- a/utest/api_test.cpp
+++ b/utest/api_test.cpp
@@ -198,8 +198,8 @@
const std::string mtx_name("test_mutex");
const std::string int_name("test_int");
- auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())();
- auto pi = shm.find_or_construct<int>(int_name.c_str())(100);
+ auto mtx = shm.FindOrCreate<Mutex>(mtx_name);
+ auto pi = shm.FindOrCreate<int>(int_name, 100);
printf("mutetx ");
PrintPtr(mtx);
diff --git a/utest/speed_test.cpp b/utest/speed_test.cpp
index 4615c53..d145ab4 100644
--- a/utest/speed_test.cpp
+++ b/utest/speed_test.cpp
@@ -26,7 +26,7 @@
SharedMemory &shm = TestShm();
MsgI::BindShm(shm);
- MQId id = boost::uuids::random_generator()();
+ MQId id = ShmMsgQueue::NewId();
const int timeout = 1000;
const uint32_t data_size = 4000;
const std::string proc_id = "demo_proc";
@@ -157,8 +157,8 @@
req_body.set_topic("topic");
req_body.set_data(msg_content);
auto req_head(InitMsgHead(GetType(req_body), client_proc_id));
- req_head.add_route()->set_mq_id(&cli.id(), cli.id().size());
- return cli.Send(&srv.id(), req_head, req_body);
+ req_head.add_route()->set_mq_id(cli.id());
+ return cli.Send(srv.id(), req_head, req_body);
};
Req();
@@ -175,15 +175,13 @@
DEFER1(req.Release());
if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) {
- auto &mqid = req_head.route()[0].mq_id();
- MQId src_id;
- memcpy(&src_id, mqid.data(), sizeof(src_id));
+ auto src_id = req_head.route()[0].mq_id();
auto Reply = [&]() {
MsgRequestTopic reply_body;
reply_body.set_topic("topic");
reply_body.set_data(msg_content);
auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id()));
- return srv.Send(&src_id, reply_head, reply_body);
+ return srv.Send(src_id, reply_head, reply_body);
};
Reply();
}
diff --git a/utest/utest.cpp b/utest/utest.cpp
index ff5d2ed..d058471 100644
--- a/utest/utest.cpp
+++ b/utest/utest.cpp
@@ -2,8 +2,6 @@
#include "defs.h"
#include "util.h"
#include <atomic>
-#include <boost/uuid/uuid_generators.hpp>
-#include <boost/uuid/uuid_io.hpp>
#include <condition_variable>
#include <stdio.h>
#include <string>
@@ -96,7 +94,7 @@
auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
- int *flag = shm.find_or_construct<int>("flag")(123);
+ int *flag = shm.FindOrCreate<int>("flag", 123);
printf("flag = %d\n", *flag);
++*flag;
const std::string sub_proc_id = "subscriber";
@@ -207,7 +205,7 @@
auto Avail = [&]() { return shm.get_free_memory(); };
auto init_avail = Avail();
- int *flag = shm.find_or_construct<int>("flag")(123);
+ int *flag = shm.FindOrCreate<int>("flag", 123);
printf("flag = %d\n", *flag);
++*flag;
--
Gitblit v1.8.0