package com.basic.security.manager.erlang; import com.basic.security.base.BaseApplication; import com.basic.security.manager.BaseManager; import com.basic.security.manager.DeviceManager; import com.basic.security.utils.Constants; import com.basic.security.utils.DateUtil; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ClusterManager extends BaseManager { /** * 创建集群 */ public static void createCluster(String clusterId, String clusterName, String cookie) { createClusterAndDeviceInfo(" ", clusterId, clusterName, cookie, "create"); } /** * 加入集群 */ public static void joinCluster(String existNodeName, String clusterId, String clusterName) { createClusterAndDeviceInfo(existNodeName, clusterId, clusterName, "123", "join"); } private static void createClusterAndDeviceInfo(String fatherNode, String clusterId, String clusterName, String cookie, String type) { BaseApplication.getApplication().executorService.execute(() -> { class RunHeler { void insertIntoDeviceInfo(String clusterId, String existNodeName) { String deviceInsertSql = "INSERT OR REPLACE INTO device_info " + "(father_node, uuid, node_id, device_id, cluster_id, update_time, create_by, del_flag) " + "VALUES ('" + existNodeName + "', '" + DeviceManager.getDeviceId() + "', '" + Constants.erlangLocalNode + "', '" + DeviceManager.getDeviceId() + "', '" + clusterId + "', '" + DateUtil.getDateTimeStrFull() + "', '" + Constants.erlangLocalNode + "','" + Constants.delFlag + "');"; SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), deviceInsertSql); SqliteManager.executeInCache(ErlangConnection.getLocalConnection(), deviceInsertSql); if (existNodeName.length() > 5) { com.basic.security.manager.erlang.DeviceManager.downloadDeviceInfoFromOtherDevices(existNodeName); SqliteManager.executeInSync(ErlangConnection.getRemoteConnection(existNodeName), deviceInsertSql); SqliteManager.executeInCache(ErlangConnection.getRemoteConnection(existNodeName), deviceInsertSql); } } void insertIntoClusterInfo(String clusterId, String clusterName) { String clusterInsertSql = "INSERT OR REPLACE INTO cluster_info " + "(cluster_id, cluster_name, update_time, create_by, del_flag)" + "VALUES ('" + clusterId + "','" + clusterName + "','" + DateUtil.getDateTimeStrFull() + "','" + Constants.erlangLocalNode + "','" + Constants.delFlag + "');"; SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), clusterInsertSql); SqliteManager.executeInCache(ErlangConnection.getLocalConnection(), clusterInsertSql); } public void execute() { if ("create".equals(type)) { insertIntoClusterInfo(clusterId, clusterName); } insertIntoDeviceInfo(clusterId, fatherNode); } } new RunHeler().execute(); }); } /** * 退出集群 * 逻辑删除:0:未删除 1:已删除 */ public static void exitCluster(String deviceId) { BaseApplication.getApplication().executorService.execute(() -> { class RunHandle { void updateDeviceInfo(String deviceId) { String sql = "update device_info SET del_flag=1 WHERE 1=1 AND device_id='" + deviceId + "'"; SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), sql); } public void execute() { updateDeviceInfo(deviceId); } } new RunHandle().execute(); }); } /** * 退出集群 * 物理删除:delete */ public static void phyDelDeviceInfo(String deviceId) { BaseApplication.getApplication().executorService.execute(() -> { class RunHandle { void delete(String deviceId) { // String sql = "delete from device_info where device_id='"+deviceId+"'"; String sql = "delete from device_info"; SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), sql); } public void execute() { delete(deviceId); } } new RunHandle().execute(); }); } /** * 查询集群信息 */ public static List> queryCluster(String cluster_name, String cluster_cookie) { class ThreadCallable implements Callable>> { @Override public List> call() throws Exception { List> list = new ArrayList<>(); String sql = "SELECT * FROM cluster_info where cluster_name='" + cluster_name + "' and cluster_id='" + cluster_cookie + "'"; list = SqliteManager.queryFromSync(ErlangConnection.getLocalConnection(), sql); return list; } } // 创建一个大小为10的线程池 ExecutorService executor = Executors.newFixedThreadPool(10); Future>> future = executor.submit(new ThreadCallable()); List> list = new ArrayList<>(); try { //调用gei方法获取线程执行结果,在线程执行完成前该方法会一直阻塞 list = future.get(); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return list; } /** * 查询当前设备是否已创建集群,一个设备只能创建一个集群 */ public static List> queryClusterByDevice() { class ThreadCallable implements Callable>> { @Override public List> call() throws Exception { List> list = new ArrayList<>(); String sql = "select * from cluster_info where create_by='" + Constants.erlangLocalNode + "'"; list = SqliteManager.queryFromSync(ErlangConnection.getLocalConnection(), sql); return list; } } // 创建一个大小为10的线程池 ExecutorService executor = Executors.newFixedThreadPool(10); Future>> future = executor.submit(new ThreadCallable()); List> list = new ArrayList<>(); try { list = future.get(); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return list; } }