liuxiaolong
2020-06-05 7c811247ecf143e08c576986a884bedadc57dd66
controllers/es.go
@@ -1,215 +1,70 @@
package controllers
import (
   "basic.com/valib/logger.git"
   "bytes"
   "encoding/json"
   "fmt"
   "github.com/gin-gonic/gin"
   "math/rand"
   "os/exec"
   "strings"
   "webserver/cache"
   "webserver/extend/code"
   "webserver/extend/config"
   "webserver/extend/esutil"
   "webserver/extend/util"
   "webserver/models"
   "webserver/service"
   "math/rand"
   "strconv"
   "strings"
)
type EsController struct{}
type EsManagementController struct{}
type EsClusterInfo struct {
   Ip string `json:"ip"`
}
// @Security ApiKeyAuth
// @Summary 比对数据查询
// @Description  比对数据查询
// @Accept  json
// @Produce json
// @Tags es
// @Param reqMap body models.EsSearch true "collection 为空"
// @Success 200 {string} json "{"code":200, msg:"目录结构数据", success:true}"
// @Failure 500 {string} json "{"code":500,  msg:"返回错误信息", success:false}"
// @Success 200 {string} json "{"code":200, msg:"", success:true}"
// @Failure 500 {string} json "{"code":500, msg:"", success:false}"
// @Router /data/api-v/es/queryEsCompareData [POST]
func PostEsCompareData(c *gin.Context) {
   searchBody := new(models.EsSearch)
   c.BindJSON(&searchBody)
   isEsSource := true
   databases := searchBody.DataBases
   page := searchBody.Page;if page <= 0 {page = 1}
   size := searchBody.Size;if size <= 0 {size = 8}
   from := (page - 1) * size
   to := page * size
   threshold := searchBody.Threshold;if threshold < 0 || threshold > 100 {threshold = 60}
   picUrl := searchBody.PicUrl
   /*feature := searchBody.Feature*/
   var featByte []byte;
 if len(picUrl) > 3 { //   linux
      /*fileName := picUrl   // picIp 定义在 fileController weedfs 文件服务器 访问 路径 前缀
      detect := gorun.GetSimpleFaceDetect(fileName)
      if len(detect) != 1 {
         util.ResponseFormat(c,code.TooManyFeatureFindError,"图片特征值过多")
         return
      }else {
         featByte = detect[0]["feature"].([]byte)
      }*/ //   linux
   }
   data := make(map[string]interface{})
   searchBody.Page = 1
   searchBody.Size = 15000
   //searchMap := Struct2Map(*searchBody)
   if len(databases) == 1 && databases[0] == "esData" {
      searchBody.IsAggs = false
      data = searchEsData(*searchBody)
   } else {
      isEsSource = false
      data = service.QueryDbPersonsForCompare(*searchBody)
   }
   sources := data["datalist"].([]interface{})
   if len(sources) > 0 {
      //进行比对
      sources = sourceCompare(sources, isEsSource, featByte, threshold)
   } else {
      fmt.Println("查询条件下无数据 source 数据为空:" + string(len(sources)))
   }
   dataLen := len(sources)
   if from > dataLen-1 {
      from = 0
      to = 0
   }
   if to > dataLen {
      to = dataLen
   }
   sources = sources[from:to]   // 数据 部分获取
   if !isEsSource {  //  人员数据要加底库名称
      for _, sou := range sources {
         tableId := sou.(map[string]interface{})["tableId"]
         if tableId != nil {
            info := QueryDbTableInfo(tableId.(string))
            if info["tableName"] != nil {sou.(map[string]interface{})["tableName"] = info["tableName"]
            }else {sou.(map[string]interface{})["tableName"] = "其他"}
            if info["bwType"] != nil {sou.(map[string]interface{})["bwType"] = info["bwType"]
            }else {sou.(map[string]interface{})["bwType"] = "4"}
         }
      }
   }
   dmap := make(map[string]interface{},2)
   dmap["datalist"] = sources
   dmap["total"] = dataLen
   util.ResponseFormat(c, code.Success, dmap)
}
// @Summary 查询摄像机以及启算法
// @Description 关联查询摄像机以及启算法
// @Produce json
// @Tags camera
// @Success 200 {string} json "{"code":200, success:true,  msg:"请求处理成功", data:"摄像机信息"}"
// @Failure 500 {string} json "{"code":500, success:false   msg:"",data:"错误信息内容"}"
// @Router /data/api-v/camera/queryCameraAndTaskInfo [get]
func (ac *CameraController) QueryCameraAndTaskInfo(c *gin.Context) {
   var cam models.Camera
   rows := cam.FindAllCamTask()
   if len(rows) == 0 {
      util.ResponseFormat(c, code.ComError, "没有记录")
   err := c.BindJSON(&searchBody)
   if err != nil || searchBody.PicUrl == "" || len(searchBody.DataBases) == 0 {
      util.ResponseFormat(c, code.RequestParamError, "参数有误")
      return
   }
   camList := make([]map[string]interface{},0,5)
   for _, cam := range rows {
      newCam := make(map[string]interface{})
      newCam["cameraid"] = cam.Id
      newCam["rtspUrl"] = cam.Rtsp
      tasks := cam.CamTask
         taskList := make([]map[string]interface{},0)
      if tasks != nil {
          taskmap:= make(map[string]interface{})
         for _,task := range tasks {
            taskId := taskmap[task.Taskid]
            if taskId != nil {
               cameraTasks := taskmap[task.Taskid].([]models.CameraTask)
               cameraTasks = append(cameraTasks,task)
               taskmap[task.Taskid] = cameraTasks
            }else {
               cameraTasks := make([]models.CameraTask,0)
               taskmap[task.Taskid] = append(cameraTasks,task)
            }
         }
         for key, value := range taskmap {
            i := make(map[string]interface{})
            i["taskid"] = key
            i["sdklist"] = value
            taskList = append(taskList,i)
         }
   if searchBody.CompareNum != "" {
      //二次搜索,不需要再比对了
      co := service.GetCompResultByNum(searchBody.CompareNum)
      if co != nil {
         //二次搜索和排序
         twiceM := GetCompareDataTwice(co, searchBody)
         util.ResponseFormat(c, code.Success, twiceM)
         return
      } else {
         m := make(map[string]interface{}, 0)
         m["compareNum"] = searchBody.CompareNum
         m["total"] = 0
         m["totalList"] = []CompareResult{}
         util.ResponseFormat(c, code.CompareResultGone, m)
         return
      }
      newCam["tasklist"] = taskList
      camList = append(camList,newCam)
   }
   c.JSON(200,camList)
   m := make(map[string]interface{}, 0)
   m["compareNum"] = searchBody.CompareNum
   m["total"] = 0
   m["totalList"] = []CompareResult{}
   util.ResponseFormat(c, code.CompareResultGone, m)
}
func searchEsData(searchBody models.EsSearch) map[string]interface{} {
   //请求索引
   index := config.EsInfo.EsIndex.VideoPersons.IndexName  //  wp只查人脸数据
   queryStr := "";
   queryBody := searchBody.InputValue
   //检索框
   if queryBody != "" {
      queryStr = "\"must\":[{\"multi_match\":{\"query\":\"" + queryBody + "\",\"fields\":[\"tableName^1.5\",\"gender^2.0\",\"race^2.0\",\"content^1.0\",\"idcard^1.8\",\"picAddress^1.0\"]," +
         "\"type\":\"cross_fields\",\"operator\":\"OR\",\"slop\":0,\"prefix_length\":0,\"max_expansions\":50,\"zero_terms_query\":\"NONE\",\"auto_generate_synonyms_phrase_query\":true,\"fuzzy_transpositions\":true,\"boost\":1}}],"
   }
   gteDate := searchBody.SearchTime[0];lteDate := searchBody.SearchTime[1]
   //判断任务ID
   taskIdStr := ""
   taskId := searchBody.Tasks
   if taskId != nil && len(taskId) > 0 {
      esTaskId := strings.Replace(strings.Trim(fmt.Sprint(taskId), "[]"), " ", "\",\"", -1)
      taskIdStr = "{\"terms\":{\"taskId\":[\"" + esTaskId + "\"]}},"
   }
   //判断摄像机ID
   videoReqNumStr := ""
   videoReqNum := searchBody.TreeNodes
   if videoReqNum != nil && len(videoReqNum) > 0 {
      esVideoReqNum := strings.Replace(strings.Trim(fmt.Sprint(videoReqNum), "[]"), " ", "\",\"", -1)
      videoReqNumStr = "{\"terms\":{\"videoReqNum\":[\"" + esVideoReqNum + "\"]}},"
   }
   //判断库表ID
   tableId := searchBody.Tabs
   esTableId := ""
   esTableIdStr := ""
   if tableId != nil && len(tableId) > 0 {
      esTableId = strings.Replace(strings.Trim(fmt.Sprint(tableId), "[]"), " ", "\",\"", -1)
      esTableIdStr = "{\"terms\":{\"tableId\":[\"" + esTableId + "\"]}},"
   }
   collectionStr := ""
   collection := searchBody.Collection
   if collection != "" {
      collectionStr = "{\"term\":{\"collection\":\"" + collection + "\"}},"
   }
   webPage := searchBody.Page
   webSize := searchBody.Size
   from := (webPage - 1) * webSize
   esFrom := strconv.Itoa(from)
   esSize := strconv.Itoa(webSize)
   //使用es底层机制处理分页
   //请求头
   url := "http://" + config.EsInfo.Masterip + ":" + config.EsInfo.Httpport +
      "/" + index + "/_search?search_type=dfs_query_then_fetch"
   //请求体
   prama := "{\"from\":\"" + esFrom + "\",\"size\":\"" + esSize + "\"," +
//   prama := "{\"size\":\"0\"," +
      "\"query\":{\"bool\":{" + queryStr +
      "\"filter\":[" +
      videoReqNumStr +
      taskIdStr +
      collectionStr +
      esTableIdStr +
      "{\"range\":{\"picDate\":{\"from\":\"" + gteDate + "\",\"to\":\"" + lteDate + "\",\"include_lower\":true,\"include_upper\":true,\"boost\":1}}}]}}," +
      "\"sort\":[{\"_score\":{\"order\":\"desc\"}},{\"picDate\":{\"order\":\"desc\"}}]," +
      "\"_source\":[\"tableName\",\"gender\",\"race\",\"content\",\"IDCard\",\"picAddress\",\"picDate\",\"sdkType\",\"age\",\"personId\",\"personIsHub\",\"personPicUrl\",\"picLocalUrl\",\"picSmUrl\",\"videoIp\",\"videoNum\",\"videoReqNum\",\"baseInfo\"]}";
   fmt.Println(prama)
   //数据解析
   tokenRes := esutil.GetEsDataReq(url, prama, true)
   return tokenRes
}
func sourceCompare(sources []interface{}, isEsSource bool, campareByte []byte, threshold float32) []interface{} {
   var filterName = "feature"
@@ -232,15 +87,367 @@
   return dataSource
}
// type  转 map
func Struct2Map(obj interface{}) map[string]interface{} {
   var data = make(map[string]interface{})
   bytes, _ := json.Marshal(obj)
   json.Unmarshal(bytes, &data)
   /*t := reflect.TypeOf(obj)
   v := reflect.ValueOf(obj)
   for i := 0; i < t.NumField(); i++ {
      data[t.Field(i).Name] = v.Field(i).Interface()
   }*/
   return data
// @Security ApiKeyAuth
// @Summary 查询ES集群信息-入口
// @Description  查询ES集群信息-入口
// @Accept  json
// @Produce json
// @Tags es
// @Param obj body controllers.EsClusterInfo true "查询集群参数"
// @Success 200 {string} json "{"code":200, msg:"", success:true}"
// @Failure 500 {string} json "{"code":500, msg:"", success:false}"
// @Router /data/api-v/es/getEsClusterInfo [POST]
func (em *EsManagementController) GetEsClusterInfo(c *gin.Context) {
   var body EsClusterInfo
   c.BindJSON(&body)
   ip := body.Ip
   serverIp := ""
   if ip != "" {
      serverIp = ip
   } else {
      localConf, err2 := cache.GetServerInfo()
      if err2 != nil || localConf.ServerIp == "" {
         logger.Debug("localConfig is wrong!!!")
         util.ResponseFormat(c, code.QueryClusterInfoErr, err2)
         return
      }
      serverIp = localConf.ServerIp
   }
   nodeInfos, err := getEsClusterInfors(serverIp)
   if err != nil {
      util.ResponseFormat(c, code.QueryClusterInfoErr, err)
      return
   }
   util.ResponseFormat(c, code.Success, nodeInfos)
}
//查询ES集群信息-业务逻辑
func getEsClusterInfors(ip string) ([]map[string]interface{}, error) {
   str := "curl " + ip + ":9200/_cat/nodes?v"
   cmd := exec.Command("sh", "-c", str)
   var out bytes.Buffer
   cmd.Stdout = &out
   err := cmd.Run()
   if err != nil {
      return nil, err
   }
   infos := strings.Split(string(out.String()), "\n")
   totalNodes := len(infos) - 1
   var nodeInfos []map[string]interface{}
   for i := 1; i < totalNodes; i++ {
      nodeInfo := make(map[string]interface{})
      context := strings.Fields(infos[i])
      nodeIp := context[0]
      Type := context[7]
      var nodeType string
      if Type == "mdi" {
         nodeType = "主节点"
      }
      if Type == "di" {
         nodeType = "从节点"
      }
      nodeInfo["ip"] = nodeIp
      nodeInfo["nodeType"] = nodeType
      url := "http://" + nodeIp + ":9200"
      buf := esutil.HttpGet(url)
      var info interface{}
      json.Unmarshal(buf, &info)
      tmpInfo := info.(map[string]interface{})
      tmpName := tmpInfo["name"].(string)
      versinInfo := tmpInfo["version"].(map[string]interface{})
      buildDate := versinInfo["build_date"].(string)
      nodeInfo["name"] = tmpName
      nodeInfo["buildDate"] = buildDate
      nodeInfos = append(nodeInfos, nodeInfo)
   }
   return nodeInfos, err
}
// @Security ApiKeyAuth
// @Summary 创建节点
// @Description  创建节点
// @Accept  json
// @Produce json
// @Tags es
// @Success 200 {string} json "{"code":200, msg:"", success:true}"
// @Failure 500 {string} json "{"code":500, msg:"", success:false}"
// @Router /data/api-v/es/createNode [POST]
func (em *EsManagementController) CreateNode(c *gin.Context) {
   msg := "创建节点失败,请联系管理员"
   str := "sh /opt/script/create_first_node.sh"
   cmd := exec.Command("sh", "-c", str)
   var out bytes.Buffer
   cmd.Stdout = &out
   err := cmd.Run()
   if err != nil {
   }
   infos := strings.Split(string(out.String()), "\n")[0]
   if infos == "服务启动成功" {
      msg = "创建节点成功"
      util.ResponseFormat(c, code.Success, msg)
      return
   }
   util.ResponseFormat(c, code.CreateFirstNodeErr, msg)
}
// @Security ApiKeyAuth
// @Summary 加入集群
// @Description  加入集群
// @Accept  json
// @Produce json
// @Tags es
// @Param obj body controllers.AddCluster true "加入集群参数"
// @Success 200 {string} json "{"code":200, msg:"", success:true}"
// @Failure 500 {string} json "{"code":500, msg:"", success:false}"
// @Router /data/api-v/es/addCluster [POST]
func (em *EsManagementController) AddCluster(c *gin.Context) {
   var ac AddCluster
   err := c.BindJSON(&ac)
   if err != nil {
      util.ResponseFormat(c, code.RequestParamError, "参数有误")
      return
   }
   str := "sh /opt/script/add_cluster.sh " + ac.Option + ""
   if ac.Option == "1" {
      info, err := updateUnicastHosts(ac.Ip)
      if err != nil || info == false {
         util.ResponseFormat(c, code.QueryClusterInfoErr, err)
         return
      }
      if info == true {
         info := runScript(str)
         if info == "运行失败" {
            util.ResponseFormat(c, code.AddClusterInfoErr, info)
            return
         }
      }
   }
   if ac.Option == "2" {
      info, err := updateUnicastHosts(ac.Ip)
      if err != nil || info == false {
         util.ResponseFormat(c, code.QueryClusterInfoErr, err)
         return
      }
      if info == true {
         info := runScript(str)
         if info == "运行失败" {
            util.ResponseFormat(c, code.AddClusterInfoErr, info)
            return
         }
      }
   } else {
      util.ResponseFormat(c, code.RequestParamError, "参数有误")
      return
   }
   util.ResponseFormat(c, code.Success, "加入成功")
}
//脚本封装
func runScript(str string) string {
   cmd := exec.Command("sh", "-c", str)
   var out bytes.Buffer
   cmd.Stdout = &out
   err := cmd.Run()
   if err != nil {
      return "运行失败"
   }
   return out.String()
}
type AddCluster struct {
   Ip     string `json:"ip"`   //集群IP
   Option string `json:option` //节点类型
}
//更新组播列表
func updateUnicastHosts(host string) (bool, error) {
   nodeInfos, err := getEsClusterInfors(host)
   if err != nil {
      return false, err
   }
   var hosts []string
   for _, val := range nodeInfos {
      nodeType := val["nodeType"].(string)
      if nodeType == "主节点" {
         ip := val["ip"].(string)
         hosts = append(hosts, ip)
      }
   }
   msg := false
   for i, val := range hosts {
      val = val + ":9300"
      hosts[i] = val
   }
   verificationHosts := "[\"" + strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", "\",\"", -1) + "\"]"
   for i, val := range hosts {
      val = "\\\"" + val + "\\\""
      hosts[i] = val
   }
   oldUnicastHost := "\\[\\\"0.0.0.0:9300\\\"\\]"
   newUnicastHost := strings.Replace(strings.Trim(fmt.Sprint(hosts), ""), " ", ",", -1)
   str := "sed -ie 's/discovery.zen.ping.unicast.hosts: " + oldUnicastHost + "/discovery.zen.ping.unicast.hosts: " + newUnicastHost + "/g' /opt/elasticsearch/config/elasticsearch.yml"
   fmt.Println(str)
   cmd := exec.Command("sh", "-c", str)
   var out bytes.Buffer
   cmd.Stdout = &out
   err1 := cmd.Run()
   if err1 != nil {
      return false, err
   }
   res := getUnicastHosts()
   fmt.Println("res:          ", res)
   res1 := "discovery.zen.ping.unicast.hosts: " + verificationHosts + ""
   fmt.Println("res1:         ", res1)
   if res == res1 {
      msg = true
   }
   return msg, err
}
//修改elasticsearch.yml映射文件
// @Security ApiKeyAuth
// @Summary 修改es集群Ip
// @Description  修改es集群Ip
// @Accept  json
// @Produce json
// @Tags es
// @Param obj body controllers.Hosts true "修改es集群Ip参数"
// @Success 200 {string} json "{"code":200, msg:"", success:true}"
// @Failure 500 {string} json "{"code":500, msg:"", success:false}"
// @Router /data/api-v/es/updateEsHosts [POST]
func (em *EsManagementController) UpdateEsHosts(c *gin.Context) {
   flag := "修改成功"
   var hosts Hosts
   c.BindJSON(&hosts)
   nodeInfos, err := getEsClusterInfors(hosts.OldIp)
   var nodeType string
   if err != nil {
      logger.Fatal(err)
      util.ResponseFormat(c, code.QueryClusterInfoErr, err)
   }
   newMasterHosts := make([]string, 0)
   allHosts := make([]string, 0)
   for _, val := range nodeInfos {
      ip := val["ip"].(string)
      tmpType := val["nodeType"].(string)
      if ip == hosts.OldIp {
         nodeType = tmpType
      }
      if tmpType == "主节点" {
         newMasterHosts = append(newMasterHosts, ip)
      }
      if ip != hosts.OldIp {
         allHosts = append(allHosts, ip)
      }
   }
   if nodeType == "主节点" {
      str1 := "sed -ie 's/network.host: " + hosts.OldIp + "/network.host: " + hosts.NewIp + "/g' /opt/elasticsearch/config/elasticsearch.yml"
      msg1 := runScript(str1)
      if msg1 == "运行失败" {
         flag = "修改配置文件失败"
      }
      for i, host := range newMasterHosts {
         if host == hosts.OldIp {
            newMasterHosts[i] = hosts.NewIp
         }
      }
      for i, val := range newMasterHosts {
         val = "\\\"" + val + ":9300\\\""
         newMasterHosts[i] = val
      }
      newUnicastHost := strings.Replace(strings.Trim(fmt.Sprint(newMasterHosts), ""), " ", ",", -1)
      tmpStr := "cat /opt/elasticsearch/config/elasticsearch.yml | grep discovery.zen.ping.unicast.hosts:"
      rs := runScript(tmpStr)
      ts := strings.Split(rs, "\n")[0]
      ots := strings.Split(ts, " ")[1]
      outs := strings.Replace(ots, "\"", "\\\"", -1)
      oldUnicastHost := "\\" + strings.Replace(outs, "]", "\\]", -1)
      str2 := "sed -ie 's/discovery.zen.ping.unicast.hosts: " + oldUnicastHost + "/discovery.zen.ping.unicast.hosts: " + newUnicastHost + "/g' /opt/elasticsearch/config/elasticsearch.yml"
      msg2 := runScript(str2)
      if msg2 == "运行失败" {
         flag = "修改配置文件失败"
      }
      for _, host := range allHosts {
         str3 := "sshpass -p \"123\" ssh basic@" + host + " \"cd /opt/elasticsearch/config  ; " + str2 + " ; cat elasticsearch.yml\""
         msg := runScript(str3)
         if msg == "运行失败" {
            flag = "修改配置文件失败"
         }
      }
   } else {
      str1 := "sed -ie 's/network.host: " + hosts.OldIp + "/network.host: " + hosts.NewIp + "/g' /opt/elasticsearch/config/elasticsearch.yml"
      msg1 := runScript(str1)
      if msg1 == "运行失败" {
         flag = "修改配置文件失败"
      }
   }
   if flag == "修改配置文件失败" {
      util.ResponseFormat(c, code.UpdateFail, flag)
   }
   util.ResponseFormat(c, code.Success, flag)
}
type Hosts struct {
   NewIp string `json:newIp`
   OldIp string `json:oldIp`
}
//func AddEsCluster(hosts []string) (string){
//   msg := "加入失败"
//   for i,val := range hosts{
//      val =val+":9300"
//      hosts[i] = val
//   }
//   verificationHosts := "[\""+strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", "\",\"", -1)+"\"]"
//   for i,val := range hosts{
//      val ="\\\""+val+"\\\""
//      hosts[i] = val
//   }
//   oldUnicastHost := "\\[\\\"0.0.0.0:9300\\\"\\]"
//   newUnicastHost := strings.Replace(strings.Trim(fmt.Sprint(hosts), ""), " ", ",", -1)
//   str := "sed -ie 's/discovery.zen.ping.unicast.hosts: "+oldUnicastHost+"/discovery.zen.ping.unicast.hosts: "+newUnicastHost+"/g' /opt/elasticsearch/config/elasticsearch.yml"
//   fmt.Println(str)
//   cmd := exec.Command("sh","-c",str)
//   var out bytes.Buffer
//   cmd.Stdout = &out
//   err := cmd.Run()
//   if err != nil {
//
//   }
//   res := getUnicastHosts()
//   fmt.Println("res:          ",res)
//   res1 := "discovery.zen.ping.unicast.hosts: "+verificationHosts+""
//   fmt.Println("res1:         ",res1)
//   if res == res1{
//      msg = "加入成功"
//   }
//   str2 := "echo \"node.master: true\" >> /opt/elasticsearch/config/elasticsearch.yml"
//   cmd2 := exec.Command("sh","-c",str2)
//   var out2 bytes.Buffer
//   cmd2.Stdout = &out2
//   err2 := cmd2.Run()
//   if err2 != nil {
//      msg = "加入失败"
//   }
//   return msg
//
//}
func getUnicastHosts() string {
   str := "cat /opt/elasticsearch/config/elasticsearch.yml | grep discovery.zen.ping.unicast.hosts:"
   cmd := exec.Command("sh", "-c", str)
   var out bytes.Buffer
   cmd.Stdout = &out
   err := cmd.Run()
   if err != nil {
   }
   infos := strings.Split(string(out.String()), "\n")[0]
   return infos
}