QiaoJiaSystem/DataManagerServer/http_configserver.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
QiaoJiaSystem/DataManagerServer/http_configserver.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
syncDBTool/ErlangDbTool.cpp | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
syncDBTool/ErlangDbTool.h | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
QiaoJiaSystem/DataManagerServer/http_configserver.cpp
@@ -1761,6 +1761,13 @@ return out; } //{ // "cookie": "abc", //集群cookie // "fatherNodeName": "main@192.168.1.186", //引导节点名称,添加引导节点时该值为" "(内为空格) // "culID" : "uuid", //集群id // "culName" : "name", //集群名称 // "devID" : "uuid" //设备id //} std::string devHttpServer_c::addNode(std::string ip, unsigned int port, std::string content, PResponse &response) { DBG("ip:" << ip << "; port:" << port); DBG("content: " << content); @@ -1773,25 +1780,32 @@ //#todo nodes have counter? //#todo nodeName Automatic generated? //#todo devId+num // 生成节点id - uuid std::string uuid = GetUUId::getUUID(); // 获取当前机器ip unsigned char ip_old[15] = {0}; std::string str_netIfName = appConfig.getStringProperty("netIfName"); GetIpAddress(str_netIfName.c_str(), ip_old); std::string str_ip((char *) ip_old); // 节点名称 std::string nodeName = uuid.append("@" + str_ip); std::string cookie = value["cookie"].asString(); // 引导节点 std::string FatherNodeName = value["fatherNodeName"].asString(); //#todo ClusterID ClusterName // 集群id std::string clusterID = value["culID"].asString(); // 为空则生成,否则直接使用 clusterID = clusterID.size() > 0 ? clusterID : GetUUId::getUUID(); std::string clusterName = value["culName"].asString(); std::string devID = value["devID"].asString(); std::string devName = value["devName"].asString(); // erlang节点保存路径/opt/erlang/${uuid} std::string path = std::string("/opt/erlang/").append(nodeName.substr(0, nodeName.find("@"))); if (erlangDbTool != nullptr) { //gaunbi node @@ -1803,6 +1817,7 @@ if (cookie.size() > 0 && (clusterID.size() > 0 || clusterName.size() > 0)) { // erlangDbTool = new ErlangTool::ErlangDbTool(path, nodeName, cookie); // #todo 进程名 -> get from argv[0] string str_tmp = "DataWebserver"; erlangDbTool = new ErlangTool::ErlangDbTool(path, nodeName, cookie, clusterID, clusterName, str_tmp); bool ret = erlangDbTool->initErlang(); @@ -1811,6 +1826,7 @@ ret = erlangDbTool->startNodeDb(FatherNodeName, devID); if (ret) { //如果成功则更新配置文件 appConfig.setStringProperty("erlNode", nodeName); appConfig.setStringProperty("erlCookie", cookie); appConfig.setStringProperty("erlPath", path); @@ -1823,6 +1839,7 @@ runAllApp(); // sleep(4); } else { // 否则则置空配置文件 appConfig.setStringProperty("erlNode", ""); appConfig.setStringProperty("erlCookie", ""); appConfig.setStringProperty("erlPath", ""); @@ -1855,6 +1872,7 @@ ret = erlangDbTool->removeNode(); // 置空配置文件 appConfig.setStringProperty("erlNode", ""); appConfig.setStringProperty("erlCookie", ""); appConfig.setStringProperty("erlPath", ""); @@ -1972,15 +1990,20 @@ std::string Uuid = value["uuid"].asString(); std::string TableType = value["TableType"].asString(); std::string TableName = value["TableName"].asString(); // 集群内同步 int SyncType = atoi(value["SyncType"].asCString()); std::string BwType = value["BwType"].asString(); std::string StartTime = value["StartTime"].asString(); std::string EndTime = value["EndTime"].asString(); // 是否上传 std::string UploadFlag = value["IsSync"].asString(); UploadFlag = UploadFlag.empty() ? "0" : UploadFlag; // 底库阈值 std::string CmpThreshold = value["threshold"].asString(); CmpThreshold = CmpThreshold.empty() ? "60" : CmpThreshold; // 是否启用 std::string Enabled = value["enabled"].asString(); Enabled = Enabled.empty() ? "1" : Enabled; @@ -2012,9 +2035,7 @@ fieldValues.insert(std::make_pair("bwType", BwType)); fieldValues.insert(std::make_pair("startTime", StartTime)); fieldValues.insert(std::make_pair("endTime", EndTime)); // #todo fieldValues.insert(std::make_pair("create_by", createBy)); fieldValues.insert(std::make_pair("uploadFlag", UploadFlag)); fieldValues.insert(std::make_pair("cmpThreshold", CmpThreshold)); fieldValues.insert(std::make_pair("enabled", Enabled)); @@ -2039,7 +2060,6 @@ return ""; } //// #todo 上传。仅黑名单。 //// #TODO 根据createBy判断是否需要上传 // if (createBy == "analyDev" && BwType == "1") { @@ -2050,7 +2070,7 @@ // Enabled); // } // send message to other CNode erlangDbTool->sendMessage(); std::string str_result = std::string("{\"result\":").append("\"" + std::to_string(ret) + "\"}"); return str_result; QiaoJiaSystem/DataManagerServer/http_configserver.h
@@ -78,16 +78,56 @@ std::string netconfig_show(std::string ip, unsigned int port, std::string content, PResponse &response); /*** * 创建集群节点 * @param ip * @param port * @param content * @param response * @return */ std::string addNode(std::string ip, unsigned int port, std::string content, PResponse &response); /*** * 查询在线节点 * @param ip * @param port * @param content * @param response * @return */ std::string searchNode(std::string ip, unsigned int port, std::string content, PResponse &response); std::string modifyCluName(std::string ip, unsigned int port, std::string content, PResponse &response); /*** * 退出集群 * @param ip * @param port * @param content * @param response * @return */ std::string removeNode(std::string ip, unsigned int port, std::string content, PResponse &response); /** *创建数据底库 * @param ip * @param port * @param content * @param response * @return */ std::string createDatabase(std::string ip, unsigned int port, std::string content, PResponse &response); /** * * @param ip * @param port * @param content * @param response * @return */ std::string deleteDatabase(std::string ip, unsigned int port, std::string content, PResponse &response); std::string updateDatabase(std::string ip, unsigned int port, std::string content, PResponse &response); syncDBTool/ErlangDbTool.cpp
@@ -71,6 +71,7 @@ return true; } // 检测返回的数据的数据类型 static int ErlangTool::checkETERMType(ETERM *elemen) { if (elemen == NULL) { return -1; @@ -187,9 +188,10 @@ g_syncDbFile.setDatabaseName(str_SyncDbFile.c_str()); g_syncDbFile.open(); // c erlang init erl_init(NULL, 0); struct in_addr addr; //#todo addr.s_addr = inet_addr("127.0.0.1"); m_pName.append(std::to_string((int) getpid())); @@ -198,6 +200,8 @@ t_cNodeName.append("@127.0.0.1"); m_cNodeName.swap(t_cNodeName); //m_pName //int erl_connect_init(number, cookie, creation)int erl_connect_xinit(host, alive, node, addr, cookie, creation) // https://cloud.tencent.com/developer/section/1123179 if (erl_connect_xinit(const_cast<char *>(m_pName.c_str()), const_cast<char *>(m_pName.c_str()), const_cast<char *>(m_cNodeName.c_str()), &addr, const_cast<char *>(m_cookie.c_str()), 0) == -1) { @@ -214,16 +218,20 @@ ERR("m_nodeName is null "); return 3; } // 判断节点是否启动 m_ret = pingNode(m_nodeName, m_cookie); if (!m_ret) { if (dir_file_exists(m_path, true)) { // 启动erlang节点的bash命令 std::string cmd = std::string("cd " + m_path + " && erl -name " + m_nodeName + " -setcookie " + m_cookie + " -mnesia dir '\"" + m_path + "\"' -detached -noshell");// >>log std::cout << cmd << std::endl; system(cmd.c_str()); std::cout << m_ret << std::endl; // 判断节点是否启动成功 m_ret = waitNode(); // erl_close_connection(m_fd); resetConn(); return m_ret; } else { @@ -264,7 +272,7 @@ } ErlMessage emsg; /* Incoming message */ //hanshu canshu // 拼接函数参数 ETERM *arrlist[5]; arrlist[0] = erl_mk_atom(m_nodeName.c_str()); arrlist[1] = erl_mk_atom(FatherNodeName.c_str()); @@ -282,6 +290,8 @@ //erl_close_connection(m_fd); // m_mutex.unlock(); if (ret == ERL_MSG) { // 消息解析 // 消息体为链表结构 std::map<std::string, std::string> t_results; ETERM *key, *value; ETERM *tuplep[6]; @@ -313,7 +323,7 @@ INFO("item is " << item.first << " " << item.second); } } }//ret == ERL_MSG end } //ret == ERL_MSG end else { ERR(" ret is " << ret); @@ -329,18 +339,14 @@ // m_mutex.lock(); m_ret = resetConn(); if (!m_ret) { //#todo error message std::cout << __FILE__ << __FUNCTION__ << __LINE__ << " error " << m_ret << " " << m_fd << std::endl; ERR(" error " << m_ret << " " << m_fd); return false; } int ret = -1; ErlMessage emsg; /* Incoming message */ //hanshu canshu // ETERM *arrlist[1]; // arrlist[0] = erl_mk_atom(m_nodeName.c_str()); ETERM *list = erl_mk_empty_list();// (arrlist, 0); // std::cout << __FILE__ << __FUNCTION__ <<__LINE__ << " " << m_fd << std::endl; ret = erl_rpc_to(m_fd, "syncDB", "removeNode", list); if (ret == ERL_TICK) { ret = erl_rpc_from(m_fd, TIMEOUT, &emsg); @@ -364,7 +370,7 @@ break; } default: printf("error add case todo \n\r"); ERR("error add case todo \n\r"); } auto it = t_results.find("atomic"); if (t_results.size() > 0 && t_results.end() != it && it->second == "ok") { @@ -471,9 +477,7 @@ map_DevDataCache devDataCache; int ret = -1; ErlMessage emsg; /* Incoming message */ //hanshu canshu // ETERM *arrlist[0]; // arrlist[0] = erl_mk_atom(m_nodeName.c_str()); ETERM *list = erl_mk_empty_list();//erl_mk_list(arrlist, 0); ret = erl_rpc_to(m_fd, "syncDB", "findAllNode", list); @@ -534,7 +538,7 @@ erlangFreeEterm(3, key, value, tuplep[j]); } // printf("\none list end\n\n\n\n"); // #todo this is have a bug // #todo this is have a bug 内存泄露?或者地址空间越界? // device_info.create_by = ""; devDataCache.insert(std::make_pair(device_info.uuid, device_info)); erlangFreeEterm(1, tail_tuple); @@ -1686,13 +1690,14 @@ bool ErlangTool::ErlangDbTool::resetConn() { //#todo if (m_pName.size() <= 0) { // 根据当前时间设置种子,秒级 srand(time(0)); // 生成随机数 m_loop = rand() % 1000; int ret = -1; erl_init(NULL, 0); // m_loop %= 10; // m_loop++; // 根据随机数生成c节点 ret = erl_connect_init(m_loop, const_cast<char *>(m_cookie.c_str()), 0); DBG("node name is " << m_loop); if (-1 == ret) { @@ -1700,26 +1705,22 @@ return false; } // std::cout << __FILE__ << __FUNCTION__ <<__LINE__ << " " << m_fd << std::endl; // 关闭上次链接的文件描述符 #TODO 第一次创建节点有概率出现短时间内无法查询的bug erl_close_connection(m_fd); // std::cout << __FILE__ << __FUNCTION__ <<__LINE__ << " " << m_fd << std::endl; m_fd = erl_connect(const_cast<char *>(m_nodeName.c_str())); // std::cout << __FILE__ << __FUNCTION__ <<__LINE__ << " " << m_fd << std::endl; if (0 > m_fd) { // erl_err_quit("erl_connect"); return false; } return true; } else { // 设置了 mPName erl_close_connection(m_fd); m_fd = erl_connect(const_cast<char *>(m_nodeName.c_str())); if (0 > m_fd) { // erl_err_quit("erl_connect"); // std::cout << __FILE__ << __FUNCTION__ <<__LINE__ << " error " << fd << std::endl; ERR("m_fd is " << m_fd << " m_nodeName is" << m_nodeName); return false; } // std::cout << __FILE__ << __FUNCTION__ <<__LINE__ << " " << fd << std::endl; INFO("m_fd is " << m_fd << " m_nodeName is" << m_nodeName); return true; } @@ -1738,15 +1739,17 @@ loop++; } } std::cout << "start waitNode" << std::endl; INFO("start waitNode"); return m_ret; } void ErlangTool::ErlangDbTool::test(pthread_cond_t *t_pthcon, pthread_mutex_t *t_pthmut) { bool ret = setCNodeName(); { // 监听socket的线程 std::thread th1([&] { int res; // 外部传入的信号和锁 pthread_cond_t *t_pthCon = t_pthcon; pthread_mutex_t *t_pthMut = t_pthmut; @@ -1786,7 +1789,8 @@ } else if (got == ERL_ERROR) { if ((fd = erl_accept(listen, &conn)) == ERL_ERROR) erl_err_quit("erl_accept"); INFO("Connected to " << conn.nodename); else INFO("Connected to " << conn.nodename); } else { if (emsg.type == ERL_REG_SEND) { @@ -1803,6 +1807,7 @@ pthread_cond_signal(t_pthCon); pthread_mutex_unlock(t_pthMut); //call back func //#TODO 消息解析 } else { ERR("message not is update"); } syncDBTool/ErlangDbTool.h
@@ -136,17 +136,18 @@ private: void initCNode(); // 向erlang端直接发送sql语句 bool getExecSqlResult(const std::string &strSql); public: //#todo setCNodeName // 将c节点名称设置给erlang使其可以在接收到消息时,能通知到本节点 bool setCNodeName(); // 向erlang发送消息, #TODO 目前仅实现了信号,待实现消息体 bool sendMessage(); //#todo sendMessage // 设置匿名c节点接受erlang节点向c节点发送的消息 void test(pthread_cond_t *, pthread_mutex_t *); /*** @@ -174,7 +175,12 @@ */ bool removeNode(); /** * 修改集群名称 * @param CluId * @param CluName * @return */ bool modifyCluName(std::string CluId, std::string CluName); @@ -303,7 +309,7 @@ std::string PerPicUrl, std::string PerFea); // private: /*** * 重新连接节点 * 为了防止过长时间连接被关闭,主动重新连接节点 * @return 连接状态 */ bool resetConn(); @@ -319,8 +325,11 @@ private: //存储路径 std::string m_path; // erlang 节点名称 std::string m_nodeName; // 程序内的c节点名称 进程名+pid@127.0.0.1 std::string m_cNodeName; // 进程名+pid?#TODO std::string m_pName; std::string m_cookie;