liuxiaolong
2019-08-02 24e2d92df7cad99e1e514fa581fe59765dbc4a12
rm discovery server
3个文件已修改
255 ■■■■ 已修改文件
controllers/dbtableperson.go 109 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controllers/dbtablesCon.go 36 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
discovery/server.go 110 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controllers/dbtableperson.go
@@ -1,12 +1,10 @@
package controllers
import (
    "basic.com/pubsub/protomsg.git"
    "encoding/json"
    "log"
    "strconv"
    "time"
    "webserver/discovery"
    "webserver/extend/logger"
    "github.com/gin-gonic/gin"
@@ -16,7 +14,6 @@
    "webserver/extend/esutil"
    "webserver/extend/util"
    "webserver/models"
    esApi "basic.com/pubsub/esutil.git"
)
type DbPersonController struct {
@@ -63,16 +60,16 @@
    params := string(personbytes)
    logger.Debug("请求url:%s;\n 请求参数params:%s", url, params)
    data, _ := esutil.PutEsDataReq(url, params)
    if data["_id"] !=""{
        //通知比对进程缓存更新
        discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
            Type: protomsg.EsCacheChanged_T_DbTablePerson,
            PersonId: personId,
            TableId: []string{ dbperson.TableId },
            Feature: dbperson.FaceFeature,
            Action: protomsg.DbAction_Insert,
        })
    }
    //if data["_id"] !=""{
    //    //通知比对进程缓存更新
    //    discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
    //        Type: protomsg.EsCacheChanged_T_DbTablePerson,
    //        PersonId: personId,
    //        TableId: []string{ dbperson.TableId },
    //        Feature: dbperson.FaceFeature,
    //        Action: protomsg.DbAction_Insert,
    //    })
    //}
    //c.JSON(200, changeEsRespData(data, "添加人员成功"))
    result = changeEsRespData(data, "添加成功")
