1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
| -define(ESQLDBKEY, 'sqlite3ConRef').
| -define(StrNodeName, syncTool:change2Str(node())).
| -define(LocalSqlCacheDB, database_name:getCacheDatabaseName()).
|
| getTimeStr() ->
| ok.
|
| getESqlDBRef() ->
| BerPid = database_name:getSyncDatabaseName(),
| case BerPid of
| undefined ->
| lager:error("BerPid is ~p ", [BerPid]),
| Res = {error, false};
| _ ->
| Dbcon = esqlite3:open(BerPid),
| Res = Dbcon
| end,
| Res.
|
| bitStrToStr([]) -> [];
| bitStrToStr([Value | T]) ->
| if
| is_bitstring(Value) == true ->
| TA = binary:bin_to_list(Value);
| is_atom(Value) == true ->
| TA = atomToStr(Value);
| is_integer(Value) == true ->
| TA = lists:flatten(io_lib:format("~w", [Value]));
| true ->
| TA = lists:flatten(io_lib:format("~w", [Value]))
| end,
| [TA | bitStrToStr(T)].
|
|
| atomToStr(Value) ->
| ResTmp = lists:flatten(io_lib:format("~w", [Value])),
| Pos = string:str(ResTmp, "'"),
| if
| Pos == 1 ->
| Res = string:strip(ResTmp, both, $');
| true ->
| Res = ResTmp
| end,
| Res.
|
| atomListToStrList([]) -> [];
| atomListToStrList([Value | T]) ->
| [atomToStr(Value) | atomListToStrList(T)].
|
| rowsToList(Names, Row) ->
| A = atomListToStrList(tuple_to_list(Names)),
| B = bitStrToStr(tuple_to_list(Row)),
| lists:zip(A, B).
|
| stripStrBothApostrophe(Element) ->
| Pos = string:str(Element, "'"),
| if
| Pos == 1 ->
| Res = string:strip(Element, both, $');
| true ->
| Res = Element
| end,
| Res.
|
|
| selectSomeInfoWithSql(Sql) ->
| Conn = getESqlDBRef(),
| case Conn of
| {ok, Db} ->
| AFun = fun(Names, Row) ->
| rowsToList(Names, Row)
| end,
| Maps = esqlite3:map(AFun, Sql, Db),
| esqlite3:close(Db);
| {error, false} ->
| lager:error("Conn is ~p ~n", [Conn]),
| Maps = {error, {msg, connDberror}};
| _ ->
| lager:error("Conn is ~p ~n", [Conn]),
| Maps = {abort, {msg, notKnowWhy}}
| end,
| Maps.
|
| %% 执行本机sql语句,会发给其他机器
| executeSqlLocalyAndSaveToCacheAndSendSql(Sql) ->
| executeSqlLocalyAndSaveToCache(Sql, node()),
| Ret = {atomic, ok},
| timer:sleep(200),
| spawn(node(), syncToolEsql, sendSqlToOtherNode, [Sql, "rpcExecuteSqlLocalyAndSaveToCacheAndSendSql"]),
| Ret.
|
|
| selectInfoFromCache(DB, Sql) ->
| AFun = fun(Names, Row) ->
| rowsToList(Names, Row)
| end,
| Maps = esqlite3:map(AFun, Sql, DB),
| Maps.
|
|
| %% 执行正常sql,不会发给其他机器
| executeSqlLocalyAndSaveToCache(Sql, Node) ->
| {ok, DB} = getESqlDBRef(),
| ok = esqlite3:exec(Sql, DB),
| esqlite3:close(DB),
| %% io:format("executeSqlLocalyAndSaveToCache ~p\n",[Sql]),
| StrUuid = syncTool:getUUIDString(),
| StrCreateUser = syncTool:change2Str(Node),
| StrTimestamp = syncTool:getTimestampStr(),
| Func = "test",
| InSql = "INSERT INTO SqlCache (uuid, func, sql, create_by,create_time) values (\""++ StrUuid++ "\",\""++
| Func++ "\" ,\""++ syncTool:change2Str(Sql)++ "\",\""++ StrCreateUser++ "\",\""++ StrTimestamp++ "\");",
| spawn(node(), syncToolEsql, saveSqlToCache, [InSql]),
| ok.
|
| %% 执行获取到的SQLCache中的数据语句
| execCacheSql(CacheDb, []) ->
| [];
| execCacheSql(CacheDb, [Rec | RecLists]) ->
| lager:info(" Rec len is ~p !!!!", [length(Rec)]),
| [{"uuid", UUID}, {"func", Func}, {"sql", CacheSql}, {"create_time", CreateTime}, {"create_by", CreateBy}] = Rec,
| lager:info(" UUID is ~p !!!!", [UUID]),
| {ok, SyncDB} = getESqlDBRef(),
| lager:info(" SyncDB is ~p !!!!", [SyncDB]),
| ok = esqlite3:exec(CacheSql, SyncDB),
| InSql = ["INSERT INTO SqlCache (uuid, func, sql, create_by, create_time) values (\"", UUID, "\",\"", Func, "\" ,\"",
| CacheSql, "\",\"", CreateBy, "\",\"", CreateTime, "\");"],
| syncToolEsql:saveSqlToCache(InSql),
| execCacheSql(CacheDb, RecLists).
|
|