-module(syncToolEsql). -compile(export_all). -include("esqltool.hrl"). %% 执行其他节点传输过来的sql语句 rpcExecuteSqlLocalyAndSaveToCacheAndSendSql(Sql, Func, Node) -> %% io:format("rpcExecuteSqlLocalyAndSaveToCacheAndSendSql start ~p ", [node()]), executeSqlLocalyAndSaveToCache(Sql, Node), ok. %% 调用其他节点执行sql语句 rpcSendSqlToOtherNode(Sql, Func, []) -> %% io:format("rpcSendSqlToOtherNode empty\n"), []; rpcSendSqlToOtherNode(Sql, Func, [Node | Lists]) -> %% io:format("rpcSendSqlToOtherNode not empty ~p \n",Node), rpc:call(Node, syncToolEsql, rpcExecuteSqlLocalyAndSaveToCacheAndSendSql, [Sql, Func, node()]), rpcSendSqlToOtherNode(Sql, Func, Lists). %% 启动一个进程,调用其他节点执行sql语句 sendSqlToOtherNode(Sql, Func) -> rpcSendSqlToOtherNode(Sql, Func, nodes()). %% 获取本机的SqlCahe记录 rpcGetSqlCacheLists(StartTime, EndTime) -> io:format("rpcGetSqlCacheLists start "), Sql = ["SELECT * FROM SqlCache where create_time > '", StartTime, "' AND create_time <= '", EndTime, "';"], io:format("Sql is ~p", [Sql]), {ok, DB} = esqlite3:open(database_name:getCacheDatabaseName()), io:format("DB is ~p", [DB]), Maps = selectInfoFromCache(DB, Sql), esqlite3:close(DB), io:format("Maps is ~p", [Maps]), Maps. %% rpc:call other node 获取sqlCahe rpcGetNodeSqlCache(StartTime, EndTime, []) -> []; %% 本节点不操作 rpcGetNodeSqlCache(StartTime, EndTime, [Node | List]) when Node == node() -> rpcGetNodeSqlCache(StartTime, EndTime, List); rpcGetNodeSqlCache(StartTime, EndTime, [Node | List]) -> Sqlmaps = rpc:call(Node, syncToolEsql, rpcGetSqlCacheLists, [StartTime, EndTime]), case Sqlmaps of [] -> rpcGetNodeSqlCache(StartTime, EndTime, List); _ -> Sqlmaps end. getAndExecuteOthNodeSqlCache() -> DeviceInfoList = device_info_esql:findAllDeviceList(), DeviceInfoCount = length(DeviceInfoList), case DeviceInfoCount of 0 -> io:format("device_info count = 0\n"); 1 -> io:format("device info count = 1\n"); _ -> {ok, DB} = esqlite3:open(database_name:getCacheDatabaseName()), [[{"create_time", StartTime}]] = selectInfoFromCache(DB, ["SELECT create_time FROM SqlCache order by create_time desc LIMIT 1;\n"]), connectAllNode(DeviceInfoList), SqlMaps = rpcGetNodeSqlCache(StartTime, syncTool:getTimestampStr(), nodes()), execCacheSql(DB, SqlMaps), ok end. %% 连接集群的其他节点 connectAllNode([]) -> []; connectAllNode([Rec | Maps]) -> [{_, NodeName}, _] = Rec, Node = list_to_atom(NodeName), LocalNode = node(), case Node of LocalNode -> []; _ -> net_kernel:connect_node(Node) end, connectAllNode(Maps). %% 保存sql到SQLCache数据库 saveSqlToCache(InSql) -> %% io:format("saveSqlToCache ~p \n",InSql), {ok, DB} = esqlite3:open(database_name:getCacheDatabaseName()), ok = esqlite3:exec(InSql, DB), esqlite3:close(DB). saveSqlToSync(InSql) -> %% io:format("saveSqlToCache ~p \n",InSql), {ok, DB} = esqlite3:open(database_name:getSyncDatabaseName()), ok = esqlite3:exec(InSql, DB), esqlite3:close(DB).