| | |
| | | #include "bn_api.h" |
| | | |
| | | #include <string.h> |
| | | #include <string> |
| | | #include <unordered_map> |
| | | #include <memory> |
| | | #include <thread> |
| | | using namespace std; |
| | | |
| | | #include "nng_wrap.h" |
| | | using namespace nng_wrap; |
| | | |
| | | #include "common.h" |
| | | using namespace std; |
| | | |
| | | #include "bhome_msg.pb.h" |
| | | #include "bhome_msg_api.pb.h" |
| | | using namespace bhome_msg; |
| | | |
| | | enum{ |
| | | URLReg, |
| | | URLDeReg, |
| | | URLRegTopic, |
| | | URLQueryTopic, |
| | | URLQueryProcs, |
| | | URLSubLocal, |
| | | URLSubNet, |
| | | URLPubProxy, |
| | | URLSubQueue, |
| | | URLHeartBeat, |
| | | }; |
| | | |
| | | static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册 |
| | | static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销 |
| | | static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题 |
| | | static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题 |
| | | static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程 |
| | | static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题 |
| | | static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题 |
| | | static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc"; |
| | | static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //这个是代理中心,用于接收待发布的消息 |
| | | static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //这个是客户端从center订阅的通道 |
| | | |
| | | static const unordered_map<int, string> map_url{ |
| | | {URLReg, IPC_REGISTER}, |
| | | {URLDeReg, IPC_UNREGISTER}, |
| | | {URLRegTopic, IPC_REGTOPIC}, |
| | | {URLQueryTopic, IPC_QUERYTOPIC}, |
| | | {URLQueryProcs, IPC_QUERYPROC}, |
| | | {URLSubLocal, IPC_SUBLOCALTOPIC}, |
| | | {URLSubNet, IPC_SUBNETTOPIC}, |
| | | {URLPubProxy, IPC_PUB_PROXY}, |
| | | {URLSubQueue, IPC_SUB_QUEUE}, |
| | | {URLHeartBeat, IPC_HEARTBEAT}, |
| | | }; |
| | | static string get_url(const int type){ |
| | | auto iter = map_url.find(type); |
| | | if (iter != map_url.end()){ |
| | | return iter->second; |
| | | } |
| | | return {}; |
| | | } |
| | | |
| | | static void fetch_set_proc_id(std::string& pid){ |
| | | static string cur_proc_id{}; |
| | |
| | | pid = cur_proc_id; |
| | | } |
| | | } |
| | | |
| | | ///////////////////////////////////////////// |
| | | |
| | | int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) |
| | | { |
| | |
| | | { |
| | | if (!topic || topic_len <= 0) return false; |
| | | |
| | | return simple_request(get_url(URLQueryTopic), topic, topic_len, reply, reply_len, timeout_ms); |
| | | auto url(get_url(URLQueryTopic)); |
| | | |
| | | if (remote && remote_len > 0){ |
| | | BHAddress addr; |
| | | if (addr.ParseFromArray(remote, remote_len)){ |
| | | if (!addr.ip().empty() && addr.port() > 0){ |
| | | // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); |
| | | printf("======>> BHQueryTopicAddress use remote address %s\n", url.c_str()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | return simple_request(url, topic, topic_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | // 请求在线进程 request |
| | |
| | | int *reply_len, |
| | | const int timeout_ms) |
| | | { |
| | | if (!query || query_len <= 0) return false; |
| | | // if (!query || query_len <= 0) return false; |
| | | |
| | | auto ret = simple_request(get_url(URLQueryProcs), query, query_len, reply, reply_len, timeout_ms); |
| | | // printf("======>> BHQueryProcs *reply %p reply_len %d\n", *reply, *reply_len); |
| | | return ret; |
| | | auto url(get_url(URLQueryProcs)); |
| | | |
| | | if (remote && remote_len > 0){ |
| | | BHAddress addr; |
| | | if (addr.ParseFromArray(remote, remote_len)){ |
| | | if (!addr.ip().empty() && addr.port() > 0){ |
| | | // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); |
| | | printf("======>> BHQueryProcs use remote address %s\n", url.c_str()); |
| | | } |
| | | } |
| | | } |
| | | |
| | | return simple_request(url, query, query_len, reply, reply_len, timeout_ms); |
| | | } |
| | | |
| | | // above communicate with center |
| | |
| | | // return true; |
| | | } |
| | | |
| | | static int get_proc_id_from_MsgRequestTopic( |
| | | const void* request, const int request_len, const int timeout_ms, |
| | | string* proc_id){ |
| | | int get_proc_id_from_MsgRequestTopic( |
| | | const void* request, const int request_len, const int timeout_ms, string* proc_id) |
| | | { |
| | | |
| | | // BHQueryTopicAddress获取proc_id |
| | | bhome_msg::MsgRequestTopic req; |
| | |
| | | |
| | | auto url("ipc:///tmp/" + procid); |
| | | |
| | | BHAddress addr; |
| | | if (addr.ParseFromArray(remote, remote_len)){ |
| | | if (!addr.ip().empty() && addr.port() > 0){ |
| | | url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); |
| | | printf("======>>use remote address %s\n", url.c_str()); |
| | | if (remote && remote_len > 0){ |
| | | BHAddress addr; |
| | | if (addr.ParseFromArray(remote, remote_len)){ |
| | | if (!addr.ip().empty() && addr.port() > 0){ |
| | | // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); |
| | | printf("======>> BHRequest use remote address %s\n", url.c_str()); |
| | | } |
| | | } |
| | | } |
| | | // 使用procid作为ipc通信 |