sunty
2019-11-01 b01bab835d32a0c51076f81269df8c9cb77bf757
add es cluster manger
3个文件已修改
201 ■■■■ 已修改文件
controllers/es.go 192 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
extend/code/code.go 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
router/router.go 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controllers/es.go
@@ -21,6 +21,12 @@
type EsController struct{}
type EsManagementController struct{}
type EsClusterInfo struct {
    Ip string `json:"ip"`
}
// @Summary 比对数据查询
// @Description  比对数据查询
// @Accept  json
@@ -157,7 +163,11 @@
    return dataSource
}
func GetEsClusterInfo(ip string) ([]map[string]interface{}, error){
//查询ES集群信息-入口
func (em *EsManagementController) GetEsClusterInfo(c *gin.Context) {
    var body EsClusterInfo
    c.BindJSON(&body)
    ip := body.Ip
    serverIp := ""
    if ip != "" {
        serverIp = ip
@@ -165,11 +175,22 @@
        localConf, err2 := cache.GetServerInfo()
        if err2 !=nil || localConf.ServerIp == "" {
            logger.Debug("localConfig is wrong!!!")
            return nil,err2
            util.ResponseFormat(c, code.QueryClusterInfoErr, err2)
            return
        }
        serverIp = localConf.ServerIp
    }
    str := "curl "+serverIp+":9200/_cat/nodes?v"
    nodeInfos, err := getEsClusterInfors(serverIp)
    if err != nil {
        util.ResponseFormat(c, code.QueryClusterInfoErr, err)
        return
    }
    util.ResponseFormat(c, code.QueryClusterInfoErr, nodeInfos)
}
//查询ES集群信息-业务逻辑
func getEsClusterInfors(ip string) ([]map[string]interface{}, error) {
    str := "curl " + ip + ":9200/_cat/nodes?v"
    cmd := exec.Command("sh","-c",str)
    var out bytes.Buffer
    cmd.Stdout = &out
@@ -206,12 +227,107 @@
        nodeInfo["buildDate"] = buildDate
        nodeInfos = append(nodeInfos, nodeInfo)
    }
    return nodeInfos,nil
    return nodeInfos, err
}
func AddEsCluster(hosts []string) (string){
    msg := "加入失败"
//创建节点
func (em *EsManagementController) CreateNode(c *gin.Context) {
    msg := "创建节点失败,请联系管理员"
    str := "sh /opt/script/create_first_node.sh"
    cmd := exec.Command("sh", "-c", str)
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
    }
    infos := strings.Split(string(out.String()), "\n")
    len := len(infos)
    res := infos[len-1]
    if res == "服务启动成功" {
        msg = "创建节点成功"
        util.ResponseFormat(c, code.Success, msg)
        return
    }
    util.ResponseFormat(c, code.CreateFirstNodeErr, msg)
}
//加入集群
func (em *EsManagementController) AddCluster(c *gin.Context) {
    var ac AddCluster
    err := c.BindJSON(&ac)
    if err != nil {
        util.ResponseFormat(c, code.RequestParamError, "参数有误")
        return
    }
    str := "sh /opt/script/create_node.sh " + ac.Option + ""
    if ac.Option == "1" {
        info, err := updateUnicastHosts(ac.Ip)
        if err != nil || info == false {
            util.ResponseFormat(c, code.QueryClusterInfoErr, err)
            return
        }
        if info == true {
            info := runScript(str)
            if info == "运行失败" {
                util.ResponseFormat(c, code.AddClusterInfoErr, info)
                return
            }
        }
    }
    if ac.Option == "2" {
        info, err := updateUnicastHosts(ac.Ip)
        if err != nil || info == false {
            util.ResponseFormat(c, code.QueryClusterInfoErr, err)
            return
        }
        if info == true {
            info := runScript(str)
            if info == "运行失败" {
                util.ResponseFormat(c, code.AddClusterInfoErr, info)
                return
            }
        }
    } else {
        util.ResponseFormat(c, code.RequestParamError, "参数有误")
        return
    }
    util.ResponseFormat(c, code.Success, "加入成功")
}
//脚本封装
func runScript(str string) string {
    cmd := exec.Command("sh", "-c", str)
    var out bytes.Buffer
    cmd.Stdout = &out
    err1 := cmd.Run()
    if err1 != nil {
        return "运行失败"
    }
    return out.String()
}
type AddCluster struct {
    Ip     string `json:"ip"`   //集群IP
    Option string `json:option` //节点类型
}
//更新组播列表
func updateUnicastHosts(host string) (bool, error) {
    nodeInfos, err := getEsClusterInfors(host)
    if err != nil {
        return false, err
    }
    var hosts []string
    for _, val := range nodeInfos {
        nodeType := val["nodeType"].(string)
        if nodeType == "主节点" {
            ip := val["ip"].(string)
            hosts = append(hosts, ip)
        }
    }
    msg := false
    for i,val := range hosts{
        val =val+":9300"
        hosts[i] = val
@@ -228,30 +344,62 @@
    cmd := exec.Command("sh","-c",str)
    var out bytes.Buffer
    cmd.Stdout = &out
    err := cmd.Run()
    if err != nil {
    err1 := cmd.Run()
    if err1 != nil {
        return false, err
    }
    res := getUnicastHosts()
    fmt.Println("res:          ",res)
    res1 := "discovery.zen.ping.unicast.hosts: "+verificationHosts+""
    fmt.Println("res1:         ",res1)
    if res == res1{
        msg = "加入成功"
        msg = true
    }
    str2 := "echo \"node.master: true\" >> /opt/elasticsearch/config/elasticsearch.yml"
    cmd2 := exec.Command("sh","-c",str2)
    var out2 bytes.Buffer
    cmd2.Stdout = &out2
    err2 := cmd2.Run()
    if err2 != nil {
        msg = "加入失败"
    }
    return msg
    return msg, err
}
func getUnicastHosts() (string){
//func AddEsCluster(hosts []string) (string){
//    msg := "加入失败"
//    for i,val := range hosts{
//        val =val+":9300"
//        hosts[i] = val
//    }
//    verificationHosts := "[\""+strings.Replace(strings.Trim(fmt.Sprint(hosts), "[]"), " ", "\",\"", -1)+"\"]"
//    for i,val := range hosts{
//        val ="\\\""+val+"\\\""
//        hosts[i] = val
//    }
//    oldUnicastHost := "\\[\\\"0.0.0.0:9300\\\"\\]"
//    newUnicastHost := strings.Replace(strings.Trim(fmt.Sprint(hosts), ""), " ", ",", -1)
//    str := "sed -ie 's/discovery.zen.ping.unicast.hosts: "+oldUnicastHost+"/discovery.zen.ping.unicast.hosts: "+newUnicastHost+"/g' /opt/elasticsearch/config/elasticsearch.yml"
//    fmt.Println(str)
//    cmd := exec.Command("sh","-c",str)
//    var out bytes.Buffer
//    cmd.Stdout = &out
//    err := cmd.Run()
//    if err != nil {
//
//    }
//    res := getUnicastHosts()
//    fmt.Println("res:          ",res)
//    res1 := "discovery.zen.ping.unicast.hosts: "+verificationHosts+""
//    fmt.Println("res1:         ",res1)
//    if res == res1{
//        msg = "加入成功"
//    }
//    str2 := "echo \"node.master: true\" >> /opt/elasticsearch/config/elasticsearch.yml"
//    cmd2 := exec.Command("sh","-c",str2)
//    var out2 bytes.Buffer
//    cmd2.Stdout = &out2
//    err2 := cmd2.Run()
//    if err2 != nil {
//        msg = "加入失败"
//    }
//    return msg
//
//}
func getUnicastHosts() string {
    str := "cat /opt/elasticsearch/config/elasticsearch.yml | grep discovery.zen.ping.unicast.hosts:"
    cmd := exec.Command("sh","-c",str)
    var out bytes.Buffer
extend/code/code.go
@@ -61,4 +61,7 @@
    ComError = &Code{http.StatusInternalServerError, false, ""}
    ClusterNodesEmpty = &Code{http.StatusOK, true, "集群节点为空"}
    AddTaskErr = &Code{http.StatusInternalServerError, false, "此国标摄像机已在其它服务器配置任务!"}
    CreateFirstNodeErr  = &Code{http.StatusInternalServerError, false, "创建节点失败!"}
    QueryClusterInfoErr = &Code{http.StatusInternalServerError, false, "查询失败,请确认您的ip是正确的!"}
    AddClusterInfoErr   = &Code{http.StatusInternalServerError, false, "加入节点失败!"}
)
router/router.go
@@ -29,6 +29,7 @@
    ssController := new(controllers.SysSetController)
    sdkController := new(controllers.SdkController)
    esSearchController := new(controllers.EsSearchController)
    esManagementController := new(controllers.EsManagementController)
    realTimeController := new(controllers.RealTimeController)
    cameraTimeRuleController := new(controllers.CameraTimeruleController)
    polygonController := new(controllers.CameraPolygonController)
@@ -145,13 +146,16 @@
        task.POST("/saveTaskSdkRule", taskController.SaveTaskSdkRule)
    }
    // 检索 查询
    // 检索 查询 节点操作
    es := r.Group(urlPrefix + "/es")
    {
        es.POST("/tagList", esSearchController.PostEsTagList)
        es.POST("/taskList", esSearchController.PostEsTaskList)
        es.POST("/esSearch", esSearchController.PostEsSearch)
        es.POST("/queryEsCompareData", controllers.PostEsCompareData)
        es.POST("/getEsClusterInfo", esManagementController.GetEsClusterInfo)
        es.POST("/addCluster", esManagementController.AddCluster)
        es.POST("/createNode", esManagementController.CreateNode)
    }
    //实时被调数据