add async recv suport; sync by waiting for async.
| | |
| | | kMsgTypePublish = 3; |
| | | kMsgTypeSubscribe = 4; |
| | | kMsgTypeUnsubscribe = 5; |
| | | kMsgTypeQueryTopic = 6; |
| | | kMsgTypeQueryTopicReply = 7; |
| | | } |
| | | |
| | | message DataPub { |
| | |
| | | message DataSub { |
| | | repeated bytes topics = 1; |
| | | } |
| | | |
| | | message DataRequest { |
| | | bytes topic = 1; |
| | | bytes data = 2; |
| | | } |
| | | |
| | | message DataReply { |
| | | bytes data = 1; |
| | | } |
| | | |
| | | message DataQueryTopic { |
| | | bytes topic = 1; |
| | | } |
| | | |
| | | message DataQueryTopicReply { |
| | | BHAddress address = 1; |
| | | } |
| | |
| | | typedef boost::uuids::uuid MQId; |
| | | |
| | | const MQId kBHBusQueueId = boost::uuids::string_generator()("01234567-89ab-cdef-8349-1234567890ff"); |
| | | const MQId kBHTopicRPCId = boost::uuids::string_generator()("12345670-89ab-cdef-8349-1234567890ff"); |
| | | const int kBHCenterPort = 24287; |
| | | const char kTopicSep = '.'; |
| | | namespace bhome_shm |
| | |
| | | msg.add_route()->set_mq_id(&src_id, sizeof(src_id)); |
| | | return msg; |
| | | } |
| | | BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size) |
| | | { |
| | | DataRequest req; |
| | | req.set_topic(topic); |
| | | req.set_data(data, size); |
| | | const std::string &body(req.SerializeAsString()); |
| | | return MakeRequest(src_id, body.data(), body.size()); |
| | | } |
| | | |
| | | BHMsg MakeReply(const void *data, const size_t size) |
| | | { |
| | | assert(data && size); |
| | | BHMsg msg(InitMsg(kMsgTypeReply)); |
| | | msg.set_body(data, size); |
| | | DataReply reply; |
| | | reply.set_data(data, size); |
| | | msg.set_body(reply.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | |
| | | return msg; |
| | | } |
| | | |
| | | BHMsg MakeQueryTopic(const std::string &topic) |
| | | { |
| | | BHMsg msg(InitMsg(kMsgTypeQueryTopic)); |
| | | DataQueryTopic query; |
| | | query.set_topic(topic); |
| | | msg.set_body(query.SerializeAsString()); |
| | | return msg; |
| | | } |
| | | |
| | | void *Pack(SharedMemory &shm, const BHMsg &msg) |
| | | { |
| | | uint32_t msg_size = msg.ByteSizeLong(); |
| | |
| | | int num_ = 1; |
| | | }; |
| | | |
| | | BHMsg MakeQueryTopic(const std::string &topic); |
| | | BHMsg MakeRequest(const MQId &src_id, const void *data, const size_t size); |
| | | BHMsg MakeRequest(const MQId &src_id, const std::string &topic, const void *data, const size_t size); |
| | | BHMsg MakeReply(const void *data, const size_t size); |
| | | BHMsg MakeSub(const MQId &client, const std::vector<std::string> &topics); |
| | | BHMsg MakeUnsub(const MQId &client, const std::vector<std::string> &topics); |
| | |
| | | Remove(); |
| | | } |
| | | |
| | | bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms) |
| | | bool ShmMsgQueue::Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend) |
| | | { |
| | | Queue *remote = Find(shm, MsgQIdToName(remote_id)); |
| | | return remote && remote->Write(msg, timeout_ms, [](const MsgI &msg) { msg.AddRef(); }); |
| | | return remote && remote->Write(msg, timeout_ms, [&onsend](const MsgI &msg) { onsend(); msg.AddRef(); }); |
| | | } |
| | | |
| | | // bool ShmMsgQueue::Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) |
| | | // { |
| | | // Queue *remote = Find(MsgQIdToName(remote_id)); |
| | | // return remote && remote->Write(msg, timeout_ms, [](const MsgI&msg){msg.AddRef();}); |
| | | // } |
| | | // Test shows that in the 2 cases: |
| | | // 1) build msg first, then find remote queue; |
| | | // 2) find remote queue first, then build msg; |
| | | // 1 is about 50% faster than 2, maybe cache related. |
| | | |
| | | bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms) |
| | | bool ShmMsgQueue::Send(const MQId &remote_id, const BHMsg &data, const int timeout_ms, const std::function<void()> &onsend) |
| | | { |
| | | MsgI msg; |
| | | if (msg.Make(shm(), data)) { |
| | | if (Send(remote_id, msg, timeout_ms)) { |
| | | if (Send(remote_id, msg, timeout_ms, onsend)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(shm()); |
| | |
| | | return false; |
| | | } |
| | | |
| | | /* |
| | | bool ShmMsgQueue::Send(const MQId &remote_id, const void *data, const size_t size, const int timeout_ms) |
| | | { |
| | | // Test shows that in the 2 cases: |
| | | // 1) build msg first, then find remote queue; |
| | | // 2) find remote queue first, then build msg; |
| | | // 1 is about 50% faster than 2, maybe cache related. |
| | | |
| | | MsgI msg; |
| | | if(msg.BuildRequest(shm(), Id(), data, size)) { |
| | | if(Send(remote_id, msg, timeout_ms)) { |
| | | return true; |
| | | } else { |
| | | msg.Release(shm()); |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | //*/ |
| | | bool ShmMsgQueue::Recv(BHMsg &msg, const int timeout_ms) |
| | | { |
| | | MsgI imsg; |
| | |
| | | { |
| | | typedef ShmObject<SharedQueue<MsgI>> Super; |
| | | typedef Super::Data Queue; |
| | | typedef std::function<void()> OnSend; |
| | | bool Write(const MsgI &buf, const int timeout_ms) { return data()->Write(buf, timeout_ms); } |
| | | bool Read(MsgI &buf, const int timeout_ms) { return data()->Read(buf, timeout_ms); } |
| | | MQId id_; |
| | |
| | | |
| | | bool Recv(BHMsg &msg, const int timeout_ms); |
| | | bool Recv(MsgI &msg, const int timeout_ms) { return Read(msg, timeout_ms); } |
| | | bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms); |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms); |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend); |
| | | static bool Send(SharedMemory &shm, const MQId &remote_id, const MsgI &msg, const int timeout_ms) |
| | | { |
| | | return Send(shm, remote_id, msg, timeout_ms, []() {}); |
| | | } |
| | | bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms, OnSend const &onsend); |
| | | bool Send(const MQId &remote_id, const BHMsg &msg, const int timeout_ms) |
| | | { |
| | | return Send(remote_id, msg, timeout_ms, []() {}); |
| | | } |
| | | bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms, OnSend const &onsend) |
| | | { |
| | | return Send(shm(), remote_id, msg, timeout_ms, onsend); |
| | | } |
| | | bool Send(const MQId &remote_id, const MsgI &msg, const int timeout_ms) |
| | | { |
| | | return Send(shm(), remote_id, msg, timeout_ms); |
| | |
| | | #include "bh_util.h" |
| | | #include "defs.h" |
| | | #include "msg.h" |
| | | #include <chrono> |
| | | #include <condition_variable> |
| | | |
| | | using namespace bhome_msg; |
| | | using namespace bhome_shm; |
| | |
| | | { |
| | | |
| | | } // namespace |
| | | |
| | | //TODO maybe change to base class, each type is a sub class. |
| | | |
| | | ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) : |
| | | shm_(shm), type_(type), run_(false) |
| | |
| | | return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker); |
| | | } |
| | | |
| | | bool ShmSocket::StartAsync(int nworker) |
| | | { |
| | | auto AsyncRecvProc = [this](BHMsg &msg) { |
| | | auto Find = [&](RecvCB &cb) { |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | const std::string &msgid = msg.msg_id(); |
| | | auto pos = async_cbs_.find(msgid); |
| | | if (pos != async_cbs_.end()) { |
| | | cb.swap(pos->second); |
| | | async_cbs_.erase(pos); |
| | | return true; |
| | | } else { |
| | | return false; |
| | | } |
| | | }; |
| | | |
| | | RecvCB cb; |
| | | if (Find(cb) && cb) { |
| | | cb(msg); |
| | | } |
| | | }; |
| | | |
| | | return Start(AsyncRecvProc, nworker); |
| | | } |
| | | |
| | | bool ShmSocket::Stop() |
| | | { |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool ShmSocket::AsyncRequest(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) |
| | | { |
| | | if (type_ != eSockRequest) { |
| | | return false; |
| | | } |
| | | assert(remote && pmsg && !mq_); |
| | | try { |
| | | const BHMsg &msg = *static_cast<const BHMsg *>(pmsg); |
| | | auto RegisterCB = [&]() { |
| | | std::lock_guard<std::mutex> lock(mutex_); |
| | | async_cbs_.emplace(msg.msg_id(), cb); |
| | | }; |
| | | |
| | | return mq_->Send(*static_cast<const MQId *>(remote), msg, timeout_ms, RegisterCB); |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool ShmSocket::SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms) |
| | | { |
| | | struct State { |
| | | std::mutex mutex; |
| | | std::condition_variable cv; |
| | | bool canceled = false; |
| | | }; |
| | | |
| | | try { |
| | | std::shared_ptr<State> st(new State); |
| | | auto OnRecv = [=](BHMsg &msg) { |
| | | std::unique_lock<std::mutex> lk(st->mutex); |
| | | if (!st->canceled) { |
| | | static_cast<BHMsg *>(result)->Swap(&msg); |
| | | st->cv.notify_one(); |
| | | } |
| | | }; |
| | | |
| | | std::unique_lock<std::mutex> lk(st->mutex); |
| | | auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); |
| | | if (AsyncRequest(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, end) == std::cv_status::no_timeout) { |
| | | return true; |
| | | } else { |
| | | st->canceled = true; |
| | | return false; |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool ShmSocket::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) |
| | | { |
| | | BHMsg result; |
| | | const BHMsg &msg = MakeQueryTopic(topic); |
| | | if (SyncRequest(&kBHTopicRPCId, &msg, &result, timeout_ms)) { |
| | | if (result.type() == kMsgTypeQueryTopicReply) { |
| | | DataQueryTopicReply reply; |
| | | if (reply.ParseFromString(result.body())) { |
| | | addr = reply.address(); |
| | | return !addr.mq_id().empty(); |
| | | } |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) |
| | | { |
| | | auto Call = [&](const void *remote) { |
| | | const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size)); |
| | | auto onRecv = [cb](BHMsg &msg) { |
| | | if (msg.type() == kMsgTypeReply) { |
| | | DataReply reply; |
| | | if (reply.ParseFromString(msg.body())) { |
| | | cb(reply.data().data(), reply.data().size()); |
| | | } |
| | | } |
| | | }; |
| | | return AsyncRequest(remote, &msg, timeout_ms, onRecv); |
| | | }; |
| | | |
| | | try { |
| | | BHAddress addr; |
| | | if (QueryRPCTopic(topic, addr, timeout_ms)) { |
| | | return Call(addr.mq_id().data()); |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | | |
| | | bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out) |
| | | { |
| | | try { |
| | | BHAddress addr; |
| | | if (QueryRPCTopic(topic, addr, timeout_ms)) { |
| | | const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size)); |
| | | BHMsg reply; |
| | | if (SyncRequest(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) { |
| | | DataReply dr; |
| | | if (dr.ParseFromString(msg.body())) { |
| | | dr.mutable_data()->swap(out); |
| | | return true; |
| | | } |
| | | } |
| | | } |
| | | } catch (...) { |
| | | return false; |
| | | } |
| | | } |
| | |
| | | #include <memory> |
| | | #include <mutex> |
| | | #include <thread> |
| | | #include <unordered_map> |
| | | #include <vector> |
| | | |
| | | class ShmSocket : private boost::noncopyable |
| | |
| | | }; |
| | | typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB; |
| | | typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB; |
| | | typedef std::function<void(const void *data, const size_t size)> RequestResultCB; |
| | | |
| | | ShmSocket(Type type, bhome_shm::SharedMemory &shm); |
| | | ShmSocket(Type type); |
| | | ~ShmSocket(); |
| | | // bool Request(const std::string &topic, const void *data, const size_t size, onReply); |
| | | bool RequestAndWait() { return false; } // call Request, and wait onReply notify cv |
| | | bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb); |
| | | bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out); |
| | | |
| | | // bool HandleRequest(onData); |
| | | bool ReadRequest(); // exclude with HandleRequest |
| | |
| | | // start recv. |
| | | bool Start(const RecvCB &onData, int nworker = 1); |
| | | bool StartRaw(const RecvRawCB &onData, int nworker = 1); |
| | | bool StartAsync(int nworker = 2); |
| | | bool Stop(); |
| | | size_t Pending() const { return mq_ ? mq_->Pending() : 0; } |
| | | |
| | | private: |
| | | bool AsyncRequest(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb); |
| | | bool SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms); |
| | | bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms); |
| | | bool StopNoLock(); |
| | | bhome_shm::SharedMemory &shm_; |
| | | const Type type_; |
| | |
| | | std::atomic<bool> run_; |
| | | |
| | | std::unique_ptr<Queue> mq_; |
| | | std::unordered_map<std::string, RecvCB> async_cbs_; |
| | | }; |
| | | |
| | | #endif // end of include guard: SOCKET_GWTJHBPO |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: topic_rpc.cpp |
| | | * |
| | | * Description: topic request/reply manager |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年03月31日 16时29分31秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: YOUR NAME (), |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #include "topic_rpc.h" |
| | | |
| | | |
| | | |
New file |
| | |
| | | /* |
| | | * ===================================================================================== |
| | | * |
| | | * Filename: topic_rpc.h |
| | | * |
| | | * Description: |
| | | * |
| | | * Version: 1.0 |
| | | * Created: 2021年03月31日 16时30分10秒 |
| | | * Revision: none |
| | | * Compiler: gcc |
| | | * |
| | | * Author: YOUR NAME (), |
| | | * Organization: |
| | | * |
| | | * ===================================================================================== |
| | | */ |
| | | #ifndef TOPIC_RPC_JU1AYN5L |
| | | #define TOPIC_RPC_JU1AYN5L |
| | | |
| | | #include "socket.h" |
| | | |
| | | // request/reply topic manager |
| | | class RPCManager |
| | | { |
| | | ShmSocket socket_; |
| | | |
| | | public: |
| | | }; |
| | | |
| | | #endif // end of include guard: TOPIC_RPC_JU1AYN5L |