package com.basic.security.manager.impl.erlang; import com.basic.security.manager.impl.sqlite.SlBaseManager; import com.basic.security.manager.impl.sqlite.SlDeviceManager; import com.basic.security.utils.Constants; import com.basic.security.utils.DateUtil; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; public class ClusterManager extends SlBaseManager { /** * 创建集群 */ public static void createCluster(String clusterId, String clusterName, String cookie) { createClusterAndDeviceInfo(" ", clusterId, clusterName, cookie,"create"); } /** * 加入集群 */ public static void joinCluster(String existNodeName, String clusterId, String clusterName) { createClusterAndDeviceInfo(existNodeName, clusterId, clusterName, "123","join"); } private static void createClusterAndDeviceInfo(String fatherNode, String clusterId, String clusterName, String cookie,String type) { new Thread(){ @Override public void run() { class RunHeler { void insertIntoDeviceInfo(String clusterId, String existNodeName) { String deviceInsertSql = "INSERT OR REPLACE INTO device_info " + "(father_node, uuid, node_id, device_id, cluster_id, update_time, create_by, del_flag) "+ "VALUES ('"+existNodeName+"', '" + SlDeviceManager.getDeviceId() + "', '" + Constants.erlangLocalNode + "', '" + SlDeviceManager.getDeviceId() + "', '" + clusterId + "', '" + DateUtil.getDateTimeStrFull()+ "', '" + Constants.erlangLocalNode + "','"+ Constants.delFlag+"');"; SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), deviceInsertSql); SqliteManager.executeInCache(ErlangConnection.getLocalConnection(),deviceInsertSql); if (existNodeName.length() > 5) { DeviceManager.downloadDeviceInfoFromOtherDevices(existNodeName); SqliteManager.executeInSync(ErlangConnection.getRemoteConnection(existNodeName), deviceInsertSql); SqliteManager.executeInCache(ErlangConnection.getRemoteConnection(existNodeName), deviceInsertSql); } } void insertIntoClusterInfo(String clusterId, String clusterName) { String clusterInsertSql = "INSERT OR REPLACE INTO cluster_info " + "(cluster_id, cluster_name, update_time, create_by, del_flag)"+ "VALUES ('"+ clusterId + "','"+ clusterName + "','"+ DateUtil.getDateTimeStrFull() + "','"+ Constants.erlangLocalNode + "','"+ Constants.delFlag+"');"; SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), clusterInsertSql); SqliteManager.executeInCache(ErlangConnection.getLocalConnection(),clusterInsertSql); } public void execute() { if("create".equals(type)){ insertIntoClusterInfo(clusterId, clusterName); } insertIntoDeviceInfo(clusterId, fatherNode); } } new RunHeler().execute(); } }.start(); } /** * 退出集群 * 逻辑删除:0:未删除 1:已删除 */ public static void exitCluster(String deviceId){ new Thread(){ @Override public void run() { class RunHandle{ void updateDeviceInfo(String deviceId){ String sql = "update device_info SET del_flag=1 WHERE 1=1 AND device_id='"+deviceId+"'"; SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), sql); } public void execute() { updateDeviceInfo(deviceId); } } new RunHandle().execute(); } }.start(); } /** * 退出集群 * 物理删除:delete */ public static void phyDelDeviceInfo(String deviceId){ new Thread(){ @Override public void run() { class RunHandle{ void delete(String deviceId){ // String sql = "delete from device_info where device_id='"+deviceId+"'"; String sql = "delete from device_info"; SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), sql); } public void execute() { delete(deviceId); } } new RunHandle().execute(); } }.start(); } /** * 查询集群信息 */ public static List> queryCluster(String cluster_name,String cluster_cookie){ class ThreadCallable implements Callable>> { @Override public List> call() throws Exception { List> list = new ArrayList<>(); String sql = "SELECT * FROM cluster_info where cluster_name='"+cluster_name+"' and cluster_id='"+cluster_cookie+"'"; list = SqliteManager.queryFromSync(ErlangConnection.getLocalConnection(),sql); return list; } } // 创建一个大小为10的线程池 ExecutorService executor = Executors.newFixedThreadPool(10); Future>> future = executor.submit(new ThreadCallable()); List> list = new ArrayList<>(); try { //调用gei方法获取线程执行结果,在线程执行完成前该方法会一直阻塞 list = future.get(); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return list; } /** * 查询当前设备是否已创建集群,一个设备只能创建一个集群 */ public static List> queryClusterByDevice(){ class ThreadCallable implements Callable>> { @Override public List> call() throws Exception { List> list = new ArrayList<>(); String sql = "select * from cluster_info where create_by='"+Constants.erlangLocalNode+"'"; list = SqliteManager.queryFromSync(ErlangConnection.getLocalConnection(),sql); return list; } } // 创建一个大小为10的线程池 ExecutorService executor = Executors.newFixedThreadPool(10); Future>> future = executor.submit(new ThreadCallable()); List> list = new ArrayList<>(); try { list = future.get(); } catch (ExecutionException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return list; } }