#include "bh_api.h"
|
#include "defs.h"
|
#include "topic_node.h"
|
#include <memory>
|
|
using namespace bhome_shm;
|
using namespace bhome_msg;
|
|
namespace
|
{
|
TopicNode &ProcNode()
|
{
|
static TopicNode node(BHomeShm());
|
return node;
|
}
|
|
class TmpPtr : private boost::noncopyable
|
{
|
void *ptr_ = 0;
|
size_t size_ = 0;
|
|
public:
|
explicit TmpPtr(const size_t size) :
|
ptr_(malloc(size)), size_(size) {}
|
explicit TmpPtr(const std::string &str) :
|
TmpPtr(str.size())
|
{
|
if (ptr_) {
|
memcpy(ptr_, str.data(), str.size());
|
}
|
}
|
~TmpPtr() { free(ptr_); }
|
void *get() const { return ptr_; }
|
void *release()
|
{
|
void *tmp = ptr_;
|
ptr_ = 0;
|
return tmp;
|
}
|
size_t size() const { return size_; }
|
operator bool() const { return ptr_; }
|
};
|
|
template <class Msg>
|
bool PackOutput(const Msg &msg, void **out, int *out_len)
|
{
|
auto size = msg.ByteSizeLong();
|
TmpPtr p(size);
|
if (!p) {
|
SetLastError(ENOMEM, "not enough memory.");
|
return false;
|
}
|
msg.SerializePartialToArray(p.get(), size);
|
*out = p.release();
|
*out_len = size;
|
return true;
|
}
|
|
} // namespace
|
|
bool BHRegister(const void *proc_info,
|
const int proc_info_len,
|
void **reply,
|
int *reply_len,
|
const int timeout_ms)
|
{
|
ProcInfo pi;
|
if (!pi.ParseFromArray(proc_info, proc_info_len)) {
|
SetLastError(eInvalidInput, "invalid input.");
|
return false;
|
}
|
MsgCommonReply msg_reply;
|
if (ProcNode().Register(pi, msg_reply, timeout_ms)) {
|
return PackOutput(msg_reply, reply, reply_len);
|
} else {
|
return false;
|
}
|
}
|
|
bool BHHeartBeatEasy(const int timeout_ms)
|
{
|
return ProcNode().Heartbeat(timeout_ms);
|
}
|
|
bool BHHeartBeat(const void *proc_info,
|
const int proc_info_len,
|
void **reply,
|
int *reply_len,
|
const int timeout_ms)
|
{
|
ProcInfo pi;
|
if (!pi.ParseFromArray(proc_info, proc_info_len)) {
|
SetLastError(eInvalidInput, "invalid input.");
|
return false;
|
}
|
MsgCommonReply msg_reply;
|
if (ProcNode().Heartbeat(pi, msg_reply, timeout_ms)) {
|
return PackOutput(msg_reply, reply, reply_len);
|
} else {
|
return false;
|
}
|
}
|
|
bool BHPublish(const void *msgpub,
|
const int msgpub_len,
|
const int timeout_ms)
|
{
|
MsgPublish pub;
|
if (!pub.ParseFromArray(msgpub, msgpub_len)) {
|
SetLastError(eInvalidInput, "invalid input.");
|
return false;
|
}
|
return ProcNode().Publish(pub, timeout_ms);
|
}
|
|
bool BHReadSub(void **proc_id,
|
int *proc_id_len,
|
void **msgpub,
|
int *msgpub_len,
|
const int timeout_ms)
|
{
|
std::string proc;
|
MsgPublish pub;
|
|
if (ProcNode().RecvSub(proc, pub, timeout_ms)) {
|
TmpPtr pproc(proc);
|
if (pproc && PackOutput(pub, msgpub, msgpub_len)) {
|
*proc_id = pproc.release();
|
*proc_id_len = pproc.size();
|
} else {
|
SetLastError(ENOMEM, "out of mem");
|
}
|
}
|
return false;
|
}
|
|
bool BHRequest(const void *request,
|
const int request_len,
|
void **proc_id,
|
int *proc_id_len,
|
void **reply,
|
int *reply_len,
|
const int timeout_ms)
|
{
|
MsgRequestTopic req;
|
if (!req.ParseFromArray(request, request_len)) {
|
SetLastError(eInvalidInput, "invalid input.");
|
return false;
|
}
|
std::string proc;
|
MsgRequestTopicReply out_msg;
|
if (ProcNode().ClientSyncRequest(req, proc, out_msg, timeout_ms)) {
|
TmpPtr pproc(proc);
|
if (pproc && PackOutput(out_msg, reply, reply_len)) {
|
*proc_id = pproc.release();
|
*proc_id_len = pproc.size();
|
} else {
|
SetLastError(ENOMEM, "out of mem");
|
}
|
}
|
return false;
|
}
|
|
bool BHReadRequest(void **proc_id,
|
int *proc_id_len,
|
void **request,
|
int *request_len,
|
void **src,
|
const int timeout_ms)
|
{
|
void *src_info = 0;
|
std::string proc;
|
MsgRequestTopic out_msg;
|
if (ProcNode().ServerRecvRequest(src_info, proc, out_msg, timeout_ms)) {
|
TmpPtr pproc(proc);
|
if (pproc && PackOutput(out_msg, request, request_len)) {
|
*proc_id = pproc.release();
|
*proc_id_len = pproc.size();
|
*src = src_info;
|
} else {
|
SetLastError(ENOMEM, "out of mem");
|
}
|
}
|
return false;
|
}
|
|
bool BHSendReply(void *src,
|
const void *reply,
|
const int reply_len)
|
{
|
MsgRequestTopicReply rep;
|
if (!rep.ParseFromArray(reply, reply_len)) {
|
SetLastError(eInvalidInput, "invalid input.");
|
return false;
|
}
|
return ProcNode().ServerSendReply(src, rep);
|
}
|
|
int BHCleanUp()
|
{
|
return 0;
|
}
|
|
namespace
|
{
|
typedef std::function<bool(const void *, const int)> ServerSender;
|
} // namespace
|
|
void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb)
|
{
|
TopicNode::ServerCB on_req;
|
TopicNode::SubDataCB on_sub;
|
if (server_cb) {
|
on_req = [server_cb](const std::string &proc_id, const MsgRequestTopic &request, MsgRequestTopicReply &reply) {
|
std::string sreq(request.SerializeAsString());
|
bool r = false;
|
ServerSender sender = [&](const void *p, const int len) {
|
r = reply.ParseFromArray(p, len);
|
return r;
|
};
|
server_cb(proc_id.data(), proc_id.size(), sreq.data(), sreq.size(), (BHServerCallbackTag *) (&sender));
|
return r;
|
};
|
}
|
if (sub_cb) {
|
on_sub = [sub_cb](const std::string &proc_id, const MsgPublish &pub) {
|
std::string s(pub.SerializeAsString());
|
sub_cb(proc_id.data(), proc_id.size(), s.data(), s.size());
|
};
|
}
|
|
ProcNode().Start(on_req, on_sub);
|
}
|
bool BHServerCallbackReply(const BHServerCallbackTag *tag,
|
const void *data,
|
const int data_len)
|
{
|
auto &sender = *(const ServerSender *) (tag);
|
return sender(data, data_len);
|
}
|
|
void BHFree(void *data, int size)
|
{
|
free(data);
|
}
|
|
int BHGetLastError(void **msg, int *msg_len)
|
{
|
int ec = 0;
|
if (msg && msg_len) {
|
std::string err_msg;
|
GetLastError(ec, err_msg);
|
TmpPtr p(err_msg);
|
if (p) {
|
*msg = p.release();
|
*msg_len = p.size();
|
}
|
}
|
return ec;
|
}
|
|
#undef BH_SOCKET_MEMF_CALL
|