zhangzengfei
2022-01-10 4496b59ab27d569df1da7ef634e02273b3a9618a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.basic.security.manager.erlang;
 
import com.basic.security.base.BaseApplication;
import com.basic.security.manager.BaseManager;
import com.basic.security.manager.DeviceManager;
import com.basic.security.utils.Constants;
import com.basic.security.utils.DateUtil;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
 
public class ClusterManager extends BaseManager {
 
    /**
     * 创建集群
     */
    public static void createCluster(String clusterId, String clusterName, String cookie) {
        createClusterAndDeviceInfo(" ", clusterId, clusterName, cookie, "create");
    }
 
    /**
     * 加入集群
     */
    public static void joinCluster(String existNodeName, String clusterId, String clusterName) {
        createClusterAndDeviceInfo(existNodeName, clusterId, clusterName, "123", "join");
    }
 
    private static void createClusterAndDeviceInfo(String fatherNode, String clusterId, String clusterName, String cookie, String type) {
        BaseApplication.getApplication().executorService.execute(() -> {
            class RunHeler {
                void insertIntoDeviceInfo(String clusterId, String existNodeName) {
                    String deviceInsertSql = "INSERT OR REPLACE INTO device_info " +
                            "(father_node, uuid, node_id, device_id, cluster_id, update_time, create_by, del_flag) " +
                            "VALUES ('" + existNodeName + "', '" + DeviceManager.getDeviceId() + "', '" + Constants.erlangLocalNode + "', '" +
                            DeviceManager.getDeviceId() + "', '" + clusterId + "', '" +
                            DateUtil.getDateTimeStrFull() + "', '" + Constants.erlangLocalNode + "','" + Constants.delFlag + "');";
                    SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), deviceInsertSql);
                    SqliteManager.executeInCache(ErlangConnection.getLocalConnection(), deviceInsertSql);
                    if (existNodeName.length() > 5) {
                        com.basic.security.manager.erlang.DeviceManager.downloadDeviceInfoFromOtherDevices(existNodeName);
                        SqliteManager.executeInSync(ErlangConnection.getRemoteConnection(existNodeName), deviceInsertSql);
                        SqliteManager.executeInCache(ErlangConnection.getRemoteConnection(existNodeName), deviceInsertSql);
                    }
                }
 
                void insertIntoClusterInfo(String clusterId, String clusterName) {
                    String clusterInsertSql = "INSERT OR REPLACE INTO cluster_info " +
                            "(cluster_id, cluster_name, update_time, create_by, del_flag)" +
                            "VALUES ('" + clusterId + "','" + clusterName + "','" + DateUtil.getDateTimeStrFull() + "','" + Constants.erlangLocalNode + "','" + Constants.delFlag + "');";
                    SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), clusterInsertSql);
                    SqliteManager.executeInCache(ErlangConnection.getLocalConnection(), clusterInsertSql);
                }
 
                public void execute() {
                    if ("create".equals(type)) {
                        insertIntoClusterInfo(clusterId, clusterName);
                    }
                    insertIntoDeviceInfo(clusterId, fatherNode);
                }
            }
            new RunHeler().execute();
        });
    }
 
    /**
     * 退出集群
     * 逻辑删除:0:未删除   1:已删除
     */
    public static void exitCluster(String deviceId) {
        BaseApplication.getApplication().executorService.execute(() -> {
            class RunHandle {
                void updateDeviceInfo(String deviceId) {
                    String sql = "update device_info SET del_flag=1 WHERE 1=1 AND device_id='" + deviceId + "'";
                    SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), sql);
                }
 
                public void execute() {
                    updateDeviceInfo(deviceId);
                }
            }
            new RunHandle().execute();
        });
    }
 
    /**
     * 退出集群
     * 物理删除:delete
     */
    public static void phyDelDeviceInfo(String deviceId) {
        BaseApplication.getApplication().executorService.execute(() -> {
            class RunHandle {
                void delete(String deviceId) {
//                        String sql = "delete from device_info where device_id='"+deviceId+"'";
                    String sql = "delete from device_info";
                    SqliteManager.executeInSync(ErlangConnection.getLocalConnection(), sql);
                }
 
                public void execute() {
                    delete(deviceId);
                }
            }
            new RunHandle().execute();
        });
    }
 
    /**
     * 查询集群信息
     */
    public static List<Map<String, String>> queryCluster(String cluster_name, String cluster_cookie) {
 
        class ThreadCallable implements Callable<List<Map<String, String>>> {
            @Override
            public List<Map<String, String>> call() throws Exception {
                List<Map<String, String>> list = new ArrayList<>();
                String sql = "SELECT * FROM cluster_info where cluster_name='" + cluster_name + "' and cluster_id='" + cluster_cookie + "'";
                list = SqliteManager.queryFromSync(ErlangConnection.getLocalConnection(), sql);
                return list;
            }
        }
        // 创建一个大小为10的线程池
        ExecutorService executor = Executors.newFixedThreadPool(10);
        Future<List<Map<String, String>>> future = executor.submit(new ThreadCallable());
        List<Map<String, String>> list = new ArrayList<>();
        try {
            //调用gei方法获取线程执行结果,在线程执行完成前该方法会一直阻塞
            list = future.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return list;
    }
 
    /**
     * 查询当前设备是否已创建集群,一个设备只能创建一个集群
     */
    public static List<Map<String, String>> queryClusterByDevice() {
 
        class ThreadCallable implements Callable<List<Map<String, String>>> {
 
            @Override
            public List<Map<String, String>> call() throws Exception {
                List<Map<String, String>> list = new ArrayList<>();
                String sql = "select * from cluster_info where create_by='" + Constants.erlangLocalNode + "'";
                list = SqliteManager.queryFromSync(ErlangConnection.getLocalConnection(), sql);
                return list;
            }
        }
        // 创建一个大小为10的线程池
        ExecutorService executor = Executors.newFixedThreadPool(10);
        Future<List<Map<String, String>>> future = executor.submit(new ThreadCallable());
        List<Map<String, String>> list = new ArrayList<>();
        try {
            list = future.get();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return list;
    }
 
 
}