@@ -111,23 +108,23 @@
    result := changeEsRespData(data, "修改成功")
    if result["success"].(bool) {
        //code.Success.Message = "修改底库人员成功"
        if dbperson.Enable == 1 {
            discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
                Type: protomsg.EsCacheChanged_T_DbTablePerson,
                PersonId: dbperson.Id,
                TableId: []string{ dbperson.TableId },
                Feature: "",
                Action: protomsg.DbAction_Insert,
            })
        } else {
            discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
                Type: protomsg.EsCacheChanged_T_DbTablePerson,
                PersonId: dbperson.Id,
                TableId: []string{ dbperson.TableId },
                Feature: "",
                Action: protomsg.DbAction_Delete,
            })
        }
        //if dbperson.Enable == 1 {
        //    discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
        //        Type: protomsg.EsCacheChanged_T_DbTablePerson,
        //        PersonId: dbperson.Id,
        //        TableId: []string{ dbperson.TableId },
        //        Feature: "",
        //        Action: protomsg.DbAction_Insert,
        //    })
        //} else {
        //    discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
        //        Type: protomsg.EsCacheChanged_T_DbTablePerson,
        //        PersonId: dbperson.Id,
        //        TableId: []string{ dbperson.TableId },
        //        Feature: "",
        //        Action: protomsg.DbAction_Delete,
        //    })
        //}
        util.ResponseFormat(c, code.Success, result["data"])
    } else {
        //code.ServiceInsideError.Message += result["msg"].(string)
@@ -159,18 +156,18 @@
    if result["success"].(bool) {
        //code.Success.Message = "删除底库人员成功"
        //通知比对进程,此人已删除
        dbperArr, e := esApi.Dbpersoninfosbyid([]string{uuid}, config.EsInfo.EsIndex.Dbtablepersons.IndexName, config.EsInfo.Masterip, config.EsInfo.Httpport)
        if e ==nil && len(dbperArr) > 0{
            if dbperArr[0].TableId !=""{
                discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
                    Type: protomsg.EsCacheChanged_T_DbTablePerson,
                    PersonId: uuid,
                    TableId: []string{ dbperArr[0].TableId },
                    Feature: "",
                    Action: protomsg.DbAction_Delete,
                })
            }
        }
        //dbperArr, e := esApi.Dbpersoninfosbyid([]string{uuid}, config.EsInfo.EsIndex.Dbtablepersons.IndexName, config.EsInfo.Masterip, config.EsInfo.Httpport)
        //if e ==nil && len(dbperArr) > 0{
        //    if dbperArr[0].TableId !=""{
        //        discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
        //            Type: protomsg.EsCacheChanged_T_DbTablePerson,
        //            PersonId: uuid,
        //            TableId: []string{ dbperArr[0].TableId },
        //            Feature: "",
        //            Action: protomsg.DbAction_Delete,
        //        })
        //    }
        //}
        util.ResponseFormat(c, code.Success, result["data"])
    } else {
@@ -203,20 +200,20 @@
    //result := changeEsRespData(data, "删除成功")
    if data["error"] == nil {
        //code.Success.Message = "删除底库人员成功"
        dbperArr, e := esApi.Dbpersoninfosbyid(uuids, config.EsInfo.EsIndex.Dbtablepersons.IndexName, config.EsInfo.Masterip, config.EsInfo.Httpport)
        if e ==nil && len(dbperArr) > 0{
            for _,esPer :=range dbperArr {
                if esPer.TableId !=""{
                    discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
                        Type: protomsg.EsCacheChanged_T_DbTablePerson,
                        PersonId: esPer.Id,
                        TableId: []string{ esPer.TableId },
                        Feature: "",
                        Action: protomsg.DbAction_Delete,
                    })
                }
            }
        }
        //dbperArr, e := esApi.Dbpersoninfosbyid(uuids, config.EsInfo.EsIndex.Dbtablepersons.IndexName, config.EsInfo.Masterip, config.EsInfo.Httpport)
        //if e ==nil && len(dbperArr) > 0{
        //    for _,esPer :=range dbperArr {
        //        if esPer.TableId !=""{
        //            discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
        //                Type: protomsg.EsCacheChanged_T_DbTablePerson,
        //                PersonId: esPer.Id,
        //                TableId: []string{ esPer.TableId },
        //                Feature: "",
        //                Action: protomsg.DbAction_Delete,
        //            })
        //        }
        //    }
        //}
        util.ResponseFormat(c, code.Success, "删除底库人员成功")
    } else {
        //code.ServiceInsideError.Message += result["msg"].(string)
controllers/dbtablesCon.go
@@ -2,14 +2,12 @@
import (
    "basic.com/dbapi.git"
    "basic.com/pubsub/protomsg.git"
    "encoding/json"
    "fmt"
    "github.com/gin-gonic/gin"
    "github.com/satori/go.uuid"
    "log"
    "time"
    "webserver/discovery"
    "webserver/extend/code"
    "webserver/extend/config"
    "webserver/extend/esutil"
@@ -102,23 +100,23 @@
    result := changeEsRespData(data, "修改成功")
    if result["success"].(bool) {
        //code.Success.Message = "修改底库成功"
        if dbtable.Enable ==1 {
            discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
                Type: protomsg.EsCacheChanged_T_DbTable,
                PersonId: "",
                TableId: []string{ dbtable.Id },
                Feature: "",
                Action: protomsg.DbAction_Insert,
            })
        } else {
            discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
                Type: protomsg.EsCacheChanged_T_DbTable,
                PersonId: "",
                TableId: []string{ dbtable.Id },
                Feature: "",
                Action: protomsg.DbAction_Delete,
            })
        }
        //if dbtable.Enable ==1 {
        //    discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
        //        Type: protomsg.EsCacheChanged_T_DbTable,
        //        PersonId: "",
        //        TableId: []string{ dbtable.Id },
        //        Feature: "",
        //        Action: protomsg.DbAction_Insert,
        //    })
        //} else {
        //    discovery.AddDbMessage(&protomsg.EsPersonCacheChange{
        //        Type: protomsg.EsCacheChanged_T_DbTable,
        //        PersonId: "",
        //        TableId: []string{ dbtable.Id },
        //        Feature: "",
        //        Action: protomsg.DbAction_Delete,
        //    })
        //}
        util.ResponseFormat(c, code.Success, result["data"])
    } else {
        //code.ServiceInsideError.Message += result["msg"].(string)
discovery/server.go
@@ -1,57 +1,57 @@
package discovery
import (
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/gopherdiscovery.git"
    "github.com/gogo/protobuf/proto"
    "time"
    "webserver/extend/logger"
)
const (
    Discovery_Server = "tcp://0.0.0.0:40008"
    Discovery_UrlPubSub = "tcp://0.0.0.0:50008"
)
//启动discovery的server
var discoveryServer *gopherdiscovery.DiscoveryServer
var dbChangeChan chan *protomsg.EsPersonCacheChange
func StartServer() {
    var clients []string
    var err error
    var (
        defaultOpts = gopherdiscovery.Options{
            SurveyTime:   3 * time.Second,
            //RecvDeadline: 3 * time.Second,
            PollTime:     5 * time.Second,
        }
    )
    discoveryServer, err = gopherdiscovery.Server(Discovery_Server, Discovery_UrlPubSub, defaultOpts)
    logger.Debug("server: ", discoveryServer)
    logger.Debug("err:",err)
    logger.Debug("clients: ",clients)
    dbChangeChan = make(chan *protomsg.EsPersonCacheChange)
    for {
        select {
        case dbMsg := <-dbChangeChan:
            publishMessage(dbMsg)
        }
    }
}
//广播数据库改变的msg
func publishMessage(msg *protomsg.EsPersonCacheChange) {
    sendBytes,err := proto.Marshal(msg)
    if err ==nil{
        discoveryServer.PublishMsg(string(sendBytes))
    }
}
func AddDbMessage(msg *protomsg.EsPersonCacheChange) {
    logger.Debug("MSG EsPersonCacheChange In")
    dbChangeChan<-msg
}
//import (
//    "basic.com/pubsub/protomsg.git"
//    "basic.com/valib/gopherdiscovery.git"
//    "github.com/gogo/protobuf/proto"
//    "time"
//    "webserver/extend/logger"
//)
//
//const (
//    Discovery_Server = "tcp://0.0.0.0:40008"
//    Discovery_UrlPubSub = "tcp://0.0.0.0:50008"
//)
//
////启动discovery的server
//var discoveryServer *gopherdiscovery.DiscoveryServer
//var dbChangeChan chan *protomsg.EsPersonCacheChange
//func StartServer() {
//    var clients []string
//    var err error
//    var (
//        defaultOpts = gopherdiscovery.Options{
//            SurveyTime:   3 * time.Second,
//            //RecvDeadline: 3 * time.Second,
//            PollTime:     5 * time.Second,
//        }
//    )
//
//    discoveryServer, err = gopherdiscovery.Server(Discovery_Server, Discovery_UrlPubSub, defaultOpts)
//
//    logger.Debug("server: ", discoveryServer)
//    logger.Debug("err:",err)
//    logger.Debug("clients: ",clients)
//
//    dbChangeChan = make(chan *protomsg.EsPersonCacheChange)
//
//    for {
//        select {
//        case dbMsg := <-dbChangeChan:
//            publishMessage(dbMsg)
//        }
//    }
//}
//
////广播数据库改变的msg
//func publishMessage(msg *protomsg.EsPersonCacheChange) {
//    sendBytes,err := proto.Marshal(msg)
//    if err ==nil{
//        discoveryServer.PublishMsg(string(sendBytes))
//    }
//}
//
//func AddDbMessage(msg *protomsg.EsPersonCacheChange) {
//    logger.Debug("MSG EsPersonCacheChange In")
//    dbChangeChan<-msg
//}