package controllers import ( "basic.com/valib/logger.git" "bytes" "encoding/json" "fmt" "github.com/gin-gonic/gin" "math/rand" "os/exec" "strings" "webserver/cache" "webserver/extend/code" "webserver/extend/esutil" "webserver/extend/util" "webserver/models" "webserver/service" ) type EsController struct{} type EsManagementController struct{} type EsClusterInfo struct { Ip string `json:"ip"` } // @Security ApiKeyAuth // @Summary 比对数据查询 // @Description 比对数据查询 // @Accept json // @Produce json // @Tags es // @Param reqMap body models.EsSearch true "collection 为空" // @Success 200 {string} json "{"code":200, msg:"", success:true}" // @Failure 500 {string} json "{"code":500, msg:"", success:false}" // @Router /data/api-v/es/queryEsCompareData [POST] func PostEsCompareData(c *gin.Context) { searchBody := new(models.EsSearch) err := c.BindJSON(&searchBody) if err != nil || searchBody.PicUrl == "" || len(searchBody.DataBases) == 0 { util.ResponseFormat(c, code.RequestParamError, "参数有误") return } if searchBody.CompareNum != "" { //二次搜索,不需要再比对了 co := service.GetCompResultByNum(searchBody.CompareNum) if co != nil { //二次搜索和排序 twiceM := GetCompareDataTwice(co, searchBody) util.ResponseFormat(c, code.Success, twiceM) return } else { m := make(map[string]interface{}, 0) m["compareNum"] = searchBody.CompareNum m["total"] = 0 m["totalList"] = []CompareResult{} util.ResponseFormat(c, code.CompareResultGone, m) return } } m := make(map[string]interface{}, 0) m["compareNum"] = searchBody.CompareNum m["total"] = 0 m["totalList"] = []CompareResult{} util.ResponseFormat(c, code.CompareResultGone, m) } func sourceCompare(sources []interface{}, isEsSource bool, campareByte []byte, threshold float32) []interface{} { var filterName = "feature" if isEsSource { filterName = "FaceFeature" } fmt.Println("查询" + filterName) dataSource := make([]interface{}, 0, 20) for _, obj := range sources { source := obj.(map[string]interface{}) //feature := source[filterName].(string) // linux //featByte, _ := base64.StdEncoding.DecodeString(feature) // linux //score := gosdk.FaceCompare(featByte, campareByte) // linux score := rand.Float32() // windows if score >= threshold*0.01 { source["score"] = int(score * 100) dataSource = append(dataSource, source) } } return dataSource } // @Security ApiKeyAuth // @Summary 查询ES集群信息-入口 // @Description 查询ES集群信息-入口 // @Accept json // @Produce json // @Tags es // @Param obj body controllers.EsClusterInfo true "查询集群参数" // @Success 200 {string} json "{"code":200, msg:"", success:true}" // @Failure 500 {string} json "{"code":500, msg:"", success:false}" // @Router /data/api-v/es/getEsClusterInfo [POST] func (em *EsManagementController) GetEsClusterInfo(c *gin.Context) { var body EsClusterInfo c.BindJSON(&body) ip := body.Ip serverIp := "" if ip != "" { serverIp = ip } else { localConf, err2 := cache.GetServerInfo() if err2 != nil || localConf.ServerIp == "" { logger.Debug("localConfig is wrong!!!") util.ResponseFormat(c, code.QueryClusterInfoErr, err2) return } serverIp = localConf.ServerIp } nodeInfos, err := getEsClusterInfors(serverIp) if err != nil { util.ResponseFormat(c, code.QueryClusterInfoErr, err) return } util.ResponseFormat(c, code.Success, nodeInfos) } //查询ES集群信息-业务逻辑 func getEsClusterInfors(ip string) ([]map[string]interface{}, error) { str := "curl " + ip + ":9200/_cat/nodes?v" cmd := exec.Command("sh", "-c", str) var out bytes.Buffer cmd.Stdout = &out err := cmd.Run() if err != nil { return nil, err } infos := strings.Split(string(out.String()), "\n") totalNodes := len(infos) - 1 var nodeInfos []map[string]interface{} for i := 1; i < totalNodes; i++ { nodeInfo := make(map[string]interface{}) context := strings.Fields(infos[i]) nodeIp := context[0] Type := context[7] var nodeType string if Type == "mdi" { nodeType = "主节点" } if Type == "di" { nodeType = "从节点" } nodeInfo["ip"] = nodeIp nodeInfo["nodeType"] = nodeType url := "http://" + nodeIp + ":9200" buf := esutil.HttpGet(url) var info interface{} json.Unmarshal(buf, &info) tmpInfo := info.(map[string]interface{}) tmpName := tmpInfo["name"].(string) versinInfo := tmpInfo["version"].(map[string]interface{}) buildDate := versinInfo["build_date"].(string) nodeInfo["name"] = tmpName nodeInfo["buildDate"] = buildDate nodeInfos = append(nodeInfos, nodeInfo) } return nodeInfos, err } // @Security ApiKeyAuth // @Summary 创建节点 // @Description 创建节点 // @Accept json // @Produce json // @Tags es // @Success 200 {string} json "{"code":200, msg:"", success:true}" // @Failure 500 {string} json "{"code":500, msg:"", success:false}" // @Router /data/api-v/es/createNode [POST] func (em *EsManagementController) CreateNode(c *gin.Context) { msg := "创建节点失败,请联系管理员" str := "sh /opt/script/create_first_node.sh" cmd := exec.Command("sh", "-c", str) var out bytes.Buffer cmd.Stdout = &out err := cmd.Run() if err != nil { } infos := strings.Split(string(out.String()), "\n")[0] if infos == "服务启动成功" { msg = "创建节点成功" util.ResponseFormat(c, code.Success, msg) return } util.ResponseFormat(c, code.CreateFirstNodeErr, msg) } // @Security ApiKeyAuth // @Summary 加入集群 // @Description 加入集群 // @Accept json // @Produce json // @Tags es // @Param obj body controllers.AddCluster true "加入集群参数" // @Success 200 {string} json "{"code":200, msg:"", success:true}" // @Failure 500 {string} json "{"code":500, msg:"", success:false}" // @Router /data/api-v/es/addCluster [POST] func (em *EsManagementController) AddCluster(c *gin.Context) { var ac AddCluster err := c.BindJSON(&ac) if err != nil { util.ResponseFormat(c, code.RequestParamError, "参数有误") return } str := "sh /opt/script/add_cluster.sh " + ac.Option + "" if ac.Option == "1" { info, err := updateUnicastHosts(ac.Ip) if err != nil || info == false { util.ResponseFormat(c, code.QueryClusterInfoErr, err) return } if info == true { info := runScript(str) if info == "运行失败" { util.ResponseFormat(c, code.AddClusterInfoErr, info) return } } } if ac.Option == "2" { info, err := updateUnicastHosts(ac.Ip) if err != nil || info == false { util.ResponseFormat(c, code.QueryClusterInfoErr, err) return } if info == true { info := runScript(str) if info == "运行失败" { util.ResponseFormat(c, code.AddClusterInfoErr, info) return } } } else { util.ResponseFormat(c, code.RequestParamError, "参数有误") return } util.ResponseFormat(c, code.Success, "加入成功") } //脚本封装 func runScript(str string) string { cmd := exec.Command("sh", "-c", str) var out bytes.Buffer cmd.Stdout = &out err := cmd.Run() if err != nil { return "运行失败" } return out.String() } type AddCluster struct { Ip string `json:"ip"` //集群IP Option string `json:option` //节点类型 } //更新组播列表 func updateUnicastHosts(host string) (bool, error) { nodeInfos, err := getEsClusterInfors(host) if err != nil { return false, err } var hosts []string for _, val := range nodeInfos { nodeType := val["nodeType"].(string) if nodeType == "主节点" { ip := val["ip"].(string) hosts = append(hosts, ip) } } msg := false for i, val := range hosts { val = val + ":9300" hosts[i] = val } verificationHosts := "[\"" + strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", "\",\"", -1) + "\"]" for i, val := range hosts { val = "\\\"" + val + "\\\"" hosts[i] = val } oldUnicastHost := "\\[\\\"0.0.0.0:9300\\\"\\]" newUnicastHost := strings.Replace(strings.Trim(fmt.Sprint(hosts), ""), " ", ",", -1) str := "sed -ie 's/discovery.zen.ping.unicast.hosts: " + oldUnicastHost + "/discovery.zen.ping.unicast.hosts: " + newUnicastHost + "/g' /opt/elasticsearch/config/elasticsearch.yml" fmt.Println(str) cmd := exec.Command("sh", "-c", str) var out bytes.Buffer cmd.Stdout = &out err1 := cmd.Run() if err1 != nil { return false, err } res := getUnicastHosts() fmt.Println("res: ", res) res1 := "discovery.zen.ping.unicast.hosts: " + verificationHosts + "" fmt.Println("res1: ", res1) if res == res1 { msg = true } return msg, err } //修改elasticsearch.yml映射文件 // @Security ApiKeyAuth // @Summary 修改es集群Ip // @Description 修改es集群Ip // @Accept json // @Produce json // @Tags es // @Param obj body controllers.Hosts true "修改es集群Ip参数" // @Success 200 {string} json "{"code":200, msg:"", success:true}" // @Failure 500 {string} json "{"code":500, msg:"", success:false}" // @Router /data/api-v/es/updateEsHosts [POST] func (em *EsManagementController) UpdateEsHosts(c *gin.Context) { flag := "修改成功" var hosts Hosts c.BindJSON(&hosts) nodeInfos, err := getEsClusterInfors(hosts.OldIp) var nodeType string if err != nil { logger.Fatal(err) util.ResponseFormat(c, code.QueryClusterInfoErr, err) } newMasterHosts := make([]string, 0) allHosts := make([]string, 0) for _, val := range nodeInfos { ip := val["ip"].(string) tmpType := val["nodeType"].(string) if ip == hosts.OldIp { nodeType = tmpType } if tmpType == "主节点" { newMasterHosts = append(newMasterHosts, ip) } if ip != hosts.OldIp { allHosts = append(allHosts, ip) } } if nodeType == "主节点" { str1 := "sed -ie 's/network.host: " + hosts.OldIp + "/network.host: " + hosts.NewIp + "/g' /opt/elasticsearch/config/elasticsearch.yml" msg1 := runScript(str1) if msg1 == "运行失败" { flag = "修改配置文件失败" } for i, host := range newMasterHosts { if host == hosts.OldIp { newMasterHosts[i] = hosts.NewIp } } for i, val := range newMasterHosts { val = "\\\"" + val + ":9300\\\"" newMasterHosts[i] = val } newUnicastHost := strings.Replace(strings.Trim(fmt.Sprint(newMasterHosts), ""), " ", ",", -1) tmpStr := "cat /opt/elasticsearch/config/elasticsearch.yml | grep discovery.zen.ping.unicast.hosts:" rs := runScript(tmpStr) ts := strings.Split(rs, "\n")[0] ots := strings.Split(ts, " ")[1] outs := strings.Replace(ots, "\"", "\\\"", -1) oldUnicastHost := "\\" + strings.Replace(outs, "]", "\\]", -1) str2 := "sed -ie 's/discovery.zen.ping.unicast.hosts: " + oldUnicastHost + "/discovery.zen.ping.unicast.hosts: " + newUnicastHost + "/g' /opt/elasticsearch/config/elasticsearch.yml" msg2 := runScript(str2) if msg2 == "运行失败" { flag = "修改配置文件失败" } for _, host := range allHosts { str3 := "sshpass -p \"123\" ssh basic@" + host + " \"cd /opt/elasticsearch/config ; " + str2 + " ; cat elasticsearch.yml\"" msg := runScript(str3) if msg == "运行失败" { flag = "修改配置文件失败" } } } else { str1 := "sed -ie 's/network.host: " + hosts.OldIp + "/network.host: " + hosts.NewIp + "/g' /opt/elasticsearch/config/elasticsearch.yml" msg1 := runScript(str1) if msg1 == "运行失败" { flag = "修改配置文件失败" } } if flag == "修改配置文件失败" { util.ResponseFormat(c, code.UpdateFail, flag) } util.ResponseFormat(c, code.Success, flag) } type Hosts struct { NewIp string `json:newIp` OldIp string `json:oldIp` } //func AddEsCluster(hosts []string) (string){ // msg := "加入失败" // for i,val := range hosts{ // val =val+":9300" // hosts[i] = val // } // verificationHosts := "[\""+strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", "\",\"", -1)+"\"]" // for i,val := range hosts{ // val ="\\\""+val+"\\\"" // hosts[i] = val // } // oldUnicastHost := "\\[\\\"0.0.0.0:9300\\\"\\]" // newUnicastHost := strings.Replace(strings.Trim(fmt.Sprint(hosts), ""), " ", ",", -1) // str := "sed -ie 's/discovery.zen.ping.unicast.hosts: "+oldUnicastHost+"/discovery.zen.ping.unicast.hosts: "+newUnicastHost+"/g' /opt/elasticsearch/config/elasticsearch.yml" // fmt.Println(str) // cmd := exec.Command("sh","-c",str) // var out bytes.Buffer // cmd.Stdout = &out // err := cmd.Run() // if err != nil { // // } // res := getUnicastHosts() // fmt.Println("res: ",res) // res1 := "discovery.zen.ping.unicast.hosts: "+verificationHosts+"" // fmt.Println("res1: ",res1) // if res == res1{ // msg = "加入成功" // } // str2 := "echo \"node.master: true\" >> /opt/elasticsearch/config/elasticsearch.yml" // cmd2 := exec.Command("sh","-c",str2) // var out2 bytes.Buffer // cmd2.Stdout = &out2 // err2 := cmd2.Run() // if err2 != nil { // msg = "加入失败" // } // return msg // //} func getUnicastHosts() string { str := "cat /opt/elasticsearch/config/elasticsearch.yml | grep discovery.zen.ping.unicast.hosts:" cmd := exec.Command("sh", "-c", str) var out bytes.Buffer cmd.Stdout = &out err := cmd.Run() if err != nil { } infos := strings.Split(string(out.String()), "\n")[0] return infos }