package service import ( "basic.com/valib/logger.git" "basic.com/valib/serf.git/client" "encoding/json" "errors" "time" "vamicro/config" "vamicro/devicemanage-service/models" "vamicro/system-service/serf" ) //处理申请信息,维护状态 func DealApply() { //1.处理待发送请求的数据,将请求发送到目标设备,并将status设置为已申请 go applying() //2.处理已通过的数据,将此设备写入到device表中,表示可以控制此设备了 go applied() } func applying() { var da models.DeviceApply for { list := da.FindByStatus(models.ApplyStatus_Sending) if list != nil && len(list) > 0 { for _,d := range list { if sendApply(d.ApplyKey, config.Server.AnalyServerId, d.Ip, d.DevId ) { da.UpdateStatus(models.ApplyStatus_Waiting, d.Id) } } } time.Sleep(time.Second * 3) } } func applied() { var da models.DeviceApply for { list := da.FindByStatus(models.ApplyStatus_Agreed) if list != nil && len(list) > 0 { for _,d :=range list { //1.将信息写入到device表中 //2.将此条申请信息置为Managed状态 doAgreedByTx(d) } } time.Sleep(time.Second * 3) } } func doAgreedByTx(da models.DeviceApply) { tx := models.GetDB().Begin() var err error defer func() { if err != nil || tx != nil { tx.Rollback() } }() var tmp models.DeviceApply i, _ := tmp.FindByDevId(da.DevId) if i >0 { err = errors.New("device表中已存在此设备") return } //1.获取设备信息 //d := getRemoteDevInfo(da.DevId, da.Ip) //d.Id = uuid.NewV4().String() //if b := d.Insert(); !b { // err = errors.New("新增device失败") // return //} if !da.UpdateStatus(models.ApplyStatus_Managed, da.Id) { err = errors.New("修改managed状态失败") return } tx.Commit() } type devCollectNew struct { DeviceList []models.Device `json:"deviceList"` Sdks []models.DeviceSdk `json:"sdks"` Apps []models.DeviceApp `json:"apps"` } const CollectDeviceTopic = "collect-manage-device-info" func CollectManageDeviceInfo(data []byte) error { applyM := make(map[string]struct{}) var dApply models.DeviceApply list := dApply.FindByStatus(models.ApplyStatus_Managed) for _, d := range list { applyM[d.DevId] = struct{}{} } var dc devCollectNew e := json.Unmarshal(data, &dc) if e == nil { tx := models.GetDB().Begin() var err error var devModel models.Device var dsModel models.DeviceSdk var daModel models.DeviceApp defer func() { if err != nil && tx != nil { tx.Rollback() } }() for _, d := range dc.DeviceList { if _,ok := applyM[d.DevId]; ok { if rows,_ := devModel.SelectById(d.Id); rows >0 { if err = tx.Table(d.TableName()).Save(&d).Error;err != nil { return err } } else { if err = tx.Table(d.TableName()).Create(&d).Error;err != nil { return err } } } } for _,ds := range dc.Sdks { if rows,_ := dsModel.SelectById(ds.Id); rows >0 { if err = tx.Table(ds.TableName()).Save(&ds).Error;err != nil { return err } } else { if err = tx.Table(ds.TableName()).Create(&ds).Error;err != nil { return err } } } for _,da := range dc.Apps { if rows,_ := daModel.SelectById(da.Id); rows >0 { if err = tx.Table(da.TableName()).Save(&da).Error;err != nil { return err } } else { if err = tx.Table(da.TableName()).Create(&da).Error;err != nil { return err } } } tx.Commit() return nil } else { logger.Error("unmarshal err:", e) return e } } func getRemoteDevInfo(devId, ip string) { } //将请求发送到指定节点上 func sendApply(key, devId, ip, targetDevId string) bool { //ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter) body := map[string]interface{} { "key": key, "fromDevId": devId, "fromIp": ip, } bts, _ := json.Marshal(body) resp, err := WrapQueryRpc("DevAuthApply", bts, targetDevId, ip) return err ==nil && len(resp) >0 } func WrapQueryRpc(topic string, data []byte, targetDevId string, targetIp string) ([]client.NodeResponse, error) { arg := serf.RpcParamTopic{ Topic: topic, Data: data, } d, _ := json.Marshal(arg) param := serf.RpcParam{ Name: serf.QueryRpc, Timeout: time.Second * 5, FilterNodes: []string { targetDevId }, Data: d, } return serf.RpcQuery(targetIp, ¶m) }