zhangzengfei
2023-09-05 1b34d7bacad94933ad63fc0e199bd32ac49d9fa5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
package service
 
import (
    "basic.com/valib/logger.git"
    "basic.com/valib/serf.git/client"
    "encoding/json"
    "errors"
    "time"
    "vamicro/config"
    "vamicro/devicemanage-service/models"
    "vamicro/system-service/serf"
)
 
//处理申请信息,维护状态
func DealApply() {
    //1.处理待发送请求的数据,将请求发送到目标设备,并将status设置为已申请
    go applying()
    //2.处理已通过的数据,将此设备写入到device表中,表示可以控制此设备了
    go applied()
}
 
func applying() {
    var da models.DeviceApply
    for {
        list := da.FindByStatus(models.ApplyStatus_Sending)
        if list != nil && len(list) > 0 {
            for _,d := range list {
                if sendApply(d.ApplyKey, config.Server.AnalyServerId, d.Ip, d.DevId ) {
                    da.UpdateStatus(models.ApplyStatus_Waiting, d.Id)
                }
            }
        }
        time.Sleep(time.Second * 3)
    }
}
 
func applied() {
    var da models.DeviceApply
    for {
        list := da.FindByStatus(models.ApplyStatus_Agreed)
        if list != nil && len(list) > 0 {
            for _,d :=range list {
                //1.将信息写入到device表中
                //2.将此条申请信息置为Managed状态
                doAgreedByTx(d)
            }
        }
        time.Sleep(time.Second * 3)
    }
}
 
func doAgreedByTx(da models.DeviceApply)  {
    tx := models.GetDB().Begin()
    var err error
    defer func() {
        if err != nil || tx != nil {
            tx.Rollback()
        }
    }()
    var tmp models.DeviceApply
    i, _ := tmp.FindByDevId(da.DevId)
    if i >0 {
        err = errors.New("device表中已存在此设备")
        return
    }
    //1.获取设备信息
    //d := getRemoteDevInfo(da.DevId, da.Ip)
    //d.Id = uuid.NewV4().String()
    //if b := d.Insert(); !b {
    //    err = errors.New("新增device失败")
    //    return
    //}
    if !da.UpdateStatus(models.ApplyStatus_Managed, da.Id) {
        err = errors.New("修改managed状态失败")
        return
    }
    tx.Commit()
}
 
type devCollectNew struct {
    DeviceList     []models.Device     `json:"deviceList"`
    Sdks         []models.DeviceSdk     `json:"sdks"`
    Apps         []models.DeviceApp     `json:"apps"`
}
const CollectDeviceTopic = "collect-manage-device-info"
func CollectManageDeviceInfo(data []byte) error {
    applyM := make(map[string]struct{})
    var dApply models.DeviceApply
    list := dApply.FindByStatus(models.ApplyStatus_Managed)
    for _, d := range list {
        applyM[d.DevId] = struct{}{}
    }
    var dc devCollectNew
    e := json.Unmarshal(data, &dc)
    if e == nil {
        tx := models.GetDB().Begin()
        var err error
        var devModel models.Device
        var dsModel models.DeviceSdk
        var daModel models.DeviceApp
        defer func() {
            if err != nil && tx != nil {
                tx.Rollback()
            }
        }()
        for _, d := range dc.DeviceList {
            if _,ok := applyM[d.DevId]; ok {
                if rows,_ := devModel.SelectById(d.Id); rows >0 {
                    if err = tx.Table(d.TableName()).Save(&d).Error;err != nil {
                        return err
                    }
                } else {
                    if err = tx.Table(d.TableName()).Create(&d).Error;err != nil {
                        return err
                    }
                }
            }
 
        }
        for _,ds := range dc.Sdks {
            if rows,_ := dsModel.SelectById(ds.Id); rows >0 {
                if err = tx.Table(ds.TableName()).Save(&ds).Error;err != nil {
                    return err
                }
            } else {
                if err = tx.Table(ds.TableName()).Create(&ds).Error;err != nil {
                    return err
                }
            }
        }
 
        for _,da := range dc.Apps {
            if rows,_ := daModel.SelectById(da.Id); rows >0 {
                if err = tx.Table(da.TableName()).Save(&da).Error;err != nil {
                    return err
                }
            } else {
                if err = tx.Table(da.TableName()).Create(&da).Error;err != nil {
                    return err
                }
            }
        }
 
 
        tx.Commit()
        return nil
    } else {
        logger.Error("unmarshal err:", e)
        return e
    }
}
 
func getRemoteDevInfo(devId, ip string)  {
 
 
}
 
//将请求发送到指定节点上
func sendApply(key, devId, ip, targetDevId string) bool {
    //ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
 
    body := map[string]interface{} {
        "key":       key,
        "fromDevId": devId,
        "fromIp":    ip,
    }
    bts, _ := json.Marshal(body)
 
    resp, err := WrapQueryRpc("DevAuthApply", bts, targetDevId, ip)
 
    return err ==nil && len(resp) >0
}
 
func WrapQueryRpc(topic string, data []byte, targetDevId string, targetIp string) ([]client.NodeResponse, error) {
    arg := serf.RpcParamTopic{
        Topic: topic,
        Data: data,
    }
    d, _ := json.Marshal(arg)
 
    param := serf.RpcParam{
        Name: serf.QueryRpc,
        Timeout: time.Second * 5,
        FilterNodes: []string { targetDevId },
        Data: d,
    }
 
    return serf.RpcQuery(targetIp, &param)
}