package com.basic.security.manager; import android.os.SystemClock; import android.text.TextUtils; import com.basic.security.activity.MainActivity; import com.basic.security.base.BaseApplication; import com.basic.security.model.ClusterSetting; import com.basic.security.model.ModelAdapter; import com.basic.security.model.Node; import com.basic.security.model.RowToSerf; import com.basic.security.utils.SqlSplit; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import androidSync.AndroidSync; public class ClusterSerfSyncManager { public static final Object serfQueueLock = new Object(); public static final Object serfReceiveQueueLock = new Object(); public static final Object serfSendQueueLock = new Object(); public static Queue serfQueue = new PriorityQueue<>(); public static List serfReceiveQueue = new ArrayList<>(); public static List serfReceiveQueueCopy = new ArrayList<>(); public static List serfSendQueueCopy = new ArrayList<>(); public static List serfSendQueue = new ArrayList<>(); public static Comparator> mapComparator = (m1, m2) -> { try { return m1.get(Node.nodeID).compareTo(m2.get(Node.nodeID)); } catch (Exception e) { return 0; } }; static int count = 0; static boolean started = false; static boolean prevStarted = false; static Map> prevNodeIdNodeMap = new HashMap<>(); static String serfQueueStr = ""; public static void start() { started = false; prevStarted = false; BaseApplication.getApplication().executorService.execute(() -> { boolean fromTimeOut = false; MainActivity mainActivity = BaseApplication.getApplication().activity; while (true) { try { // System1.out.println("ClusterSerfSyncManager.start 2 "); ModelAdapter cluster = ClusterSettingManager.cluster; if (cluster.getBool(ClusterSetting.exit)) { if (mainActivity.currentFragment == mainActivity.fragment_cluster) { BaseApplication.getApplication().activity.fragment_cluster.updateDeviceList(new ArrayList<>()); } prevNodeIdNodeMap.clear(); started = false; AndroidSync.leave(); } prevStarted = started; if (!cluster.getBool(ClusterSetting.exit) && !fromTimeOut) { String localNodeId = ClusterSettingManager.getLocalNodeId(); String getClusterId = ClusterSettingManager.getClusterId(); String password = ClusterSettingManager.getPassword(); if ( !TextUtils.isEmpty(localNodeId) && !TextUtils.isEmpty(getClusterId) && !TextUtils.isEmpty(password) ) { AndroidSync.leave(); AndroidSync.registerReceiveSqlInterfaceFromJava(ClusterSerfSyncManager::receiveData); AndroidSync.initCluster(); AndroidSync.initAgent(localNodeId); AndroidSync.syncInit(getClusterId, password, localNodeId, ""); // System1.out.println("ClusterSerfSyncManager.start 0 " + getClusterId + " " + password + " " + localNodeId + " "); // System1.out.println("ClusterSerfSyncManager.start 3 " + cluster.getBool(ClusterSetting.exit)); started = true; } } if (started) { String otherNodeIp = ClusterSettingManager.getOtherNodeIp(); if (!TextUtils.isEmpty(otherNodeIp)) { // AndroidSync.joinByNodeAddrs(otherNodeIp + ":30190"); } String nodesStr = new String(AndroidSync.getNodes()); List> availableNodes = new ArrayList<>(); try { Gson gson = new Gson(); Type typeOfListOfTypeX = new TypeToken>>() { }.getType(); List> nodes = gson.fromJson(nodesStr, typeOfListOfTypeX); for (Map node : nodes) { if ("1".equals(node.get("isAlive"))) { availableNodes.add(node); } } if (mainActivity.currentFragment == mainActivity.fragment_cluster) { Collections.sort(availableNodes, mapComparator); BaseApplication.getApplication().activity.fragment_cluster.updateDeviceList(availableNodes); } } catch (Exception e) { e.printStackTrace(); } if (availableNodes.size() <= 1) { otherNodeIp = ClusterSettingManager.getOtherNodeIp(); if (!TextUtils.isEmpty(otherNodeIp)) { AndroidSync.joinByNodeAddrs(otherNodeIp + ":30190"); // System1.out.println("ClusterSerfSyncManager.start 1 " + otherNodeIp + ":30190"); } } try { synchronized (serfSendQueueLock) { if (serfSendQueue.size() == 0) { serfSendQueueLock.wait(3 * 1000); } if (serfSendQueue.size() > 0) { serfSendQueueCopy.addAll(serfSendQueue); serfSendQueue.clear(); } } for (ModelAdapter serfSendCopy : serfSendQueueCopy) { AndroidSync.syncSql(serfSendCopy.getString(RowToSerf.sql)); RowToSerfManager.confirmSend(serfSendCopy); try { System1.out.println("ClusterSerfSyncManager.start serfSendCopy=" + serfSendCopy.getString(RowToSerf.sql).substring(0, 30)); } catch (Exception e) { System1.out.println("ClusterSerfSyncManager.start " + e.getMessage()); } } serfSendQueueCopy.clear(); } catch (Exception e) { e.printStackTrace(); } if (prevStarted != started) { SystemClock.sleep(5 * 1000); } count++; } serfQueueStr = ""; synchronized (serfQueueLock) { if (serfQueue.peek() == null) { serfQueueLock.wait(3 * 1000); } if (serfQueue.size() > 0) { serfQueueStr = serfQueue.remove(); fromTimeOut = false; } else { fromTimeOut = true; } } } catch (Exception e) { e.printStackTrace(); } } }); // BaseApplication.getApplication().executorService.execute(() -> { // while (true) { // sendData("select * from table1 count=" + count + ", node=node3"); // SystemClock.sleep(4 * 1000); // } // }); BaseApplication.getApplication().executorService.execute(() -> { while (true) { try { synchronized (serfReceiveQueueLock) { if (serfReceiveQueue.size() == 0) { serfReceiveQueueLock.wait(3 * 1000); } if (serfReceiveQueue.size() > 0) { serfReceiveQueueCopy.addAll(serfReceiveQueue); serfReceiveQueue.clear(); } } for (String sqls : serfReceiveQueueCopy) { List sqlList = new ArrayList<>(); if (sqls.contains(";")) { String[] sqlArray = sqls.split(";"); sqlList.addAll(Arrays.asList(sqlArray)); } else { sqlList.add(sqls); } for (String sql : sqlList) { sql = sql.trim(); try { if (sql.contains("cameras")) { continue; } System1.out.println("ClusterSerfSyncManager.start tableName=" + SqlSplit.getTableName(sql) + " " + SqlSplit.containsTable(sql)); if (SqlSplit.containsTable(sql)) { System1.out.println("ClusterSerfSyncManager.start remoteSql=" + (sql.length() > 30 ? sql.substring(0, 30) : sql) + "..."); SerfToRowManager.addSerfToRow(sql); // DatabaseManager.execSQL(sql); // PersonAManager.afterExecutedSql(sql); // PersonATypeManager.afterExecutedSql(sql); } } catch (Exception e) { e.printStackTrace(); System1.out.println("ClusterSerfSyncManager.start " + e.getMessage()); } } } serfReceiveQueueCopy.clear(); } catch (Exception e) { e.printStackTrace(); } } }); } private static void receiveData(String content) { try { synchronized (serfReceiveQueueLock) { // System1.out.println("ClusterSerfSyncManager.receiveData " + content); serfReceiveQueue.add(content); serfReceiveQueueLock.notify(); } } catch (Exception e) { e.printStackTrace(); } } public static void sendData(ModelAdapter rowToSerf) { // System1.out.println("ClusterSerfSyncManager.sendData sql="+content); BaseApplication.getApplication().executorService.execute(new Runnable() { @Override public void run() { try { synchronized (serfSendQueueLock) { serfSendQueue.add(rowToSerf); serfSendQueueLock.notify(); } } catch (Exception e) { e.printStackTrace(); } } }); } }