/*
|
* =====================================================================================
|
*
|
* Filename: topic_node.h
|
*
|
* Description:
|
*
|
* Version: 1.0
|
* Created: 2021年04月07日 09时05分26秒
|
* Revision: none
|
* Compiler: gcc
|
*
|
* Author: Li Chao (), lichao@aiotlink.com
|
* Organization:
|
*
|
* =====================================================================================
|
*/
|
#ifndef TOPIC_NODE_YVKWA6TF
|
#define TOPIC_NODE_YVKWA6TF
|
|
#include "msg.h"
|
#include "shm_socket.h"
|
#include <atomic>
|
#include <memory>
|
#include <mutex>
|
#include <vector>
|
|
using namespace bhome_shm;
|
using namespace bhome_msg;
|
|
// a node is a client.
|
class TopicNode
|
{
|
SharedMemory &shm_;
|
ProcInfo info_;
|
|
SharedMemory &shm() const { return shm_; }
|
const MQInfo &CenterAddr() const { return BHTopicCenterAddress(shm()); }
|
const MQInfo &BusAddr() const { return BHTopicBusAddress(shm()); }
|
|
public:
|
TopicNode(SharedMemory &shm, MQId ssn_id = 0);
|
~TopicNode();
|
|
// topic node
|
bool Register(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms) { return DoRegister(false, proc, reply_body, timeout_ms); }
|
bool DoRegister(const bool internal, ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
|
bool Unregister(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
|
bool Heartbeat(ProcInfo &proc, MsgCommonReply &reply_body, const int timeout_ms);
|
bool Heartbeat(const int timeout_ms);
|
bool QueryTopicAddress(BHAddress &dest, MsgQueryTopic &query, MsgQueryTopicReply &reply_body, const int timeout_ms);
|
bool QueryProcs(BHAddress &dest, MsgQueryProc &query, MsgQueryProcReply &reply_body, const int timeout_ms);
|
|
// topic rpc server
|
typedef std::function<bool(const std::string &client_proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply)> ServerSyncCB;
|
typedef std::function<void(void *src_info, std::string &client_proc_id, MsgRequestTopic &request)> ServerAsyncCB;
|
bool ServerStart(ServerSyncCB const &cb, const int nworker = 2);
|
bool ServerStart(ServerAsyncCB const &cb, const int nworker = 2);
|
bool ServerRegisterRPC(MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms) { return DoServerRegisterRPC(false, topics, reply, timeout_ms); }
|
bool DoServerRegisterRPC(const bool internal, MsgTopicList &topics, MsgCommonReply &reply, const int timeout_ms);
|
bool ServerRecvRequest(void *&src_info, std::string &proc_id, MsgRequestTopic &request, const int timeout_ms);
|
bool ServerSendReply(void *src_info, const MsgRequestTopicReply &reply);
|
|
// topic client
|
typedef std::function<void(const BHMsgHead &head, const MsgRequestTopicReply &reply)> RequestResultCB;
|
bool ClientStartWorker(RequestResultCB const &cb, const int nworker = 2);
|
bool ClientAsyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &msg_id, const RequestResultCB &rrcb = RequestResultCB());
|
bool ClientSyncRequest(const BHAddress &remote_addr, const MsgRequestTopic &request, std::string &proc_id, MsgRequestTopicReply &reply, const int timeout_ms);
|
|
// publish
|
bool Publish(const MsgPublish &pub, const int timeout_ms);
|
|
// subscribe
|
typedef std::function<void(const std::string &proc_id, const MsgPublish &data)> SubDataCB;
|
bool SubscribeStartWorker(const SubDataCB &tdcb, int nworker = 2);
|
bool Subscribe(MsgTopicList &topics, MsgCommonReply &reply_body, const int timeout_ms);
|
bool RecvSub(std::string &proc_id, MsgPublish &pub, const int timeout_ms);
|
|
void Start(ServerAsyncCB const &server_cb, SubDataCB const &sub_cb, RequestResultCB &client_cb, int nworker = 2);
|
void Stop();
|
|
private:
|
MQId ssn() { return SockNode().id(); }
|
bool ClientQueryRPCTopic(const Topic &topic, BHAddress &addr, const int timeout_ms);
|
typedef MsgQueryTopicReply::BHNodeAddress NodeAddress;
|
int QueryTopicServers(const Topic &topic, std::vector<NodeAddress> &addr, const int timeout_ms);
|
const std::string &proc_id() { return info_.proc_id(); }
|
|
typedef BHAddress Address;
|
class TopicQueryCache
|
{
|
class Impl
|
{
|
struct TimedRec {
|
Address addr_;
|
int64_t timestamp_;
|
};
|
typedef std::unordered_map<Topic, TimedRec> Records;
|
Records records_;
|
|
public:
|
bool Find(const Topic &topic, Address &addr)
|
{
|
auto pos = records_.find(topic);
|
if (pos != records_.end()) {
|
if (NowSec() - pos->second.timestamp_ < NodeTimeoutSec() / 2) {
|
addr = pos->second.addr_;
|
return true;
|
} else {
|
LOG_TRACE() << "topic dest cache timeout.";
|
}
|
}
|
return false;
|
}
|
bool Store(const Topic &topic, const Address &addr)
|
{
|
records_[topic] = {addr, NowSec()};
|
return true;
|
}
|
};
|
Synced<Impl> impl_;
|
|
public:
|
bool Find(const Topic &topic, Address &addr) { return impl_->Find(topic, addr); }
|
bool Store(const Topic &topic, const Address &addr) { return impl_->Store(topic, addr); }
|
};
|
|
// some sockets may be the same one, using functions make it easy to change.
|
enum { eSockStart,
|
eSockNode = eSockStart,
|
eSockPub = eSockNode,
|
eSockServer,
|
eSockClient,
|
eSockSub,
|
eSockEnd,
|
};
|
std::vector<std::shared_ptr<ShmSocket>> sockets_;
|
|
ShmSocket &SockNode() { return *sockets_[eSockNode]; }
|
ShmSocket &SockPub() { return *sockets_[eSockPub]; }
|
ShmSocket &SockSub() { return *sockets_[eSockSub]; }
|
ShmSocket &SockClient() { return *sockets_[eSockClient]; }
|
ShmSocket &SockServer() { return *sockets_[eSockServer]; }
|
|
void SetProcIndex(int index)
|
{
|
proc_index_ = index;
|
for (int i = eSockStart; i < eSockEnd; ++i) {
|
sockets_[i]->SetNodeProc(index, i);
|
}
|
}
|
|
enum State {
|
eStateUninited,
|
eStateUnregistered,
|
eStateOnline,
|
eStateOffline // heartbeat fail.
|
};
|
void state(const State st) { state_.store(st); }
|
void state_cas(State expected, const State val) { state_.compare_exchange_strong(expected, val); }
|
State state() const { return state_.load(); }
|
bool IsOnline() { return state() == eStateOnline; }
|
bool Init();
|
bool Valid() const { return state() != eStateUninited; }
|
std::mutex mutex_;
|
MQId ssn_id_ = 0;
|
std::atomic<State> state_;
|
int proc_index_ = -1;
|
|
TopicQueryCache topic_query_cache_;
|
|
class RecvQ
|
{
|
public:
|
void Write(BHMsgHead &&head, std::string &&body) { q_.push_back({std::move(head), std::move(body)}); }
|
bool Read(BHMsgHead &head, std::string &body)
|
{
|
if (q_.empty()) {
|
return false;
|
} else {
|
head = std::move(q_.front().head);
|
body = std::move(q_.front().body);
|
q_.pop_front();
|
return true;
|
}
|
}
|
|
private:
|
struct MsgData {
|
BHMsgHead head;
|
std::string body;
|
};
|
std::deque<MsgData> q_;
|
};
|
Synced<RecvQ> server_buffer_;
|
Synced<RecvQ> sub_buffer_;
|
};
|
|
#endif // end of include guard: TOPIC_NODE_YVKWA6TF
|