/*
|
* =====================================================================================
|
*
|
* Filename: socket.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年03月30日 15时49分19秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (),
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
|
#ifndef SOCKET_GWTJHBPO
|
#define SOCKET_GWTJHBPO
|
|
#include "shm_queue.h"
|
#include <atomic>
|
#include <boost/noncopyable.hpp>
|
#include <functional>
|
#include <memory>
|
#include <mutex>
|
#include <thread>
|
#include <unordered_map>
|
#include <vector>
|
|
class ShmSocket : private boost::noncopyable
|
{
|
typedef bhome_shm::ShmMsgQueue Queue;
|
|
public:
|
enum Type {
|
eSockRequest,
|
eSockReply,
|
eSockSubscribe,
|
eSockPublish,
|
eSockBus,
|
};
|
typedef std::function<void(bhome_msg::BHMsg &msg)> RecvCB;
|
typedef std::function<void(bhome_msg::MsgI &imsg)> RecvRawCB;
|
typedef std::function<void(const void *data, const size_t size)> RequestResultCB;
|
|
ShmSocket(Type type, bhome_shm::SharedMemory &shm);
|
ShmSocket(Type type);
|
~ShmSocket();
|
bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, const RequestResultCB &rrcb);
|
bool RequestRPC(const std::string &topic, const void *data, const size_t size, const int timeout_ms, std::string &out);
|
|
// bool HandleRequest(onData);
|
bool ReadRequest(); // exclude with HandleRequest
|
bool SendReply(); // exclude with HandleRequest
|
|
bool Publish(const std::string &topic, const void *data, const size_t size, const int timeout_ms);
|
bool Subscribe(const std::vector<std::string> &topics, const int timeout_ms);
|
bool RecvSub(std::string &topic, std::string &data, const int timeout_ms);
|
|
// start recv.
|
bool Start(const RecvCB &onData, int nworker = 1);
|
bool StartRaw(const RecvRawCB &onData, int nworker = 1);
|
bool StartAsync(int nworker = 2);
|
bool Stop();
|
size_t Pending() const { return mq_ ? mq_->Pending() : 0; }
|
|
private:
|
bool AsyncRequest(const void *remote, const void *msg, const int timeout_ms, const RecvCB &cb);
|
bool SyncRequest(const void *remote, const void *msg, void *result, const int timeout_ms);
|
bool QueryRPCTopic(const std::string &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
|
bool StopNoLock();
|
bhome_shm::SharedMemory &shm_;
|
const Type type_;
|
std::vector<std::thread> workers_;
|
std::mutex mutex_;
|
std::atomic<bool> run_;
|
|
std::unique_ptr<Queue> mq_;
|
std::unordered_map<std::string, RecvCB> async_cbs_;
|
};
|
|
#endif // end of include guard: SOCKET_GWTJHBPO
|