controllers/dbtableperson.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
controllers/dbtablesCon.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 | |
discovery/server.go | ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史 |
controllers/dbtableperson.go
@@ -1,12 +1,10 @@ package controllers import ( "basic.com/pubsub/protomsg.git" "encoding/json" "log" "strconv" "time" "webserver/discovery" "webserver/extend/logger" "github.com/gin-gonic/gin" @@ -16,7 +14,6 @@ "webserver/extend/esutil" "webserver/extend/util" "webserver/models" esApi "basic.com/pubsub/esutil.git" ) type DbPersonController struct { @@ -63,16 +60,16 @@ params := string(personbytes) logger.Debug("请求url:%s;\n 请求参数params:%s", url, params) data, _ := esutil.PutEsDataReq(url, params) if data["_id"] !=""{ //通知比对进程缓存更新 discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ Type: protomsg.EsCacheChanged_T_DbTablePerson, PersonId: personId, TableId: []string{ dbperson.TableId }, Feature: dbperson.FaceFeature, Action: protomsg.DbAction_Insert, }) } //if data["_id"] !=""{ // //通知比对进程缓存更新 // discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ // Type: protomsg.EsCacheChanged_T_DbTablePerson, // PersonId: personId, // TableId: []string{ dbperson.TableId }, // Feature: dbperson.FaceFeature, // Action: protomsg.DbAction_Insert, // }) //} //c.JSON(200, changeEsRespData(data, "添加人员成功")) result = changeEsRespData(data, "添加成功") @@ -111,23 +108,23 @@ result := changeEsRespData(data, "修改成功") if result["success"].(bool) { //code.Success.Message = "修改底库人员成功" if dbperson.Enable == 1 { discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ Type: protomsg.EsCacheChanged_T_DbTablePerson, PersonId: dbperson.Id, TableId: []string{ dbperson.TableId }, Feature: "", Action: protomsg.DbAction_Insert, }) } else { discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ Type: protomsg.EsCacheChanged_T_DbTablePerson, PersonId: dbperson.Id, TableId: []string{ dbperson.TableId }, Feature: "", Action: protomsg.DbAction_Delete, }) } //if dbperson.Enable == 1 { // discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ // Type: protomsg.EsCacheChanged_T_DbTablePerson, // PersonId: dbperson.Id, // TableId: []string{ dbperson.TableId }, // Feature: "", // Action: protomsg.DbAction_Insert, // }) //} else { // discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ // Type: protomsg.EsCacheChanged_T_DbTablePerson, // PersonId: dbperson.Id, // TableId: []string{ dbperson.TableId }, // Feature: "", // Action: protomsg.DbAction_Delete, // }) //} util.ResponseFormat(c, code.Success, result["data"]) } else { //code.ServiceInsideError.Message += result["msg"].(string) @@ -159,18 +156,18 @@ if result["success"].(bool) { //code.Success.Message = "删除底库人员成功" //通知比对进程,此人已删除 dbperArr, e := esApi.Dbpersoninfosbyid([]string{uuid}, config.EsInfo.EsIndex.Dbtablepersons.IndexName, config.EsInfo.Masterip, config.EsInfo.Httpport) if e ==nil && len(dbperArr) > 0{ if dbperArr[0].TableId !=""{ discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ Type: protomsg.EsCacheChanged_T_DbTablePerson, PersonId: uuid, TableId: []string{ dbperArr[0].TableId }, Feature: "", Action: protomsg.DbAction_Delete, }) } } //dbperArr, e := esApi.Dbpersoninfosbyid([]string{uuid}, config.EsInfo.EsIndex.Dbtablepersons.IndexName, config.EsInfo.Masterip, config.EsInfo.Httpport) //if e ==nil && len(dbperArr) > 0{ // if dbperArr[0].TableId !=""{ // discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ // Type: protomsg.EsCacheChanged_T_DbTablePerson, // PersonId: uuid, // TableId: []string{ dbperArr[0].TableId }, // Feature: "", // Action: protomsg.DbAction_Delete, // }) // } //} util.ResponseFormat(c, code.Success, result["data"]) } else { @@ -203,20 +200,20 @@ //result := changeEsRespData(data, "删除成功") if data["error"] == nil { //code.Success.Message = "删除底库人员成功" dbperArr, e := esApi.Dbpersoninfosbyid(uuids, config.EsInfo.EsIndex.Dbtablepersons.IndexName, config.EsInfo.Masterip, config.EsInfo.Httpport) if e ==nil && len(dbperArr) > 0{ for _,esPer :=range dbperArr { if esPer.TableId !=""{ discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ Type: protomsg.EsCacheChanged_T_DbTablePerson, PersonId: esPer.Id, TableId: []string{ esPer.TableId }, Feature: "", Action: protomsg.DbAction_Delete, }) } } } //dbperArr, e := esApi.Dbpersoninfosbyid(uuids, config.EsInfo.EsIndex.Dbtablepersons.IndexName, config.EsInfo.Masterip, config.EsInfo.Httpport) //if e ==nil && len(dbperArr) > 0{ // for _,esPer :=range dbperArr { // if esPer.TableId !=""{ // discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ // Type: protomsg.EsCacheChanged_T_DbTablePerson, // PersonId: esPer.Id, // TableId: []string{ esPer.TableId }, // Feature: "", // Action: protomsg.DbAction_Delete, // }) // } // } //} util.ResponseFormat(c, code.Success, "删除底库人员成功") } else { //code.ServiceInsideError.Message += result["msg"].(string) controllers/dbtablesCon.go
@@ -2,14 +2,12 @@ import ( "basic.com/dbapi.git" "basic.com/pubsub/protomsg.git" "encoding/json" "fmt" "github.com/gin-gonic/gin" "github.com/satori/go.uuid" "log" "time" "webserver/discovery" "webserver/extend/code" "webserver/extend/config" "webserver/extend/esutil" @@ -102,23 +100,23 @@ result := changeEsRespData(data, "修改成功") if result["success"].(bool) { //code.Success.Message = "修改底库成功" if dbtable.Enable ==1 { discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ Type: protomsg.EsCacheChanged_T_DbTable, PersonId: "", TableId: []string{ dbtable.Id }, Feature: "", Action: protomsg.DbAction_Insert, }) } else { discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ Type: protomsg.EsCacheChanged_T_DbTable, PersonId: "", TableId: []string{ dbtable.Id }, Feature: "", Action: protomsg.DbAction_Delete, }) } //if dbtable.Enable ==1 { // discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ // Type: protomsg.EsCacheChanged_T_DbTable, // PersonId: "", // TableId: []string{ dbtable.Id }, // Feature: "", // Action: protomsg.DbAction_Insert, // }) //} else { // discovery.AddDbMessage(&protomsg.EsPersonCacheChange{ // Type: protomsg.EsCacheChanged_T_DbTable, // PersonId: "", // TableId: []string{ dbtable.Id }, // Feature: "", // Action: protomsg.DbAction_Delete, // }) //} util.ResponseFormat(c, code.Success, result["data"]) } else { //code.ServiceInsideError.Message += result["msg"].(string) discovery/server.go
@@ -1,57 +1,57 @@ package discovery import ( "basic.com/pubsub/protomsg.git" "basic.com/valib/gopherdiscovery.git" "github.com/gogo/protobuf/proto" "time" "webserver/extend/logger" ) const ( Discovery_Server = "tcp://0.0.0.0:40008" Discovery_UrlPubSub = "tcp://0.0.0.0:50008" ) //启动discovery的server var discoveryServer *gopherdiscovery.DiscoveryServer var dbChangeChan chan *protomsg.EsPersonCacheChange func StartServer() { var clients []string var err error var ( defaultOpts = gopherdiscovery.Options{ SurveyTime: 3 * time.Second, //RecvDeadline: 3 * time.Second, PollTime: 5 * time.Second, } ) discoveryServer, err = gopherdiscovery.Server(Discovery_Server, Discovery_UrlPubSub, defaultOpts) logger.Debug("server: ", discoveryServer) logger.Debug("err:",err) logger.Debug("clients: ",clients) dbChangeChan = make(chan *protomsg.EsPersonCacheChange) for { select { case dbMsg := <-dbChangeChan: publishMessage(dbMsg) } } } //广播数据库改变的msg func publishMessage(msg *protomsg.EsPersonCacheChange) { sendBytes,err := proto.Marshal(msg) if err ==nil{ discoveryServer.PublishMsg(string(sendBytes)) } } func AddDbMessage(msg *protomsg.EsPersonCacheChange) { logger.Debug("MSG EsPersonCacheChange In") dbChangeChan<-msg } //import ( // "basic.com/pubsub/protomsg.git" // "basic.com/valib/gopherdiscovery.git" // "github.com/gogo/protobuf/proto" // "time" // "webserver/extend/logger" //) // //const ( // Discovery_Server = "tcp://0.0.0.0:40008" // Discovery_UrlPubSub = "tcp://0.0.0.0:50008" //) // ////启动discovery的server //var discoveryServer *gopherdiscovery.DiscoveryServer //var dbChangeChan chan *protomsg.EsPersonCacheChange //func StartServer() { // var clients []string // var err error // var ( // defaultOpts = gopherdiscovery.Options{ // SurveyTime: 3 * time.Second, // //RecvDeadline: 3 * time.Second, // PollTime: 5 * time.Second, // } // ) // // discoveryServer, err = gopherdiscovery.Server(Discovery_Server, Discovery_UrlPubSub, defaultOpts) // // logger.Debug("server: ", discoveryServer) // logger.Debug("err:",err) // logger.Debug("clients: ",clients) // // dbChangeChan = make(chan *protomsg.EsPersonCacheChange) // // for { // select { // case dbMsg := <-dbChangeChan: // publishMessage(dbMsg) // } // } //} // ////广播数据库改变的msg //func publishMessage(msg *protomsg.EsPersonCacheChange) { // sendBytes,err := proto.Marshal(msg) // if err ==nil{ // discoveryServer.PublishMsg(string(sendBytes)) // } //} // //func AddDbMessage(msg *protomsg.EsPersonCacheChange) { // logger.Debug("MSG EsPersonCacheChange In") // dbChangeChan<-msg //}