进出入完善组织机构并加入导入人员和机构功能
554325746@qq.com
2019-08-07 07a66e53d2b4126c2004870d81a379d8ef0071da
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
-module(syncToolEsql).
-compile(export_all).
-include("esqltool.hrl").
 
%% 执行其他节点传输过来的sql语句
rpcExecuteSqlLocalyAndSaveToCacheAndSendSql(Sql, Func, Node) ->
%%  io:format("rpcExecuteSqlLocalyAndSaveToCacheAndSendSql start ~p ", [node()]),
  executeSqlLocalyAndSaveToCache(Sql, Node),
  ok.
 
%% 调用其他节点执行sql语句
rpcSendSqlToOtherNode(Sql, Func, []) ->
%%   io:format("rpcSendSqlToOtherNode empty\n"),
  [];
rpcSendSqlToOtherNode(Sql, Func, [Node | Lists]) ->
%%   io:format("rpcSendSqlToOtherNode not empty ~p \n",Node),
  rpc:call(Node, syncToolEsql, rpcExecuteSqlLocalyAndSaveToCacheAndSendSql, [Sql, Func, node()]),
  rpcSendSqlToOtherNode(Sql, Func, Lists).
 
%% 启动一个进程,调用其他节点执行sql语句
sendSqlToOtherNode(Sql, Func) ->
  rpcSendSqlToOtherNode(Sql, Func, nodes()).
 
%% 获取本机的SqlCahe记录
rpcGetSqlCacheLists(StartTime, EndTime) ->
  io:format("rpcGetSqlCacheLists start "),
  Sql = ["SELECT * FROM SqlCache where create_time > '", StartTime, "' AND create_time <= '", EndTime, "';"],
  io:format("Sql is ~p", [Sql]),
  {ok, DB} = esqlite3:open(database_name:getCacheDatabaseName()),
  io:format("DB is ~p", [DB]),
  Maps = selectInfoFromCache(DB, Sql),
  esqlite3:close(DB),
  io:format("Maps is ~p", [Maps]),
  Maps.
 
%% rpc:call other node 获取sqlCahe
rpcGetNodeSqlCache(StartTime, EndTime, []) ->
  [];
%%  本节点不操作
rpcGetNodeSqlCache(StartTime, EndTime, [Node | List]) when Node == node() ->
  rpcGetNodeSqlCache(StartTime, EndTime, List);
rpcGetNodeSqlCache(StartTime, EndTime, [Node | List]) ->
  Sqlmaps = rpc:call(Node, syncToolEsql, rpcGetSqlCacheLists, [StartTime, EndTime]),
  case Sqlmaps of
    [] ->
      rpcGetNodeSqlCache(StartTime, EndTime, List);
    _ ->
      Sqlmaps
  end.
 
getAndExecuteOthNodeSqlCache() ->
  DeviceInfoList = device_info_esql:findAllDeviceList(),
  DeviceInfoCount = length(DeviceInfoList),
  case DeviceInfoCount of
    0 ->
      io:format("device_info count = 0\n");
    1 ->
      io:format("device info count = 1\n");
    _ ->
      {ok, DB} = esqlite3:open(database_name:getCacheDatabaseName()),
      [[{"create_time", StartTime}]] = selectInfoFromCache(DB, ["SELECT create_time FROM SqlCache order by create_time desc LIMIT 1;\n"]),
      connectAllNode(DeviceInfoList),
      SqlMaps = rpcGetNodeSqlCache(StartTime, syncTool:getTimestampStr(), nodes()),
      execCacheSql(DB, SqlMaps),
      ok
  end.
 
%% 连接集群的其他节点
connectAllNode([]) ->
  [];
connectAllNode([Rec | Maps]) ->
  [{_, NodeName}, _] = Rec,
  Node = list_to_atom(NodeName),
  LocalNode = node(),
  case Node of
    LocalNode ->
      [];
    _ ->
      net_kernel:connect_node(Node)
  end,
  connectAllNode(Maps).
 
 
%% 保存sql到SQLCache数据库
saveSqlToCache(InSql) ->
%%   io:format("saveSqlToCache ~p \n",InSql),
  {ok, DB} = esqlite3:open(database_name:getCacheDatabaseName()),
  ok = esqlite3:exec(InSql, DB),
  esqlite3:close(DB).
 
saveSqlToSync(InSql) ->
%%   io:format("saveSqlToCache ~p \n",InSql),
  {ok, DB} = esqlite3:open(database_name:getSyncDatabaseName()),
  ok = esqlite3:exec(InSql, DB),
  esqlite3:close(DB).