package service import ( "bytes" "encoding/json" "fmt" "io/ioutil" "net/http" "time" "vamicro/config" "vamicro/saas-service/service/nodeService" "basic.com/pubsub/protomsg.git" "basic.com/valib/bhomeclient.git" "basic.com/valib/logger.git" ) func StartSubTopic(ms *bhomeclient.MicroNode, f func(protomsg.DbAction, string)) { logger.Info("StartSubTopic...") defer func() { if r := recover(); r != nil { logger.Errorf("StartSubTopic recover r=%v", r) } }() for { select { case msg := <-ms.SubCh: if msg == nil { return } // 订阅timerule消息 topic := string(msg.Topic) if topic == "scene-service" { var data protomsg.DbChangeMessage logger.Infof("StartSubTopic, topic=scene-service, data=%v, len(ms.SubCh)=%v", string(msg.Data), len(ms.SubCh)) if err := json.Unmarshal(msg.Data, &data); err != nil { return } if data.Table == protomsg.TableChanged_T_TimeRule { f(data.Action, data.Info) } } } } } func SubTopicCallback(dType protomsg.DbAction, info string) { type CameraTimerule struct { Id string `json:"id"` Name string `json:"name"` TimeRule string `json:"time_rule"` } var timerule CameraTimerule if err := json.Unmarshal([]byte(info), &timerule); err != nil { logger.Errorf("notifyTimeRule to saas SubTopicCallback Unmarshal, err=%v", err) return } notifyTimeRule(dType, timerule.Id, timerule.Name, timerule.TimeRule) } // 通知 saas 平台 timerule 变更 func notifyTimeRule(dType protomsg.DbAction, id, name, tr string) { // 非集群信息,不通知saas if nodeService.Node.ClusterId == "" { logger.Info("notifyTimeRule to saas, is not cluster, no notify saas") return } var url string var srcData []byte if dType == protomsg.DbAction_Delete { url = fmt.Sprintf("http://%v/saas/api-s/cameraTimerule/delete", config.SaasConf.Url) srcData = []byte(fmt.Sprintf("{\"id\":\"%v\",\"from\":\"vamicro\"}", id)) } else if dType == protomsg.DbAction_Insert || dType == protomsg.DbAction_Update { url = fmt.Sprintf("http://%v/saas/api-s/cameraTimerule/save", config.SaasConf.Url) type TimeRange struct { Start string `json:"start"` End string `json:"end"` } type DayCtl struct { Day int32 `json:"day"` TimeRange []*TimeRange `json:"time_range"` } type CameraTimerule struct { Id string `json:"id"` Name string `json:"name"` TimeRule []*DayCtl `json:"time_rule"` ClusterId string `json:"clusterId"` DevId string `json:"devId"` From string `json:"from"` } timerule := CameraTimerule{ Id: id, Name: name, ClusterId: nodeService.Node.ClusterId, DevId: nodeService.Node.NodeId, From: "vamicro", } var dayC []*DayCtl if err := json.Unmarshal([]byte(tr), &dayC); err != nil { logger.Errorf("notifyTimeRule to saas Unmarshal, err=%v", err) return } timerule.TimeRule = dayC srcData, _ = json.Marshal(timerule) } else { logger.Errorf("notifyTimeRule to saas dType=%v undefine", dType) return } retBuf, err := HttpRCT("POST", url, srcData, 30*time.Second) logger.Debugf("notifyTimeRule to saas HttpRCT url=%v, request=%v, response=%v, err=%v", url, string(srcData), string(retBuf), err) if err != nil { logger.Errorf("notifyTimeRule to saas HttpRCT err=%v", err) return } } func HttpRCT(method string, url string, parama []byte, timeout time.Duration) (buf []byte, err error) { client := http.Client{ Timeout: timeout, } request, err := http.NewRequest(method, url, bytes.NewBuffer(parama)) request.Header.Set("Content-type", "application/json") request.Header.Add("Authorization", "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE5NTIxNTcwNjEsInVzZXIiOiJ7XCJpZFwiOlwiNjI0ZjY4ZGYtNTA1NS00OTMzLTkyNTktZGQ1YTZkNzY4YWE0XCIsXCJwYXJlbnRJZFwiOlwiXCIsXCJwZXJtaXNzaW9uc1wiOm51bGwsXCJ1c2VybmFtZVwiOlwiQWRtaW5pc3RyYXRvclwifSJ9.9hCLlblpvxd6R21uxeNF0ppURnv1Pbu9Mhi4yRiK8_s") if err != nil { logger.Debug("build request err:", err) return nil, err } resp, err := client.Do(request) if err != nil { logger.Debug("request error: ", err) return nil, err } defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) if err != nil { logger.Debug("read body err:", err) return nil, err } return body, nil }