/* * ===================================================================================== * * Filename: socket.cpp * * Description: * * Version: 1.0 * Created: 2021年03月30日 15时48分58秒 * Revision: none * Compiler: gcc * * Author: Li Chao (), * Organization: * * ===================================================================================== */ #include "socket.h" #include "bh_util.h" #include "defs.h" #include "msg.h" #include #include using namespace bhome_msg; using namespace bhome_shm; namespace { } // namespace //TODO maybe change to base class, each type is a sub class. ShmSocket::ShmSocket(Type type, bhome_shm::SharedMemory &shm) : shm_(shm), type_(type), run_(false) { switch (type) { case eSockBus: mq_.reset(new Queue(kBHBusQueueId, shm_, 1000)); break; case eSockRequest: mq_.reset(new Queue(shm_, 12)); break; case eSockReply: mq_.reset(new Queue(shm_, 64)); break; case eSockSubscribe: mq_.reset(new Queue(shm_, 64)); break; case eSockPublish: break; // no recv mq needed default: break; } } ShmSocket::ShmSocket(Type type) : ShmSocket(type, BHomeShm()) {} ShmSocket::~ShmSocket() { Stop(); } bool ShmSocket::Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms) { if (type_ != eSockPublish) { return false; } assert(!mq_); try { MsgI imsg; if (!imsg.MakeRC(shm_, MakePub(topic, data, size))) { return false; } DEFER1(imsg.Release(shm_)); return Queue::Send(shm_, kBHBusQueueId, imsg, timeout_ms); } catch (...) { return false; } } bool ShmSocket::Subscribe(const std::vector &topics, const int timeout_ms) { if (type_ != eSockSubscribe) { return false; } assert(mq_); try { return mq_->Send(kBHBusQueueId, MakeSub(mq_->Id(), topics), timeout_ms); } catch (...) { return false; } } bool ShmSocket::StartRaw(const RecvRawCB &onData, int nworker) { auto CanRecv = [this]() { switch (type_) { case eSockRequest: case eSockReply: case eSockBus: case eSockSubscribe: return true; default: return false; } }; if (!CanRecv()) { return false; } std::lock_guard lock(mutex_); StopNoLock(); auto RecvProc = [this, onData]() { while (run_) { try { MsgI imsg; DEFER1(imsg.Release(shm_)); if (mq_->Recv(imsg, 100)) { onData(imsg); } } catch (...) { } } }; run_.store(true); for (int i = 0; i < nworker; ++i) { workers_.emplace_back(RecvProc); } return true; } bool ShmSocket::Start(const RecvCB &onData, int nworker) { return StartRaw([this, onData](MsgI &imsg) { BHMsg m; if (imsg.Unpack(m)) { onData(m); } }, nworker); } bool ShmSocket::StartAsync(int nworker) { auto AsyncRecvProc = [this](BHMsg &msg) { auto Find = [&](RecvCB &cb) { std::lock_guard lock(mutex_); const std::string &msgid = msg.msg_id(); auto pos = async_cbs_.find(msgid); if (pos != async_cbs_.end()) { cb.swap(pos->second); async_cbs_.erase(pos); return true; } else { return false; } }; RecvCB cb; if (Find(cb) && cb) { cb(msg); } }; return Start(AsyncRecvProc, nworker); } bool ShmSocket::Stop() { std::lock_guard lock(mutex_); return StopNoLock(); } bool ShmSocket::StopNoLock() { if (run_.exchange(false)) { for (auto &w : workers_) { if (w.joinable()) { w.join(); } } return true; } return false; } bool ShmSocket::AsyncRequest(const void *remote, const void *pmsg, const int timeout_ms, const RecvCB &cb) { if (type_ != eSockRequest) { return false; } assert(remote && pmsg && !mq_); try { const BHMsg &msg = *static_cast(pmsg); auto RegisterCB = [&]() { std::lock_guard lock(mutex_); async_cbs_.emplace(msg.msg_id(), cb); }; return mq_->Send(*static_cast(remote), msg, timeout_ms, RegisterCB); } catch (...) { return false; } } bool ShmSocket::SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms) { struct State { std::mutex mutex; std::condition_variable cv; bool canceled = false; }; try { std::shared_ptr st(new State); auto OnRecv = [=](BHMsg &msg) { std::unique_lock lk(st->mutex); if (!st->canceled) { static_cast(result)->Swap(&msg); st->cv.notify_one(); } }; std::unique_lock lk(st->mutex); auto end = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); if (AsyncRequest(remote, msg, timeout_ms, OnRecv) && st->cv.wait_until(lk, end) == std::cv_status::no_timeout) { return true; } else { st->canceled = true; return false; } } catch (...) { return false; } } bool ShmSocket::QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms) { BHMsg result; const BHMsg &msg = MakeQueryTopic(topic); if (SyncRequest(&kBHTopicRPCId, &msg, &result, timeout_ms)) { if (result.type() == kMsgTypeQueryTopicReply) { DataQueryTopicReply reply; if (reply.ParseFromString(result.body())) { addr = reply.address(); return !addr.mq_id().empty(); } } } return false; } bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &cb) { auto Call = [&](const void *remote) { const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size)); auto onRecv = [cb](BHMsg &msg) { if (msg.type() == kMsgTypeReply) { DataReply reply; if (reply.ParseFromString(msg.body())) { cb(reply.data().data(), reply.data().size()); } } }; return AsyncRequest(remote, &msg, timeout_ms, onRecv); }; try { BHAddress addr; if (QueryRPCTopic(topic, addr, timeout_ms)) { return Call(addr.mq_id().data()); } } catch (...) { return false; } } bool ShmSocket::RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out) { try { BHAddress addr; if (QueryRPCTopic(topic, addr, timeout_ms)) { const BHMsg &msg(MakeRequest(mq_->Id(), topic, data, size)); BHMsg reply; if (SyncRequest(addr.mq_id().data(), &msg, &reply, timeout_ms) && reply.type() == kMsgTypeReply) { DataReply dr; if (dr.ParseFromString(msg.body())) { dr.mutable_data()->swap(out); return true; } } } } catch (...) { return false; } }