change mqid from uuid to uint64.
| | |
| | | { |
| | | public: |
| | | typedef std::string ProcId; |
| | | typedef std::string Address; |
| | | typedef MQId Address; |
| | | typedef bhome_msg::ProcInfo ProcInfo; |
| | | typedef std::function<void(Address const &)> Cleaner; |
| | | typedef std::function<void(Address const)> Cleaner; |
| | | |
| | | private: |
| | | enum { |
| | |
| | | WeakNode weak_node_; |
| | | bool operator<(const TopicDest &a) const { return mq_ < a.mq_; } |
| | | }; |
| | | inline const std::string &SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } |
| | | inline MQId SrcAddr(const BHMsgHead &head) { return head.route(0).mq_id(); } |
| | | inline bool MatchAddr(std::set<Address> const &addrs, const Address &addr) { return addrs.find(addr) != addrs.end(); } |
| | | |
| | | NodeCenter(const std::string &id, const Cleaner &cleaner, const int64_t offline_time, const int64_t kill_time) : |
| | |
| | | { |
| | | return HandleMsg( |
| | | head, [&](Node node) -> MsgCommonReply { |
| | | auto &src = SrcAddr(head); |
| | | auto src = SrcAddr(head); |
| | | auto &topics = msg.topics().topic_list(); |
| | | node->services_[src].insert(topics.begin(), topics.end()); |
| | | TopicDest dest = {src, node}; |
| | |
| | | MsgCommonReply Subscribe(const BHMsgHead &head, const MsgSubscribe &msg) |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | auto &src = SrcAddr(head); |
| | | auto src = SrcAddr(head); |
| | | auto &topics = msg.topics().topic_list(); |
| | | node->subscriptions_[src].insert(topics.begin(), topics.end()); |
| | | TopicDest dest = {src, node}; |
| | |
| | | MsgCommonReply Unsubscribe(const BHMsgHead &head, const MsgUnsubscribe &msg) |
| | | { |
| | | return HandleMsg(head, [&](Node node) { |
| | | auto &src = SrcAddr(head); |
| | | auto src = SrcAddr(head); |
| | | auto pos = node->subscriptions_.find(src); |
| | | |
| | | auto RemoveSubTopicDestRecord = [this](const Topic &topic, const TopicDest &dest) { |
| | |
| | | auto MakeReplyer = [](ShmSocket &socket, BHMsgHead &head, const std::string &proc_id) { |
| | | return [&](auto &&rep_body) { |
| | | auto reply_head(InitMsgHead(GetType(rep_body), proc_id, head.msg_id())); |
| | | auto &remote = head.route(0).mq_id(); |
| | | socket.Send(remote.data(), reply_head, rep_body); |
| | | auto remote = head.route(0).mq_id(); |
| | | socket.Send(remote, reply_head, rep_body); |
| | | }; |
| | | }; |
| | | |
| | |
| | | if (node) { |
| | | // should also make sure that mq is not killed before msg expires. |
| | | // it would be ok if (kill_time - offline_time) is longer than expire time. |
| | | socket.Send(cli.mq_.data(), msg); |
| | | socket.Send(cli.mq_, msg); |
| | | ++it; |
| | | } else { |
| | | it = clients.erase(it); |
| | |
| | | return rec; |
| | | } |
| | | |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len) |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len) |
| | | { |
| | | Centers()[name] = CenterInfo{name, handler, idle, mqid, mq_len}; |
| | | return true; |
| | | } |
| | | bool BHCenter::Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len) |
| | | { |
| | | return Install(name, handler, idle, std::string((const char *) &mqid, sizeof(mqid)), mq_len); |
| | | } |
| | | |
| | | BHCenter::BHCenter(Socket::Shm &shm) |
| | | { |
| | | auto gc = [&](const std::string &id) { |
| | | auto r = ShmSocket::Remove(shm, *(MQId *) id.data()); |
| | | printf("remove mq : %s\n", r ? "ok" : "failed"); |
| | | auto gc = [&](const MQId id) { |
| | | auto r = ShmSocket::Remove(shm, id); |
| | | printf("remove mq %ld : %s\n", id, (r ? "ok" : "failed")); |
| | | }; |
| | | |
| | | AddCenter("#bhome_center", gc); |
| | | |
| | | for (auto &kv : Centers()) { |
| | | auto &info = kv.second; |
| | | sockets_[info.name_] = std::make_shared<ShmSocket>(shm, *(MQId *) info.mqid_.data(), info.mq_len_); |
| | | sockets_[info.name_] = std::make_shared<ShmSocket>(shm, info.mqid_, info.mq_len_); |
| | | } |
| | | } |
| | | |
| | |
| | | public: |
| | | typedef Socket::PartialRecvCB MsgHandler; |
| | | typedef Socket::IdleCB IdleHandler; |
| | | static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const std::string &mqid, const int mq_len); |
| | | static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId &mqid, const int mq_len); |
| | | static bool Install(const std::string &name, MsgHandler handler, IdleHandler idle, const MQId mqid, const int mq_len); |
| | | |
| | | BHCenter(Socket::Shm &shm); |
| | | ~BHCenter() { Stop(); } |
| | |
| | | std::string name_; |
| | | MsgHandler handler_; |
| | | IdleHandler idle_; |
| | | std::string mqid_; |
| | | MQId mqid_; |
| | | int mq_len_ = 0; |
| | | }; |
| | | typedef std::map<std::string, CenterInfo> CenterRecords; |
| | |
| | | return true; |
| | | } |
| | | |
| | | auto mtx(shm_.find_or_construct<Mutex>((name_ + "_mutex_0").c_str())()); |
| | | auto time_stamp(shm_.find_or_construct<int64_t>((name_ + "_timestamp_0").c_str())(0)); |
| | | auto mtx(shm_.FindOrCreate<Mutex>(name_ + "_mutex_0")); |
| | | auto time_stamp(shm_.FindOrCreate<int64_t>(name_ + "_timestamp_0", 0)); |
| | | |
| | | if (mtx && time_stamp) { |
| | | Guard lock(*mtx); |
| | |
| | | int center_main(int argc, const char *argv[]) |
| | | { |
| | | auto &shm = BHomeShm(); |
| | | MsgI::BindShm(shm); |
| | | GlobalInit(shm); |
| | | |
| | | AppArg args(argc, argv); |
| | | if (args.Has("remove")) { |
| | |
| | | return shm_name; |
| | | } |
| | | }; |
| | | printf("monitoring shm : %s, size : %dM\n", DisplayName().c_str(), shm_size); |
| | | printf("monitoring shm : %s, size : %ldM\n", DisplayName().c_str(), shm_size); |
| | | |
| | | SharedMemory shm(shm_name, 1024 * 1024 * shm_size); |
| | | std::atomic<bool> run(true); |
| | |
| | | package bhome_msg; |
| | | |
| | | message BHAddress { |
| | | bytes mq_id = 1; // mqid, uuid |
| | | bytes ip = 2; // |
| | | uint64 mq_id = 1; |
| | | bytes ip = 2; |
| | | int32 port = 3; |
| | | } |
| | | |
| | |
| | | { |
| | | TopicNode &ProcNode() |
| | | { |
| | | static bool init_bind_msg_shm = MsgI::BindShm(BHomeShm()); |
| | | static bool init = GlobalInit(BHomeShm()); |
| | | static TopicNode node(BHomeShm()); |
| | | return node; |
| | | } |
| | |
| | | } |
| | | }; |
| | | |
| | | template <class T, class Tag> |
| | | class StaticDataRef |
| | | { |
| | | typedef T *Ptr; |
| | | static inline Ptr &ptr() |
| | | { |
| | | static Ptr sp(nullptr); |
| | | return sp; |
| | | } |
| | | |
| | | protected: |
| | | static inline T &GetData() |
| | | { |
| | | if (!ptr()) { throw std::string("Must set ShmMsg shm before use!"); } |
| | | return *ptr(); |
| | | } |
| | | |
| | | public: |
| | | static bool SetData(T &t) |
| | | { |
| | | auto Bind = [&]() { ptr() = &t; return true; }; |
| | | return ptr() ? false : Bind(); |
| | | } |
| | | }; |
| | | |
| | | // macro helper |
| | | #define JOIN_IMPL(a, b) a##b |
| | | #define JOIN(a, b) JOIN_IMPL(a, b) |
| | |
| | | * ===================================================================================== |
| | | */ |
| | | #include "defs.h" |
| | | #include "shm.h" |
| | | #include "msg.h" |
| | | #include "shm_queue.h" |
| | | |
| | | namespace |
| | | { |
| | | |
| | | const MQId kBHTopicBus = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); |
| | | const MQId kBHTopicCenter = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff"); |
| | | const MQId kBHUniCenter = boost::uuids::string_generator()("87654321-89ab-cdef-8349-1234567890ff"); |
| | | |
| | | struct LastError { |
| | | int ec_ = 0; |
| | |
| | | |
| | | } // namespace |
| | | |
| | | const MQId &BHTopicBusAddress() { return kBHTopicBus; } |
| | | const MQId &BHTopicCenterAddress() { return kBHTopicCenter; } |
| | | const MQId &BHUniCenterAddress() { return kBHUniCenter; } |
| | | |
| | | bhome_shm::SharedMemory &BHomeShm() |
| | | { |
| | | static bhome_shm::SharedMemory shm("bhome_default_shm_v0", 1024 * 1024 * 512); |
| | | return shm; |
| | | } |
| | | |
| | | bool GlobalInit(bhome_shm::SharedMemory &shm) |
| | | { |
| | | MsgI::BindShm(shm); |
| | | typedef std::atomic<MQId> IdSrc; |
| | | IdSrc *psrc = shm.FindOrCreate<IdSrc>("shmqIdSrc0", 100000); |
| | | return ShmMsgQueue::SetData(*psrc); |
| | | } |
| | | |
| | | void SetLastError(const int ec, const std::string &msg) |
| | | { |
| | | LastErrorStore().ec_ = ec; |
| | |
| | | #ifndef DEFS_KP8LKGD0 |
| | | #define DEFS_KP8LKGD0 |
| | | |
| | | #include <boost/uuid/uuid.hpp> |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | | #include <string> |
| | | |
| | | typedef boost::uuids::uuid MQId; |
| | | typedef uint64_t MQId; |
| | | |
| | | const MQId &BHTopicBusAddress(); |
| | | const MQId &BHTopicCenterAddress(); |
| | | const MQId &BHUniCenterAddress(); |
| | | const MQId kBHTopicCenter = 100; |
| | | const MQId kBHTopicBus = 101; |
| | | const MQId kBHUniCenter = 102; |
| | | inline const MQId BHTopicCenterAddress() { return kBHTopicCenter; } |
| | | inline const MQId BHTopicBusAddress() { return kBHTopicBus; } |
| | | inline const MQId BHUniCenterAddress() { return kBHUniCenter; } |
| | | |
| | | const int kBHCenterPort = 24287; |
| | | const char kTopicSep = '.'; |
| | |
| | | } // namespace bhome_shm |
| | | |
| | | bhome_shm::SharedMemory &BHomeShm(); |
| | | bool GlobalInit(bhome_shm::SharedMemory &shm); |
| | | typedef std::string Topic; |
| | | void SetLastError(const int ec, const std::string &msg); |
| | | void GetLastError(int &ec, std::string &msg); |
| | |
| | | #include "shm.h" |
| | | #include <atomic> |
| | | #include <boost/interprocess/offset_ptr.hpp> |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | | #include <functional> |
| | | #include <stdint.h> |
| | | |
| | |
| | | // ShmMsg is safe to be stored in shared memory, so POD data or offset_ptr is required. |
| | | // message content layout: (meta) / header_size + header + data_size + data |
| | | |
| | | typedef boost::uuids::uuid MQId; |
| | | |
| | | class ShmMsg |
| | | class ShmMsg : private StaticDataRef<SharedMemory, ShmMsg> |
| | | { |
| | | private: |
| | | static inline SharedMemory &shm() { return GetData(); } |
| | | // store ref count, msgs shareing the same data should also hold a pointer of the same RefCount object. |
| | | class RefCount : private boost::noncopyable |
| | | { |
| | |
| | | { |
| | | static const Offset base = Addr(shm().get_address()); // cache value. |
| | | return base; |
| | | } |
| | | static inline SharedMemory &shm() |
| | | { |
| | | if (!pshm()) { throw std::string("Must set ShmMsg shm before use!"); } |
| | | return *pshm(); |
| | | } |
| | | static inline SharedMemory *&pshm() |
| | | { |
| | | static SharedMemory *pshm = 0; |
| | | return pshm; |
| | | } |
| | | |
| | | static const uint32_t kMsgTag = 0xf1e2d3c4; |
| | |
| | | T *get() const { return static_cast<T *>(Ptr(offset_ + BaseAddr())); } |
| | | |
| | | public: |
| | | static bool BindShm(SharedMemory &shm) |
| | | { |
| | | assert(!pshm()); |
| | | pshm() = &shm; |
| | | return true; |
| | | } |
| | | |
| | | static bool BindShm(SharedMemory &shm) { return SetData(shm); } |
| | | ShmMsg() : |
| | | ShmMsg(nullptr) {} |
| | | explicit ShmMsg(const size_t size) : |
| | |
| | | #include "shm_queue.h" |
| | | #include <chrono> |
| | | |
| | | int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr) |
| | | int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr) |
| | | { |
| | | auto FirstNotExpired = [](Array &l) { |
| | | auto Less = [](const TimedMsg &msg, const TimePoint &tp) { return msg.expire() < tp; }; |
| | |
| | | bool r = false; |
| | | if (d.index() == 0) { |
| | | auto &msg = boost::variant2::get<0>(pos->data().data_); |
| | | r = mq.TrySend(*(MQId *) remote.data(), msg); |
| | | r = mq.TrySend(remote, msg); |
| | | if (r) { |
| | | msg.Release(); |
| | | } |
| | |
| | | MsgI msg; |
| | | if (msg.Make(content)) { |
| | | DEFER1(msg.Release();); |
| | | r = mq.TrySend(*(MQId *) remote.data(), msg); |
| | | r = mq.TrySend(remote, msg); |
| | | } |
| | | } |
| | | return r; |
| | |
| | | return nprocessed; |
| | | } |
| | | |
| | | int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &al) |
| | | int SendQ::DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &al) |
| | | { |
| | | int nsend = 0; |
| | | auto AllSent = [&](Array &arr) { |
| | |
| | | class SendQ |
| | | { |
| | | public: |
| | | typedef std::string Remote; |
| | | typedef MQId Remote; |
| | | typedef bhome_msg::MsgI MsgI; |
| | | typedef std::string Content; |
| | | typedef boost::variant2::variant<MsgI, Content> Data; |
| | |
| | | typedef TimedMsg::TimePoint TimePoint; |
| | | typedef TimedMsg::Duration Duration; |
| | | |
| | | template <class... Rest> |
| | | void Append(const MQId &id, Rest &&...rest) |
| | | { |
| | | Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...); |
| | | } |
| | | // template <class... Rest> |
| | | // void Append(const MQId &id, Rest &&...rest) |
| | | // { |
| | | // Append(std::string((const char *) &id, sizeof(id)), std::forward<decltype(rest)>(rest)...); |
| | | // } |
| | | |
| | | void Append(const Remote &addr, const MsgI &msg, OnMsgEvent onExpire = OnMsgEvent()) |
| | | void Append(const Remote addr, const MsgI msg, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | msg.AddRef(); |
| | | AppendData(addr, Data(msg), DefaultExpire(), onExpire); |
| | | } |
| | | void Append(const Remote &addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent()) |
| | | void Append(const Remote addr, Content &&content, OnMsgEvent onExpire = OnMsgEvent()) |
| | | { |
| | | AppendData(addr, Data(std::move(content)), DefaultExpire(), onExpire); |
| | | } |
| | |
| | | private: |
| | | static TimePoint Now() { return TimedMsg::Clock::now(); } |
| | | static TimePoint DefaultExpire() { return Now() + std::chrono::seconds(60); } |
| | | void AppendData(const Remote &addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire) |
| | | void AppendData(const Remote addr, Data &&data, const TimePoint &expire, OnMsgEvent onExpire) |
| | | { |
| | | //TODO simple queue, organize later ? |
| | | |
| | |
| | | typedef std::list<Array> ArrayList; |
| | | typedef std::unordered_map<Remote, ArrayList> Store; |
| | | |
| | | int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, Array &arr); |
| | | int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote &remote, ArrayList &arr); |
| | | int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, Array &arr); |
| | | int DoSend1Remote(bhome_shm::ShmMsgQueue &mq, const Remote remote, ArrayList &arr); |
| | | |
| | | std::mutex mutex_in_; |
| | | std::mutex mutex_out_; |
| | |
| | | #include <boost/interprocess/sync/interprocess_mutex.hpp> |
| | | #include <boost/interprocess/sync/scoped_lock.hpp> |
| | | #include <boost/noncopyable.hpp> |
| | | #include <boost/uuid/uuid.hpp> |
| | | #include <chrono> |
| | | #include <thread> |
| | | |
| | |
| | | ~SharedMemory(); |
| | | std::string name() const { return name_; } |
| | | bool Remove() { return Remove(name()); } |
| | | |
| | | template <class T, class... Params> |
| | | T *FindOrCreate(const std::string &name, Params &&...params) |
| | | { |
| | | return find_or_construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...); |
| | | } |
| | | template <class T, class... Params> |
| | | T *Create(const std::string &name, Params &&...params) |
| | | { |
| | | return construct<T>(name.c_str(), std::nothrow)(std::forward<decltype(params)>(params)...); |
| | | } |
| | | void *Alloc(const size_t size) { return allocate(size, std::nothrow); } |
| | | void Dealloc(void *p) |
| | | { |
| | |
| | | void Dealloc(offset_ptr<T> ptr) { return Dealloc(ptr.get()); } |
| | | |
| | | template <class T, class... Params> |
| | | T *New(Params const &...params) { return construct<T>(anonymous_instance, std::nothrow)(params...); } |
| | | T *New(Params &&...params) { return construct<T>(anonymous_instance, std::nothrow)(std::forward<decltype(params)>(params)...); } |
| | | template <class T> |
| | | void Delete(T *p) |
| | | { |
| | |
| | | ShmObject(ShmType &segment, const std::string &name, Params &&...t) : |
| | | shm_(segment), name_(name) |
| | | { |
| | | pdata_ = shm_.find_or_construct<Data>(ObjName(name_).c_str(), std::nothrow)(t...); |
| | | pdata_ = shm_.Create<Data>(ObjName(name_), std::forward<decltype(t)>(t)...); |
| | | if (!IsOk()) { |
| | | throw("Error: Not enough memory, can not allocate \"" + name_ + "\""); |
| | | } |
| | |
| | | |
| | | #include "shm_queue.h" |
| | | #include "bh_util.h" |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | | #include <boost/uuid/uuid_io.hpp> |
| | | |
| | | namespace bhome_shm |
| | | { |
| | | using namespace bhome_msg; |
| | | using namespace boost::interprocess; |
| | | using namespace boost::uuids; |
| | | |
| | | namespace |
| | | { |
| | | std::string MsgQIdToName(const MQId &id) { return "shmq" + to_string(id); } |
| | | // MQId EmptyId() { return nil_uuid(); } |
| | | MQId NewId() { return random_generator()(); } |
| | | std::string MsgQIdToName(const ShmMsgQueue::MQId id) |
| | | { |
| | | char buf[40] = "mqOx"; |
| | | int n = sprintf(buf + 4, "%lx", id); |
| | | return std::string(buf, n + 4); |
| | | } |
| | | |
| | | const int AdjustMQLength(const int len) |
| | | { |
| | | const int kMaxLength = 10000; |
| | |
| | | |
| | | } // namespace |
| | | |
| | | ShmMsgQueue::MQId ShmMsgQueue::NewId() |
| | | { |
| | | static auto &id = GetData(); |
| | | return ++id; |
| | | } |
| | | // ShmMsgQueue memory usage: (320 + 16*length) bytes, length >= 2 |
| | | ShmMsgQueue::ShmMsgQueue(const MQId &id, ShmType &segment, const int len) : |
| | | ShmMsgQueue::ShmMsgQueue(const MQId id, ShmType &segment, const int len) : |
| | | Super(segment, MsgQIdToName(id), AdjustMQLength(len), segment.get_segment_manager()), |
| | | id_(id) |
| | | { |
| | |
| | | |
| | | ShmMsgQueue::~ShmMsgQueue() {} |
| | | |
| | | bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId &id) |
| | | bool ShmMsgQueue::Remove(SharedMemory &shm, const MQId id) |
| | | { |
| | | Queue *q = Find(shm, id); |
| | | if (q) { |
| | |
| | | return Super::Remove(shm, MsgQIdToName(id)); |
| | | } |
| | | |
| | | ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId &remote_id) |
| | | ShmMsgQueue::Queue *ShmMsgQueue::Find(SharedMemory &shm, const MQId remote_id) |
| | | { |
| | | return Super::Find(shm, MsgQIdToName(remote_id)); |
| | | } |
| | | |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend) |
| | | bool ShmMsgQueue::TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend) |
| | | { |
| | | Queue *remote = Find(shm, remote_id); |
| | | if (remote) { |
| | |
| | | |
| | | #include "msg.h" |
| | | #include "shm.h" |
| | | #include <atomic> |
| | | #include <boost/circular_buffer.hpp> |
| | | #include <boost/date_time/posix_time/posix_time.hpp> |
| | | |
| | |
| | | |
| | | template <class D> |
| | | using Circular = boost::circular_buffer<D, Allocator<D>>; |
| | | |
| | | typedef boost::uuids::uuid MQId; |
| | | |
| | | template <class D> |
| | | class SharedQueue : private Circular<D> |
| | |
| | | |
| | | using namespace bhome_msg; |
| | | |
| | | class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>> |
| | | class ShmMsgQueue : private ShmObject<SharedQueue<MsgI>>, public StaticDataRef<std::atomic<uint64_t>, ShmMsgQueue> |
| | | { |
| | | typedef ShmObject<SharedQueue<MsgI>> Super; |
| | | typedef Super::Data Queue; |
| | | typedef std::function<void()> OnSend; |
| | | MQId id_; |
| | | |
| | | protected: |
| | | ShmMsgQueue(const std::string &raw_name, ShmType &segment, const int len); // internal use. |
| | | public: |
| | | ShmMsgQueue(const MQId &id, ShmType &segment, const int len); |
| | | typedef uint64_t MQId; |
| | | |
| | | static MQId NewId(); |
| | | |
| | | ShmMsgQueue(const MQId id, ShmType &segment, const int len); |
| | | ShmMsgQueue(ShmType &segment, const int len); |
| | | ~ShmMsgQueue(); |
| | | static bool Remove(SharedMemory &shm, const MQId &id); |
| | | const MQId &Id() const { return id_; } |
| | | static bool Remove(SharedMemory &shm, const MQId id); |
| | | MQId Id() const { return id_; } |
| | | using Super::shm; |
| | | |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return data()->Read(msg, timeout_ms); } |
| | | bool TryRecv(MsgI &msg) { return data()->TryRead(msg); } |
| | | template <class OnData> |
| | | int TryRecvAll(OnData const &onData) { return data()->TryReadAll(onData); } |
| | | static Queue *Find(SharedMemory &shm, const MQId &remote_id); |
| | | // static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend = OnSend()); |
| | | static bool TrySend(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); |
| | | static Queue *Find(SharedMemory &shm, const MQId remote_id); |
| | | static bool TrySend(SharedMemory &shm, const MQId remote_id, const MsgI &msg, OnSend const &onsend = OnSend()); |
| | | template <class Iter> |
| | | static int TrySendAll(SharedMemory &shm, const MQId &remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) |
| | | static int TrySendAll(SharedMemory &shm, const MQId remote_id, const Iter begin, const Iter end, OnSend const &onsend = OnSend()) |
| | | { |
| | | Queue *remote = Find(shm, remote_id); |
| | | if (remote) { |
| | |
| | | } |
| | | } |
| | | |
| | | // template <class... Rest> |
| | | // bool Send(const MQId &remote_id, Rest const &...rest) { return Send(shm(), remote_id, rest...); } |
| | | template <class... Rest> |
| | | bool TrySend(const MQId &remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } |
| | | bool TrySend(const MQId remote_id, Rest const &...rest) { return TrySend(shm(), remote_id, rest...); } |
| | | template <class... Rest> |
| | | int TrySendAll(const MQId &remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } |
| | | int TrySendAll(const MQId remote_id, Rest const &...rest) { return TrySendAll(shm(), remote_id, rest...); } |
| | | |
| | | size_t Pending() const { return data()->size(); } |
| | | private: |
| | | MQId id_; |
| | | }; |
| | | |
| | | } // namespace bhome_shm |
| | |
| | | using namespace bhome_msg; |
| | | using namespace bhome_shm; |
| | | |
| | | ShmSocket::ShmSocket(Shm &shm, const MQId &id, const int len) : |
| | | ShmSocket::ShmSocket(Shm &shm, const MQId id, const int len) : |
| | | run_(false), mq_(id, shm, len) |
| | | { |
| | | Start(); |
| | |
| | | #include <vector> |
| | | |
| | | using namespace bhome_msg; |
| | | |
| | | class ShmSocket : private boost::noncopyable |
| | | { |
| | | template <class... T> |
| | | bool SendImpl(const void *valid_remote, T &&...rest) |
| | | { |
| | | send_buffer_.Append(*static_cast<const MQId *>(valid_remote), std::forward<decltype(rest)>(rest)...); |
| | | return true; |
| | | } |
| | | |
| | | protected: |
| | | typedef bhome_shm::ShmMsgQueue Queue; |
| | | |
| | | public: |
| | | typedef ShmMsgQueue::MQId MQId; |
| | | typedef bhome_shm::SharedMemory Shm; |
| | | typedef std::function<void(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> RecvCB; |
| | | typedef std::function<bool(ShmSocket &sock, MsgI &imsg, BHMsgHead &head)> PartialRecvCB; |
| | | typedef std::function<void(ShmSocket &sock)> IdleCB; |
| | | |
| | | ShmSocket(Shm &shm, const MQId &id, const int len); |
| | | ShmSocket(Shm &shm, const MQId id, const int len); |
| | | ShmSocket(Shm &shm, const int len = 12); |
| | | ~ShmSocket(); |
| | | static bool Remove(SharedMemory &shm, const MQId &id) { return Queue::Remove(shm, id); } |
| | | static bool Remove(SharedMemory &shm, const MQId id) { return Queue::Remove(shm, id); } |
| | | bool Remove() { return Remove(shm(), id()); } |
| | | const MQId &id() const { return mq().Id(); } |
| | | MQId id() const { return mq().Id(); } |
| | | // start recv. |
| | | bool Start(int nworker = 1, const RecvCB &onData = RecvCB(), const IdleCB &onIdle = IdleCB()); |
| | | bool Start(const RecvCB &onData, const IdleCB &onIdle, int nworker = 1) { return Start(nworker, onData, onIdle); } |
| | | bool Start(const RecvCB &onData, int nworker = 1) { return Start(nworker, onData); } |
| | | bool Stop(); |
| | | size_t Pending() const { return mq().Pending(); } |
| | | |
| | | template <class Body> |
| | | bool Send(const void *valid_remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) |
| | | bool Send(const MQId remote, BHMsgHead &head, Body &body, RecvCB &&cb = RecvCB()) |
| | | { |
| | | try { |
| | | if (!cb) { |
| | | return SendImpl(valid_remote, MsgI::Serialize(head, body)); |
| | | return SendImpl(remote, MsgI::Serialize(head, body)); |
| | | } else { |
| | | std::string msg_id(head.msg_id()); |
| | | per_msg_cbs_->Store(msg_id, std::move(cb)); |
| | |
| | | RecvCB cb_no_use; |
| | | per_msg_cbs_->Pick(msg_id, cb_no_use); |
| | | }; |
| | | return SendImpl(valid_remote, MsgI::Serialize(head, body), onExpireRemoveCB); |
| | | return SendImpl(remote, MsgI::Serialize(head, body), onExpireRemoveCB); |
| | | } |
| | | } catch (...) { |
| | | SetLastError(eError, "Send internal error."); |
| | |
| | | } |
| | | } |
| | | |
| | | bool Send(const void *valid_remote, const MsgI &imsg) |
| | | bool Send(const MQId remote, const MsgI &imsg) |
| | | { |
| | | return SendImpl(valid_remote, imsg); |
| | | return SendImpl(remote, imsg); |
| | | } |
| | | |
| | | bool SyncRecv(MsgI &msg, bhome_msg::BHMsgHead &head, const int timeout_ms); |
| | | |
| | | template <class Body> |
| | | bool SendAndRecv(const void *remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) |
| | | bool SendAndRecv(const MQId remote, BHMsgHead &head, Body &body, MsgI &reply, BHMsgHead &reply_head, const int timeout_ms) |
| | | { |
| | | struct State { |
| | | std::mutex mutex; |
| | |
| | | bool StopNoLock(); |
| | | bool RunningNoLock() { return !workers_.empty(); } |
| | | |
| | | template <class... Rest> |
| | | bool SendImpl(const MQId remote, Rest &&...rest) |
| | | { |
| | | send_buffer_.Append(remote, std::forward<decltype(rest)>(rest)...); |
| | | return true; |
| | | } |
| | | |
| | | std::vector<std::thread> workers_; |
| | | std::mutex mutex_; |
| | | std::atomic<bool> run_; |
| | |
| | | |
| | | namespace |
| | | { |
| | | inline void AddRoute(BHMsgHead &head, const MQId &id) { head.add_route()->set_mq_id(&id, sizeof(id)); } |
| | | inline void AddRoute(BHMsgHead &head, const MQId id) { head.add_route()->set_mq_id(id); } |
| | | |
| | | struct SrcInfo { |
| | | std::vector<BHAddress> route; |
| | |
| | | auto &sock = SockNode(); |
| | | MsgRegister body; |
| | | body.mutable_proc()->Swap(&proc); |
| | | auto AddId = [&](const MQId &id) { body.add_addrs()->set_mq_id(&id, sizeof(id)); }; |
| | | auto AddId = [&](const MQId id) { body.add_addrs()->set_mq_id(id); }; |
| | | AddId(SockNode().id()); |
| | | AddId(SockServer().id()); |
| | | AddId(SockClient().id()); |
| | |
| | | MsgCommonReply body; |
| | | CheckResult(imsg, head, body); |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, body, onResult); |
| | | return sock.Send(BHTopicCenterAddress(), head, body, onResult); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | if (r) { |
| | | CheckResult(reply, reply_head, reply_body); |
| | | } |
| | |
| | | MsgCommonReply body; |
| | | CheckResult(imsg, head, body); |
| | | }; |
| | | return sock.Send(&BHTopicCenterAddress(), head, body, onResult); |
| | | return sock.Send(BHTopicCenterAddress(), head, body, onResult); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | return r && CheckResult(reply, reply_head, reply_body); |
| | | } |
| | | } |
| | |
| | | AddRoute(head, sock.id()); |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | return sock.Send(BHTopicCenterAddress(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply && reply.ParseBody(reply_body); |
| | | return (r && IsSuccess(reply_body.errmsg().errcode())); |
| | | } |
| | |
| | | MsgI reply; |
| | | DEFER1(reply.Release()); |
| | | BHMsgHead reply_head; |
| | | return (sock.SendAndRecv(&BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && |
| | | return (sock.SendAndRecv(BHTopicCenterAddress(), head, query, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeQueryTopicReply && |
| | | reply.ParseBody(reply_body)); |
| | | } |
| | |
| | | AddRoute(head, sock.id()); |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicCenterAddress(), head, body); |
| | | return sock.Send(BHTopicCenterAddress(), head, body); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | bool r = sock.SendAndRecv(&BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | bool r = sock.SendAndRecv(BHTopicCenterAddress(), head, body, reply, reply_head, timeout_ms); |
| | | r = r && reply_head.type() == kMsgTypeCommonReply; |
| | | r = r && reply.ParseBody(reply_body); |
| | | return r; |
| | |
| | | for (int i = 0; i < head.route_size() - 1; ++i) { |
| | | reply_head.add_route()->Swap(head.mutable_route(i)); |
| | | } |
| | | auto &remote = head.route().rbegin()->mq_id(); |
| | | sock.Send(remote.data(), reply_head, reply_body); |
| | | auto remote = head.route().rbegin()->mq_id(); |
| | | sock.Send(remote, reply_head, reply_body); |
| | | } |
| | | }; |
| | | |
| | |
| | | for (unsigned i = 0; i < p->route.size() - 1; ++i) { |
| | | head.add_route()->Swap(&p->route[i]); |
| | | } |
| | | return sock.Send(p->route.back().mq_id().data(), head, body); |
| | | return sock.Send(p->route.back().mq_id(), head, body); |
| | | } |
| | | |
| | | bool TopicNode::ClientStartWorker(RequestResultCB const &cb, const int nworker) |
| | |
| | | } |
| | | } |
| | | }; |
| | | return sock.Send(addr.mq_id().data(), head, req, onRecv); |
| | | return sock.Send(addr.mq_id(), head, req, onRecv); |
| | | } else { |
| | | return sock.Send(addr.mq_id().data(), head, req); |
| | | return sock.Send(addr.mq_id(), head, req); |
| | | } |
| | | }; |
| | | |
| | |
| | | DEFER1(reply_msg.Release();); |
| | | BHMsgHead reply_head; |
| | | |
| | | if (sock.SendAndRecv(addr.mq_id().data(), head, request, reply_msg, reply_head, timeout_ms) && |
| | | if (sock.SendAndRecv(addr.mq_id(), head, request, reply_msg, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeRequestTopicReply && |
| | | reply_msg.ParseBody(out_reply)) { |
| | | reply_head.mutable_proc_id()->swap(out_proc_id); |
| | |
| | | std::vector<NodeAddress> lst; |
| | | if (QueryRPCTopics(topic, lst, timeout_ms)) { |
| | | addr = lst.front().addr(); |
| | | if (!addr.mq_id().empty()) { |
| | | if (addr.mq_id() != 0) { |
| | | topic_query_cache_.Store(topic, addr); |
| | | return true; |
| | | } |
| | |
| | | AddRoute(head, sock.id()); |
| | | |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicBusAddress(), head, pub); |
| | | return sock.Send(BHTopicBusAddress(), head, pub); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | MsgCommonReply reply_body; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && |
| | | return sock.SendAndRecv(BHTopicBusAddress(), head, pub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | | IsSuccess(reply_body.errmsg().errcode()); |
| | |
| | | BHMsgHead head(InitMsgHead(GetType(sub), proc_id())); |
| | | AddRoute(head, sock.id()); |
| | | if (timeout_ms == 0) { |
| | | return sock.Send(&BHTopicBusAddress(), head, sub); |
| | | return sock.Send(BHTopicBusAddress(), head, sub); |
| | | } else { |
| | | MsgI reply; |
| | | DEFER1(reply.Release();); |
| | | BHMsgHead reply_head; |
| | | return sock.SendAndRecv(&BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && |
| | | return sock.SendAndRecv(BHTopicBusAddress(), head, sub, reply, reply_head, timeout_ms) && |
| | | reply_head.type() == kMsgTypeCommonReply && |
| | | reply.ParseBody(reply_body) && |
| | | IsSuccess(reply_body.errmsg().errcode()); |
| | |
| | | |
| | | const std::string mtx_name("test_mutex"); |
| | | const std::string int_name("test_int"); |
| | | auto mtx = shm.find_or_construct<Mutex>(mtx_name.c_str())(); |
| | | auto pi = shm.find_or_construct<int>(int_name.c_str())(100); |
| | | auto mtx = shm.FindOrCreate<Mutex>(mtx_name); |
| | | auto pi = shm.FindOrCreate<int>(int_name, 100); |
| | | |
| | | printf("mutetx "); |
| | | PrintPtr(mtx); |
| | |
| | | SharedMemory &shm = TestShm(); |
| | | MsgI::BindShm(shm); |
| | | |
| | | MQId id = boost::uuids::random_generator()(); |
| | | MQId id = ShmMsgQueue::NewId(); |
| | | const int timeout = 1000; |
| | | const uint32_t data_size = 4000; |
| | | const std::string proc_id = "demo_proc"; |
| | |
| | | req_body.set_topic("topic"); |
| | | req_body.set_data(msg_content); |
| | | auto req_head(InitMsgHead(GetType(req_body), client_proc_id)); |
| | | req_head.add_route()->set_mq_id(&cli.id(), cli.id().size()); |
| | | return cli.Send(&srv.id(), req_head, req_body); |
| | | req_head.add_route()->set_mq_id(cli.id()); |
| | | return cli.Send(srv.id(), req_head, req_body); |
| | | }; |
| | | |
| | | Req(); |
| | |
| | | DEFER1(req.Release()); |
| | | |
| | | if (req.ParseHead(req_head) && req_head.type() == kMsgTypeRequestTopic) { |
| | | auto &mqid = req_head.route()[0].mq_id(); |
| | | MQId src_id; |
| | | memcpy(&src_id, mqid.data(), sizeof(src_id)); |
| | | auto src_id = req_head.route()[0].mq_id(); |
| | | auto Reply = [&]() { |
| | | MsgRequestTopic reply_body; |
| | | reply_body.set_topic("topic"); |
| | | reply_body.set_data(msg_content); |
| | | auto reply_head(InitMsgHead(GetType(reply_body), server_proc_id, req_head.msg_id())); |
| | | return srv.Send(&src_id, reply_head, reply_body); |
| | | return srv.Send(src_id, reply_head, reply_body); |
| | | }; |
| | | Reply(); |
| | | } |
| | |
| | | #include "defs.h" |
| | | #include "util.h" |
| | | #include <atomic> |
| | | #include <boost/uuid/uuid_generators.hpp> |
| | | #include <boost/uuid/uuid_io.hpp> |
| | | #include <condition_variable> |
| | | #include <stdio.h> |
| | | #include <string> |
| | |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | | int *flag = shm.find_or_construct<int>("flag")(123); |
| | | int *flag = shm.FindOrCreate<int>("flag", 123); |
| | | printf("flag = %d\n", *flag); |
| | | ++*flag; |
| | | const std::string sub_proc_id = "subscriber"; |
| | |
| | | |
| | | auto Avail = [&]() { return shm.get_free_memory(); }; |
| | | auto init_avail = Avail(); |
| | | int *flag = shm.find_or_construct<int>("flag")(123); |
| | | int *flag = shm.FindOrCreate<int>("flag", 123); |
| | | printf("flag = %d\n", *flag); |
| | | ++*flag; |
| | | |