/*
|
* =====================================================================================
|
*
|
* Filename: topic_node.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月07日 09时05分26秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#ifndef TOPIC_NODE_YVKWA6TF
|
#define TOPIC_NODE_YVKWA6TF
|
|
#include "msg.h"
|
#include "socket.h"
|
#include <memory>
|
|
using namespace bhome_shm;
|
using namespace bhome_msg;
|
|
// a node is a client.
|
class TopicNode
|
{
|
SharedMemory &shm_;
|
MsgRegister info_;
|
|
SharedMemory &shm() { return shm_; }
|
|
public:
|
typedef std::function<void(std::string &proc_id, const void *data, const int len)> DataCB;
|
TopicNode(SharedMemory &shm);
|
~TopicNode();
|
|
// topic node
|
bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
|
bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
|
bool Heartbeat(const int timeout_ms);
|
|
// topic rpc server
|
typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerCB;
|
bool ServerStart(ServerCB const &cb, const int nworker = 2);
|
bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
|
bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms);
|
bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply, const int timeout_ms);
|
|
// topic client
|
typedef std::function<void(const std::string &proc_id, const MsgRequestTopicReply &reply)> RequestResultCB;
|
bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
|
bool ClientAsyncRequest(const MsgRequestTopic &request, const int timeout_ms, const RequestResultCB &rrcb = RequestResultCB());
|
bool ClientSyncRequest(const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
|
|
// publish
|
bool Publish(const MsgPublish &pub, const int timeout_ms);
|
|
// subscribe
|
typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB;
|
bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2);
|
bool Subscribe(MsgTopicList &topics, const int timeout_ms);
|
bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
|
|
void Start(ServerCB const &server_cb, SubDataCB const &sub_cb);
|
void Stop();
|
|
private:
|
bool ClientQueryRPCTopic(const Topic &topic, bhome::msg::BHAddress &addr, const int timeout_ms);
|
const std::string &proc_id() { return info_.proc().proc_id(); }
|
|
typedef bhome_msg::BHAddress Address;
|
class TopicQueryCache
|
{
|
class Impl
|
{
|
typedef std::unordered_map<Topic, Address> Store;
|
Store store_;
|
|
public:
|
bool Find(const Topic &topic, Address &addr)
|
{
|
auto pos = store_.find(topic);
|
if (pos != store_.end()) {
|
addr = pos->second;
|
return true;
|
} else {
|
return false;
|
}
|
}
|
bool Update(const Topic &topic, const Address &addr)
|
{
|
store_[topic] = addr;
|
return true;
|
}
|
};
|
Synced<Impl> impl_;
|
// Impl &impl()
|
// {
|
// thread_local Impl impl;
|
// return impl;
|
// }
|
|
public:
|
bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
|
bool Update(const Topic &topic, const Address &addr) { return impl_->Update(topic, addr); }
|
};
|
|
// some sockets may be the same one, using functions make it easy to change.
|
|
auto &SockNode() { return sock_node_; }
|
auto &SockPub() { return SockNode(); }
|
auto &SockSub() { return sock_sub_; }
|
auto &SockRequest() { return sock_request_; }
|
auto &SockClient() { return SockRequest(); }
|
auto &SockReply() { return sock_reply_; }
|
auto &SockServer() { return SockReply(); }
|
|
ShmSocket sock_node_;
|
ShmSocket sock_request_;
|
ShmSocket sock_reply_;
|
ShmSocket sock_sub_;
|
|
TopicQueryCache topic_query_cache_;
|
};
|
|
#endif // end of include guard: TOPIC_NODE_YVKWA6TF
|