package com.cloud.control.service.impl;
|
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.cloud.common.utils.JsonUtil;
|
import com.cloud.common.utils.RestTemplateUtil;
|
import com.cloud.control.dao.TaskMapper;
|
import com.cloud.control.model.Task;
|
import com.cloud.control.service.ClusterService;
|
import com.cloud.control.service.DeviceService;
|
import com.cloud.control.service.TaskService;
|
import com.cloud.control.utils.DevInfoColumnMap;
|
import com.cloud.control.utils.EnumStr;
|
import com.cloud.control.vo.MenuTreeVo;
|
import com.cloud.control.vo.TreeVo;
|
import com.cloud.control.vo.cDbVo;
|
import com.cloud.model.common.Result;
|
import com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.dao.DuplicateKeyException;
|
import org.springframework.http.MediaType;
|
import org.springframework.stereotype.Service;
|
import org.springframework.util.LinkedMultiValueMap;
|
import org.springframework.util.MultiValueMap;
|
import org.springframework.web.servlet.mvc.method.annotation.MatrixVariableMethodArgumentResolver;
|
|
import java.lang.reflect.Array;
|
import java.text.SimpleDateFormat;
|
import java.util.*;
|
import java.util.stream.Collectors;
|
import java.util.stream.Stream;
|
|
import static com.cloud.control.utils.DevInfoColumnMap.cluName;
|
import static com.cloud.control.utils.DevInfoColumnMap.devName;
|
import static com.cloud.control.utils.DevInfoColumnMap.nodes;
|
|
@Service
|
@Slf4j
|
public class TaskServiceImpl implements TaskService {
|
|
@Autowired
|
private RestTemplateUtil restTemplateUtil;
|
@Autowired
|
private EnumStr enumStr;
|
@Autowired
|
private ClusterService clusterService;
|
@Autowired
|
private DeviceService deviceService;
|
@Autowired
|
private TaskMapper taskMapper;
|
|
private static final SimpleDateFormat sdfTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
@Override
|
public JSONObject queryTaskListByPage(Date startTime, Date endTime, String likeName,
|
Integer status, String source, Integer from, Integer size,
|
String sortName, String sortType){
|
JSONObject tasks = new JSONObject();
|
long total = taskMapper.selectAllTaskCount(startTime, endTime, likeName, status, source);
|
tasks.put("total",total);
|
List<Task> taskList = taskMapper.selectAllTaskByPage(startTime, endTime, likeName, status, source,
|
from, size,sortName,sortType);
|
JSONObject soureList = querySoureList();
|
JSONArray sources = soureList.getJSONArray("selectOpts2");// 平台 布控 源
|
// sources.stream().map(obj - >{});
|
Map<String,String> map = sources.stream().collect(Collectors.toMap(obj -> ((JSONObject)obj).getString("value"),
|
obj -> ((JSONObject)obj).getString("name")));
|
//数据处理
|
taskList.forEach( task ->{
|
String cluName = map.get(task.getSource());
|
if (cluName != null) {
|
task.setSource(cluName);
|
}
|
String scope = (String) task.getScope();
|
JSONArray jsonScope = JSONArray.parseArray(scope);
|
String collect = jsonScope.stream()
|
.map(obj -> ((JSONObject) obj).getString(DevInfoColumnMap.cluId))
|
.map(str -> map.get(str))
|
.collect(Collectors.joining(","));
|
task.setScope(collect); // 转换
|
});
|
tasks.put("taskList",taskList);
|
return tasks;
|
}
|
|
/**
|
* 添加 报警本地底库 到 c服务
|
* @param taskName
|
* @param startTime
|
* @param endTime
|
* 前端 直接添加 报警 任务 到分析服务
|
* @return
|
*/
|
@Override
|
public JSONObject addTaskForIndevice(String uuid,String taskName,Integer threshold,Date startTime,Date endTime,
|
List<String> devIds,boolean isUpdate,String enabled){
|
String reqIp = null;
|
JSONObject isExist =
|
isExist = isExistDataBase(taskName, devIds ,false);
|
if (isExist.getBoolean("result") && !isUpdate){ // true 为 存在
|
isExist.put("code",40010); // 底库存在 重新添加
|
return isExist;
|
}
|
if ( !isUpdate ){enabled = "1";}
|
if (StringUtils.isBlank(enabled) || !enabled.equals("1") || !enabled.equals("0")){ enabled = "1"; }
|
List<String> devIp = (List<String>) isExist.get("devIp");
|
JSONObject jsonObj = new JSONObject();
|
String dbId = uuid;
|
if (dbId == null || dbId.isEmpty()){ dbId = UUID.randomUUID().toString().replaceAll("-","");}
|
Task task = new Task();
|
task.setName(taskName); jsonObj.put("TableType","person");jsonObj.put("TableName",taskName);
|
task.setStartTime(startTime); jsonObj.put("StartTime",sdfTime.format(startTime));
|
task.setEndTime(endTime); jsonObj.put("EndTime",sdfTime.format(endTime));
|
boolean enable = startTime.before(new Date())&& endTime.after(new Date());
|
task.setEnabled(enabled); jsonObj.put("IsSync","0");jsonObj.put("enabled",enabled);
|
task.setStatus(startTime.after(new Date())?0:endTime.before(new Date())?-1:1);
|
if (!isUpdate){
|
task.setSource(DevInfoColumnMap.PlatformCont); // "平台布控"
|
}
|
task.setId(dbId); jsonObj.put("uuid",dbId);
|
task.setThreshold(threshold); jsonObj.put("threshold",threshold);
|
task.setScope(isExist.get("cluList").toString()); task.setSyncType(0); jsonObj.put("SyncType","0"); jsonObj.put("BwType","1");
|
JSONObject cResult = new JSONObject();JSONObject addDbResult = new JSONObject();
|
String addMsg = null;
|
for (String ip : devIp) {
|
String bsUrl = "http://"+ip+":"+enumStr.getCServerPort();
|
String esBaseUrl =enumStr.addDataBaseUrl;
|
if ( isUpdate){
|
jsonObj.remove("IsSync");
|
esBaseUrl = enumStr.updateDataBaseUrl;
|
task.setUpdateBy("wp");
|
task.setUpdateTime(new Date());
|
}else {
|
task.setUpdateTime(new Date());
|
task.setUpdateBy("wp");
|
task.setCreateTime(new Date());
|
task.setCreateBy("wp");
|
}
|
jsonObj.put("createBy",enumStr.conCenter); // c 判定 来源
|
String reqJson = jsonObj.toJSONString();
|
JSONObject dataBaseParams = JSONObject.parseObject(reqJson);
|
log.info("添加报警本地库到分析节点:reqJson:"+dataBaseParams+",请求地址:"+bsUrl+esBaseUrl);
|
String respData = restTemplateUtil.post(bsUrl+esBaseUrl,
|
dataBaseParams, MediaType.APPLICATION_JSON_UTF8, String.class, false);
|
if (respData != null){
|
log.info("添加报警本地库 c返回:"+respData);
|
cResult = JSONObject.parseObject(respData);
|
String result = cResult.getString("result");
|
if (result == null || !result.equals("1")){
|
addMsg = addMsg !=null?addMsg+","+ip:ip;
|
}
|
}else {
|
addMsg = addMsg !=null?addMsg+","+ip:ip;
|
}
|
}
|
String msgType = "创建";
|
if (isUpdate)msgType = "修改";
|
if (addMsg != null){
|
addDbResult.put("result",false);
|
addDbResult.put("msg",addMsg+"以上节点"+msgType+"布控任务失败。");
|
}else{
|
// 加入 数据库
|
int i = 0;
|
if (!isUpdate){
|
i = taskMapper.insertSelective(task);
|
}else {
|
i = taskMapper.updateByPrimaryKeySelective(task);
|
}
|
log.info(msgType+(i>0?"布控任务成功":"布控任务失败"));
|
if (i > 0){
|
addDbResult.put("result",true);
|
addDbResult.put("msg",msgType+"布控任务成功。");
|
}else{
|
addDbResult.put("result",false);
|
addDbResult.put("msg","分析节点"+msgType+"布控任务成功,"+msgType+"总布控库失败");
|
}
|
}
|
return addDbResult;
|
}
|
|
/**
|
* 修改 报警同步 底库 到 c服务
|
* @param taskName
|
* @param startTime
|
* @param endTime
|
* 前端 直接添加 报警 任务 到分析服务
|
* @return
|
*/
|
@Override
|
public JSONObject updateSyncTaskForIndevice(String uuid,String taskName,Integer threshold,Date startTime,Date endTime,
|
List<String> cluIps,boolean isUpdate,JSONArray scope,String enabled){
|
if (StringUtils.isBlank(enabled) || !enabled.equals("1") || !enabled.equals("0")){ enabled = "1"; }
|
List<String> devIp = cluIps; // 集群 信息 ip
|
JSONObject jsonObj = new JSONObject();
|
String dbId = uuid;
|
if (dbId == null || dbId.isEmpty()){ dbId = UUID.randomUUID().toString().replaceAll("-","");}
|
Task task = new Task();
|
task.setName(taskName); jsonObj.put("TableType","person");jsonObj.put("TableName",taskName);
|
task.setStartTime(startTime); jsonObj.put("StartTime",sdfTime.format(startTime));
|
task.setEndTime(endTime); jsonObj.put("EndTime",sdfTime.format(endTime));
|
boolean enable = startTime.before(new Date())&& endTime.after(new Date());
|
task.setEnabled(enabled); jsonObj.put("enabled",enabled);
|
task.setStatus(1); // startTime.after(new Date())?0:endTime.before(new Date())?-1:
|
task.setSource(DevInfoColumnMap.PlatformCont); // 平台布控
|
task.setId(dbId); jsonObj.put("uuid",dbId);
|
task.setThreshold(threshold); jsonObj.put("threshold",threshold);
|
task.setScope(scope.toJSONString()); task.setSyncType(1); jsonObj.put("SyncType","1"); jsonObj.put("BwType","1");
|
JSONObject cResult = new JSONObject();JSONObject addDbResult = new JSONObject();
|
String addMsg = null;
|
for (String ip : devIp) {
|
String bsUrl = "http://"+ip+":"+enumStr.getCServerPort();
|
String esBaseUrl = enumStr.updateDataBaseUrl;
|
String reqJson = jsonObj.toJSONString();
|
JSONObject dataBaseParams = JSONObject.parseObject(reqJson);
|
log.info("修改报警同步库到分析节点:reqJson:"+dataBaseParams+",请求地址:"+bsUrl+esBaseUrl);
|
String respData = restTemplateUtil.post(bsUrl+esBaseUrl,
|
dataBaseParams, MediaType.APPLICATION_JSON_UTF8, String.class, false);
|
if (respData != null){
|
log.info("修改报警同步库 c返回:"+respData);
|
cResult = JSONObject.parseObject(respData);
|
String result = cResult.getString("result");
|
if (result == null || !result.equals("1")){
|
addMsg = addMsg !=null?addMsg+","+ip:ip;
|
}
|
}else {
|
addMsg = addMsg !=null?addMsg+","+ip:ip;
|
}
|
}
|
String msgType = "修改";
|
if (addMsg != null){
|
addDbResult.put("result",false);
|
addDbResult.put("msg",addMsg+"以上节点"+msgType+"布控任务失败。");
|
}else{
|
// 加入 数据库
|
int i = 1;
|
if (isUpdate){ // 修改 更新 同步 数据库 的 范围 就行
|
i = taskMapper.updateByPrimaryKeySelective(task);
|
}else {
|
i = taskMapper.insertSelective(task);
|
}
|
log.info(msgType+(i>0?"布控任务成功":"布控任务失败"));
|
if (i > 0){
|
addDbResult.put("result",true);
|
addDbResult.put("msg",msgType+"布控任务成功。");
|
}else{
|
addDbResult.put("result",false);
|
addDbResult.put("msg","分析节点"+msgType+"布控任务成功,"+msgType+"总布控库失败");
|
}
|
}
|
return addDbResult;
|
}
|
/**
|
* 添加 全部报警底库 到 本后台
|
* 启动调用
|
* @return
|
*/
|
@Override
|
public void addTaskForIndevice(){
|
// 获取 服务ip
|
String reqIp = null;
|
Result allNode = clusterService.findAllNode();
|
if (!allNode.isSuccess()){
|
log.error(allNode.getMsg());
|
throw new RuntimeException(allNode.getMsg());
|
}
|
JSONArray clus = (JSONArray) allNode.getData();
|
for (Object obj : clus){
|
JSONObject clu = (JSONObject)obj;
|
JSONArray syncDbs = clu.getJSONArray("syncDbs"); // 同步库 列表
|
JSONObject cluInfo = new JSONObject();
|
String cluName = clu.getString("name"); // 集群名
|
String id = clu.getString("id"); // 集群 设备存储id
|
cluInfo.put(DevInfoColumnMap.cluName,cluName);
|
cluInfo.put(DevInfoColumnMap.cluId,id);
|
JSONArray cluList = new JSONArray();
|
cluList.add(cluInfo);
|
// 处理同步库数据
|
if(syncDbs !=null && syncDbs.size()>0) {
|
for (Object dbObj : syncDbs) {
|
JSONObject syncDb = (JSONObject) dbObj;
|
if ("1".equals(syncDb.getString("bwType"))) {
|
Task task = new Task();
|
task.setId(syncDb.getString("uuid"));
|
String tableName = syncDb.getString("tableName");
|
task.setName(tableName);
|
java.sql.Date start_time = syncDb.getSqlDate("start_time");
|
task.setStartTime(start_time);
|
java.sql.Date end_time = syncDb.getSqlDate("end_time");
|
task.setEndTime(end_time);
|
boolean enabled = start_time != null && start_time.before(new Date()) && end_time != null && end_time.after(new Date());
|
String enable = syncDb.getString("enabled");
|
task.setStatus(enable == null ? enabled ? 1 : 0 : enable.equals("1") ? 1 : 0);
|
Integer threshold = syncDb.getInteger("threshold");
|
task.setThreshold(threshold);
|
task.setSource(id != null ? id : "未知集群布控"); // 集群名
|
task.setScope(cluList.toJSONString()); // 布控范围
|
task.setSyncType(1); //
|
task.setCreateTime(new Date());
|
task.setCreateBy("listenter-wp");
|
task.setUpdateBy("listenter-wp");
|
task.setUpdateTime(new Date());
|
try {
|
int i = taskMapper.insertSelective(task);
|
log.info(tableName + (i == 1 ? "同步底库被添加成功" : "同步底库添加失败"));
|
} catch (DuplicateKeyException de) {
|
task.setCreateBy(null);
|
task.setCreateTime(null);
|
int i = taskMapper.updateByPrimaryKeySelective(task);
|
log.info(tableName + "同步库已被添加过,本次进行修改操作," + (i == 1 ? "同步库被修改成功" : "同步库修改失败"));
|
}
|
}
|
log.info("同步库处理完毕");
|
}
|
}
|
// 处理 各节点 本地库
|
JSONArray nodes = clu.getJSONArray("nodes"); // 同步库 列表
|
for (Object nodeObj : nodes){
|
JSONObject node = (JSONObject)nodeObj;
|
JSONArray localDates = node.getJSONArray("localDates");
|
for (Object localDate : localDates) {
|
JSONObject localDb = (JSONObject)localDate;
|
if ("1".equals(localDb.getString("bwType"))){
|
Task task = new Task();
|
task.setId(localDb.getString("uuid"));
|
String tableName = localDb.getString("tableName");
|
task.setName(tableName);
|
java.sql.Date start_time = localDb.getSqlDate("start_time");
|
task.setStartTime(start_time);
|
java.sql.Date end_time = localDb.getSqlDate("end_time");
|
task.setEndTime(end_time);
|
boolean enabled = start_time!=null&&start_time.before(new Date())&&end_time!=null&&end_time.after(new Date());
|
String enable = localDb.getString("enabled");
|
task.setStatus(enable==null?enabled?1:0:enable.equals("1")?1:0);
|
Integer threshold = localDb.getInteger("threshold");
|
task.setThreshold(threshold);
|
task.setSource(id); // 来源 集群id
|
JSONArray jsonArray = new JSONArray();
|
JSONObject nodeInfo = new JSONObject();
|
nodeInfo.put(DevInfoColumnMap.devId,node.getString("id"));
|
nodeInfo.put(DevInfoColumnMap.devName,node.getString("devName"));
|
jsonArray.add(nodeInfo);
|
cluInfo.put(DevInfoColumnMap.nodes,jsonArray);
|
// cluList.add(cluInfo); // 不能重复添加
|
task.setScope(cluList.toJSONString()); // 集群信息
|
task.setSyncType(0);
|
task.setCreateTime(new Date());
|
task.setCreateBy("listenter-wp");
|
task.setUpdateBy("listenter-wp");
|
task.setUpdateTime(new Date());
|
try {
|
int i = taskMapper.insertSelective(task);
|
log.info(tableName+(i==1?"本地库被添加成功":"本地库添加失败"));
|
}catch (DuplicateKeyException de){
|
task.setCreateBy(null);
|
task.setCreateTime(null);
|
int i = taskMapper.updateByPrimaryKeySelective(task);
|
log.info(tableName+"本地库已被添加过,本次进行修改操作,"+(i==1?"本地库被修改成功":"本地库修改失败"));
|
}
|
}
|
log.info("本地库处理完毕");
|
}
|
}
|
log.info("全部集群节点底库信息已加入数据库");
|
}
|
}
|
|
@Override
|
public JSONObject updateTask(String uuid, String taskName, Integer threshold, Date startTime, Date endTime,
|
List<String> devIds){
|
Task task = taskMapper.selectByPrimaryKey(uuid);
|
JSONObject respObj = new JSONObject();
|
if (task != null) {
|
JSONArray scope = JSONArray.parseArray(task.getScope().toString()); // 布控范围
|
Integer syncType = task.getSyncType();
|
if (syncType==1){
|
JSONObject existDataBase = isExistDataBase(null, devIds,true);
|
JSONArray cluList = existDataBase.getJSONArray("cluList");
|
List<String> nowcluIds = getCluOrDevId(cluList, DevInfoColumnMap.CluType); // 现有的集群id
|
List<String> cluIds = getCluOrDevId(scope, DevInfoColumnMap.CluType); // 旧有的集群id
|
if (nowcluIds==null || cluIds == null){
|
respObj.put("result",false);
|
respObj.put("msg","布控任务不存在,请验证。");
|
return respObj;
|
}
|
// List<String>
|
// 添加的集群id
|
List<String> nowIdsCopy = new ArrayList<>();
|
nowIdsCopy.addAll(nowcluIds);
|
List<String> cIdsCopy = new ArrayList<>();
|
cIdsCopy.addAll(cluIds);
|
boolean b = nowIdsCopy.removeAll(cluIds);
|
if (nowIdsCopy != null ){ // 说明有新增的
|
updateSyncTaskForIndevice( uuid, taskName, threshold, startTime, endTime,
|
nowIdsCopy,true, cluList, "1"); // 现有 全部集群 信息
|
}
|
boolean b2 = cIdsCopy.removeAll(nowcluIds);
|
if (cIdsCopy.size() > 0 ){ // 说明有需要删除的
|
delSyncTaskByIp( uuid, cIdsCopy);
|
}
|
// 剩下 需要更新的
|
List<String> nowIdsCopy2 = new ArrayList<>();
|
nowIdsCopy2.addAll(cluIds);
|
nowIdsCopy2.removeAll(nowIdsCopy);
|
if (nowIdsCopy2.size() > 0){
|
for (int i=0;i< nowIdsCopy2.size();i++){
|
String cluIp = getCluOrDevAndNameOrIpById(nowIdsCopy2.get(i), DevInfoColumnMap.CluType, DevInfoColumnMap.ipType);
|
nowIdsCopy2.set(i,cluIp);
|
}
|
JSONObject jsonObject = updateSyncTaskForIndevice(uuid, taskName, threshold, startTime, endTime,
|
nowIdsCopy2, true, cluList, "1");// 现有 全部集群 信息
|
return jsonObject;
|
}
|
}else{ // 本地库 需要 节点信息 去更新
|
List<String> oldDevIds = getCluOrDevId(scope, DevInfoColumnMap.DevType); // 节点信息
|
// devIds 比较
|
List<String> oldIdsCopy = new ArrayList<>();
|
oldIdsCopy.addAll(oldDevIds);
|
oldIdsCopy.removeAll(devIds);
|
// 多余的
|
if (oldIdsCopy != null){
|
addTaskForIndevice( uuid, taskName,threshold, startTime,endTime,
|
devIds,true,null);
|
}
|
// 减少的 devIds
|
List<String> nowIdsCopy = new ArrayList<>();
|
nowIdsCopy.addAll(devIds);
|
nowIdsCopy.removeAll(oldDevIds);
|
// 多余的
|
if (nowIdsCopy .size() > 0){
|
delSyncTaskByIp(uuid,nowIdsCopy);
|
}
|
respObj.put("result",true);
|
respObj.put("msg","布控任务修改成功。");
|
}
|
}else {
|
respObj.put("result",false);
|
respObj.put("msg","布控任务不存在,请验证。");
|
}
|
return respObj;
|
}
|
|
// scope 解析 cluId devId
|
public List<String> getCluOrDevId(JSONArray scope,String nameType){
|
List<String> idList = new ArrayList<>();
|
if (DevInfoColumnMap.CluType.equals(nameType)){
|
scope.forEach(clu->{
|
idList.add(((JSONObject)clu).getString(DevInfoColumnMap.cluId));
|
});
|
return idList;
|
}else if (DevInfoColumnMap.DevType.equals(nameType)){
|
scope.forEach(clu->{
|
JSONObject sclu = (JSONObject)clu;
|
JSONArray nodes = sclu.getJSONArray(DevInfoColumnMap.nodes);
|
if (nodes != null) {
|
nodes.forEach(node ->{
|
idList.add(((JSONObject)node).getString(DevInfoColumnMap.devId));
|
});
|
}
|
});
|
return idList;
|
}
|
return null;
|
}
|
|
@Override
|
public JSONObject stopTask(String uuid) {
|
Task task = taskMapper.selectByPrimaryKey(uuid);
|
JSONObject respObj = new JSONObject();
|
if (task != null){
|
JSONArray scope = JSONArray.parseArray(task.getScope().toString());
|
Integer syncType = task.getSyncType();
|
Result allNode = clusterService.findAllNode();
|
if (!allNode.isSuccess()){
|
respObj.put("result",false);
|
respObj.put("msg",allNode.getMsg());
|
return respObj;
|
}
|
String addMsg = null;
|
if (syncType.equals(1)){ // 同步库处理方式
|
for (Object obj : scope){
|
JSONObject scop = (JSONObject)obj;
|
String cluId = (String) scop.get(DevInfoColumnMap.cluId);
|
|
// 该集群 处理 停止布控
|
String cluip = getCluOrDevAndNameOrIpById( cluId,DevInfoColumnMap.CluType,DevInfoColumnMap.ipType);
|
if (StringUtils.isBlank(cluip)){
|
respObj.put("result",false);
|
respObj.put("msg","集群ip信息查询失败,导致无法调用停止布控");
|
return respObj;
|
}
|
JSONObject reqData = new JSONObject();
|
reqData.put("uuid",uuid);
|
reqData.put("enabled","0");
|
reqData.put("SyncType",task.getSyncType().toString());
|
reqData.put("TableName",task.getName());
|
reqData.put("TableType","person");
|
JSONObject dataBaseParams = reqData;
|
String reqUrl = "http://"+cluip+":"+enumStr.getCServerPort()+enumStr.updateDataBaseUrl;
|
log.info("添加报警本地库到分析节点:reqJson:"+dataBaseParams+",请求地址:"+reqUrl);
|
String respData = restTemplateUtil.post(reqUrl,
|
dataBaseParams, MediaType.APPLICATION_JSON_UTF8, false);
|
if (respData != null){
|
log.info("添加报警本地库 c返回:"+respData);
|
JSONObject cResult = JSONObject.parseObject(respData);
|
String result = cResult.getString("result");
|
if (result == null || !result.equals("1")){
|
log.info("本次停止布控调用c 返回信息"+respData);
|
addMsg = "停止布控失败";
|
break;
|
}
|
}
|
}
|
if (addMsg != null){
|
respObj.put("result",false);
|
respObj.put("msg","集群停止布控失败");
|
return respObj;
|
}else{
|
task.setEnabled("0");
|
int i = taskMapper.updateByPrimaryKeySelective(task);
|
if (i >0 ){
|
respObj.put("result",true);
|
respObj.put("msg","各集群节点停止布控任务成功");
|
}else {
|
respObj.put("result",false);
|
respObj.put("msg","后台停止布控失败");
|
}
|
return respObj;
|
}
|
}else { // 本地库 停止布控
|
for (Object obj : scope){
|
JSONObject scop = (JSONObject)obj;
|
JSONArray nodes = scop.getJSONArray(DevInfoColumnMap.nodes);
|
for (Object nodeObj : nodes){ // 遍历 所有节点信息
|
JSONObject node = (JSONObject)nodeObj;
|
String devId = (String) node.get(DevInfoColumnMap.devId);
|
if (StringUtils.isBlank(devId)){
|
respObj.put("result",false);
|
respObj.put("msg","布控任务对应节点id 存储为空");
|
return respObj;
|
}
|
String devip = getCluOrDevAndNameOrIpById(devId,DevInfoColumnMap.DevType,DevInfoColumnMap.ipType);
|
if (StringUtils.isBlank(devip)){
|
respObj.put("result",false);
|
respObj.put("msg","集群ip信息查询失败,导致无法调用停止布控");
|
return respObj;
|
}
|
JSONObject reqData = new JSONObject();
|
reqData.put("uuid",uuid);
|
reqData.put("enabled","0");
|
reqData.put("SyncType","0");
|
reqData.put("TableName",task.getName());
|
reqData.put("TableType","person");
|
JSONObject dataBaseParams = reqData;
|
String reqUrl = "http://"+devip+":"+enumStr.getCServerPort()+enumStr.updateDataBaseUrl;
|
log.info("添加报警本地库到分析节点:reqJson:"+dataBaseParams+",请求地址:"+reqUrl);
|
String respData = restTemplateUtil.post(reqUrl,
|
dataBaseParams, MediaType.APPLICATION_JSON_UTF8, false);
|
if (respData != null){
|
log.info("添加报警本地库 c返回:"+respData);
|
JSONObject cResult = JSONObject.parseObject(respData);
|
String result = cResult.getString("result");
|
if (result == null || !result.equals("1")){
|
log.info("本次停止布控调用c 返回信息"+respData);
|
addMsg = "停止布控失败";
|
break;
|
}
|
}
|
}
|
if (addMsg != null) break; // 停止布控调用c 失败 直接停止
|
}
|
if (addMsg != null){
|
respObj.put("result",false);
|
respObj.put("msg","停止布控失败");
|
return respObj;
|
}else {
|
// 本地停止
|
task.setEnabled("0");
|
int i = taskMapper.updateByPrimaryKeySelective(task);
|
if (i >0 ){
|
respObj.put("result",true);
|
respObj.put("msg","集群各节点停止布控成功");
|
}else {
|
respObj.put("result",false);
|
respObj.put("msg","后台停止布控失败");
|
}
|
return respObj;
|
}
|
}
|
}else {
|
respObj.put("result",false);
|
respObj.put("msg","布控任务不存在,请验证。");
|
}
|
return respObj;
|
}
|
// 依据 集群或设备id 获取名称 或 ip 地址
|
public String getCluOrDevAndNameOrIpById(String id,String type,String valueType){
|
|
Result result = clusterService.getClusterDeviceTree();
|
if (result.isSuccess()){
|
JSONArray respJson = (JSONArray) result.getData();
|
List<MenuTreeVo> data = respJson.toJavaList(MenuTreeVo.class);
|
if (DevInfoColumnMap.CluType.equals(type)){
|
for (MenuTreeVo datum : data) {
|
if (datum.getId().equals(id) && datum.getType().equals("1")){
|
if (DevInfoColumnMap.nameType.equals(valueType)){
|
return datum.getName();
|
}else if (DevInfoColumnMap.ipType.equals(valueType)){
|
return datum.getIp();
|
}
|
}
|
}
|
}else if (DevInfoColumnMap.DevType.equals(type)){
|
for (MenuTreeVo datum : data){
|
List<MenuTreeVo> child = datum.getChild();
|
for (MenuTreeVo devInfo : child) {
|
if (devInfo.getId().equals(id) && devInfo.getType().equals("2")){
|
if (DevInfoColumnMap.nameType.equals(valueType)){
|
return datum.getName();
|
}else if (DevInfoColumnMap.ipType.equals(valueType)){
|
return datum.getIp();
|
}
|
}
|
}
|
}
|
}
|
}
|
return null;
|
}
|
|
// 依据 集群 id 获取 设备信息
|
public List<String> getDevIdsByCluId(String cluId){
|
Result result = clusterService.getClusterDeviceTree();
|
List<String> devList = new ArrayList<>();
|
if (result.isSuccess()) {
|
JSONArray jsonData = (JSONArray) result.getData();
|
List<MenuTreeVo> data = JSONArray.parseArray(jsonData.toJSONString(),MenuTreeVo.class);
|
data.stream().filter(men -> men.getType().equals("1")&&men.getId().equals(cluId))
|
.map(MenuTreeVo::getChild).forEach(child -> child.forEach(node->{
|
devList.add(node.getId());
|
}));
|
}
|
return devList;
|
}
|
|
|
// 依据 ip 进行 删除 节点 底库信息
|
public JSONObject delSyncTaskByIp(String uuid,List<String> ips) {
|
Task task = taskMapper.selectByPrimaryKey(uuid);
|
JSONObject respObj = new JSONObject();
|
if (task != null){
|
JSONArray scope = (JSONArray) task.getScope();
|
Integer syncType = task.getSyncType();
|
String addMsg = null;
|
|
JSONObject creqParam = new JSONObject();
|
creqParam.put("uuid",uuid);
|
creqParam.put("TableType","person");
|
creqParam.put("TableName",task.getName());
|
creqParam.put("SyncType",task.getSyncType().toString());
|
if (syncType.equals(1)){ // 同步库处理方式
|
for (String cluip : ips){
|
// 该集群 处理 停止布控
|
if (StringUtils.isBlank(cluip)){
|
respObj.put("result",false);
|
respObj.put("msg","集群ip信息未知,导致无法调用停止布控");
|
return respObj;
|
}
|
String reqUrl = "http://"+cluip+":"+enumStr.getCServerPort()+enumStr.deleteDataBaseUrl;
|
log.info("添加报警本地库到分析节点:reqJson:"+creqParam+",请求地址:"+reqUrl);
|
String respData = restTemplateUtil.post(reqUrl,
|
creqParam, MediaType.APPLICATION_JSON_UTF8, false);
|
if (respData != null){
|
log.info("添加报警本地库 c返回:"+respData);
|
JSONObject cResult = JSONObject.parseObject(respData);
|
String result = cResult.getString("result");
|
if (result == null || !result.equals("1")){
|
log.info("本次停止布控调用c 返回信息"+respData);
|
addMsg = "停止布控失败";
|
break;
|
}
|
}
|
}
|
if (addMsg != null){
|
respObj.put("result",false);
|
respObj.put("msg","集群停止布控失败");
|
return respObj;
|
}else{
|
// 本地停止
|
task.setDelFlag("1");
|
int i = taskMapper.updateByPrimaryKeySelective(task);
|
if (i >0 ){
|
respObj.put("result",true);
|
respObj.put("msg","集群各节点删除布控成功");
|
}else {
|
respObj.put("result",false);
|
respObj.put("msg","后台删除布控失败");
|
}
|
return respObj;
|
}
|
}else { // 本地库 停止布控
|
for (String devip : ips){
|
|
if (StringUtils.isBlank(devip)){
|
respObj.put("result",false);
|
respObj.put("msg","集群ip信息查询失败,导致无法调用停止布控");
|
return respObj;
|
}
|
String reqUrl = "http://"+devip+":"+enumStr.getCServerPort()+enumStr.deleteDataBaseUrl;
|
log.info("添加报警本地库到分析节点:reqJson:"+creqParam+",请求地址:"+reqUrl);
|
String respData = restTemplateUtil.post(reqUrl,
|
creqParam, MediaType.APPLICATION_JSON_UTF8, false);
|
if (respData != null){
|
log.info("添加报警本地库 c返回:"+respData);
|
JSONObject cResult = JSONObject.parseObject(respData);
|
String result = cResult.getString("result");
|
if (result == null || !result.equals("1")){
|
log.info("本次停止布控调用c 返回信息"+respData);
|
addMsg = "停止布控失败";
|
break;
|
}
|
}
|
}
|
if (addMsg != null){
|
respObj.put("result",false);
|
respObj.put("msg","停止布控失败");
|
return respObj;
|
}else {
|
// 本地停止
|
respObj.put("result",true);
|
respObj.put("msg",ips+"节点删除布控成功");
|
return respObj;
|
}
|
}
|
}else {
|
respObj.put("result",false);
|
respObj.put("msg","布控任务不存在,请验证。");
|
}
|
return respObj;
|
}
|
|
|
@Override
|
public JSONObject delTask(String uuid) {
|
Task task = taskMapper.selectByPrimaryKey(uuid);
|
JSONObject respObj = new JSONObject();
|
if (task != null){
|
JSONArray scope = JSONArray.parseArray(task.getScope().toString()) ;
|
Integer syncType = task.getSyncType();
|
Result allNode = clusterService.findAllNode();
|
if (!allNode.isSuccess()){
|
respObj.put("result",false);
|
respObj.put("msg",allNode.getMsg());
|
return respObj;
|
}
|
String addMsg = null;
|
|
JSONObject creqParam = new JSONObject();
|
creqParam.put("uuid",uuid);
|
creqParam.put("TableType","person");
|
creqParam.put("TableName",task.getName());
|
creqParam.put("SyncType",task.getSyncType().toString());
|
if (syncType.equals(1)){ // 同步库处理方式
|
for (Object obj : scope){
|
JSONObject scop = (JSONObject)obj;
|
String cluId = (String) scop.get(DevInfoColumnMap.cluId);
|
|
// 该集群 处理 停止布控
|
String cluip = getCluOrDevAndNameOrIpById( cluId,DevInfoColumnMap.CluType,DevInfoColumnMap.ipType);
|
if (StringUtils.isBlank(cluip)){
|
respObj.put("result",false);
|
respObj.put("msg","集群ip信息查询失败,导致无法调用停止布控");
|
return respObj;
|
}
|
String reqUrl = "http://"+cluip+":"+enumStr.getCServerPort()+enumStr.deleteDataBaseUrl;
|
log.info("添加报警本地库到分析节点:reqJson:"+creqParam+",请求地址:"+reqUrl);
|
String respData = restTemplateUtil.post(reqUrl,
|
creqParam, MediaType.APPLICATION_JSON_UTF8, false);
|
if (respData != null){
|
log.info("添加报警本地库 c返回:"+respData);
|
JSONObject cResult = JSONObject.parseObject(respData);
|
String result = cResult.getString("result");
|
if (result == null || !result.equals("1")){
|
log.info("本次停止布控调用c 返回信息"+respData);
|
addMsg = "停止布控失败";
|
break;
|
}
|
}
|
}
|
if (addMsg != null){
|
respObj.put("result",false);
|
respObj.put("msg","集群停止布控失败");
|
return respObj;
|
}else{
|
// 本地停止
|
task.setDelFlag("1");
|
int i = taskMapper.updateByPrimaryKeySelective(task);
|
if (i >0 ){
|
respObj.put("result",true);
|
respObj.put("msg","集群各节点删除布控成功");
|
}else {
|
respObj.put("result",false);
|
respObj.put("msg","后台删除布控失败");
|
}
|
return respObj;
|
}
|
}else { // 本地库 停止布控
|
for (Object obj : scope){
|
JSONObject scop = (JSONObject)obj;
|
JSONArray nodes = scop.getJSONArray(DevInfoColumnMap.nodes);
|
for (Object nodeObj : nodes){ // 遍历 所有节点信息
|
JSONObject node = (JSONObject)nodeObj;
|
String devId = (String) node.get(DevInfoColumnMap.devId);
|
if (StringUtils.isBlank(devId)){
|
respObj.put("result",false);
|
respObj.put("msg","布控任务对应节点id 存储为空");
|
return respObj;
|
}
|
String devip = getCluOrDevAndNameOrIpById(devId,DevInfoColumnMap.DevType,DevInfoColumnMap.ipType);
|
if (StringUtils.isBlank(devip)){
|
respObj.put("result",false);
|
respObj.put("msg","集群ip信息查询失败,导致无法调用停止布控");
|
return respObj;
|
}
|
String reqUrl = "http://"+devip+":"+enumStr.getCServerPort()+enumStr.deleteDataBaseUrl;
|
log.info("添加报警本地库到分析节点:reqJson:"+creqParam+",请求地址:"+reqUrl);
|
String respData = restTemplateUtil.post(reqUrl,
|
creqParam, MediaType.APPLICATION_JSON_UTF8, false);
|
if (respData != null){
|
log.info("添加报警本地库 c返回:"+respData);
|
JSONObject cResult = JSONObject.parseObject(respData);
|
String result = cResult.getString("result");
|
if (result == null || !result.equals("1")){
|
log.info("本次停止布控调用c 返回信息"+respData);
|
addMsg = "停止布控失败";
|
break;
|
}
|
}
|
}
|
if (addMsg != null) break; // 停止布控调用c 失败 直接停止
|
}
|
if (addMsg != null){
|
respObj.put("result",false);
|
respObj.put("msg","停止布控失败");
|
return respObj;
|
}else {
|
// 本地停止
|
task.setDelFlag("1");
|
int i = taskMapper.updateByPrimaryKeySelective(task);
|
if (i >0 ){
|
respObj.put("result",true);
|
respObj.put("msg","集群各节点删除布控成功");
|
}else {
|
respObj.put("result",false);
|
respObj.put("msg","后台删除布控失败");
|
}
|
return respObj;
|
}
|
}
|
}else {
|
respObj.put("result",false);
|
respObj.put("msg","布控任务不存在,请验证。");
|
}
|
return respObj;
|
}
|
|
@Override
|
public JSONObject addTaskByNode(String uuid, String taskName, Integer threshold, Date startTime, Date endTime,
|
String syncType, String enabled, String cluId, String devId) {
|
Task task = new Task();
|
task.setId(uuid);
|
task.setName(taskName);
|
task.setStartTime(startTime);
|
task.setEndTime(endTime);
|
boolean enable = startTime.before(new Date())&& endTime.after(new Date());
|
task.setEnabled(enabled);
|
task.setStatus(startTime.after(new Date())?0:endTime.before(new Date())?-1:1);
|
task.setSource(cluId); // 平台布控
|
task.setThreshold(threshold);
|
task.setSyncType(syncType == null ?0:syncType.equals("1")?1:0);
|
String cluName = "未知集群";
|
String devName ="未知节点设备名";
|
// 查询集群名
|
cluName = getCluOrDevAndNameOrIpById(cluId, DevInfoColumnMap.CluType, DevInfoColumnMap.nameType);
|
// 依据 设备id 查询 设备节点名 和 id
|
JSONObject nodeInfo = deviceService.getNodeByDevId(devId);
|
String id = nodeInfo.getString("id");
|
devName = nodeInfo.getString("devName");
|
String scopeJson = "[{\""+ DevInfoColumnMap.cluName +"\":\""+cluName+"\"," +
|
"\""+DevInfoColumnMap.cluId+"\": \"f33942a0-2876-4332-911c-fb011cf5d166\",\""+DevInfoColumnMap.nodes+"\":[" +
|
"{\""+DevInfoColumnMap.devId+"\":\""+id+"\",\""+ DevInfoColumnMap.devName+"\":\""+devName+"\"}]}]";
|
JSONArray scope = JSONArray.parseArray(scopeJson);
|
task.setScope(scope.toJSONString());
|
task.setCreateBy(devName); task.setCreateTime(new Date());
|
JSONObject cResult = new JSONObject();JSONObject addDbResult = new JSONObject();
|
// 加入 数据库
|
int i = 1;
|
try {
|
i = taskMapper.insertSelective(task);
|
}catch (DuplicateKeyException e){
|
task.setCreateBy(null);task.setCreateTime(null);
|
task.setUpdateBy(devName);task.setUpdateBy("集群节点修改");
|
i = taskMapper.updateByPrimaryKeySelective(task);
|
}
|
log.info(i>0?"保存布控任务成功":"保存布控任务失败");
|
if (i > 0){
|
addDbResult.put("result",true);
|
addDbResult.put("msg","保存布控任务成功。");
|
}else{
|
addDbResult.put("result",false);
|
addDbResult.put("msg","保存布控任务失败");
|
}
|
return addDbResult;
|
}
|
|
@Override
|
public JSONObject querySoureList() {
|
Result cluDevResult = clusterService.getClusterDeviceTree();
|
if (cluDevResult.isSuccess()){
|
JSONObject sourceList = new JSONObject();
|
JSONArray sourceArray = new JSONArray();
|
JSONArray data = (JSONArray) cluDevResult.getData();
|
List<MenuTreeVo> menus = JSONArray.parseArray(data.toJSONString(),MenuTreeVo.class);
|
menus.forEach(menTre ->{
|
if (menTre.getType().equals("1")){
|
JSONObject source = new JSONObject();
|
source.put("name",menTre.getName());
|
source.put("value",menTre.getId());
|
sourceArray.add(source);
|
}
|
});
|
JSONObject source = new JSONObject();
|
source.put("name","平台布控");
|
source.put("value",DevInfoColumnMap.PlatformCont);
|
sourceArray.add(source);
|
sourceList.put("selectOpts2",sourceArray);
|
return sourceList;
|
}else {
|
throw new RuntimeException(cluDevResult.getMsg());
|
}
|
}
|
|
/**
|
* 单表任务详情
|
* @param taskId
|
* @return
|
*/
|
@Override
|
public JSONObject queryTaskById(String taskId) {
|
Task task = taskMapper.selectByPrimaryKey(taskId);
|
String s = JSONObject.toJSONStringWithDateFormat(task, "yyyy-MM-dd HH:mm:ss");
|
JSONObject taskJson = JSONObject.parseObject(s);
|
// Object sqlMap = taskJson.remove("sqlMap");
|
JSONArray scope = JSONArray.parseArray(task.getScope().toString());
|
ArrayList<String> devList = new ArrayList<>();
|
// 范围解析
|
scope.forEach(jsonObj ->{
|
JSONObject cluInfo = (JSONObject)jsonObj;
|
JSONArray jsonNode = cluInfo.getJSONArray(DevInfoColumnMap.nodes);
|
if (jsonNode == null){
|
String cluId = cluInfo.getString(DevInfoColumnMap.cluId);
|
// 获取所有 节点信息
|
List<String> devIds = getDevIdsByCluId(cluId);
|
devList.addAll(devIds);
|
}else if (jsonNode.size()>0){
|
jsonNode.forEach(dev ->{
|
JSONObject jsonDev = (JSONObject)dev;
|
devList.add(jsonDev.getString(DevInfoColumnMap.devId));
|
});
|
}
|
});
|
taskJson.put("devIds",devList);
|
return taskJson;
|
}
|
|
@Override
|
public JSONArray queryTaskScopeDetailById(String taskId) {
|
|
Task task = taskMapper.selectByPrimaryKey(taskId);
|
String scopeJson = task.getScope().toString();
|
JSONArray jsonScope = JSONArray.parseArray(scopeJson);
|
List<String> cluIds = getCluOrDevId(jsonScope, DevInfoColumnMap.CluType);
|
List<String> devIds = getCluOrDevId(jsonScope, DevInfoColumnMap.DevType);
|
Result result = clusterService.getClusterDeviceTree();
|
JSONArray data = (JSONArray) result.getData();
|
if (result.isSuccess()){
|
Iterator<Object> iterator = data.iterator();
|
if (iterator.hasNext()){
|
JSONObject clu = (JSONObject) iterator.next();
|
if (cluIds.contains(clu.getString("id"))){
|
JSONArray nodes = clu.getJSONArray(DevInfoColumnMap.nodes);
|
if (task.getSyncType() != 1 && nodes != null && nodes.size() > 0){
|
Iterator<Object> nodeIter = nodes.iterator();
|
if (nodeIter.hasNext()){
|
JSONObject nodeInfo = (JSONObject) nodeIter.next();
|
if (devIds.contains(nodeInfo.getString("id"))){
|
// 不做操作
|
}else {
|
nodeIter.remove();
|
}
|
}
|
}
|
}else {
|
iterator.remove();
|
}
|
}
|
return data;
|
|
}else {
|
return null;
|
}
|
}
|
|
/**
|
* 任务所在 节点列表是否存在
|
* @param taskName
|
* @param devIds
|
* @return
|
*/
|
public JSONObject isExistDataBase(String taskName,List<String> devIds,boolean isClu){
|
Result allNode = clusterService.findAllNode();
|
if (!allNode.isSuccess()){
|
log.error(allNode.getMsg());
|
throw new RuntimeException(allNode.getMsg());
|
}
|
JSONArray clus = (JSONArray)allNode.getData();
|
// 获取 服务ip
|
JSONObject cResult = new JSONObject();
|
boolean isExist = false; // 是否存在 的判断
|
String existMsg = null;
|
List<String> devIp = new ArrayList<>();
|
JSONArray clulist = new JSONArray();
|
for (Object cluObj : clus) {
|
JSONObject clu = (JSONObject)cluObj;
|
JSONArray nodes = clu.getJSONArray("nodes");
|
JSONObject cluInfo = new JSONObject();
|
cluInfo.put(DevInfoColumnMap.cluId,clu.getString("id"));
|
cluInfo.put(cluName,clu.getString("name"));
|
JSONArray nodeInfos = new JSONArray();
|
for (Object nodeObj : nodes) {
|
JSONObject node = (JSONObject)nodeObj;
|
if (devIds.contains(node.getString("id"))){ // 包含本 节点
|
JSONArray localDates = node.getJSONArray("localDates");
|
// 判断 是否存在 底库
|
for (Object localDate : localDates){
|
JSONObject localDb = (JSONObject)localDate;
|
String tableName = localDb.getString("tableName");
|
tableName = tableName.substring(tableName.indexOf("lt_")+3);
|
if (taskName != null && taskName.equals(tableName)){
|
isExist = true;
|
String existDev = node.getString("devName");
|
String name = clu.getString("name");
|
existMsg += (name==null?"":name)+"集群下"+ (existDev==null?"":existDev)+"节点存在"+taskName+"本地库,";
|
}
|
}
|
// 将本节点 信息 存储
|
JSONObject nodeInfo = new JSONObject();
|
nodeInfo.put(DevInfoColumnMap.devId,node.getString("id"));
|
nodeInfo.put(devName,node.getString("devName"));
|
nodeInfos.add(nodeInfo);
|
// 获取 ip
|
String nodeIp = node.getString("nodeIp");
|
if (nodeIp != null && !node.isEmpty()){
|
devIp.add(node.getString("nodeIp"));
|
}else{
|
existMsg = "节点Ip 未获取到,请设备模块程序员和管理员检查集群节点列表ip字段数据.";
|
cResult.put("result",false);
|
cResult.put("msg",existMsg);
|
return cResult;
|
}
|
}
|
}
|
// 库 信息 存储
|
if (nodeInfos.size()>0){
|
if (!isClu) cluInfo.put(DevInfoColumnMap.nodes,nodeInfos); // 集群 信息 ,不加分析节点信息
|
clulist.add(cluInfo);
|
}
|
// log.info("判断是否存在本地库结果:"+existMsg);
|
}
|
if (isExist){
|
cResult.put("result",true);
|
}else {
|
cResult.put("result",false);
|
}
|
if (clulist.size()>0){
|
cResult.put("devIp",devIp);
|
cResult.put("cluList",clulist);
|
}
|
cResult.put("msg",existMsg);
|
return cResult;
|
}
|
|
/**
|
* 获取加入布控任务树
|
* @return
|
*/
|
@Override
|
public JSONObject getAddToCtlTree() {
|
JSONObject jsonResult = new JSONObject();
|
|
List<TreeVo> platDbList = new ArrayList();
|
JSONArray jsonArrCluDb = new JSONArray();
|
List<cDbVo> cluDbList = new ArrayList();
|
List<Task> allTask = taskMapper.findAll(null);
|
|
if(allTask !=null && allTask.size()>0){
|
|
Result allNode = clusterService.findAllNode();
|
Map<String, cDbVo> dbMap = null;
|
if(allNode !=null && allNode.isSuccess()){
|
dbMap = getTaskClusterMap(JSONArray.parseArray(allNode.getData().toString()));
|
log.info("dbMap:"+dbMap);
|
}
|
for(Task task : allTask){
|
if(StringUtils.isNotEmpty(task.getSource()) && enumStr.conCenter.equals(task.getSource())){//平台布控任务
|
TreeVo pDb = new TreeVo();
|
pDb.setId(task.getId());
|
pDb.setName(task.getName());
|
platDbList.add(pDb);
|
}else{//集群内创建的布控任务
|
if(dbMap !=null && dbMap.containsKey(task.getId())){
|
cDbVo localDb = dbMap.get(task.getId());//本地库信息
|
cluDbList.add(localDb);
|
}
|
}
|
}
|
if(cluDbList !=null && cluDbList.size()>0){
|
for(cDbVo cdb : cluDbList){
|
platDbList.forEach(db->{
|
if(db.getId().equals(cdb.getId())){
|
cluDbList.remove(cdb);//集群内的库如果在平台列表中已存在,则不再显示
|
}
|
});
|
}
|
//转化为前端需要的集群-本地库列表的格式
|
MultiValueMap<String, cDbVo> mvMap = new LinkedMultiValueMap<>();
|
for(cDbVo cdb : cluDbList){
|
if(!mvMap.containsKey(cdb.getClusterId())){
|
mvMap.add(cdb.getClusterId(), cdb);
|
}else{
|
mvMap.get(cdb.getClusterId()).add(cdb);
|
}
|
}
|
Set<Map.Entry<String, List<cDbVo>>> entries = mvMap.entrySet();
|
for(Map.Entry entry : entries){
|
JSONObject jsonDb = new JSONObject();
|
jsonDb.put("id", entry.getKey());
|
jsonDb.put("name", ((List<cDbVo>)entry.getValue()).get(0).getClusterName());
|
jsonDb.put("child", entry.getValue());
|
jsonArrCluDb.add(jsonDb);
|
}
|
}
|
}
|
jsonResult.put("platDbList", platDbList);
|
jsonResult.put("cluDbList", jsonArrCluDb);
|
return jsonResult;
|
}
|
|
/**
|
* 获取每个库所属的集群和节点信息
|
* @param clusterArr
|
* @return
|
*/
|
private Map<String, cDbVo> getTaskClusterMap(JSONArray clusterArr){
|
Map<String, cDbVo> map = new HashMap<>();
|
if(clusterArr !=null && clusterArr.size()>0){
|
|
for(Object cluster: clusterArr){
|
JSONObject jsonCluster = (JSONObject)cluster;
|
JSONArray nodeArr = jsonCluster.getJSONArray("nodes");
|
if(nodeArr !=null && nodeArr.size()>0){
|
for(Object node:nodeArr){
|
JSONObject jsonNode = (JSONObject)node;
|
JSONArray localDbArr = jsonNode.getJSONArray("localDates");
|
if(localDbArr !=null && localDbArr.size()>0){
|
for(Object localDb:localDbArr){
|
JSONObject jsonLocalDb = (JSONObject)localDb;
|
cDbVo vo = new cDbVo();
|
vo.setId(jsonLocalDb.getString("uuid"));//本地库id
|
vo.setName(jsonLocalDb.getString("tableName"));//本地库名称
|
vo.setClusterId(jsonCluster.getString("id"));//集群id
|
vo.setClusterName(jsonCluster.getString("name"));//集群名称
|
vo.setNodeId(jsonNode.getString("id"));//节点id
|
vo.setNodeName(jsonNode.getString("devName"));//节点名称
|
map.put(jsonLocalDb.getString("uuid"), vo);//本地库id
|
}
|
}
|
}
|
}
|
}
|
}
|
return map;
|
}
|
}
|