-module(syncToolEsql).
|
-compile(export_all).
|
-include("esqltool.hrl").
|
|
%% 执行其他节点传输过来的sql语句
|
rpcExecuteSqlLocalyAndSaveToCacheAndSendSql(Sql, Func, Node) ->
|
%% io:format("rpcExecuteSqlLocalyAndSaveToCacheAndSendSql start ~p ", [node()]),
|
executeSqlLocalyAndSaveToCache(Sql, Node),
|
ok.
|
|
%% 调用其他节点执行sql语句
|
rpcSendSqlToOtherNode(Sql, Func, []) ->
|
%% io:format("rpcSendSqlToOtherNode empty\n"),
|
[];
|
rpcSendSqlToOtherNode(Sql, Func, [Node | Lists]) ->
|
%% io:format("rpcSendSqlToOtherNode not empty ~p \n",Node),
|
rpc:call(Node, syncToolEsql, rpcExecuteSqlLocalyAndSaveToCacheAndSendSql, [Sql, Func, node()]),
|
rpcSendSqlToOtherNode(Sql, Func, Lists).
|
|
%% 启动一个进程,调用其他节点执行sql语句
|
sendSqlToOtherNode(Sql, Func) ->
|
rpcSendSqlToOtherNode(Sql, Func, nodes()).
|
|
%% 获取本机的SqlCahe记录
|
rpcGetSqlCacheLists(StartTime, EndTime) ->
|
io:format("rpcGetSqlCacheLists start "),
|
Sql = ["SELECT * FROM SqlCache where create_time > '", StartTime, "' AND create_time <= '", EndTime, "';"],
|
io:format("Sql is ~p", [Sql]),
|
{ok, DB} = esqlite3:open(database_name:getCacheDatabaseName()),
|
io:format("DB is ~p", [DB]),
|
Maps = selectInfoFromCache(DB, Sql),
|
esqlite3:close(DB),
|
io:format("Maps is ~p", [Maps]),
|
Maps.
|
|
%% rpc:call other node 获取sqlCahe
|
rpcGetNodeSqlCache(StartTime, EndTime, []) ->
|
[];
|
%% 本节点不操作
|
rpcGetNodeSqlCache(StartTime, EndTime, [Node | List]) when Node == node() ->
|
rpcGetNodeSqlCache(StartTime, EndTime, List);
|
rpcGetNodeSqlCache(StartTime, EndTime, [Node | List]) ->
|
Sqlmaps = rpc:call(Node, syncToolEsql, rpcGetSqlCacheLists, [StartTime, EndTime]),
|
case Sqlmaps of
|
[] ->
|
rpcGetNodeSqlCache(StartTime, EndTime, List);
|
_ ->
|
Sqlmaps
|
end.
|
|
getAndExecuteOthNodeSqlCache() ->
|
DeviceInfoList = device_info_esql:findAllDeviceList(),
|
DeviceInfoCount = length(DeviceInfoList),
|
case DeviceInfoCount of
|
0 ->
|
io:format("device_info count = 0\n");
|
1 ->
|
io:format("device info count = 1\n");
|
_ ->
|
{ok, DB} = esqlite3:open(database_name:getCacheDatabaseName()),
|
[[{"create_time", StartTime}]] = selectInfoFromCache(DB, ["SELECT create_time FROM SqlCache order by create_time desc LIMIT 1;\n"]),
|
connectAllNode(DeviceInfoList),
|
SqlMaps = rpcGetNodeSqlCache(StartTime, syncTool:getTimestampStr(), nodes()),
|
execCacheSql(DB, SqlMaps),
|
ok
|
end.
|
|
%% 连接集群的其他节点
|
connectAllNode([]) ->
|
[];
|
connectAllNode([Rec | Maps]) ->
|
[{_, NodeName}, _] = Rec,
|
Node = list_to_atom(NodeName),
|
LocalNode = node(),
|
case Node of
|
LocalNode ->
|
[];
|
_ ->
|
net_kernel:connect_node(Node)
|
end,
|
connectAllNode(Maps).
|
|
|
%% 保存sql到SQLCache数据库
|
saveSqlToCache(InSql) ->
|
%% io:format("saveSqlToCache ~p \n",InSql),
|
{ok, DB} = esqlite3:open(database_name:getCacheDatabaseName()),
|
ok = esqlite3:exec(InSql, DB),
|
esqlite3:close(DB).
|
|
saveSqlToSync(InSql) ->
|
%% io:format("saveSqlToCache ~p \n",InSql),
|
{ok, DB} = esqlite3:open(database_name:getSyncDatabaseName()),
|
ok = esqlite3:exec(InSql, DB),
|
esqlite3:close(DB).
|