#include "bh_api.h"
|
#include "defs.h"
|
#include "topic_node.h"
|
#include <chrono>
|
#include <cstdio>
|
#include <memory>
|
|
using namespace std::chrono_literals;
|
using namespace bhome_shm;
|
using namespace bhome_msg;
|
|
namespace
|
{
|
std::string GetProcExe()
|
{
|
auto f = fopen("/proc/self/stat", "rb");
|
if (f) {
|
DEFER1(fclose(f));
|
char buf[100] = {0};
|
int n = fread(buf, 1, sizeof(buf), f);
|
if (n > 0) {
|
std::string s(buf, n);
|
auto start = s.find('(');
|
if (start != std::string::npos) {
|
++start;
|
auto end = s.find(')', start);
|
return s.substr(start, end - start);
|
}
|
}
|
}
|
return std::to_string(getpid());
|
}
|
std::unique_ptr<TopicNode> &ProcNodePtr()
|
{
|
// client side init here.
|
static std::mutex mtx;
|
auto InitLog = []() {
|
ns_log::AddLog(BHLogDir() + "bhshmq_node_" + GetProcExe() + ".log", true);
|
return true;
|
};
|
static bool init_log = InitLog();
|
|
static std::string shm_name;
|
static std::unique_ptr<TopicNode> ptr;
|
|
std::lock_guard<std::mutex> lk(mtx);
|
if (shm_name != BHomeShmName()) {
|
shm_name = BHomeShmName();
|
LOG_INFO() << "using shm " << shm_name;
|
|
ptr.reset();
|
// must reset/stop node before call BHomeShm() which resets shm.
|
|
auto &shm = BHomeShm();
|
for (int i = 0; !ptr && i < 3; ++i) {
|
if (GlobalInit(shm)) {
|
ptr.reset(new TopicNode(shm));
|
} else {
|
std::this_thread::sleep_for(1s); // make sure shm init done.
|
}
|
}
|
}
|
return ptr;
|
}
|
TopicNode &ProcNode()
|
{
|
return *ProcNodePtr();
|
}
|
|
class TmpPtr : private boost::noncopyable
|
{
|
void *ptr_ = 0;
|
size_t size_ = 0;
|
|
public:
|
explicit TmpPtr(const size_t size) :
|
ptr_(malloc(size)), size_(size) {}
|
explicit TmpPtr(const std::string &str) :
|
TmpPtr(str.size())
|
{
|
if (ptr_) {
|
memcpy(ptr_, str.data(), str.size());
|
}
|
}
|
~TmpPtr() { free(ptr_); }
|
void *get() const { return ptr_; }
|
void *release()
|
{
|
void *tmp = ptr_;
|
ptr_ = 0;
|
return tmp;
|
}
|
size_t size() const { return size_; }
|
operator bool() const { return ptr_; }
|
bool ReleaseTo(void **pdata, int *psize)
|
{
|
if (!ptr_) {
|
return false;
|
}
|
if (pdata && psize) {
|
*psize = size();
|
*pdata = release();
|
}
|
return true;
|
}
|
};
|
|
template <class Msg>
|
bool PackOutput(const Msg &msg, void **out, int *out_len)
|
{
|
if (!out || !out_len) {
|
return true; // not wanted.
|
}
|
auto size = msg.ByteSizeLong();
|
TmpPtr p(size);
|
if (!p) {
|
SetLastError(ENOMEM, "not enough memory.");
|
return false;
|
}
|
msg.SerializePartialToArray(p.get(), size);
|
p.ReleaseTo(out, out_len);
|
return true;
|
}
|
|
template <class MsgIn, class MsgOut = MsgCommonReply>
|
bool BHApi_In1_Out1(bool (TopicNode::*mfunc)(MsgIn &, MsgOut &, const int),
|
const void *request, const int request_len,
|
void **reply, int *reply_len,
|
const int timeout_ms)
|
{
|
MsgIn input;
|
if (!input.ParseFromArray(request, request_len)) {
|
SetLastError(eInvalidInput, "invalid input.");
|
return false;
|
}
|
MsgOut msg_reply;
|
auto &ptr = ProcNodePtr();
|
if (!ptr) {
|
SetLastError(eNotFound, "center not started.");
|
return 0;
|
}
|
|
return (ProcNode().*mfunc)(input, msg_reply, timeout_ms) &&
|
PackOutput(msg_reply, reply, reply_len);
|
}
|
|
template <class MsgIn0, class MsgIn1, class MsgOut = MsgCommonReply>
|
bool BHApi_In2_Out1(bool (TopicNode::*mfunc)(MsgIn0 &, MsgIn1 &, MsgOut &, const int),
|
const void *in0, const int in0_len,
|
const void *in1, const int in1_len,
|
void **reply, int *reply_len,
|
const int timeout_ms)
|
{
|
MsgIn0 input0;
|
MsgIn1 input1;
|
if (!input0.ParseFromArray(in0, in0_len) ||
|
!input1.ParseFromArray(in1, in1_len)) {
|
SetLastError(eInvalidInput, "invalid input.");
|
return false;
|
}
|
MsgOut msg_reply;
|
return (ProcNode().*mfunc)(input0, input1, msg_reply, timeout_ms) &&
|
PackOutput(msg_reply, reply, reply_len);
|
}
|
|
} // namespace
|
|
int BHApiIn1Out1Proxy(FBHApiIn1Out1 func,
|
const void *request,
|
const int request_len,
|
void **reply,
|
int *reply_len,
|
const int timeout_ms)
|
{
|
return (*func)(request, request_len, reply, reply_len, timeout_ms);
|
}
|
|
int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
return BHApi_In1_Out1<ProcInfo>(&TopicNode::Register, proc_info, proc_info_len, reply, reply_len, timeout_ms);
|
}
|
int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
return BHApi_In1_Out1<ProcInfo>(&TopicNode::Unregister, proc_info, proc_info_len, reply, reply_len, timeout_ms);
|
}
|
|
int BHHeartbeatEasy(const int timeout_ms)
|
{
|
return ProcNode().Heartbeat(timeout_ms);
|
}
|
|
int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
return BHApi_In1_Out1<ProcInfo>(&TopicNode::Heartbeat, proc_info, proc_info_len, reply, reply_len, timeout_ms);
|
}
|
|
int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
return BHApi_In1_Out1<MsgTopicList>(&TopicNode::ServerRegisterRPC, topics, topics_len, reply, reply_len, timeout_ms);
|
}
|
|
int BHQueryTopicAddress(const void *remote, const int remote_len,
|
const void *topic, const int topic_len,
|
void **reply, int *reply_len,
|
const int timeout_ms)
|
{
|
return BHApi_In2_Out1<BHAddress, MsgQueryTopic, MsgQueryTopicReply>(
|
&TopicNode::QueryTopicAddress,
|
remote, remote_len, topic, topic_len, reply, reply_len, timeout_ms);
|
}
|
|
int BHQueryProcs(const void *remote,
|
const int remote_len,
|
const void *query,
|
const int query_len,
|
void **reply,
|
int *reply_len,
|
const int timeout_ms)
|
{
|
return BHApi_In2_Out1<BHAddress, MsgQueryProc, MsgQueryProcReply>(
|
&TopicNode::QueryProcs,
|
remote, remote_len, query, query_len, reply, reply_len, timeout_ms);
|
}
|
|
int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms)
|
{
|
return BHApi_In1_Out1<MsgTopicList>(&TopicNode::Subscribe, topics, topics_len, reply, reply_len, timeout_ms);
|
}
|
|
int BHPublish(const void *msgpub,
|
const int msgpub_len,
|
const int timeout_ms)
|
{
|
MsgPublish pub;
|
if (!pub.ParseFromArray(msgpub, msgpub_len)) {
|
SetLastError(eInvalidInput, "invalid input.");
|
return false;
|
}
|
return ProcNode().Publish(pub, timeout_ms);
|
}
|
|
int BHReadSub(void **proc_id,
|
int *proc_id_len,
|
void **msgpub,
|
int *msgpub_len,
|
const int timeout_ms)
|
{
|
std::string proc;
|
MsgPublish pub;
|
|
if (ProcNode().RecvSub(proc, pub, timeout_ms)) {
|
TmpPtr pproc(proc);
|
if (pproc && PackOutput(pub, msgpub, msgpub_len)) {
|
pproc.ReleaseTo(proc_id, proc_id_len);
|
return true;
|
} else {
|
SetLastError(ENOMEM, "out of mem");
|
}
|
}
|
return false;
|
}
|
|
int BHAsyncRequest(const void *remote,
|
const int remote_len,
|
const void *request,
|
const int request_len,
|
void **msg_id,
|
int *msg_id_len)
|
{
|
BHAddress dest;
|
MsgRequestTopic req;
|
if (!dest.ParseFromArray(remote, remote_len) ||
|
!req.ParseFromArray(request, request_len)) {
|
SetLastError(eInvalidInput, "invalid input.");
|
return false;
|
}
|
std::string str_msg_id;
|
MsgRequestTopicReply out_msg;
|
if (ProcNode().ClientAsyncRequest(dest, req, str_msg_id)) {
|
if (!msg_id || !msg_id_len) {
|
return true;
|
}
|
TmpPtr ptr(str_msg_id);
|
if (ptr) {
|
ptr.ReleaseTo(msg_id, msg_id_len);
|
return true;
|
} else {
|
SetLastError(ENOMEM, "out of mem");
|
}
|
}
|
return false;
|
}
|
|
int BHRequest(const void *remote,
|
const int remote_len,
|
const void *request,
|
const int request_len,
|
void **proc_id,
|
int *proc_id_len,
|
void **reply,
|
int *reply_len,
|
const int timeout_ms)
|
{
|
BHAddress dest;
|
MsgRequestTopic req;
|
if (!dest.ParseFromArray(remote, remote_len) ||
|
!req.ParseFromArray(request, request_len)) {
|
SetLastError(eInvalidInput, "invalid input.");
|
return false;
|
}
|
std::string proc;
|
MsgRequestTopicReply out_msg;
|
if (ProcNode().ClientSyncRequest(dest, req, proc, out_msg, timeout_ms)) {
|
TmpPtr pproc(proc);
|
if (pproc && PackOutput(out_msg, reply, reply_len)) {
|
pproc.ReleaseTo(proc_id, proc_id_len);
|
return true;
|
} else {
|
SetLastError(ENOMEM, "out of mem");
|
}
|
}
|
return false;
|
}
|
|
int BHReadRequest(void **proc_id,
|
int *proc_id_len,
|
void **request,
|
int *request_len,
|
void **src,
|
const int timeout_ms)
|
{
|
void *src_info = 0;
|
std::string proc;
|
MsgRequestTopic out_msg;
|
if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) {
|
TmpPtr pproc(proc);
|
if (pproc && PackOutput(out_msg, request, request_len)) {
|
pproc.ReleaseTo(proc_id, proc_id_len);
|
*src = src_info;
|
return true;
|
} else {
|
SetLastError(ENOMEM, "out of mem");
|
}
|
}
|
return false;
|
}
|
|
int BHSendReply(void *src,
|
const void *reply,
|
const int reply_len)
|
{
|
MsgRequestTopicReply rep;
|
if (!rep.ParseFromArray(reply, reply_len)) {
|
SetLastError(eInvalidInput, "invalid input.");
|
return false;
|
}
|
return ProcNode().ServerSendReply(src, rep);
|
}
|
|
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb)
|
{
|
TopicNode::ServerAsyncCB on_req;
|
TopicNode::SubDataCB on_sub;
|
TopicNode::RequestResultCB on_reply;
|
if (server_cb) {
|
on_req = [server_cb](void *src, std::string &proc_id, const MsgRequestTopic &request) {
|
std::string sreq(request.SerializeAsString());
|
server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), src);
|
};
|
}
|
if (sub_cb) {
|
on_sub = [sub_cb](const std::string &proc_id, const MsgPublish &pub) {
|
std::string s(pub.SerializeAsString());
|
sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size());
|
};
|
}
|
if (client_cb) {
|
on_reply = [client_cb](const BHMsgHead &head, const MsgRequestTopicReply &rep) {
|
std::string s(rep.SerializeAsString());
|
client_cb(head.proc_id().data(), head.proc_id().size(),
|
head.msg_id().data(), head.msg_id().size(),
|
s.data(), s.size());
|
};
|
}
|
|
ProcNode().Start(on_req, on_sub, on_reply);
|
}
|
|
void BHFree(void *data, int size)
|
{
|
free(data);
|
}
|
|
int BHCleanup()
|
{
|
ProcNodePtr().reset();
|
return 0;
|
}
|
|
int BHGetLastError(void **msg, int *msg_len)
|
{
|
int ec = 0;
|
if (msg && msg_len) {
|
std::string err_msg;
|
GetLastError(ec, err_msg);
|
TmpPtr p(err_msg);
|
p.ReleaseTo(msg, msg_len);
|
}
|
return ec;
|
}
|
|
#undef BH_SOCKET_MEMF_CALL
|