/*
|
* =====================================================================================
|
*
|
* Filename: reqrep.h
|
*
|
* Description: topic request/reply sockets
|
*
|
* Version: 1.0
|
* Created: 2021年04月01日 09时36分06秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#ifndef REQREP_ACEH09NK
|
#define REQREP_ACEH09NK
|
|
#include "bh_util.h"
|
#include "defs.h"
|
#include "msg.h"
|
#include "socket.h"
|
#include <functional>
|
#include <unordered_map>
|
|
using bhome::msg::ProcInfo;
|
|
class SocketRequest : private ShmSocket
|
{
|
typedef ShmSocket Socket;
|
|
public:
|
SocketRequest(Socket::Shm &shm) :
|
Socket(shm, 64) { StartWorker(); }
|
SocketRequest() :
|
SocketRequest(BHomeShm()) {}
|
~SocketRequest() { Stop(); }
|
|
typedef std::function<void(const std::string &data)> RequestResultCB;
|
bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
|
bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
|
bool Stop() { return Socket::Stop(); }
|
bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
|
bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
|
{
|
return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
|
}
|
bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
|
bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms)
|
{
|
return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
|
}
|
|
private:
|
bool AsyncSend(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
|
bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
|
bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
|
std::unordered_map<std::string, RecvCB> async_cbs_;
|
|
typedef bhome_msg::BHAddress Address;
|
class TopicCache
|
{
|
class Impl
|
{
|
typedef std::unordered_map<std::string, Address> Store;
|
Store store_;
|
|
public:
|
bool Find(const std::string &topic, Address &addr)
|
{
|
auto pos = store_.find(topic);
|
if (pos != store_.end()) {
|
addr = pos->second;
|
return true;
|
} else {
|
return false;
|
}
|
}
|
bool Update(const std::string &topic, const Address &addr)
|
{
|
store_[topic] = addr;
|
return true;
|
}
|
};
|
Synced<Impl> impl_;
|
// Impl &impl()
|
// {
|
// thread_local Impl impl;
|
// return impl;
|
// }
|
|
public:
|
bool Find(const std::string &topic, Address &addr) { return impl_->Find(topic, addr); }
|
bool Update(const std::string &topic, const Address &addr) { return impl_->Update(topic, addr); }
|
};
|
TopicCache topic_cache_;
|
};
|
|
class SocketReply : private ShmSocket
|
{
|
typedef ShmSocket Socket;
|
|
public:
|
SocketReply(Socket::Shm &shm) :
|
Socket(shm, 64) {}
|
SocketReply() :
|
SocketReply(BHomeShm()) {}
|
~SocketReply() { Stop(); }
|
|
typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
|
bool StartWorker(const OnRequest &rcb, int nworker = 2);
|
bool Stop() { return Socket::Stop(); }
|
bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
|
bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
|
bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
|
bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
|
|
private:
|
};
|
|
#endif // end of include guard: REQREP_ACEH09NK
|