package com.basic.security.manager.impl.erlang;
|
|
import com.basic.security.manager.impl.sqlite.SlBaseManager;
|
import com.basic.security.manager.impl.sqlite.SlDeviceManager;
|
import com.basic.security.utils.Constants;
|
import com.basic.security.utils.DateUtil;
|
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.Callable;
|
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.Future;
|
|
public class ClusterManager extends SlBaseManager {
|
|
/**
|
* 创建集群
|
*/
|
public static void createCluster(String clusterId, String clusterName, String cookie) {
|
createClusterAndDeviceInfo(" ", clusterId, clusterName, cookie,"create");
|
}
|
|
/**
|
* 加入集群
|
*/
|
public static void joinCluster(String existNodeName, String clusterId, String clusterName) {
|
createClusterAndDeviceInfo(existNodeName, clusterId, clusterName, "123","join");
|
}
|
|
private static void createClusterAndDeviceInfo(String fatherNode, String clusterId, String clusterName, String cookie,String type) {
|
new Thread(){
|
@Override
|
public void run() {
|
class RunHeler {
|
void insertIntoDeviceInfo(String clusterId, String existNodeName) {
|
String deviceInsertSql = "INSERT OR REPLACE INTO device_info " +
|
"(father_node, uuid, node_id, device_id, cluster_id, update_time, create_by, del_flag) "+
|
"VALUES ('"+existNodeName+"', '" + SlDeviceManager.getDeviceId() + "', '" + Constants.erlangLocalNode + "', '" +
|
SlDeviceManager.getDeviceId() + "', '" + clusterId + "', '" +
|
DateUtil.getDateTimeStrFull()+ "', '" + Constants.erlangLocalNode + "','"+ Constants.delFlag+"');";
|
SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), deviceInsertSql);
|
SqliteManager.executeInCache(ErlangConnection.getLocalConnection(),deviceInsertSql);
|
if (existNodeName.length() > 5) {
|
DeviceManager.downloadDeviceInfoFromOtherDevices(existNodeName);
|
SqliteManager.executeInSync(ErlangConnection.getRemoteConnection(existNodeName), deviceInsertSql);
|
SqliteManager.executeInCache(ErlangConnection.getRemoteConnection(existNodeName), deviceInsertSql);
|
}
|
}
|
|
void insertIntoClusterInfo(String clusterId, String clusterName) {
|
String clusterInsertSql = "INSERT OR REPLACE INTO cluster_info " +
|
"(cluster_id, cluster_name, update_time, create_by, del_flag)"+
|
"VALUES ('"+ clusterId + "','"+ clusterName + "','"+ DateUtil.getDateTimeStrFull() + "','"+ Constants.erlangLocalNode + "','"+ Constants.delFlag+"');";
|
SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), clusterInsertSql);
|
SqliteManager.executeInCache(ErlangConnection.getLocalConnection(),clusterInsertSql);
|
}
|
public void execute() {
|
if("create".equals(type)){
|
insertIntoClusterInfo(clusterId, clusterName);
|
}
|
insertIntoDeviceInfo(clusterId, fatherNode);
|
}
|
}
|
new RunHeler().execute();
|
}
|
}.start();
|
}
|
|
/**
|
* 退出集群
|
* 逻辑删除:0:未删除 1:已删除
|
*/
|
public static void exitCluster(String deviceId){
|
new Thread(){
|
@Override
|
public void run() {
|
class RunHandle{
|
void updateDeviceInfo(String deviceId){
|
String sql = "update device_info SET del_flag=1 WHERE 1=1 AND device_id='"+deviceId+"'";
|
SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), sql);
|
}
|
public void execute() {
|
updateDeviceInfo(deviceId);
|
}
|
}
|
new RunHandle().execute();
|
}
|
}.start();
|
}
|
|
/**
|
* 退出集群
|
* 物理删除:delete
|
*/
|
public static void phyDelDeviceInfo(String deviceId){
|
new Thread(){
|
@Override
|
public void run() {
|
class RunHandle{
|
void delete(String deviceId){
|
// String sql = "delete from device_info where device_id='"+deviceId+"'";
|
String sql = "delete from device_info";
|
SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), sql);
|
}
|
public void execute() {
|
delete(deviceId);
|
}
|
}
|
new RunHandle().execute();
|
}
|
}.start();
|
}
|
|
/**
|
* 查询集群信息
|
*/
|
public static List<Map<String,String>> queryCluster(String cluster_name,String cluster_cookie){
|
|
class ThreadCallable implements Callable<List<Map<String,String>>> {
|
@Override
|
public List<Map<String, String>> call() throws Exception {
|
List<Map<String,String>> list = new ArrayList<>();
|
String sql = "SELECT * FROM cluster_info where cluster_name='"+cluster_name+"' and cluster_id='"+cluster_cookie+"'";
|
list = SqliteManager.queryFromSync(ErlangConnection.getLocalConnection(),sql);
|
return list;
|
}
|
}
|
// 创建一个大小为10的线程池
|
ExecutorService executor = Executors.newFixedThreadPool(10);
|
Future<List<Map<String, String>>> future = executor.submit(new ThreadCallable());
|
List<Map<String,String>> list = new ArrayList<>();
|
try {
|
//调用gei方法获取线程执行结果,在线程执行完成前该方法会一直阻塞
|
list = future.get();
|
} catch (ExecutionException e) {
|
e.printStackTrace();
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
return list;
|
}
|
|
/**
|
* 查询当前设备是否已创建集群,一个设备只能创建一个集群
|
*/
|
public static List<Map<String,String>> queryClusterByDevice(){
|
|
class ThreadCallable implements Callable<List<Map<String,String>>> {
|
|
@Override
|
public List<Map<String, String>> call() throws Exception {
|
List<Map<String,String>> list = new ArrayList<>();
|
String sql = "select * from cluster_info where create_by='"+Constants.erlangLocalNode+"'";
|
list = SqliteManager.queryFromSync(ErlangConnection.getLocalConnection(),sql);
|
return list;
|
}
|
}
|
// 创建一个大小为10的线程池
|
ExecutorService executor = Executors.newFixedThreadPool(10);
|
Future<List<Map<String, String>>> future = executor.submit(new ThreadCallable());
|
List<Map<String,String>> list = new ArrayList<>();
|
try {
|
list = future.get();
|
} catch (ExecutionException e) {
|
e.printStackTrace();
|
} catch (InterruptedException e) {
|
e.printStackTrace();
|
}
|
return list;
|
}
|
|
|
}
|