sunty
2019-11-01 b01bab835d32a0c51076f81269df8c9cb77bf757
add es cluster manger
3个文件已修改
295 ■■■■ 已修改文件
controllers/es.go 256 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
extend/code/code.go 33 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
router/router.go 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controllers/es.go
@@ -21,6 +21,12 @@
type EsController struct{}
type EsManagementController struct{}
type EsClusterInfo struct {
    Ip string `json:"ip"`
}
// @Summary 比对数据查询
// @Description  比对数据查询
// @Accept  json
@@ -37,28 +43,28 @@
        util.ResponseFormat(c, code.RequestParamError, "参数有误")
        return
    }
    if searchBody.CompareNum !="" {
    if searchBody.CompareNum != "" {
        //二次搜索,不需要再比对了
        co := service.GetCompResultByNum(searchBody.CompareNum)
        if co != nil {
            //二次搜索和排序
            twiceM := GetCompareDataTwice(co,searchBody)
            util.ResponseFormat(c,code.Success,twiceM)
            twiceM := GetCompareDataTwice(co, searchBody)
            util.ResponseFormat(c, code.Success, twiceM)
            return
        } else {
            m :=make(map[string]interface{},0)
            m := make(map[string]interface{}, 0)
            m["compareNum"] = searchBody.CompareNum
            m["total"] = 0
            m["totalList"] = []CompareResult{}
            util.ResponseFormat(c,code.CompareResultGone,m)
            util.ResponseFormat(c, code.CompareResultGone, m)
            return
        }
    }
    m :=make(map[string]interface{},0)
    m := make(map[string]interface{}, 0)
    m["compareNum"] = searchBody.CompareNum
    m["total"] = 0
    m["totalList"] = []CompareResult{}
    util.ResponseFormat(c,code.CompareResultGone,m)
    util.ResponseFormat(c, code.CompareResultGone, m)
}
func searchEsData(searchBody models.EsSearch) map[string]interface{} {
@@ -108,7 +114,7 @@
    //使用es底层机制处理分页
    //请求头
    localConf, err2 := cache.GetServerInfo()
    if err2 !=nil || localConf.AlarmIp == "" || localConf.ServerId == "" {
    if err2 != nil || localConf.AlarmIp == "" || localConf.ServerId == "" {
        logger.Debug("localConfig is wrong!!!")
        return nil
    }
@@ -157,47 +163,62 @@
    return dataSource
}
func GetEsClusterInfo(ip string) ([]map[string]interface{}, error){
//查询ES集群信息-入口
func (em *EsManagementController) GetEsClusterInfo(c *gin.Context) {
    var body EsClusterInfo
    c.BindJSON(&body)
    ip := body.Ip
    serverIp := ""
    if ip != "" {
        serverIp = ip
    } else {
        localConf, err2 := cache.GetServerInfo()
        if err2 !=nil || localConf.ServerIp == "" {
        if err2 != nil || localConf.ServerIp == "" {
            logger.Debug("localConfig is wrong!!!")
            return nil,err2
            util.ResponseFormat(c, code.QueryClusterInfoErr, err2)
            return
        }
        serverIp = localConf.ServerIp
    }
    str := "curl "+serverIp+":9200/_cat/nodes?v"
    cmd := exec.Command("sh","-c",str)
    nodeInfos, err := getEsClusterInfors(serverIp)
    if err != nil {
        util.ResponseFormat(c, code.QueryClusterInfoErr, err)
        return
    }
    util.ResponseFormat(c, code.QueryClusterInfoErr, nodeInfos)
}
//查询ES集群信息-业务逻辑
func getEsClusterInfors(ip string) ([]map[string]interface{}, error) {
    str := "curl " + ip + ":9200/_cat/nodes?v"
    cmd := exec.Command("sh", "-c", str)
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
        return nil,err
        return nil, err
    }
    infos := strings.Split(string(out.String()),"\n")
    totalNodes := len(infos)-1
    infos := strings.Split(string(out.String()), "\n")
    totalNodes := len(infos) - 1
    var nodeInfos []map[string]interface{}
    for i:=1;i<totalNodes ;i++  {
        nodeInfo :=  make(map[string]interface{})
    for i := 1; i < totalNodes; i++ {
        nodeInfo := make(map[string]interface{})
        context := strings.Fields(infos[i])
        nodeIp := context[0]
        Type := context[8]
        var nodeType string
        if Type == "*"{
        if Type == "*" {
            nodeType = "主节点"
        }
        if Type == "-"{
        if Type == "-" {
            nodeType = "从节点"
        }
        nodeInfo["ip"] = nodeIp
        nodeInfo["nodeType"] = nodeType
        url := "http://"+nodeIp+":9200"
        url := "http://" + nodeIp + ":9200"
        buf := esutil.HttpGet(url)
        var info interface{}
        json.Unmarshal(buf,&info)
        json.Unmarshal(buf, &info)
        tmpInfo := info.(map[string]interface{})
        tmpName := tmpInfo["name"].(string)
        versinInfo := tmpInfo["version"].(map[string]interface{})
@@ -206,61 +227,188 @@
        nodeInfo["buildDate"] = buildDate
        nodeInfos = append(nodeInfos, nodeInfo)
    }
    return nodeInfos,nil
    return nodeInfos, err
}
func AddEsCluster(hosts []string) (string){
    msg := "加入失败"
    for i,val := range hosts{
        val =val+":9300"
//创建节点
func (em *EsManagementController) CreateNode(c *gin.Context) {
    msg := "创建节点失败,请联系管理员"
    str := "sh /opt/script/create_first_node.sh"
    cmd := exec.Command("sh", "-c", str)
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
    }
    infos := strings.Split(string(out.String()), "\n")
    len := len(infos)
    res := infos[len-1]
    if res == "服务启动成功" {
        msg = "创建节点成功"
        util.ResponseFormat(c, code.Success, msg)
        return
    }
    util.ResponseFormat(c, code.CreateFirstNodeErr, msg)
}
//加入集群
func (em *EsManagementController) AddCluster(c *gin.Context) {
    var ac AddCluster
    err := c.BindJSON(&ac)
    if err != nil {
        util.ResponseFormat(c, code.RequestParamError, "参数有误")
        return
    }
    str := "sh /opt/script/create_node.sh " + ac.Option + ""
    if ac.Option == "1" {
        info, err := updateUnicastHosts(ac.Ip)
        if err != nil || info == false {
            util.ResponseFormat(c, code.QueryClusterInfoErr, err)
            return
        }
        if info == true {
            info := runScript(str)
            if info == "运行失败" {
                util.ResponseFormat(c, code.AddClusterInfoErr, info)
                return
            }
        }
    }
    if ac.Option == "2" {
        info, err := updateUnicastHosts(ac.Ip)
        if err != nil || info == false {
            util.ResponseFormat(c, code.QueryClusterInfoErr, err)
            return
        }
        if info == true {
            info := runScript(str)
            if info == "运行失败" {
                util.ResponseFormat(c, code.AddClusterInfoErr, info)
                return
            }
        }
    } else {
        util.ResponseFormat(c, code.RequestParamError, "参数有误")
        return
    }
    util.ResponseFormat(c, code.Success, "加入成功")
}
//脚本封装
func runScript(str string) string {
    cmd := exec.Command("sh", "-c", str)
    var out bytes.Buffer
    cmd.Stdout = &out
    err1 := cmd.Run()
    if err1 != nil {
        return "运行失败"
    }
    return out.String()
}
type AddCluster struct {
    Ip     string `json:"ip"`   //集群IP
    Option string `json:option` //节点类型
}
//更新组播列表
func updateUnicastHosts(host string) (bool, error) {
    nodeInfos, err := getEsClusterInfors(host)
    if err != nil {
        return false, err
    }
    var hosts []string
    for _, val := range nodeInfos {
        nodeType := val["nodeType"].(string)
        if nodeType == "主节点" {
            ip := val["ip"].(string)
            hosts = append(hosts, ip)
        }
    }
    msg := false
    for i, val := range hosts {
        val = val + ":9300"
        hosts[i] = val
    }
    verificationHosts := "[\""+strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", "\",\"", -1)+"\"]"
    for i,val := range hosts{
        val ="\\\""+val+"\\\""
    verificationHosts := "[\"" + strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", "\",\"", -1) + "\"]"
    for i, val := range hosts {
        val = "\\\"" + val + "\\\""
        hosts[i] = val
    }
    oldUnicastHost := "\\[\\\"0.0.0.0:9300\\\"\\]"
    newUnicastHost := strings.Replace(strings.Trim(fmt.Sprint(hosts), ""), " ", ",", -1)
    str := "sed -ie 's/discovery.zen.ping.unicast.hosts: "+oldUnicastHost+"/discovery.zen.ping.unicast.hosts: "+newUnicastHost+"/g' /opt/elasticsearch/config/elasticsearch.yml"
    str := "sed -ie 's/discovery.zen.ping.unicast.hosts: " + oldUnicastHost + "/discovery.zen.ping.unicast.hosts: " + newUnicastHost + "/g' /opt/elasticsearch/config/elasticsearch.yml"
    fmt.Println(str)
    cmd := exec.Command("sh","-c",str)
    cmd := exec.Command("sh", "-c", str)
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
    err1 := cmd.Run()
    if err1 != nil {
        return false, err
    }
    res := getUnicastHosts()
    fmt.Println("res:          ",res)
    res1 := "discovery.zen.ping.unicast.hosts: "+verificationHosts+""
    fmt.Println("res1:         ",res1)
    if res == res1{
        msg = "加入成功"
    fmt.Println("res:          ", res)
    res1 := "discovery.zen.ping.unicast.hosts: " + verificationHosts + ""
    fmt.Println("res1:         ", res1)
    if res == res1 {
        msg = true
    }
    str2 := "echo \"node.master: true\" >> /opt/elasticsearch/config/elasticsearch.yml"
    cmd2 := exec.Command("sh","-c",str2)
    var out2 bytes.Buffer
    cmd2.Stdout = &out2
    err2 := cmd2.Run()
    if err2 != nil {
        msg = "加入失败"
    }
    return msg
    return msg, err
}
func getUnicastHosts() (string){
//func AddEsCluster(hosts []string) (string){
//    msg := "加入失败"
//    for i,val := range hosts{
//        val =val+":9300"
//        hosts[i] = val
//    }
//    verificationHosts := "[\""+strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", "\",\"", -1)+"\"]"
//    for i,val := range hosts{
//        val ="\\\""+val+"\\\""
//        hosts[i] = val
//    }
//    oldUnicastHost := "\\[\\\"0.0.0.0:9300\\\"\\]"
//    newUnicastHost := strings.Replace(strings.Trim(fmt.Sprint(hosts), ""), " ", ",", -1)
//    str := "sed -ie 's/discovery.zen.ping.unicast.hosts: "+oldUnicastHost+"/discovery.zen.ping.unicast.hosts: "+newUnicastHost+"/g' /opt/elasticsearch/config/elasticsearch.yml"
//    fmt.Println(str)
//    cmd := exec.Command("sh","-c",str)
//    var out bytes.Buffer
//    cmd.Stdout = &out
//    err := cmd.Run()
//    if err != nil {
//
//    }
//    res := getUnicastHosts()
//    fmt.Println("res:          ",res)
//    res1 := "discovery.zen.ping.unicast.hosts: "+verificationHosts+""
//    fmt.Println("res1:         ",res1)
//    if res == res1{
//        msg = "加入成功"
//    }
//    str2 := "echo \"node.master: true\" >> /opt/elasticsearch/config/elasticsearch.yml"
//    cmd2 := exec.Command("sh","-c",str2)
//    var out2 bytes.Buffer
//    cmd2.Stdout = &out2
//    err2 := cmd2.Run()
//    if err2 != nil {
//        msg = "加入失败"
//    }
//    return msg
//
//}
func getUnicastHosts() string {
    str := "cat /opt/elasticsearch/config/elasticsearch.yml | grep discovery.zen.ping.unicast.hosts:"
    cmd := exec.Command("sh","-c",str)
    cmd := exec.Command("sh", "-c", str)
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
    }
    infos := strings.Split(string(out.String()),"\n")[0]
    infos := strings.Split(string(out.String()), "\n")[0]
    return infos
}
extend/code/code.go
@@ -11,11 +11,11 @@
var (
    // Success 请求处理成功
    Success = &Code{http.StatusOK, true, "请求处理成功"}
    AddSuccess = &Code{http.StatusOK,true,"添加成功"}
    UpdateSuccess = &Code{http.StatusOK,true,"更新成功"}
    UpdateFail = &Code{http.StatusBadRequest,false,"更新失败"}
    DelSuccess = &Code{http.StatusOK,true,"删除成功"}
    Success       = &Code{http.StatusOK, true, "请求处理成功"}
    AddSuccess    = &Code{http.StatusOK, true, "添加成功"}
    UpdateSuccess = &Code{http.StatusOK, true, "更新成功"}
    UpdateFail    = &Code{http.StatusBadRequest, false, "更新失败"}
    DelSuccess    = &Code{http.StatusOK, true, "删除成功"}
    // RequestParamError 请求参数错误
    RequestParamError = &Code{http.StatusBadRequest, false, "请求参数有误"}
    // AccountPassUnmatch 该账号原密码不匹配
@@ -28,10 +28,10 @@
    UploadSuffixError = &Code{http.StatusBadRequest, false, "该上传文件格式目前暂不支持"}
    // UploadSizeLimit 目前上传仅支持小于5M的文件内容
    UploadSizeLimit = &Code{http.StatusBadRequest, false, "目前上传仅支持小于5M的文件内容"}
    LoginSuccess = &Code{http.StatusOK,true,"登录成功"}
    LoginSuccess    = &Code{http.StatusOK, true, "登录成功"}
    // SigninInfoError 账户名或密码有误
    LoginInfoError = &Code{http.StatusUnauthorized, false, "用户名或密码错误"}
    LogoutSuccess = &Code{http.StatusOK,true,"退出成功"}
    LogoutSuccess  = &Code{http.StatusOK, true, "退出成功"}
    // TokenNotFound 请求未携带Token, 无权访问
    TokenNotFound = &Code{http.StatusUnauthorized, false, "请求未携带Token, 无权访问"}
    // TokenInvalid 无效的Token信息
@@ -49,16 +49,19 @@
    //删除节点失败
    HasChildNodeError = &Code{http.StatusInternalServerError, false, "数据节点存在子节点 无法删除"}
    NotLogin = &Code{ http.StatusUnauthorized, false, "登录失效,请重新登录"}
    DbPersonUploadSuccess = &Code{ http.StatusOK, true, "人员上传成功"}
    DbPersonUploadFail = &Code{ http.StatusInternalServerError, false, "人员上传失败"}
    DbPersonUpdateSuccess = &Code{ http.StatusOK, true, "人员更新成功"}
    NotLogin              = &Code{http.StatusUnauthorized, false, "登录失效,请重新登录"}
    DbPersonUploadSuccess = &Code{http.StatusOK, true, "人员上传成功"}
    DbPersonUploadFail    = &Code{http.StatusInternalServerError, false, "人员上传失败"}
    DbPersonUpdateSuccess = &Code{http.StatusOK, true, "人员更新成功"}
    CompareResultGone = &Code{ http.StatusOK,true,"上次比对已失效,请从新比对"}
    CompareResultGone = &Code{http.StatusOK, true, "上次比对已失效,请从新比对"}
    TaskStoped = &Code{http.StatusOK, false, "此任务为停用状态,请在任务管理中开启!"}
    ComError = &Code{http.StatusInternalServerError, false, ""}
    ClusterNodesEmpty = &Code{http.StatusOK, true, "集群节点为空"}
    AddTaskErr = &Code{http.StatusInternalServerError, false, "此国标摄像机已在其它服务器配置任务!"}
    ComError            = &Code{http.StatusInternalServerError, false, ""}
    ClusterNodesEmpty   = &Code{http.StatusOK, true, "集群节点为空"}
    AddTaskErr          = &Code{http.StatusInternalServerError, false, "此国标摄像机已在其它服务器配置任务!"}
    CreateFirstNodeErr  = &Code{http.StatusInternalServerError, false, "创建节点失败!"}
    QueryClusterInfoErr = &Code{http.StatusInternalServerError, false, "查询失败,请确认您的ip是正确的!"}
    AddClusterInfoErr   = &Code{http.StatusInternalServerError, false, "加入节点失败!"}
)
router/router.go
@@ -29,6 +29,7 @@
    ssController := new(controllers.SysSetController)
    sdkController := new(controllers.SdkController)
    esSearchController := new(controllers.EsSearchController)
    esManagementController := new(controllers.EsManagementController)
    realTimeController := new(controllers.RealTimeController)
    cameraTimeRuleController := new(controllers.CameraTimeruleController)
    polygonController := new(controllers.CameraPolygonController)
@@ -145,13 +146,16 @@
        task.POST("/saveTaskSdkRule", taskController.SaveTaskSdkRule)
    }
    // 检索 查询
    // 检索 查询 节点操作
    es := r.Group(urlPrefix + "/es")
    {
        es.POST("/tagList", esSearchController.PostEsTagList)
        es.POST("/taskList", esSearchController.PostEsTaskList)
        es.POST("/esSearch", esSearchController.PostEsSearch)
        es.POST("/queryEsCompareData", controllers.PostEsCompareData)
        es.POST("/getEsClusterInfo", esManagementController.GetEsClusterInfo)
        es.POST("/addCluster", esManagementController.AddCluster)
        es.POST("/createNode", esManagementController.CreateNode)
    }
    //实时被调数据