From 603cb36a5123e46656b06a5deb8d7ac7ff81307f Mon Sep 17 00:00:00 2001 From: 554325746@qq.com <554325746@qq.com> Date: 星期三, 25 十二月 2019 08:48:51 +0800 Subject: [PATCH] a --- app/src/main/java/com/basic/security/manager/ClusterSerfSyncManager.java | 78 +++++++++++++++++++++++++++------------ 1 files changed, 54 insertions(+), 24 deletions(-) diff --git a/app/src/main/java/com/basic/security/manager/ClusterSerfSyncManager.java b/app/src/main/java/com/basic/security/manager/ClusterSerfSyncManager.java index f0938d5..a58e5e7 100644 --- a/app/src/main/java/com/basic/security/manager/ClusterSerfSyncManager.java +++ b/app/src/main/java/com/basic/security/manager/ClusterSerfSyncManager.java @@ -8,11 +8,14 @@ import com.basic.security.model.ClusterSetting; import com.basic.security.model.ModelAdapter; import com.basic.security.model.Node; +import com.basic.security.model.RowToSerf; +import com.basic.security.utils.SqlSplit; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.lang.reflect.Type; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -30,8 +33,8 @@ public static Queue<String> serfQueue = new PriorityQueue<>(); public static List<String> serfReceiveQueue = new ArrayList<>(); public static List<String> serfReceiveQueueCopy = new ArrayList<>(); - public static List<String> serfSendQueueCopy = new ArrayList<>(); - public static List<String> serfSendQueue = new ArrayList<>(); + public static List<ModelAdapter> serfSendQueueCopy = new ArrayList<>(); + public static List<ModelAdapter> serfSendQueue = new ArrayList<>(); public static Comparator<Map<String, String>> mapComparator = (m1, m2) -> { try { return m1.get(Node.nodeID).compareTo(m2.get(Node.nodeID)); @@ -53,7 +56,7 @@ MainActivity mainActivity = BaseApplication.getApplication().activity; while (true) { try { -// System.out.println("ClusterSerfSyncManager.start 2 "); +// System1.out.println("ClusterSerfSyncManager.start 2 "); ModelAdapter cluster = ClusterSettingManager.cluster; if (cluster.getBool(ClusterSetting.exit)) { if (mainActivity.currentFragment == mainActivity.fragment_cluster) { @@ -78,8 +81,8 @@ AndroidSync.initCluster(); AndroidSync.initAgent(localNodeId); AndroidSync.syncInit(getClusterId, password, localNodeId, ""); -// System.out.println("ClusterSerfSyncManager.start 0 " + getClusterId + " " + password + " " + localNodeId + " "); -// System.out.println("ClusterSerfSyncManager.start 3 " + cluster.getBool(ClusterSetting.exit)); +// System1.out.println("ClusterSerfSyncManager.start 0 " + getClusterId + " " + password + " " + localNodeId + " "); +// System1.out.println("ClusterSerfSyncManager.start 3 " + cluster.getBool(ClusterSetting.exit)); started = true; } } @@ -111,7 +114,7 @@ otherNodeIp = ClusterSettingManager.getOtherNodeIp(); if (!TextUtils.isEmpty(otherNodeIp)) { AndroidSync.joinByNodeAddrs(otherNodeIp + ":30190"); -// System.out.println("ClusterSerfSyncManager.start 1 " + otherNodeIp + ":30190"); +// System1.out.println("ClusterSerfSyncManager.start 1 " + otherNodeIp + ":30190"); } } try { @@ -124,8 +127,14 @@ serfSendQueue.clear(); } } - for (String serfSendCopy : serfSendQueueCopy) { -// System.out.println("ClusterSerfSyncManager.start serfSendCopy=" + serfSendCopy); + for (ModelAdapter serfSendCopy : serfSendQueueCopy) { + AndroidSync.syncSql(serfSendCopy.getString(RowToSerf.sql)); + RowToSerfManager.confirmSend(serfSendCopy); + try { + System1.out.println("ClusterSerfSyncManager.start serfSendCopy=" + serfSendCopy.getString(RowToSerf.sql).substring(0, 30)); + } catch (Exception e) { + System1.out.println("ClusterSerfSyncManager.start " + e.getMessage()); + } } serfSendQueueCopy.clear(); } catch (Exception e) { @@ -153,12 +162,12 @@ } } }); - BaseApplication.getApplication().executorService.execute(() -> { - while (true) { - sendData("select * from table1 count=" + count + ", node=node3"); - SystemClock.sleep(4 * 1000); - } - }); +// BaseApplication.getApplication().executorService.execute(() -> { +// while (true) { +// sendData("select * from table1 count=" + count + ", node=node3"); +// SystemClock.sleep(4 * 1000); +// } +// }); BaseApplication.getApplication().executorService.execute(() -> { while (true) { try { @@ -171,15 +180,34 @@ serfReceiveQueue.clear(); } } - for (String serfReceiveCopy : serfReceiveQueueCopy) { - System.out.println("ClusterSerfSyncManager.start serfReceiveCopy=" + serfReceiveCopy); - try { - DatabaseManager.execSQL(serfReceiveCopy); - PersonAManager.afterExecutedSql(serfReceiveCopy); - PersonATypeManager.afterExecutedSql(serfReceiveCopy); - } catch (Exception e) { - e.printStackTrace(); + for (String sqls : serfReceiveQueueCopy) { + List<String> sqlList = new ArrayList<>(); + if (sqls.contains(";")) { + String[] sqlArray = sqls.split(";"); + sqlList.addAll(Arrays.asList(sqlArray)); + } else { + sqlList.add(sqls); } + for (String sql : sqlList) { + sql = sql.trim(); + try { + if (sql.contains("cameras")) { + continue; + } + System1.out.println("ClusterSerfSyncManager.start tableName=" + SqlSplit.getTableName(sql) + " " + SqlSplit.containsTable(sql)); + if (SqlSplit.containsTable(sql)) { + System1.out.println("ClusterSerfSyncManager.start remoteSql=" + (sql.length() > 30 ? sql.substring(0, 30) : sql) + "..."); + SerfToRowManager.addSerfToRow(sql); +// DatabaseManager.execSQL(sql); +// PersonAManager.afterExecutedSql(sql); +// PersonATypeManager.afterExecutedSql(sql); + } + } catch (Exception e) { + e.printStackTrace(); + System1.out.println("ClusterSerfSyncManager.start " + e.getMessage()); + } + } + } serfReceiveQueueCopy.clear(); } catch (Exception e) { @@ -192,6 +220,7 @@ private static void receiveData(String content) { try { synchronized (serfReceiveQueueLock) { +// System1.out.println("ClusterSerfSyncManager.receiveData " + content); serfReceiveQueue.add(content); serfReceiveQueueLock.notify(); } @@ -200,13 +229,14 @@ } } - public static void sendData(String content) { + public static void sendData(ModelAdapter rowToSerf) { +// System1.out.println("ClusterSerfSyncManager.sendData sql="+content); BaseApplication.getApplication().executorService.execute(new Runnable() { @Override public void run() { try { synchronized (serfSendQueueLock) { - serfSendQueue.add(content); + serfSendQueue.add(rowToSerf); serfSendQueueLock.notify(); } } catch (Exception e) { -- Gitblit v1.8.0