package service
|
|
import (
|
"basic.com/valib/logger.git"
|
"basic.com/valib/serf.git/client"
|
"encoding/json"
|
"errors"
|
"time"
|
"vamicro/config"
|
"vamicro/devicemanage-service/models"
|
"vamicro/system-service/serf"
|
)
|
|
//处理申请信息,维护状态
|
func DealApply() {
|
//1.处理待发送请求的数据,将请求发送到目标设备,并将status设置为已申请
|
go applying()
|
//2.处理已通过的数据,将此设备写入到device表中,表示可以控制此设备了
|
go applied()
|
}
|
|
func applying() {
|
var da models.DeviceApply
|
for {
|
list := da.FindByStatus(models.ApplyStatus_Sending)
|
if list != nil && len(list) > 0 {
|
for _,d := range list {
|
if sendApply(d.ApplyKey, config.Server.AnalyServerId, d.Ip, d.DevId ) {
|
da.UpdateStatus(models.ApplyStatus_Waiting, d.Id)
|
}
|
}
|
}
|
time.Sleep(time.Second * 3)
|
}
|
}
|
|
func applied() {
|
var da models.DeviceApply
|
for {
|
list := da.FindByStatus(models.ApplyStatus_Agreed)
|
if list != nil && len(list) > 0 {
|
for _,d :=range list {
|
//1.将信息写入到device表中
|
//2.将此条申请信息置为Managed状态
|
doAgreedByTx(d)
|
}
|
}
|
time.Sleep(time.Second * 3)
|
}
|
}
|
|
func doAgreedByTx(da models.DeviceApply) {
|
tx := models.GetDB().Begin()
|
var err error
|
defer func() {
|
if err != nil || tx != nil {
|
tx.Rollback()
|
}
|
}()
|
var tmp models.DeviceApply
|
i, _ := tmp.FindByDevId(da.DevId)
|
if i >0 {
|
err = errors.New("device表中已存在此设备")
|
return
|
}
|
//1.获取设备信息
|
//d := getRemoteDevInfo(da.DevId, da.Ip)
|
//d.Id = uuid.NewV4().String()
|
//if b := d.Insert(); !b {
|
// err = errors.New("新增device失败")
|
// return
|
//}
|
if !da.UpdateStatus(models.ApplyStatus_Managed, da.Id) {
|
err = errors.New("修改managed状态失败")
|
return
|
}
|
tx.Commit()
|
}
|
|
type devCollectNew struct {
|
DeviceList []models.Device `json:"deviceList"`
|
Sdks []models.DeviceSdk `json:"sdks"`
|
Apps []models.DeviceApp `json:"apps"`
|
}
|
const CollectDeviceTopic = "collect-manage-device-info"
|
func CollectManageDeviceInfo(data []byte) error {
|
applyM := make(map[string]struct{})
|
var dApply models.DeviceApply
|
list := dApply.FindByStatus(models.ApplyStatus_Managed)
|
for _, d := range list {
|
applyM[d.DevId] = struct{}{}
|
}
|
var dc devCollectNew
|
e := json.Unmarshal(data, &dc)
|
if e == nil {
|
tx := models.GetDB().Begin()
|
var err error
|
var devModel models.Device
|
var dsModel models.DeviceSdk
|
var daModel models.DeviceApp
|
defer func() {
|
if err != nil && tx != nil {
|
tx.Rollback()
|
}
|
}()
|
for _, d := range dc.DeviceList {
|
if _,ok := applyM[d.DevId]; ok {
|
if rows,_ := devModel.SelectById(d.Id); rows >0 {
|
if err = tx.Table(d.TableName()).Save(&d).Error;err != nil {
|
return err
|
}
|
} else {
|
if err = tx.Table(d.TableName()).Create(&d).Error;err != nil {
|
return err
|
}
|
}
|
}
|
|
}
|
for _,ds := range dc.Sdks {
|
if rows,_ := dsModel.SelectById(ds.Id); rows >0 {
|
if err = tx.Table(ds.TableName()).Save(&ds).Error;err != nil {
|
return err
|
}
|
} else {
|
if err = tx.Table(ds.TableName()).Create(&ds).Error;err != nil {
|
return err
|
}
|
}
|
}
|
|
for _,da := range dc.Apps {
|
if rows,_ := daModel.SelectById(da.Id); rows >0 {
|
if err = tx.Table(da.TableName()).Save(&da).Error;err != nil {
|
return err
|
}
|
} else {
|
if err = tx.Table(da.TableName()).Create(&da).Error;err != nil {
|
return err
|
}
|
}
|
}
|
|
|
tx.Commit()
|
return nil
|
} else {
|
logger.Error("unmarshal err:", e)
|
return e
|
}
|
}
|
|
func getRemoteDevInfo(devId, ip string) {
|
|
|
}
|
|
//将请求发送到指定节点上
|
func sendApply(key, devId, ip, targetDevId string) bool {
|
//ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
|
|
body := map[string]interface{} {
|
"key": key,
|
"fromDevId": devId,
|
"fromIp": ip,
|
}
|
bts, _ := json.Marshal(body)
|
|
resp, err := WrapQueryRpc("DevAuthApply", bts, targetDevId, ip)
|
|
return err ==nil && len(resp) >0
|
}
|
|
func WrapQueryRpc(topic string, data []byte, targetDevId string, targetIp string) ([]client.NodeResponse, error) {
|
arg := serf.RpcParamTopic{
|
Topic: topic,
|
Data: data,
|
}
|
d, _ := json.Marshal(arg)
|
|
param := serf.RpcParam{
|
Name: serf.QueryRpc,
|
Timeout: time.Second * 5,
|
FilterNodes: []string { targetDevId },
|
Data: d,
|
}
|
|
return serf.RpcQuery(targetIp, ¶m)
|
}
|