#include "bn_api.h" #include #include #include #include #include using namespace std; #include "nng_wrap.h" using namespace nng_wrap; #include "bhome_msg.pb.h" #include "bhome_msg_api.pb.h" using namespace bhome_msg; enum{ URLReg, URLDeReg, URLRegTopic, URLQueryTopic, URLQueryProcs, URLSubLocal, URLSubNet, URLPubProxy, URLSubQueue, URLHeartBeat, }; static char* IPC_REGISTER = (char*)"ipc:///tmp/bhnng-center-reg.ipc"; //进程注册 static char* IPC_UNREGISTER = (char*)"ipc:///tmp/bhnng-center-unregister.ipc"; //注销 static char* IPC_REGTOPIC = (char*)"ipc:///tmp/bhnng-center-regtopic.ipc"; //注册主题 static char* IPC_QUERYTOPIC = (char*)"ipc:///tmp/bhnng-center-querytopic.ipc"; //查询指定的主题 static char* IPC_QUERYPROC = (char*)"ipc:///tmp/bhnng-center-queryproc.ipc"; //查询所有注册的进程 static char* IPC_SUBLOCALTOPIC = (char*)"ipc:///tmp/bhnng-center-sublocaltopic.ipc"; //订阅本地主题 static char* IPC_SUBNETTOPIC = (char*)"ipc:///tmp/bhnng-center-subnettopic.ipc"; //订阅网络主题 static char* IPC_HEARTBEAT = (char*)"ipc:///tmp/bhnng-center-hb.ipc"; static char* IPC_PUB_PROXY = (char*)"ipc:///tmp/bhnng-center-pub-proxy.ipc"; //这个是代理中心,用于接收待发布的消息 static char* IPC_SUB_QUEUE = (char*)"ipc:///tmp/bhnng-center-sub-queue.ipc"; //这个是客户端从center订阅的通道 static const unordered_map map_url{ {URLReg, IPC_REGISTER}, {URLDeReg, IPC_UNREGISTER}, {URLRegTopic, IPC_REGTOPIC}, {URLQueryTopic, IPC_QUERYTOPIC}, {URLQueryProcs, IPC_QUERYPROC}, {URLSubLocal, IPC_SUBLOCALTOPIC}, {URLSubNet, IPC_SUBNETTOPIC}, {URLPubProxy, IPC_PUB_PROXY}, {URLSubQueue, IPC_SUB_QUEUE}, {URLHeartBeat, IPC_HEARTBEAT}, }; static string get_url(const int type){ auto iter = map_url.find(type); if (iter != map_url.end()){ return iter->second; } return {}; } static void fetch_set_proc_id(std::string& pid){ static string cur_proc_id{}; if (cur_proc_id.empty()){ cur_proc_id = pid; }else { pid = cur_proc_id; } } ///////////////////////////////////////////// int BHRegister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { if (!proc_info || proc_info_len <= 0) return false; const auto& url = get_url(URLReg); bhome_msg::ProcInfo pi; if (pi.ParseFromArray(proc_info, proc_info_len)){ string tmp(pi.proc_id()); fetch_set_proc_id(tmp); } void* out{}; int out_len{}; auto ret = simple_request(url, proc_info, proc_info_len, &out, &out_len, timeout_ms); // printf("======>> BHRegister return val %d\n", ret); if (ret){ constexpr int port_size = sizeof(unsigned short); unsigned short port = *(unsigned short*)(out); copy_memory(reply, reply_len, (char*)out + port_size, out_len - port_size); BHFree(out, out_len); ///////////////////////////////////////////////////////////////////////// const auto& url_pub_proxy = get_url(URLPubProxy); publish(url_pub_proxy, NULL, 0); const auto& url_sub_queue = get_url(URLSubQueue); subscribe_center(url_sub_queue); const auto& url_hb = get_url(URLHeartBeat); respond_survey(url_hb, string{(const char*)proc_info, (const size_t)proc_info_len}); start_reply(pi.proc_id(), 0); } return ret; } int BHUnregister(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { if (!proc_info || proc_info_len <= 0) return false; return simple_request(get_url(URLDeReg), proc_info, proc_info_len, reply, reply_len, timeout_ms); } int BHHeartbeatEasy(const int timeout_ms) { return 1; // string pid; // fetch_set_proc_id(pid); // bhome_msg::ProcInfo pi; // pi.set_proc_id(pid); // string str_pi = pi.SerializeAsString(); // if (str_pi.empty()) return false; // void *reply; // int reply_len; // auto ret = BHHeartbeat(str_pi.data(), str_pi.size(), &reply, &reply_len, timeout_ms); // if (ret){ // BHFree(reply, reply_len); // } // return ret; } int BHHeartbeat(const void *proc_info, const int proc_info_len, void **reply, int *reply_len, const int timeout_ms) { return 1; // return simple_request(get_url(URLHeartBeat), proc_info, proc_info_len, reply, reply_len, timeout_ms); } // request reply topic int BHRegisterTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { if (!topics || topics_len <= 0) return false; const auto& url = get_url(URLRegTopic); bhome_msg::MsgTopicList mtl; if (!mtl.ParseFromArray(topics, topics_len)){ // printf("======>> BHRegisterTopics url %s ParseFromArray failed\n", url.c_str()); return false; } string pid; fetch_set_proc_id(pid); // printf("======>> BHRegisterTopics procid %s\n", pid.c_str()); bhome_msg::MsgTopicList mtl2; mtl2.add_topic_list(pid); for(int i = 0; i < mtl.topic_list_size(); i++){ // printf("======>> BHRegisterTopics topic %s\n", mtl.topic_list(i).c_str()); mtl2.add_topic_list(mtl.topic_list(i)); } string msg(mtl2.SerializeAsString()); auto ret = simple_request(url, msg.data(), msg.size(), reply, reply_len, timeout_ms); // printf("======>> BHRegisterTopics return value %d msg size %lu\n", ret, msg.size()); return ret; } // 跟中心通信 // 根据topic获取procinfo in [reply] 在BHRequest中得到proc_id int BHQueryTopicAddress(const void *remote, const int remote_len, const void *topic, const int topic_len, void **reply, int *reply_len, const int timeout_ms) { if (!topic || topic_len <= 0) return false; auto url(get_url(URLQueryTopic)); if (remote && remote_len > 0){ BHAddress addr; if (addr.ParseFromArray(remote, remote_len)){ if (!addr.ip().empty() && addr.port() > 0){ // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); printf("======>> BHQueryTopicAddress use remote address %s\n", url.c_str()); } } } return simple_request(url, topic, topic_len, reply, reply_len, timeout_ms); } // 请求在线进程 request int BHQueryProcs(const void *remote, const int remote_len, const void *query, const int query_len, void **reply, int *reply_len, const int timeout_ms) { if (!query || query_len <= 0) return false; auto url(get_url(URLQueryProcs)); if (remote && remote_len > 0){ BHAddress addr; if (addr.ParseFromArray(remote, remote_len)){ if (!addr.ip().empty() && addr.port() > 0){ // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); printf("======>> BHQueryProcs use remote address %s\n", url.c_str()); } } } return simple_request(url, query, query_len, reply, reply_len, timeout_ms); } // above communicate with center ////////////////////////////////////////////////////////////// // below communicate with peer // 发布消息 int BHPublish(const void *msgpub, const int msgpub_len, const int timeout_ms) { if (!msgpub || msgpub_len <= 0) return false; bhome_msg::MsgPublish pub; if (!pub.ParseFromArray(msgpub, msgpub_len)){ return false; } string pid; fetch_set_proc_id(pid); MsgPublish newPub; newPub.set_topic(pid); newPub.set_data(string{(const char*)msgpub, (const size_t)msgpub_len}); string msg(newPub.SerializeAsString()); auto ret = publish(pub.topic(), msg.data(), msg.size()); if (ret == 0) return false; return true; } // 订阅 int BHSubscribeTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { if (!topics || topics_len <= 0) return false; bhome_msg::MsgTopicList mtl; if (!mtl.ParseFromArray(topics, topics_len)){ printf("BHSubscribeTopics ParseFromArray failed\n"); return false; } if (mtl.topic_list_size() == 0){ printf("BHSubscribeTopics topic_list_size zero\n"); return false; } for(int i = 0; i < mtl.topic_list_size(); i ++){ auto& t = mtl.topic_list(i); // printf("BHSubscribeTopics %s\n", t.c_str()); subscribe_topic(t); } return true; auto ret = simple_request(get_url(URLSubLocal), topics, topics_len, reply, reply_len, timeout_ms); if (!ret){ printf("BHSubscribeTopics simple_request failed\n"); } return ret; } // 订阅网络,不实现 int BHSubscribeNetTopics(const void *topics, const int topics_len, void **reply, int *reply_len, const int timeout_ms) { return BHSubscribeTopics(topics, topics_len, reply, reply_len, timeout_ms); } // 读取订阅消息,proc_id暂时没用,返回fake msg int BHReadSub(void **proc_id, int *proc_id_len, void **msgpub, int *msgpub_len, const int timeout_ms) { string topic, msg; auto ret = subscribe_read(&topic, &msg, timeout_ms); // printf("BHReadSub msg topic %s length %lu\n", topic.c_str(), msg.length()); if (ret < 0) return false; // bhome_msg::MsgPublish pub; // if (!pub.ParseFromArray(msg.data(), msg.size())){ // printf("MsgPublish parse failed size %lu\n", msg.size()); // return false; // } // printf("MsgPublish topic %s data %s\n", pub.topic().c_str(), pub.data().c_str()); // fake proc id MsgPublish newPub; if (newPub.ParseFromString(msg)){ copy_memory(proc_id, proc_id_len, newPub.topic().data(), newPub.topic().size()); copy_memory(msgpub, msgpub_len, newPub.data().data(), newPub.data().size()); return true; } return false; // const auto& fakeid = GetMeaninglessString(); // copy_memory(proc_id, proc_id_len, fakeid.data(), fakeid.size()); // copy_memory(msgpub, msgpub_len, msg.data(), msg.size()); // return true; } static int get_proc_id_from_MsgRequestTopic( const void* request, const int request_len, const int timeout_ms, string* proc_id){ // BHQueryTopicAddress获取proc_id bhome_msg::MsgRequestTopic req; if (!req.ParseFromArray(request, request_len)){ return false; } bhome_msg::MsgQueryTopic msg; msg.set_topic(req.topic()); string s(msg.SerializeAsString()); void* reply2; int reply_len2; string remote("fake_remote"); int ret = BHQueryTopicAddress(remote.c_str(), remote.length(), s.data(), s.size(), &reply2, &reply_len2, timeout_ms); if (!ret) return false; bhome_msg::MsgQueryTopicReply mr; if (!mr.ParseFromArray(reply2, reply_len2)){ BHFree(reply2, reply_len2); return false; } BHFree(reply2, reply_len2); if (mr.node_address_size() == 0) { return false; } auto& node_addr = mr.node_address(0); auto& procid = node_addr.proc_id(); *proc_id = procid; return true; } // 请求topic,request/reply topic [BHRegisterTopics] int BHRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **proc_id, int *proc_id_len, void **reply, int *reply_len, const int timeout_ms) { if (!request || request_len <= 0) return false; // BHQueryTopicAddress获取proc_id string procid{}; if (!get_proc_id_from_MsgRequestTopic(request, request_len, timeout_ms, &procid)){ return false; } auto url("ipc:///tmp/" + procid); if (remote && remote_len > 0){ BHAddress addr; if (addr.ParseFromArray(remote, remote_len)){ if (!addr.ip().empty() && addr.port() > 0){ // url = "tcp://" + addr.ip() + ":" + to_string(addr.port()); printf("======>> BHRequest use remote address %s\n", url.c_str()); } } } // 使用procid作为ipc通信 // printf("BHRequest procid %s\n", procid.c_str()); int rc = request2(url, request, request_len, reply, reply_len, timeout_ms); if (rc < 0) return false; copy_memory(proc_id, proc_id_len, procid.data(), procid.size()); return true; } // 对方的proc_id 拿不到, src 请求方的key int BHReadRequest(void **proc_id, int *proc_id_len, void **request, int *request_len, void **src, const int timeout_ms) { string msg; auto ret = read_request(src, &msg, timeout_ms); if (ret != 0) return false; string procid{}; if (!get_proc_id_from_MsgRequestTopic(msg.data(), msg.size(), timeout_ms, &procid)){ return false; } copy_memory(proc_id, proc_id_len, procid.data(), procid.size()); copy_memory(request, request_len, msg.data(), msg.size()); return true; } int BHSendReply(void *src, const void *reply, const int reply_len) { if (!src || !reply || reply_len <= 0) return false; auto ret = send_reply(src, reply, reply_len); if (ret < 0) return false; return true; } void BHFree(void *data, int size) { // printf("======>> BHFree %p\n", data); free_nng(data, size); } int BHCleanup() { return 1; } int BHGetLastError(void **msg, int *msg_len) { *msg = NULL; *msg_len = 0; int ec; get_last_error(&ec, msg, msg_len); return true; } ////////////////////////////////////////////////////////////// // 没用 int BHAsyncRequest(const void *remote, const int remote_len, const void *request, const int request_len, void **msg_id, int *msg_id_len) { return 0; } void BHStartWorker(FServerCallback server_cb, FSubDataCallback sub_cb, FClientCallback client_cb) { } ////////////////////////////////////////////////////////////// void TestRequest(int type, const char* msg, const int len){ const auto& topic = get_url(type); void *reply; int reply_len; int rc = request2(topic, msg, len, &reply, &reply_len, 10000); if (rc < 0) { printf("TestRequest failed\n"); return; } printf("------>>TestRequest send %s reply %s len %d\n", msg, (char*)reply, reply_len); BHFree(reply, reply_len); } static int test_read_request(void** request, int* request_len, void** src, const int to_ms){ string msg; auto ret = read_request(src, &msg, to_ms); if (ret != 0) return false; copy_memory(request, request_len, msg.data(), msg.size()); return true; } void TestReply(int type, int count) { const auto& in = get_url(type); // printf("url %s\n", (char*)in.data); start_reply(in, 0); // printf("start read %s\n", (char*)in.data); char *request, *src; int request_len; if (!test_read_request((void**)&request, &request_len, (void**)&src, 10000)){ printf("test_read_request failed\n"); return; } string msg{request, (size_t)request_len}; BHFree(request, request_len); // printf("------>>TestReply recv msg %s src %lu\n", msg.c_str(), *(uint64_t*)src); auto back_msg = "back:" + msg; BHSendReply(src, back_msg.c_str(), back_msg.size()); BHFree(src, 0); } void TestPub(const char* t, const int t_l, const char* d, const int d_l){ publish(t, d, d_l); } void TestSub(const char* t, const int t_l, void** d, int* d_l){ if (d == 0 && d_l == 0){ subscribe_topic(t); }else{ void* proc_id, *msgpub; int proc_id_len, msgpub_len; int timeout_ms = -1; BHReadSub(&proc_id, &proc_id_len, &msgpub, &msgpub_len, timeout_ms); if (BHReadSub(&proc_id, &proc_id_len, &msgpub, &msgpub_len, timeout_ms)){ string t((char*)proc_id, proc_id_len); BHFree(proc_id, proc_id_len); string m((char*)msgpub, msgpub_len); BHFree(msgpub, msgpub_len); printf("readsubmsg topic %s data %s\n", t.c_str(), m.c_str()); }else{ printf("readsubmsg failed\n"); } } }