lichao
2021-04-01 d26327b3cde043a9470dcd7fea8e704ea517fdae
src/reqrep.h
@@ -19,9 +19,12 @@
#define REQREP_ACEH09NK
#include "defs.h"
#include "msg.h"
#include "socket.h"
#include <functional>
#include <unordered_map>
using bhome::msg::ProcInfo;
class SocketRequest : private ShmSocket
{
@@ -32,19 +35,21 @@
       Socket(shm, 64) { StartWorker(); }
   SocketRequest() :
       SocketRequest(BHomeShm()) {}
   ~SocketRequest() { Stop(); }
   typedef std::function<void(const std::string &data)> RequestResultCB;
   bool StartWorker(const RequestResultCB &rrcb, int nworker = 2);
   bool StartWorker(int nworker = 2) { return StartWorker(RequestResultCB(), nworker); }
   bool Stop() { return Socket::Stop(); }
   bool AsyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
   bool AsyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, const RequestResultCB &rrcb)
   {
      return AsyncRequest(topic, data.data(), data.size(), timeout_ms, rrcb);
   }
   bool SyncRequest(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
   bool SyncRequest(const std::string &topic, const std::string &data, const int timeout_ms, std::string &out)
   bool SyncRequest(const std::string &topic, const void *data, const size_t size, std::string &out, const int timeout_ms);
   bool SyncRequest(const std::string &topic, const std::string &data, std::string &out, const int timeout_ms)
   {
      return SyncRequest(topic, data.data(), data.size(), timeout_ms, out);
      return SyncRequest(topic, data.data(), data.size(), out, timeout_ms);
   }
private:
@@ -52,6 +57,30 @@
   bool SyncSendAndRecv(const void *remote, const void *msg, void *result, const int timeout_ms);
   bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
   std::unordered_map<std::string, RecvCB> async_cbs_;
   std::pair<std::string, bhome::msg::BHAddress> tmp_cache_;
};
class SocketReply : private ShmSocket
{
   typedef ShmSocket Socket;
public:
   SocketReply(Socket::Shm &shm) :
       Socket(shm, 64) {}
   SocketReply() :
       SocketReply(BHomeShm()) {}
   ~SocketReply() { Stop(); }
   typedef std::function<bool(const std::string &topic, const std::string &data, std::string &reply)> OnRequest;
   bool StartWorker(const OnRequest &rcb, int nworker = 2);
   bool Stop() { return Socket::Stop(); }
   bool RecvRequest(void *&src_info, std::string &topic, std::string &data, const int timeout_ms);
   bool SendReply(void *src_info, const std::string &data, const int timeout_ms);
   bool Register(const ProcInfo &proc_info, const std::vector<std::string> &topics, const int timeout_ms);
   bool Heartbeat(const ProcInfo &proc_info, const int timeout_ms);
private:
};
#endif // end of include guard: REQREP_ACEH09NK