package com.cloud.control.service.impl; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.cloud.common.utils.JsonUtil; import com.cloud.common.utils.RestTemplateUtil; import com.cloud.control.dao.TaskMapper; import com.cloud.control.model.Task; import com.cloud.control.service.ClusterService; import com.cloud.control.service.DeviceService; import com.cloud.control.service.TaskService; import com.cloud.control.utils.DevInfoColumnMap; import com.cloud.control.utils.EnumStr; import com.cloud.control.vo.MenuTreeVo; import com.cloud.control.vo.TreeVo; import com.cloud.control.vo.cDbVo; import com.cloud.model.common.Result; import com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; import org.springframework.http.MediaType; import org.springframework.stereotype.Service; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; import org.springframework.web.servlet.mvc.method.annotation.MatrixVariableMethodArgumentResolver; import java.lang.reflect.Array; import java.text.SimpleDateFormat; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; import static com.cloud.control.utils.DevInfoColumnMap.cluName; import static com.cloud.control.utils.DevInfoColumnMap.devName; import static com.cloud.control.utils.DevInfoColumnMap.nodes; @Service @Slf4j public class TaskServiceImpl implements TaskService { @Autowired private RestTemplateUtil restTemplateUtil; @Autowired private EnumStr enumStr; @Autowired private ClusterService clusterService; @Autowired private DeviceService deviceService; @Autowired private TaskMapper taskMapper; private static final SimpleDateFormat sdfTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); @Override public JSONObject queryTaskListByPage(Date startTime, Date endTime, String likeName, Integer status, String source, Integer from, Integer size, String sortName, String sortType){ JSONObject tasks = new JSONObject(); long total = taskMapper.selectAllTaskCount(startTime, endTime, likeName, status, source); tasks.put("total",total); List taskList = taskMapper.selectAllTaskByPage(startTime, endTime, likeName, status, source, from, size,sortName,sortType); JSONObject soureList = querySoureList(); JSONArray sources = soureList.getJSONArray("selectOpts2");// 平台 布控 源 // sources.stream().map(obj - >{}); Map map = sources.stream().collect(Collectors.toMap(obj -> ((JSONObject)obj).getString("value"), obj -> ((JSONObject)obj).getString("name"))); //数据处理 taskList.forEach( task ->{ String cluName = map.get(task.getSource()); if (cluName != null) { task.setSource(cluName); } String scope = (String) task.getScope(); JSONArray jsonScope = JSONArray.parseArray(scope); String collect = jsonScope.stream() .map(obj -> ((JSONObject) obj).getString(DevInfoColumnMap.cluId)) .map(str -> map.get(str)) .collect(Collectors.joining(",")); task.setScope(collect); // 转换 }); tasks.put("taskList",taskList); return tasks; } /** * 添加 报警本地底库 到 c服务 * @param taskName * @param startTime * @param endTime * 前端 直接添加 报警 任务 到分析服务 * @return */ @Override public JSONObject addTaskForIndevice(String uuid,String taskName,Integer threshold,Date startTime,Date endTime, List devIds,boolean isUpdate,String enabled){ String reqIp = null; JSONObject isExist = isExist = isExistDataBase(taskName, devIds ,false); if (isExist.getBoolean("result") && !isUpdate){ // true 为 存在 isExist.put("code",40010); // 底库存在 重新添加 return isExist; } if ( !isUpdate ){enabled = "1";} if (StringUtils.isBlank(enabled) || !enabled.equals("1") || !enabled.equals("0")){ enabled = "1"; } List devIp = (List) isExist.get("devIp"); JSONObject jsonObj = new JSONObject(); String dbId = uuid; if (dbId == null || dbId.isEmpty()){ dbId = UUID.randomUUID().toString().replaceAll("-","");} Task task = new Task(); task.setName(taskName); jsonObj.put("TableType","person");jsonObj.put("TableName",taskName); task.setStartTime(startTime); jsonObj.put("StartTime",sdfTime.format(startTime)); task.setEndTime(endTime); jsonObj.put("EndTime",sdfTime.format(endTime)); boolean enable = startTime.before(new Date())&& endTime.after(new Date()); task.setEnabled(enabled); jsonObj.put("IsSync","0");jsonObj.put("enabled",enabled); task.setStatus(startTime.after(new Date())?0:endTime.before(new Date())?-1:1); if (!isUpdate){ task.setSource(DevInfoColumnMap.PlatformCont); // "平台布控" } task.setId(dbId); jsonObj.put("uuid",dbId); task.setThreshold(threshold); jsonObj.put("threshold",threshold); task.setScope(isExist.get("cluList").toString()); task.setSyncType(0); jsonObj.put("SyncType","0"); jsonObj.put("BwType","1"); JSONObject cResult = new JSONObject();JSONObject addDbResult = new JSONObject(); String addMsg = null; for (String ip : devIp) { String bsUrl = "http://"+ip+":"+enumStr.getCServerPort(); String esBaseUrl =enumStr.addDataBaseUrl; if ( isUpdate){ jsonObj.remove("IsSync"); esBaseUrl = enumStr.updateDataBaseUrl; task.setUpdateBy("wp"); task.setUpdateTime(new Date()); }else { task.setUpdateTime(new Date()); task.setUpdateBy("wp"); task.setCreateTime(new Date()); task.setCreateBy("wp"); } jsonObj.put("createBy",enumStr.conCenter); // c 判定 来源 String reqJson = jsonObj.toJSONString(); JSONObject dataBaseParams = JSONObject.parseObject(reqJson); log.info("添加报警本地库到分析节点:reqJson:"+dataBaseParams+",请求地址:"+bsUrl+esBaseUrl); String respData = restTemplateUtil.post(bsUrl+esBaseUrl, dataBaseParams, MediaType.APPLICATION_JSON_UTF8, String.class, false); if (respData != null){ log.info("添加报警本地库 c返回:"+respData); cResult = JSONObject.parseObject(respData); String result = cResult.getString("result"); if (result == null || !result.equals("1")){ addMsg = addMsg !=null?addMsg+","+ip:ip; } }else { addMsg = addMsg !=null?addMsg+","+ip:ip; } } String msgType = "创建"; if (isUpdate)msgType = "修改"; if (addMsg != null){ addDbResult.put("result",false); addDbResult.put("msg",addMsg+"以上节点"+msgType+"布控任务失败。"); }else{ // 加入 数据库 int i = 0; if (!isUpdate){ i = taskMapper.insertSelective(task); }else { i = taskMapper.updateByPrimaryKeySelective(task); } log.info(msgType+(i>0?"布控任务成功":"布控任务失败")); if (i > 0){ addDbResult.put("result",true); addDbResult.put("msg",msgType+"布控任务成功。"); }else{ addDbResult.put("result",false); addDbResult.put("msg","分析节点"+msgType+"布控任务成功,"+msgType+"总布控库失败"); } } return addDbResult; } /** * 修改 报警同步 底库 到 c服务 * @param taskName * @param startTime * @param endTime * 前端 直接添加 报警 任务 到分析服务 * @return */ @Override public JSONObject updateSyncTaskForIndevice(String uuid,String taskName,Integer threshold,Date startTime,Date endTime, List cluIps,boolean isUpdate,JSONArray scope,String enabled){ if (StringUtils.isBlank(enabled) || !enabled.equals("1") || !enabled.equals("0")){ enabled = "1"; } List devIp = cluIps; // 集群 信息 ip JSONObject jsonObj = new JSONObject(); String dbId = uuid; if (dbId == null || dbId.isEmpty()){ dbId = UUID.randomUUID().toString().replaceAll("-","");} Task task = new Task(); task.setName(taskName); jsonObj.put("TableType","person");jsonObj.put("TableName",taskName); task.setStartTime(startTime); jsonObj.put("StartTime",sdfTime.format(startTime)); task.setEndTime(endTime); jsonObj.put("EndTime",sdfTime.format(endTime)); boolean enable = startTime.before(new Date())&& endTime.after(new Date()); task.setEnabled(enabled); jsonObj.put("enabled",enabled); task.setStatus(1); // startTime.after(new Date())?0:endTime.before(new Date())?-1: task.setSource(DevInfoColumnMap.PlatformCont); // 平台布控 task.setId(dbId); jsonObj.put("uuid",dbId); task.setThreshold(threshold); jsonObj.put("threshold",threshold); task.setScope(scope.toJSONString()); task.setSyncType(1); jsonObj.put("SyncType","1"); jsonObj.put("BwType","1"); JSONObject cResult = new JSONObject();JSONObject addDbResult = new JSONObject(); String addMsg = null; for (String ip : devIp) { String bsUrl = "http://"+ip+":"+enumStr.getCServerPort(); String esBaseUrl = enumStr.updateDataBaseUrl; String reqJson = jsonObj.toJSONString(); JSONObject dataBaseParams = JSONObject.parseObject(reqJson); log.info("修改报警同步库到分析节点:reqJson:"+dataBaseParams+",请求地址:"+bsUrl+esBaseUrl); String respData = restTemplateUtil.post(bsUrl+esBaseUrl, dataBaseParams, MediaType.APPLICATION_JSON_UTF8, String.class, false); if (respData != null){ log.info("修改报警同步库 c返回:"+respData); cResult = JSONObject.parseObject(respData); String result = cResult.getString("result"); if (result == null || !result.equals("1")){ addMsg = addMsg !=null?addMsg+","+ip:ip; } }else { addMsg = addMsg !=null?addMsg+","+ip:ip; } } String msgType = "修改"; if (addMsg != null){ addDbResult.put("result",false); addDbResult.put("msg",addMsg+"以上节点"+msgType+"布控任务失败。"); }else{ // 加入 数据库 int i = 1; if (isUpdate){ // 修改 更新 同步 数据库 的 范围 就行 i = taskMapper.updateByPrimaryKeySelective(task); }else { i = taskMapper.insertSelective(task); } log.info(msgType+(i>0?"布控任务成功":"布控任务失败")); if (i > 0){ addDbResult.put("result",true); addDbResult.put("msg",msgType+"布控任务成功。"); }else{ addDbResult.put("result",false); addDbResult.put("msg","分析节点"+msgType+"布控任务成功,"+msgType+"总布控库失败"); } } return addDbResult; } /** * 添加 全部报警底库 到 本后台 * 启动调用 * @return */ @Override public void addTaskForIndevice(){ // 获取 服务ip String reqIp = null; Result allNode = clusterService.findAllNode(); if (!allNode.isSuccess()){ log.error(allNode.getMsg()); throw new RuntimeException(allNode.getMsg()); } JSONArray clus = (JSONArray) allNode.getData(); for (Object obj : clus){ JSONObject clu = (JSONObject)obj; JSONArray syncDbs = clu.getJSONArray("syncDbs"); // 同步库 列表 JSONObject cluInfo = new JSONObject(); String cluName = clu.getString("name"); // 集群名 String id = clu.getString("id"); // 集群 设备存储id cluInfo.put(DevInfoColumnMap.cluName,cluName); cluInfo.put(DevInfoColumnMap.cluId,id); JSONArray cluList = new JSONArray(); cluList.add(cluInfo); // 处理同步库数据 if(syncDbs !=null && syncDbs.size()>0) { for (Object dbObj : syncDbs) { JSONObject syncDb = (JSONObject) dbObj; if ("1".equals(syncDb.getString("bwType"))) { Task task = new Task(); task.setId(syncDb.getString("uuid")); String tableName = syncDb.getString("tableName"); task.setName(tableName); java.sql.Date start_time = syncDb.getSqlDate("start_time"); task.setStartTime(start_time); java.sql.Date end_time = syncDb.getSqlDate("end_time"); task.setEndTime(end_time); boolean enabled = start_time != null && start_time.before(new Date()) && end_time != null && end_time.after(new Date()); String enable = syncDb.getString("enabled"); task.setStatus(enable == null ? enabled ? 1 : 0 : enable.equals("1") ? 1 : 0); Integer threshold = syncDb.getInteger("threshold"); task.setThreshold(threshold); task.setSource(id != null ? id : "未知集群布控"); // 集群名 task.setScope(cluList.toJSONString()); // 布控范围 task.setSyncType(1); // task.setCreateTime(new Date()); task.setCreateBy("listenter-wp"); task.setUpdateBy("listenter-wp"); task.setUpdateTime(new Date()); try { int i = taskMapper.insertSelective(task); log.info(tableName + (i == 1 ? "同步底库被添加成功" : "同步底库添加失败")); } catch (DuplicateKeyException de) { task.setCreateBy(null); task.setCreateTime(null); int i = taskMapper.updateByPrimaryKeySelective(task); log.info(tableName + "同步库已被添加过,本次进行修改操作," + (i == 1 ? "同步库被修改成功" : "同步库修改失败")); } } log.info("同步库处理完毕"); } } // 处理 各节点 本地库 JSONArray nodes = clu.getJSONArray("nodes"); // 同步库 列表 for (Object nodeObj : nodes){ JSONObject node = (JSONObject)nodeObj; JSONArray localDates = node.getJSONArray("localDates"); for (Object localDate : localDates) { JSONObject localDb = (JSONObject)localDate; if ("1".equals(localDb.getString("bwType"))){ Task task = new Task(); task.setId(localDb.getString("uuid")); String tableName = localDb.getString("tableName"); task.setName(tableName); java.sql.Date start_time = localDb.getSqlDate("start_time"); task.setStartTime(start_time); java.sql.Date end_time = localDb.getSqlDate("end_time"); task.setEndTime(end_time); boolean enabled = start_time!=null&&start_time.before(new Date())&&end_time!=null&&end_time.after(new Date()); String enable = localDb.getString("enabled"); task.setStatus(enable==null?enabled?1:0:enable.equals("1")?1:0); Integer threshold = localDb.getInteger("threshold"); task.setThreshold(threshold); task.setSource(id); // 来源 集群id JSONArray jsonArray = new JSONArray(); JSONObject nodeInfo = new JSONObject(); nodeInfo.put(DevInfoColumnMap.devId,node.getString("id")); nodeInfo.put(DevInfoColumnMap.devName,node.getString("devName")); jsonArray.add(nodeInfo); cluInfo.put(DevInfoColumnMap.nodes,jsonArray); // cluList.add(cluInfo); // 不能重复添加 task.setScope(cluList.toJSONString()); // 集群信息 task.setSyncType(0); task.setCreateTime(new Date()); task.setCreateBy("listenter-wp"); task.setUpdateBy("listenter-wp"); task.setUpdateTime(new Date()); try { int i = taskMapper.insertSelective(task); log.info(tableName+(i==1?"本地库被添加成功":"本地库添加失败")); }catch (DuplicateKeyException de){ task.setCreateBy(null); task.setCreateTime(null); int i = taskMapper.updateByPrimaryKeySelective(task); log.info(tableName+"本地库已被添加过,本次进行修改操作,"+(i==1?"本地库被修改成功":"本地库修改失败")); } } log.info("本地库处理完毕"); } } log.info("全部集群节点底库信息已加入数据库"); } } @Override public JSONObject updateTask(String uuid, String taskName, Integer threshold, Date startTime, Date endTime, List devIds){ Task task = taskMapper.selectByPrimaryKey(uuid); JSONObject respObj = new JSONObject(); if (task != null) { JSONArray scope = JSONArray.parseArray(task.getScope().toString()); // 布控范围 Integer syncType = task.getSyncType(); if (syncType==1){ JSONObject existDataBase = isExistDataBase(null, devIds,true); JSONArray cluList = existDataBase.getJSONArray("cluList"); List nowcluIds = getCluOrDevId(cluList, DevInfoColumnMap.CluType); // 现有的集群id List cluIds = getCluOrDevId(scope, DevInfoColumnMap.CluType); // 旧有的集群id if (nowcluIds==null || cluIds == null){ respObj.put("result",false); respObj.put("msg","布控任务不存在,请验证。"); return respObj; } // List // 添加的集群id List nowIdsCopy = new ArrayList<>(); nowIdsCopy.addAll(nowcluIds); List cIdsCopy = new ArrayList<>(); cIdsCopy.addAll(cluIds); boolean b = nowIdsCopy.removeAll(cluIds); if (nowIdsCopy != null ){ // 说明有新增的 updateSyncTaskForIndevice( uuid, taskName, threshold, startTime, endTime, nowIdsCopy,true, cluList, "1"); // 现有 全部集群 信息 } boolean b2 = cIdsCopy.removeAll(nowcluIds); if (cIdsCopy.size() > 0 ){ // 说明有需要删除的 delSyncTaskByIp( uuid, cIdsCopy); } // 剩下 需要更新的 List nowIdsCopy2 = new ArrayList<>(); nowIdsCopy2.addAll(cluIds); nowIdsCopy2.removeAll(nowIdsCopy); if (nowIdsCopy2.size() > 0){ for (int i=0;i< nowIdsCopy2.size();i++){ String cluIp = getCluOrDevAndNameOrIpById(nowIdsCopy2.get(i), DevInfoColumnMap.CluType, DevInfoColumnMap.ipType); nowIdsCopy2.set(i,cluIp); } JSONObject jsonObject = updateSyncTaskForIndevice(uuid, taskName, threshold, startTime, endTime, nowIdsCopy2, true, cluList, "1");// 现有 全部集群 信息 return jsonObject; } }else{ // 本地库 需要 节点信息 去更新 List oldDevIds = getCluOrDevId(scope, DevInfoColumnMap.DevType); // 节点信息 // devIds 比较 List oldIdsCopy = new ArrayList<>(); oldIdsCopy.addAll(oldDevIds); oldIdsCopy.removeAll(devIds); // 多余的 if (oldIdsCopy != null){ addTaskForIndevice( uuid, taskName,threshold, startTime,endTime, devIds,true,null); } // 减少的 devIds List nowIdsCopy = new ArrayList<>(); nowIdsCopy.addAll(devIds); nowIdsCopy.removeAll(oldDevIds); // 多余的 if (nowIdsCopy .size() > 0){ delSyncTaskByIp(uuid,nowIdsCopy); } respObj.put("result",true); respObj.put("msg","布控任务修改成功。"); } }else { respObj.put("result",false); respObj.put("msg","布控任务不存在,请验证。"); } return respObj; } // scope 解析 cluId devId public List getCluOrDevId(JSONArray scope,String nameType){ List idList = new ArrayList<>(); if (DevInfoColumnMap.CluType.equals(nameType)){ scope.forEach(clu->{ idList.add(((JSONObject)clu).getString(DevInfoColumnMap.cluId)); }); return idList; }else if (DevInfoColumnMap.DevType.equals(nameType)){ scope.forEach(clu->{ JSONObject sclu = (JSONObject)clu; JSONArray nodes = sclu.getJSONArray(DevInfoColumnMap.nodes); if (nodes != null) { nodes.forEach(node ->{ idList.add(((JSONObject)node).getString(DevInfoColumnMap.devId)); }); } }); return idList; } return null; } @Override public JSONObject stopTask(String uuid) { Task task = taskMapper.selectByPrimaryKey(uuid); JSONObject respObj = new JSONObject(); if (task != null){ JSONArray scope = JSONArray.parseArray(task.getScope().toString()); Integer syncType = task.getSyncType(); Result allNode = clusterService.findAllNode(); if (!allNode.isSuccess()){ respObj.put("result",false); respObj.put("msg",allNode.getMsg()); return respObj; } String addMsg = null; if (syncType.equals(1)){ // 同步库处理方式 for (Object obj : scope){ JSONObject scop = (JSONObject)obj; String cluId = (String) scop.get(DevInfoColumnMap.cluId); // 该集群 处理 停止布控 String cluip = getCluOrDevAndNameOrIpById( cluId,DevInfoColumnMap.CluType,DevInfoColumnMap.ipType); if (StringUtils.isBlank(cluip)){ respObj.put("result",false); respObj.put("msg","集群ip信息查询失败,导致无法调用停止布控"); return respObj; } JSONObject reqData = new JSONObject(); reqData.put("uuid",uuid); reqData.put("enabled","0"); reqData.put("SyncType",task.getSyncType().toString()); reqData.put("TableName",task.getName()); reqData.put("TableType","person"); JSONObject dataBaseParams = reqData; String reqUrl = "http://"+cluip+":"+enumStr.getCServerPort()+enumStr.updateDataBaseUrl; log.info("添加报警本地库到分析节点:reqJson:"+dataBaseParams+",请求地址:"+reqUrl); String respData = restTemplateUtil.post(reqUrl, dataBaseParams, MediaType.APPLICATION_JSON_UTF8, false); if (respData != null){ log.info("添加报警本地库 c返回:"+respData); JSONObject cResult = JSONObject.parseObject(respData); String result = cResult.getString("result"); if (result == null || !result.equals("1")){ log.info("本次停止布控调用c 返回信息"+respData); addMsg = "停止布控失败"; break; } } } if (addMsg != null){ respObj.put("result",false); respObj.put("msg","集群停止布控失败"); return respObj; }else{ task.setEnabled("0"); int i = taskMapper.updateByPrimaryKeySelective(task); if (i >0 ){ respObj.put("result",true); respObj.put("msg","各集群节点停止布控任务成功"); }else { respObj.put("result",false); respObj.put("msg","后台停止布控失败"); } return respObj; } }else { // 本地库 停止布控 for (Object obj : scope){ JSONObject scop = (JSONObject)obj; JSONArray nodes = scop.getJSONArray(DevInfoColumnMap.nodes); for (Object nodeObj : nodes){ // 遍历 所有节点信息 JSONObject node = (JSONObject)nodeObj; String devId = (String) node.get(DevInfoColumnMap.devId); if (StringUtils.isBlank(devId)){ respObj.put("result",false); respObj.put("msg","布控任务对应节点id 存储为空"); return respObj; } String devip = getCluOrDevAndNameOrIpById(devId,DevInfoColumnMap.DevType,DevInfoColumnMap.ipType); if (StringUtils.isBlank(devip)){ respObj.put("result",false); respObj.put("msg","集群ip信息查询失败,导致无法调用停止布控"); return respObj; } JSONObject reqData = new JSONObject(); reqData.put("uuid",uuid); reqData.put("enabled","0"); reqData.put("SyncType","0"); reqData.put("TableName",task.getName()); reqData.put("TableType","person"); JSONObject dataBaseParams = reqData; String reqUrl = "http://"+devip+":"+enumStr.getCServerPort()+enumStr.updateDataBaseUrl; log.info("添加报警本地库到分析节点:reqJson:"+dataBaseParams+",请求地址:"+reqUrl); String respData = restTemplateUtil.post(reqUrl, dataBaseParams, MediaType.APPLICATION_JSON_UTF8, false); if (respData != null){ log.info("添加报警本地库 c返回:"+respData); JSONObject cResult = JSONObject.parseObject(respData); String result = cResult.getString("result"); if (result == null || !result.equals("1")){ log.info("本次停止布控调用c 返回信息"+respData); addMsg = "停止布控失败"; break; } } } if (addMsg != null) break; // 停止布控调用c 失败 直接停止 } if (addMsg != null){ respObj.put("result",false); respObj.put("msg","停止布控失败"); return respObj; }else { // 本地停止 task.setEnabled("0"); int i = taskMapper.updateByPrimaryKeySelective(task); if (i >0 ){ respObj.put("result",true); respObj.put("msg","集群各节点停止布控成功"); }else { respObj.put("result",false); respObj.put("msg","后台停止布控失败"); } return respObj; } } }else { respObj.put("result",false); respObj.put("msg","布控任务不存在,请验证。"); } return respObj; } // 依据 集群或设备id 获取名称 或 ip 地址 public String getCluOrDevAndNameOrIpById(String id,String type,String valueType){ Result result = clusterService.getClusterDeviceTree(); if (result.isSuccess()){ JSONArray respJson = (JSONArray) result.getData(); List data = respJson.toJavaList(MenuTreeVo.class); if (DevInfoColumnMap.CluType.equals(type)){ for (MenuTreeVo datum : data) { if (datum.getId().equals(id) && datum.getType().equals("1")){ if (DevInfoColumnMap.nameType.equals(valueType)){ return datum.getName(); }else if (DevInfoColumnMap.ipType.equals(valueType)){ return datum.getIp(); } } } }else if (DevInfoColumnMap.DevType.equals(type)){ for (MenuTreeVo datum : data){ List child = datum.getChild(); for (MenuTreeVo devInfo : child) { if (devInfo.getId().equals(id) && devInfo.getType().equals("2")){ if (DevInfoColumnMap.nameType.equals(valueType)){ return datum.getName(); }else if (DevInfoColumnMap.ipType.equals(valueType)){ return datum.getIp(); } } } } } } return null; } // 依据 集群 id 获取 设备信息 public List getDevIdsByCluId(String cluId){ Result result = clusterService.getClusterDeviceTree(); List devList = new ArrayList<>(); if (result.isSuccess()) { JSONArray jsonData = (JSONArray) result.getData(); List data = JSONArray.parseArray(jsonData.toJSONString(),MenuTreeVo.class); data.stream().filter(men -> men.getType().equals("1")&&men.getId().equals(cluId)) .map(MenuTreeVo::getChild).forEach(child -> child.forEach(node->{ devList.add(node.getId()); })); } return devList; } // 依据 ip 进行 删除 节点 底库信息 public JSONObject delSyncTaskByIp(String uuid,List ips) { Task task = taskMapper.selectByPrimaryKey(uuid); JSONObject respObj = new JSONObject(); if (task != null){ JSONArray scope = (JSONArray) task.getScope(); Integer syncType = task.getSyncType(); String addMsg = null; JSONObject creqParam = new JSONObject(); creqParam.put("uuid",uuid); creqParam.put("TableType","person"); creqParam.put("TableName",task.getName()); creqParam.put("SyncType",task.getSyncType().toString()); if (syncType.equals(1)){ // 同步库处理方式 for (String cluip : ips){ // 该集群 处理 停止布控 if (StringUtils.isBlank(cluip)){ respObj.put("result",false); respObj.put("msg","集群ip信息未知,导致无法调用停止布控"); return respObj; } String reqUrl = "http://"+cluip+":"+enumStr.getCServerPort()+enumStr.deleteDataBaseUrl; log.info("添加报警本地库到分析节点:reqJson:"+creqParam+",请求地址:"+reqUrl); String respData = restTemplateUtil.post(reqUrl, creqParam, MediaType.APPLICATION_JSON_UTF8, false); if (respData != null){ log.info("添加报警本地库 c返回:"+respData); JSONObject cResult = JSONObject.parseObject(respData); String result = cResult.getString("result"); if (result == null || !result.equals("1")){ log.info("本次停止布控调用c 返回信息"+respData); addMsg = "停止布控失败"; break; } } } if (addMsg != null){ respObj.put("result",false); respObj.put("msg","集群停止布控失败"); return respObj; }else{ // 本地停止 task.setDelFlag("1"); int i = taskMapper.updateByPrimaryKeySelective(task); if (i >0 ){ respObj.put("result",true); respObj.put("msg","集群各节点删除布控成功"); }else { respObj.put("result",false); respObj.put("msg","后台删除布控失败"); } return respObj; } }else { // 本地库 停止布控 for (String devip : ips){ if (StringUtils.isBlank(devip)){ respObj.put("result",false); respObj.put("msg","集群ip信息查询失败,导致无法调用停止布控"); return respObj; } String reqUrl = "http://"+devip+":"+enumStr.getCServerPort()+enumStr.deleteDataBaseUrl; log.info("添加报警本地库到分析节点:reqJson:"+creqParam+",请求地址:"+reqUrl); String respData = restTemplateUtil.post(reqUrl, creqParam, MediaType.APPLICATION_JSON_UTF8, false); if (respData != null){ log.info("添加报警本地库 c返回:"+respData); JSONObject cResult = JSONObject.parseObject(respData); String result = cResult.getString("result"); if (result == null || !result.equals("1")){ log.info("本次停止布控调用c 返回信息"+respData); addMsg = "停止布控失败"; break; } } } if (addMsg != null){ respObj.put("result",false); respObj.put("msg","停止布控失败"); return respObj; }else { // 本地停止 respObj.put("result",true); respObj.put("msg",ips+"节点删除布控成功"); return respObj; } } }else { respObj.put("result",false); respObj.put("msg","布控任务不存在,请验证。"); } return respObj; } @Override public JSONObject delTask(String uuid) { Task task = taskMapper.selectByPrimaryKey(uuid); JSONObject respObj = new JSONObject(); if (task != null){ JSONArray scope = JSONArray.parseArray(task.getScope().toString()) ; Integer syncType = task.getSyncType(); Result allNode = clusterService.findAllNode(); if (!allNode.isSuccess()){ respObj.put("result",false); respObj.put("msg",allNode.getMsg()); return respObj; } String addMsg = null; JSONObject creqParam = new JSONObject(); creqParam.put("uuid",uuid); creqParam.put("TableType","person"); creqParam.put("TableName",task.getName()); creqParam.put("SyncType",task.getSyncType().toString()); if (syncType.equals(1)){ // 同步库处理方式 for (Object obj : scope){ JSONObject scop = (JSONObject)obj; String cluId = (String) scop.get(DevInfoColumnMap.cluId); // 该集群 处理 停止布控 String cluip = getCluOrDevAndNameOrIpById( cluId,DevInfoColumnMap.CluType,DevInfoColumnMap.ipType); if (StringUtils.isBlank(cluip)){ respObj.put("result",false); respObj.put("msg","集群ip信息查询失败,导致无法调用停止布控"); return respObj; } String reqUrl = "http://"+cluip+":"+enumStr.getCServerPort()+enumStr.deleteDataBaseUrl; log.info("添加报警本地库到分析节点:reqJson:"+creqParam+",请求地址:"+reqUrl); String respData = restTemplateUtil.post(reqUrl, creqParam, MediaType.APPLICATION_JSON_UTF8, false); if (respData != null){ log.info("添加报警本地库 c返回:"+respData); JSONObject cResult = JSONObject.parseObject(respData); String result = cResult.getString("result"); if (result == null || !result.equals("1")){ log.info("本次停止布控调用c 返回信息"+respData); addMsg = "停止布控失败"; break; } } } if (addMsg != null){ respObj.put("result",false); respObj.put("msg","集群停止布控失败"); return respObj; }else{ // 本地停止 task.setDelFlag("1"); int i = taskMapper.updateByPrimaryKeySelective(task); if (i >0 ){ respObj.put("result",true); respObj.put("msg","集群各节点删除布控成功"); }else { respObj.put("result",false); respObj.put("msg","后台删除布控失败"); } return respObj; } }else { // 本地库 停止布控 for (Object obj : scope){ JSONObject scop = (JSONObject)obj; JSONArray nodes = scop.getJSONArray(DevInfoColumnMap.nodes); for (Object nodeObj : nodes){ // 遍历 所有节点信息 JSONObject node = (JSONObject)nodeObj; String devId = (String) node.get(DevInfoColumnMap.devId); if (StringUtils.isBlank(devId)){ respObj.put("result",false); respObj.put("msg","布控任务对应节点id 存储为空"); return respObj; } String devip = getCluOrDevAndNameOrIpById(devId,DevInfoColumnMap.DevType,DevInfoColumnMap.ipType); if (StringUtils.isBlank(devip)){ respObj.put("result",false); respObj.put("msg","集群ip信息查询失败,导致无法调用停止布控"); return respObj; } String reqUrl = "http://"+devip+":"+enumStr.getCServerPort()+enumStr.deleteDataBaseUrl; log.info("添加报警本地库到分析节点:reqJson:"+creqParam+",请求地址:"+reqUrl); String respData = restTemplateUtil.post(reqUrl, creqParam, MediaType.APPLICATION_JSON_UTF8, false); if (respData != null){ log.info("添加报警本地库 c返回:"+respData); JSONObject cResult = JSONObject.parseObject(respData); String result = cResult.getString("result"); if (result == null || !result.equals("1")){ log.info("本次停止布控调用c 返回信息"+respData); addMsg = "停止布控失败"; break; } } } if (addMsg != null) break; // 停止布控调用c 失败 直接停止 } if (addMsg != null){ respObj.put("result",false); respObj.put("msg","停止布控失败"); return respObj; }else { // 本地停止 task.setDelFlag("1"); int i = taskMapper.updateByPrimaryKeySelective(task); if (i >0 ){ respObj.put("result",true); respObj.put("msg","集群各节点删除布控成功"); }else { respObj.put("result",false); respObj.put("msg","后台删除布控失败"); } return respObj; } } }else { respObj.put("result",false); respObj.put("msg","布控任务不存在,请验证。"); } return respObj; } @Override public JSONObject addTaskByNode(String uuid, String taskName, Integer threshold, Date startTime, Date endTime, String syncType, String enabled, String cluId, String devId) { Task task = new Task(); task.setId(uuid); task.setName(taskName); task.setStartTime(startTime); task.setEndTime(endTime); boolean enable = startTime.before(new Date())&& endTime.after(new Date()); task.setEnabled(enabled); task.setStatus(startTime.after(new Date())?0:endTime.before(new Date())?-1:1); task.setSource(cluId); // 平台布控 task.setThreshold(threshold); task.setSyncType(syncType == null ?0:syncType.equals("1")?1:0); String cluName = "未知集群"; String devName ="未知节点设备名"; // 查询集群名 cluName = getCluOrDevAndNameOrIpById(cluId, DevInfoColumnMap.CluType, DevInfoColumnMap.nameType); // 依据 设备id 查询 设备节点名 和 id JSONObject nodeInfo = deviceService.getNodeByDevId(devId); String id = nodeInfo.getString("id"); devName = nodeInfo.getString("devName"); String scopeJson = "[{\""+ DevInfoColumnMap.cluName +"\":\""+cluName+"\"," + "\""+DevInfoColumnMap.cluId+"\": \"f33942a0-2876-4332-911c-fb011cf5d166\",\""+DevInfoColumnMap.nodes+"\":[" + "{\""+DevInfoColumnMap.devId+"\":\""+id+"\",\""+ DevInfoColumnMap.devName+"\":\""+devName+"\"}]}]"; JSONArray scope = JSONArray.parseArray(scopeJson); task.setScope(scope.toJSONString()); task.setCreateBy(devName); task.setCreateTime(new Date()); JSONObject cResult = new JSONObject();JSONObject addDbResult = new JSONObject(); // 加入 数据库 int i = 1; try { i = taskMapper.insertSelective(task); }catch (DuplicateKeyException e){ task.setCreateBy(null);task.setCreateTime(null); task.setUpdateBy(devName);task.setUpdateBy("集群节点修改"); i = taskMapper.updateByPrimaryKeySelective(task); } log.info(i>0?"保存布控任务成功":"保存布控任务失败"); if (i > 0){ addDbResult.put("result",true); addDbResult.put("msg","保存布控任务成功。"); }else{ addDbResult.put("result",false); addDbResult.put("msg","保存布控任务失败"); } return addDbResult; } @Override public JSONObject querySoureList() { Result cluDevResult = clusterService.getClusterDeviceTree(); if (cluDevResult.isSuccess()){ JSONObject sourceList = new JSONObject(); JSONArray sourceArray = new JSONArray(); JSONArray data = (JSONArray) cluDevResult.getData(); List menus = JSONArray.parseArray(data.toJSONString(),MenuTreeVo.class); menus.forEach(menTre ->{ if (menTre.getType().equals("1")){ JSONObject source = new JSONObject(); source.put("name",menTre.getName()); source.put("value",menTre.getId()); sourceArray.add(source); } }); JSONObject source = new JSONObject(); source.put("name","平台布控"); source.put("value",DevInfoColumnMap.PlatformCont); sourceArray.add(source); sourceList.put("selectOpts2",sourceArray); return sourceList; }else { throw new RuntimeException(cluDevResult.getMsg()); } } /** * 单表任务详情 * @param taskId * @return */ @Override public JSONObject queryTaskById(String taskId) { Task task = taskMapper.selectByPrimaryKey(taskId); String s = JSONObject.toJSONStringWithDateFormat(task, "yyyy-MM-dd HH:mm:ss"); JSONObject taskJson = JSONObject.parseObject(s); // Object sqlMap = taskJson.remove("sqlMap"); JSONArray scope = JSONArray.parseArray(task.getScope().toString()); ArrayList devList = new ArrayList<>(); // 范围解析 scope.forEach(jsonObj ->{ JSONObject cluInfo = (JSONObject)jsonObj; JSONArray jsonNode = cluInfo.getJSONArray(DevInfoColumnMap.nodes); if (jsonNode == null){ String cluId = cluInfo.getString(DevInfoColumnMap.cluId); // 获取所有 节点信息 List devIds = getDevIdsByCluId(cluId); devList.addAll(devIds); }else if (jsonNode.size()>0){ jsonNode.forEach(dev ->{ JSONObject jsonDev = (JSONObject)dev; devList.add(jsonDev.getString(DevInfoColumnMap.devId)); }); } }); taskJson.put("devIds",devList); return taskJson; } @Override public JSONArray queryTaskScopeDetailById(String taskId) { Task task = taskMapper.selectByPrimaryKey(taskId); String scopeJson = task.getScope().toString(); JSONArray jsonScope = JSONArray.parseArray(scopeJson); List cluIds = getCluOrDevId(jsonScope, DevInfoColumnMap.CluType); List devIds = getCluOrDevId(jsonScope, DevInfoColumnMap.DevType); Result result = clusterService.getClusterDeviceTree(); JSONArray data = (JSONArray) result.getData(); if (result.isSuccess()){ Iterator iterator = data.iterator(); if (iterator.hasNext()){ JSONObject clu = (JSONObject) iterator.next(); if (cluIds.contains(clu.getString("id"))){ JSONArray nodes = clu.getJSONArray(DevInfoColumnMap.nodes); if (task.getSyncType() != 1 && nodes != null && nodes.size() > 0){ Iterator nodeIter = nodes.iterator(); if (nodeIter.hasNext()){ JSONObject nodeInfo = (JSONObject) nodeIter.next(); if (devIds.contains(nodeInfo.getString("id"))){ // 不做操作 }else { nodeIter.remove(); } } } }else { iterator.remove(); } } return data; }else { return null; } } /** * 任务所在 节点列表是否存在 * @param taskName * @param devIds * @return */ public JSONObject isExistDataBase(String taskName,List devIds,boolean isClu){ Result allNode = clusterService.findAllNode(); if (!allNode.isSuccess()){ log.error(allNode.getMsg()); throw new RuntimeException(allNode.getMsg()); } JSONArray clus = (JSONArray)allNode.getData(); // 获取 服务ip JSONObject cResult = new JSONObject(); boolean isExist = false; // 是否存在 的判断 String existMsg = null; List devIp = new ArrayList<>(); JSONArray clulist = new JSONArray(); for (Object cluObj : clus) { JSONObject clu = (JSONObject)cluObj; JSONArray nodes = clu.getJSONArray("nodes"); JSONObject cluInfo = new JSONObject(); cluInfo.put(DevInfoColumnMap.cluId,clu.getString("id")); cluInfo.put(cluName,clu.getString("name")); JSONArray nodeInfos = new JSONArray(); for (Object nodeObj : nodes) { JSONObject node = (JSONObject)nodeObj; if (devIds.contains(node.getString("id"))){ // 包含本 节点 JSONArray localDates = node.getJSONArray("localDates"); // 判断 是否存在 底库 for (Object localDate : localDates){ JSONObject localDb = (JSONObject)localDate; String tableName = localDb.getString("tableName"); tableName = tableName.substring(tableName.indexOf("lt_")+3); if (taskName != null && taskName.equals(tableName)){ isExist = true; String existDev = node.getString("devName"); String name = clu.getString("name"); existMsg += (name==null?"":name)+"集群下"+ (existDev==null?"":existDev)+"节点存在"+taskName+"本地库,"; } } // 将本节点 信息 存储 JSONObject nodeInfo = new JSONObject(); nodeInfo.put(DevInfoColumnMap.devId,node.getString("id")); nodeInfo.put(devName,node.getString("devName")); nodeInfos.add(nodeInfo); // 获取 ip String nodeIp = node.getString("nodeIp"); if (nodeIp != null && !node.isEmpty()){ devIp.add(node.getString("nodeIp")); }else{ existMsg = "节点Ip 未获取到,请设备模块程序员和管理员检查集群节点列表ip字段数据."; cResult.put("result",false); cResult.put("msg",existMsg); return cResult; } } } // 库 信息 存储 if (nodeInfos.size()>0){ if (!isClu) cluInfo.put(DevInfoColumnMap.nodes,nodeInfos); // 集群 信息 ,不加分析节点信息 clulist.add(cluInfo); } // log.info("判断是否存在本地库结果:"+existMsg); } if (isExist){ cResult.put("result",true); }else { cResult.put("result",false); } if (clulist.size()>0){ cResult.put("devIp",devIp); cResult.put("cluList",clulist); } cResult.put("msg",existMsg); return cResult; } /** * 获取加入布控任务树 * @return */ @Override public JSONObject getAddToCtlTree() { JSONObject jsonResult = new JSONObject(); List platDbList = new ArrayList(); JSONArray jsonArrCluDb = new JSONArray(); List cluDbList = new ArrayList(); List allTask = taskMapper.findAll(null); if(allTask !=null && allTask.size()>0){ Result allNode = clusterService.findAllNode(); Map dbMap = null; if(allNode !=null && allNode.isSuccess()){ dbMap = getTaskClusterMap(JSONArray.parseArray(allNode.getData().toString())); log.info("dbMap:"+dbMap); } for(Task task : allTask){ if(StringUtils.isNotEmpty(task.getSource()) && enumStr.conCenter.equals(task.getSource())){//平台布控任务 TreeVo pDb = new TreeVo(); pDb.setId(task.getId()); pDb.setName(task.getName()); platDbList.add(pDb); }else{//集群内创建的布控任务 if(dbMap !=null && dbMap.containsKey(task.getId())){ cDbVo localDb = dbMap.get(task.getId());//本地库信息 cluDbList.add(localDb); } } } if(cluDbList !=null && cluDbList.size()>0){ for(cDbVo cdb : cluDbList){ platDbList.forEach(db->{ if(db.getId().equals(cdb.getId())){ cluDbList.remove(cdb);//集群内的库如果在平台列表中已存在,则不再显示 } }); } //转化为前端需要的集群-本地库列表的格式 MultiValueMap mvMap = new LinkedMultiValueMap<>(); for(cDbVo cdb : cluDbList){ if(!mvMap.containsKey(cdb.getClusterId())){ mvMap.add(cdb.getClusterId(), cdb); }else{ mvMap.get(cdb.getClusterId()).add(cdb); } } Set>> entries = mvMap.entrySet(); for(Map.Entry entry : entries){ JSONObject jsonDb = new JSONObject(); jsonDb.put("id", entry.getKey()); jsonDb.put("name", ((List)entry.getValue()).get(0).getClusterName()); jsonDb.put("child", entry.getValue()); jsonArrCluDb.add(jsonDb); } } } jsonResult.put("platDbList", platDbList); jsonResult.put("cluDbList", jsonArrCluDb); return jsonResult; } /** * 获取每个库所属的集群和节点信息 * @param clusterArr * @return */ private Map getTaskClusterMap(JSONArray clusterArr){ Map map = new HashMap<>(); if(clusterArr !=null && clusterArr.size()>0){ for(Object cluster: clusterArr){ JSONObject jsonCluster = (JSONObject)cluster; JSONArray nodeArr = jsonCluster.getJSONArray("nodes"); if(nodeArr !=null && nodeArr.size()>0){ for(Object node:nodeArr){ JSONObject jsonNode = (JSONObject)node; JSONArray localDbArr = jsonNode.getJSONArray("localDates"); if(localDbArr !=null && localDbArr.size()>0){ for(Object localDb:localDbArr){ JSONObject jsonLocalDb = (JSONObject)localDb; cDbVo vo = new cDbVo(); vo.setId(jsonLocalDb.getString("uuid"));//本地库id vo.setName(jsonLocalDb.getString("tableName"));//本地库名称 vo.setClusterId(jsonCluster.getString("id"));//集群id vo.setClusterName(jsonCluster.getString("name"));//集群名称 vo.setNodeId(jsonNode.getString("id"));//节点id vo.setNodeName(jsonNode.getString("devName"));//节点名称 map.put(jsonLocalDb.getString("uuid"), vo);//本地库id } } } } } } return map; } }