package com.basic.security.manager.erlang;
|
|
import com.basic.security.base.BaseApplication;
|
import com.basic.security.manager.BaseManager;
|
import com.basic.security.manager.DeviceManager;
|
import com.basic.security.utils.Constants;
|
import com.basic.security.utils.DateUtil;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.Callable;
|
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.Future;
|
|
public class ClusterManager extends BaseManager {
|
|
/**
|
* 创建集群
|
*/
|
public static void createCluster(String clusterId, String clusterName, String cookie) {
|
createClusterAndDeviceInfo(" ", clusterId, clusterName, cookie, "create");
|
}
|
|
/**
|
* 加入集群
|
*/
|
public static void joinCluster(String existNodeName, String clusterId, String clusterName) {
|
createClusterAndDeviceInfo(existNodeName, clusterId, clusterName, "123", "join");
|
}
|
|
private static void createClusterAndDeviceInfo(String fatherNode, String clusterId, String clusterName, String cookie, String type) {
|
BaseApplication.getApplication().executorService.execute(() -> {
|
class RunHeler {
|
void insertIntoDeviceInfo(String clusterId, String existNodeName) {
|
String deviceInsertSql = "INSERT OR REPLACE INTO device_info " +
|
"(father_node, uuid, node_id, device_id, cluster_id, update_time, create_by, del_flag) " +
|
"VALUES ('" + existNodeName + "', '" + DeviceManager.getDeviceId() + "', '" + Constants.erlangLocalNode + "', '" +
|
DeviceManager.getDeviceId() + "', '" + clusterId + "', '" +
|
DateUtil.getDateTimeStrFull() + "', '" + Constants.erlangLocalNode + "','" + Constants.delFlag + "');";
|
SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), deviceInsertSql);
|
SqliteManager.executeInCache(ErlangConnection.getLocalConnection(), deviceInsertSql);
|
if (existNodeName.length() > 5) {
|
com.basic.security.manager.erlang.DeviceManager.downloadDeviceInfoFromOtherDevices(existNodeName);
|
SqliteManager.executeInSync(ErlangConnection.getRemoteConnection(existNodeName), deviceInsertSql);
|
SqliteManager.executeInCache(ErlangConnection.getRemoteConnection(existNodeName), deviceInsertSql);
|
}
|
}
|
|
void insertIntoClusterInfo(String clusterId, String clusterName) {
|
String clusterInsertSql = "INSERT OR REPLACE INTO cluster_info " +
|
"(cluster_id, cluster_name, update_time, create_by, del_flag)" +
|
"VALUES ('" + clusterId + "','" + clusterName + "','" + DateUtil.getDateTimeStrFull() + "','" + Constants.erlangLocalNode + "','" + Constants.delFlag + "');";
|
SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), clusterInsertSql);
|
SqliteManager.executeInCache(ErlangConnection.getLocalConnection(), clusterInsertSql);
|
}
|
|
public void execute() {
|
if ("create".equals(type)) {
|
insertIntoClusterInfo(clusterId, clusterName);
|
}
|
insertIntoDeviceInfo(clusterId, fatherNode);
|
}
|
}
|
new RunHeler().execute();
|
});
|
}
|
|
/**
|
* 退出集群
|
* 逻辑删除:0:未删除 1:已删除
|
*/
|
public static void exitCluster(String deviceId) {
|
BaseApplication.getApplication().executorService.execute(() -> {
|
class RunHandle {
|
void updateDeviceInfo(String deviceId) {
|
String sql = "update device_info SET del_flag=1 WHERE 1=1 AND device_id='" + deviceId + "'";
|
SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), sql);
|
}
|
|
public void execute() {
|
updateDeviceInfo(deviceId);
|
}
|
}
|
new RunHandle().execute();
|
});
|
}
|
|
/**
|
* 退出集群
|
* 物理删除:delete
|
*/
|
public static void phyDelDeviceInfo(String deviceId) {
|
BaseApplication.getApplication().executorService.execute(() -> {
|
class RunHandle {
|
void delete(String deviceId) {
|
// String sql = "delete from device_info where device_id='"+deviceId+"'";
|
String sql = "delete from device_info";
|
SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), sql);
|
}
|
|
public void execute() {
|
delete(deviceId);
|
}
|
}
|
new RunHandle().execute();
|
});
|
}
|
|
/**
|
* 查询集群信息
|
*/
|
public static List<Map<String, String>> queryCluster(String cluster_name, String cluster_cookie) {
|
|
class ThreadCallable implements Callable<List<Map<String, String>>> {
|
@Override
|
public List<Map<String, String>> call() throws Exception {
|
List<Map<String, String>> list = new ArrayList<>();
|
String sql = "SELECT * FROM cluster_info where cluster_name='" + cluster_name + "' and cluster_id='" + cluster_cookie + "'";
|
list = SqliteManager.queryFromSync(ErlangConnection.getLocalConnection(), sql);
|
return list;
|
}
|
}
|
// 创建一个大小为10的线程池
|
ExecutorService executor = Executors.newFixedThreadPool(10);
|
Future<List<Map<String, String>>> future = executor.submit(new ThreadCallable());
|
List<Map<String, String>> list = new ArrayList<>();
|
try {
|
//调用gei方法获取线程执行结果,在线程执行完成前该方法会一直阻塞
|
list = future.get();
|
} catch (ExecutionException e) {
|
e.printStackTrace();
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
return list;
|
}
|
|
/**
|
* 查询当前设备是否已创建集群,一个设备只能创建一个集群
|
*/
|
public static List<Map<String, String>> queryClusterByDevice() {
|
|
class ThreadCallable implements Callable<List<Map<String, String>>> {
|
|
@Override
|
public List<Map<String, String>> call() throws Exception {
|
List<Map<String, String>> list = new ArrayList<>();
|
String sql = "select * from cluster_info where create_by='" + Constants.erlangLocalNode + "'";
|
list = SqliteManager.queryFromSync(ErlangConnection.getLocalConnection(), sql);
|
return list;
|
}
|
}
|
// 创建一个大小为10的线程池
|
ExecutorService executor = Executors.newFixedThreadPool(10);
|
Future<List<Map<String, String>>> future = executor.submit(new ThreadCallable());
|
List<Map<String, String>> list = new ArrayList<>();
|
try {
|
list = future.get();
|
} catch (ExecutionException e) {
|
e.printStackTrace();
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
return list;
|
}
|
|
|
}
|