package com.basic.security.manager; import android.os.SystemClock; import android.text.TextUtils; import com.basic.security.activity.MainActivity; import com.basic.security.base.BaseApplication; import com.basic.security.model.ClusterSetting; import com.basic.security.model.ModelAdapter; import com.basic.security.model.Node; import com.basic.security.model.RowToSerf; import com.basic.security.utils.ExceptionUtil; import com.basic.security.utils.SqlSplit; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.Queue; import androidSync.AndroidSync; public class ClusterSerfSyncManager { public static final Object serfQueueLock = new Object(); public static final Object serfReceiveQueueLock = new Object(); public static final Object serfSendQueueLock = new Object(); public static Queue serfQueue = new PriorityQueue<>(); public static List serfReceiveQueue = new ArrayList<>(); public static List serfReceiveQueueCopy = new ArrayList<>(); public static List serfSendQueueCopy = new ArrayList<>(); public static List serfSendQueue = new ArrayList<>(); public static Comparator> mapComparator = (m1, m2) -> { try { return m1.get(Node.nodeID).compareTo(m2.get(Node.nodeID)); } catch (Exception e) { return 0; } }; static int count = 0; static boolean started = false; static boolean prevStarted = false; static Map> prevNodeIdNodeMap = new HashMap<>(); static String serfQueueStr = ""; public static void start() { started = false; prevStarted = false; BaseApplication.getApplication().executorService.execute(() -> { boolean fromTimeOut = false; MainActivity mainActivity = BaseApplication.getApplication().activity; while (true) { try { ModelAdapter cluster = ClusterSettingManager.cluster; if (cluster.getBool(ClusterSetting.exit)) { if (mainActivity.currentFragment == mainActivity.fragment_cluster) { if (BaseApplication.getApplication().activity.fragment_cluster != null) { BaseApplication.getApplication().activity.fragment_cluster.updateDeviceList(new ArrayList<>()); } } prevNodeIdNodeMap.clear(); started = false; AndroidSync.leave(); } prevStarted = started; if (!cluster.getBool(ClusterSetting.exit) && !fromTimeOut) { String localNodeId = ClusterSettingManager.getLocalNodeId(); String getClusterId = ClusterSettingManager.getClusterId(); String password = ClusterSettingManager.getPassword(); if ( !TextUtils.isEmpty(localNodeId) && !TextUtils.isEmpty(getClusterId) && !TextUtils.isEmpty(password) ) { AndroidSync.leave(); AndroidSync.registerReceiveSqlInterfaceFromJava(ClusterSerfSyncManager::receiveData); AndroidSync.initCluster(); AndroidSync.initAgent(localNodeId); AndroidSync.syncInit(getClusterId, password, localNodeId, "", "/sdcard/"); started = true; } } if (started) { String otherNodeIp = ClusterSettingManager.getOtherNodeIp(); if (!TextUtils.isEmpty(otherNodeIp)) { } String nodesStr = new String(AndroidSync.getNodes()); List> availableNodes = new ArrayList<>(); try { Gson gson = new Gson(); Type typeOfListOfTypeX = new TypeToken>>() { }.getType(); List> nodes = gson.fromJson(nodesStr, typeOfListOfTypeX); for (Map node : nodes) { if ("1".equals(node.get("isAlive"))) { availableNodes.add(node); } } if (mainActivity.currentFragment == mainActivity.fragment_cluster) { Collections.sort(availableNodes, mapComparator); BaseApplication.getApplication().activity.fragment_cluster.updateDeviceList(availableNodes); } } catch (Exception e) { ExceptionUtil.printException(e); } if (availableNodes.size() <= 1) { otherNodeIp = ClusterSettingManager.getOtherNodeIp(); if (!TextUtils.isEmpty(otherNodeIp)) { AndroidSync.joinByNodeAddrs(otherNodeIp + ":30190"); } } try { synchronized (serfSendQueueLock) { if (serfSendQueue.size() == 0) { serfSendQueueLock.wait(3 * 1000); } if (serfSendQueue.size() > 0) { serfSendQueueCopy.addAll(serfSendQueue); serfSendQueue.clear(); } } for (ModelAdapter serfSendCopy : serfSendQueueCopy) { AndroidSync.syncSql(serfSendCopy.getString(RowToSerf.sql)); RowToSerfManager.confirmSend(serfSendCopy); try { // System1.out.println("ClusterSerfSyncManager.start serfSendCopy=" + serfSendCopy.getString(RowToSerf.sql).substring(0, 30)); } catch (Exception e) { System1.out.println("ClusterSerfSyncManager.start " + e.getMessage()); } } serfSendQueueCopy.clear(); } catch (Exception e) { ExceptionUtil.printException(e); } if (prevStarted != started) { SystemClock.sleep(5 * 1000); } count++; } serfQueueStr = ""; synchronized (serfQueueLock) { if (serfQueue.peek() == null) { serfQueueLock.wait(3 * 1000); } if (serfQueue.size() > 0) { serfQueueStr = serfQueue.remove(); fromTimeOut = false; } else { fromTimeOut = true; } } } catch (Exception e) { ExceptionUtil.printException(e); } } }); BaseApplication.getApplication().executorService.execute(() -> { while (true) { try { synchronized (serfReceiveQueueLock) { if (serfReceiveQueue.size() == 0) { serfReceiveQueueLock.wait(3 * 1000); } if (serfReceiveQueue.size() > 0) { serfReceiveQueueCopy.addAll(serfReceiveQueue); serfReceiveQueue.clear(); } } for (String sqls : serfReceiveQueueCopy) { List sqlList = new ArrayList<>(); if (sqls.contains(";")) { String[] sqlArray = sqls.split(";"); sqlList.addAll(Arrays.asList(sqlArray)); } else { sqlList.add(sqls); } for (String sql : sqlList) { sql = sql.trim(); try { if (sql.contains("cameras")) { continue; } System1.out.println("ClusterSerfSyncManager.start tableName=" + SqlSplit.getTableName(sql) + " " + SqlSplit.containsTable(sql)); if (SqlSplit.containsTable(sql)) { System1.out.println("ClusterSerfSyncManager.start remoteSql=" + (sql.length() > 30 ? sql.substring(0, 30) : sql) + "..."); SerfToRowManager.addSerfToRow(sql); DatabaseManager.execSQL(sql); PersonAManager.afterExecutedSql(sql); PersonATypeManager.afterExecutedSql(sql); } } catch (Exception e) { ExceptionUtil.printException(e); System1.out.println("ClusterSerfSyncManager.start " + e.getMessage()); } } } serfReceiveQueueCopy.clear(); } catch (Exception e) { ExceptionUtil.printException(e); } } }); } private static void receiveData(String content) { try { synchronized (serfReceiveQueueLock) { serfReceiveQueue.add(content); serfReceiveQueueLock.notify(); } } catch (Exception e) { ExceptionUtil.printException(e); } } public static void sendData(ModelAdapter rowToSerf) { BaseApplication.getApplication().executorService.execute(new Runnable() { @Override public void run() { try { synchronized (serfSendQueueLock) { serfSendQueue.add(rowToSerf); serfSendQueueLock.notify(); } } catch (Exception e) { ExceptionUtil.printException(e); } } }); } }