zhangzengfei
2023-09-05 1b34d7bacad94933ad63fc0e199bd32ac49d9fa5
修复编译
15个文件已添加
1个文件已修改
1601 ■■■■■ 已修改文件
devicemanage-service/Makefile 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/broadcast/client.go 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/controllers/device.go 428 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/main.go 134 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/models/db.go 32 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/models/device.go 137 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/models/deviceApp.go 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/models/deviceApply.go 69 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/models/deviceSdk.go 85 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/service/ctrl.go 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/service/device.go 189 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/vo/ctrl.go 47 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/vo/device.go 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/controllers/syssetconf.go 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/service/busRequest.go 141 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/service/toNode.go 80 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
devicemanage-service/Makefile
New file
@@ -0,0 +1,7 @@
BUILD_VERSION := 1.1.1
#APP_NAME      := myversion
include ../module.dep
devicemanage-service/broadcast/client.go
New file
@@ -0,0 +1,79 @@
package broadcast
import (
    "fmt"
    "github.com/gogf/greuse"
    "strconv"
    "strings"
    "sync"
    "time"
)
var (
    port = 31995
    lport = 31996
    lock sync.Mutex    //同一时刻只能有一个线程在执行搜索
)
//广播收集其他节点信息
func BroadCast() ([]string, error) {
    lock.Lock()
    defer lock.Unlock()
    retCh := make(chan []string)
    go startRecv(retCh)
    conn, err := greuse.Dial("udp", "0.0.0.0:"+strconv.Itoa(lport), "255.255.255.255:"+strconv.Itoa(port))
    if err != nil {
        fmt.Println("err:", err)
    } else {
        defer conn.Close()
        n,err := conn.Write([]byte("who are you?"))
        if err != nil || n == 0 {
            fmt.Println("conn.Write err:", err, " n:", n)
        }
    }
    nodes := <-retCh
    return nodes, nil
}
func startRecv(rCh chan []string) {
    conn, err := greuse.ListenPacket("udp", "0.0.0.0:"+strconv.Itoa(lport))
    if err != nil {
        fmt.Println("startRecv ListenPacket err:", err)
        rCh <- []string{}
        return
    }
    //10秒钟之内收到的返回,即认为是在线的
    conn.SetReadDeadline(time.Now().Add(time.Second * 1))
    ch := time.After(time.Second * 4)
    var nodes []string
Loop:
    for {
        select {
        case <-ch:
            fmt.Println("<-ch")
            break Loop
        default:
            ret := make([]byte, 1024)
            n, from, e := conn.ReadFrom(ret)
            if e == nil && n >0 {
                arr := strings.Split(from.String(), ":")
                if len(arr) == 2 {
                    nodes = append(nodes, arr[0])
                }
                fmt.Println("read message from udp:", string(ret), " from:", from)
            } else {
                time.Sleep(time.Millisecond * 100)
            }
        }
    }
    rCh <- nodes
}
devicemanage-service/controllers/device.go
New file
@@ -0,0 +1,428 @@
package controllers
import (
    "basic.com/valib/bhomeclient.git"
    "basic.com/valib/bhomedbapi.git"
    "basic.com/valib/logger.git"
    "context"
    "encoding/json"
    "github.com/satori/go.uuid"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
    "strconv"
    "time"
    "vamicro/devicemanage-service/broadcast"
    "vamicro/devicemanage-service/models"
    "vamicro/devicemanage-service/service"
    "vamicro/devicemanage-service/vo"
)
type DeviceController struct {
}
//搜索设备
func (dc *DeviceController) Search(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    nodes, err := broadcast.BroadCast()
    if err != nil {
        return &bhomeclient.Reply{ Msg: err.Error()}
    }
    if nodes == nil {
        nodes = make([]string, 0)
    }
    return &bhomeclient.Reply{ Success: true, Data: nodes}
}
//申请添加设备
func (dc *DeviceController) AddApply(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    type addApplyArg struct {
        Key     string     `json:"key"`
        DevId     string     `json:"devId"`
        Ip         string     `json:"ip"`
    }
    var reqBody addApplyArg
    err := c.BindJSON(&reqBody)
    if err != nil || reqBody.Key == "" || reqBody.DevId == "" || reqBody.Ip == "" {
        return &bhomeclient.Reply{ Msg: "参数有误"}
    }
    var da models.DeviceApply
    i,e := da.FindByDevId(reqBody.DevId)
    if e != nil || i == 0 { //未申请添加过此设备
        da.Id = uuid.NewV4().String()
        da.DevId = reqBody.DevId
        da.CreateTime = time.Now().Format("2006-01-02 15:04:05")
        da.ApplyKey = reqBody.Key
        da.Ip = reqBody.Ip
        da.Status = models.ApplyStatus_Sending
        if da.Insert() {
            return &bhomeclient.Reply{ Success: true, Msg: "添加成功"}
        } else {
            return &bhomeclient.Reply{ Success: true, Msg: "添加失败"}
        }
    } else if da.Status == models.ApplyStatus_Reject { //已被拒绝的请求,可以重新发起申请
        if da.UpdateStatus(models.ApplyStatus_Sending, da.Id) {
            return &bhomeclient.Reply{ Success: true, Msg: "操作成功"}
        } else {
            return &bhomeclient.Reply{ Msg: "重新发送请求失败"}
        }
    }
    return &bhomeclient.Reply{ Msg: "已添加对此设备的申请"}
}
func RecvApprove(ctx context.Context) {
    url := "0.0.0.0:"+strconv.Itoa(4012)
    var sock mangos.Socket
    var err error
    var msg []byte
    if sock, err = rep.NewSocket(); err != nil {
        logger.Debug("new applyApprove socket err:", err)
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Listen(url); err != nil {
        logger.Debug("listen on applyApprove socket err:", err)
    }
    for {
        select {
        case <-ctx.Done():
            logger.Debug("ctx done")
            return
        default:
            msg, err = sock.Recv()
            if err != nil {
                continue
            }
            if len(msg) > 0 {
                apply := dealApprove(msg)
                ret, _ := json.Marshal(*apply)
                retErr := sock.Send(ret)
                if retErr != nil {
                    logger.Debug("retErr:", retErr)
                }
            } else {
                time.Sleep(time.Millisecond*100)
            }
        }
    }
}
func dealApprove(msg []byte) *bhomeclient.Reply {
    type approveArg struct {
        DevId     string     `json:"devId"`  //设备id
        Status     int     `json:"status"` //-1表示拒绝,1:表示通过
    }
    var reqBody approveArg
    err := json.Unmarshal(msg, &reqBody)
    if err != nil || reqBody.Status < models.ApplyStatus_Reject || reqBody.Status > models.ApplyStatus_Agreed {
        return &bhomeclient.Reply{ Msg: "请求参数有误"}
    }
    var da models.DeviceApply
    i, _ := da.FindByDevId(reqBody.DevId)
    if i == 0 {
        return &bhomeclient.Reply{ Msg: reqBody.DevId+"的申请请求已不存在,请检查" }
    }
    if da.Status != models.ApplyStatus_Waiting {
        return &bhomeclient.Reply{ Msg: "当前此设备id不是申请状态,请检查" }
    }
    if da.UpdateStatus(reqBody.Status, da.Id) {
        return &bhomeclient.Reply{ Success: true, Msg: "操作成功"}
    }
    return &bhomeclient.Reply{ Msg: "操作失败"}
}
//其他节点通过申请或者拒绝申请
func (dc *DeviceController) ApplyApprove(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    return &bhomeclient.Reply{ Msg: "building..."}
}
//func (dc *DeviceController) SyncDevToManager(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply {
//    var reqBody sysVo.SyncDevInfo
//    err := c.BindJSON(&reqBody)
//    if err != nil {
//        return &bhomeclient.Reply{ Msg: err.Error() }
//    }
//
//    return &bhomeclient.Reply{ Data: reqBody }
//}
//设备归类统计,分全部设备、集群xx设备、未加入集群
func (dc *DeviceController) Types(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    type devStatistic struct {
        Name           string         `json:"name"`
        DevCount     int         `json:"devCount"`
        SdkCount     int         `json:"sdkCount"`
        Percent     string         `json:"percent"`
        ClusterId     string         `json:"clusterId"`
        Type         int         `json:"type"`
    }
    list := make([]devStatistic, 0)
    //1.统计全部设备
    total := devStatistic{
        Name: "全部设备",
        DevCount: 100,
        SdkCount: 45,
        Percent: "80%",
        Type: 0,
    }
    list = append(list, total)
    //2.按集群统计
    var d models.Device
    devCts := d.GroupByCluster()
    if devCts != nil {
        for _,g := range devCts {
            ds := devStatistic{
                Name: g.ClusterName,
                ClusterId: g.ClusterId,
                DevCount: g.Count,
                SdkCount: 0,
                Percent: "70%",
                Type: 1,
            }
            list = append(list, ds)
        }
    }
    //3.未加入集群设备
    ns := devStatistic{
        Name: "未加入集群",
        DevCount: 0,
        SdkCount: 0,
        Percent: "10%",
        Type: 2,
    }
    list = append(list, ns)
    return &bhomeclient.Reply{ Success: true, Data: list}
}
//通过不同类别获取设备列表,分页
func (dc *DeviceController) PageByType(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    type PageDevArg struct {
        Type         int     `json:"type"`  //类型
        ClusterId     string     `json:"clusterId"`   //集群id
        Status         int     `json:"status"`   //状态 0:全部,1:在线,-1:离线
        Page         int     `json:"page"`
        Size         int     `json:"size"`
        InputTxt     string     `json:"inputTxt"`
    }
    var reqBody PageDevArg
    err := c.BindJSON(&reqBody)
    if err != nil {
        return &bhomeclient.Reply{ Msg:err.Error()}
    }
    if reqBody.Page <= 0 {
        reqBody.Page = 1
    }
    if reqBody.Size <= 0 {
        reqBody.Size = 8
    }
    var d models.Device
    if reqBody.Type == 0 { //全部设备
        list, err := d.PageAllDev(reqBody.Page, reqBody.Size, reqBody.Status, reqBody.InputTxt)
        if err != nil {
            return &bhomeclient.Reply{ Msg: err.Error()}
        }
        return &bhomeclient.Reply{ Success: true, Data: list}
    } else if reqBody.Type == 1 || reqBody.Type == 2 {  //按集群统计
        list, err := d.PageDevByCluster(reqBody.Page, reqBody.Size, reqBody.ClusterId, reqBody.Status, reqBody.InputTxt)
        if err != nil {
            return &bhomeclient.Reply{ Msg: err.Error()}
        }
        return &bhomeclient.Reply{ Success: true, Data: list}
    }
    return nil
}
//获取设备详情
func (dc *DeviceController) Detail(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    devId := c.Query("devId")
    logger.Debug("Detail devId:", devId)
    var d models.Device
    i, e := d.FindByDevId(devId)
    if e == nil && i > 0 {
        detail := vo.DevDetail{}
        pd,_ := vo.CopyDeviceFromModel(d)
        detail.Device = pd
        detail.Sdks = getSdkDetailsByDevId(d.DevId)
        detail.Apps = getAppDetailsByDevId(d.DevId)
        return &bhomeclient.Reply{ Success:true, Data: detail }
    } else {
        return &bhomeclient.Reply{ Msg:"未找到此设备的相关信息"}
    }
}
func getSdkDetailsByDevId(devId string) []vo.SdkDetail {
    arr := make([]vo.SdkDetail, 0)
    var sdkApi bhomedbapi.SdkApi
    sdkMap := sdkApi.FindAllMap()
    var ds models.DeviceSdk
    list, _ := ds.FindByMachineCode("", devId)
    if list != nil {
        for _,s :=range list {
            if v,ok := sdkMap[s.SdkId]; ok {
                sd := vo.SdkDetail{}
                sd.Sdk = v
                sd.InstallTime = s.InstallTime
                sd.ExpireTime = s.ExpireTime
                sd.ActivateCode = s.ActivateCode
                arr = append(arr, sd)
            }
        }
    }
    return arr
}
func getAppDetailsByDevId(devId string) []vo.AppDetail {
    arr := make([]vo.AppDetail, 0)
    var appApi bhomedbapi.AppApi
    appMap := appApi.FindAppMap()
    var da models.DeviceApp
    list, _ := da.FindByMachineCode("", devId)
    if list != nil {
        for _,s :=range list {
            if v,ok := appMap[s.AppId]; ok {
                ad := vo.AppDetail{}
                ad.App = v
                ad.InstallTime = s.InstallTime
                ad.ExpireTime = s.ExpireTime
                ad.ActivateCode = s.ActivateCode
                arr = append(arr, ad)
            }
        }
    }
    return arr
}
//控制远程设备创建集群
func (dc *DeviceController) RemoteCreateCluster(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply {
    var reqBody vo.CreateClusterArg
    err := c.BindJSON(&reqBody)
    if err != nil {
        return &bhomeclient.Reply{ Msg: "参数有误"}
    }
    r,err := service.RemoteCreateCluster(reqBody)
    if err == nil {
        return r
    }
    return &bhomeclient.Reply{ Msg: err.Error() }
}
//控制远程设备搜索集群
func (dc *DeviceController) RemoteSearchCluster(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply {
    var reqBody vo.SearchClusterArg
    err := c.BindJSON(&reqBody)
    if err != nil {
        return &bhomeclient.Reply{ Msg: "参数有误"}
    }
    r, err := service.RemoteSearchCluster(reqBody)
    if err == nil {
        return r
    }
    return &bhomeclient.Reply{ Msg: err.Error() }
}
//获取远程设备搜索到的设备列表
func (dc *DeviceController) RemoteGetSearchNodes(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply {
    var reqBody vo.GetSearchNodesArg
    err := c.BindJSON(&reqBody)
    if err != nil {
        return &bhomeclient.Reply{ Msg: "参数有误"}
    }
    r,err := service.RemoteGetSearchNodes(reqBody)
    if err == nil {
        return r
    }
    return &bhomeclient.Reply{Msg: err.Error()}
}
//加入集群
func (dc *DeviceController) RemoteJoinCluster(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    var reqBody vo.JoinClusterArg
    err := c.BindJSON(&reqBody)
    if err != nil {
        return &bhomeclient.Reply{ Msg: "参数有误"}
    }
    r,err := service.RemoteJoinCluster(reqBody)
    if err == nil {
        return r
    }
    return &bhomeclient.Reply{ Msg: err.Error()}
}
//设备重启
func (dc *DeviceController) RemoteReboot(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    var reqBody vo.RebootArg
    err := c.BindJSON(&reqBody)
    if err != nil {
        return &bhomeclient.Reply{ Msg: "参数有误"}
    }
    r,err := service.RemoteReboot(reqBody)
    if err == nil {
        return r
    }
    return &bhomeclient.Reply{ Msg: err.Error()}
}
//移除设备
func (dc *DeviceController) RemoteRemove(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    return nil
}
//卸载算法或者应用
func (dc *DeviceController) RemoteUninstall(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    var reqBody vo.UninstallArg
    err := c.BindJSON(&reqBody)
    if err != nil {
        return &bhomeclient.Reply{ Msg: "参数有误"}
    }
    r,err := service.RemoteUninstall(reqBody)
    if err == nil {
        return r
    }
    return &bhomeclient.Reply{ Msg: err.Error()}
}
//升级算法或者应用
func (dc *DeviceController) RemoteUpgrade(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    var reqBody vo.UpgradeArg
    err := c.BindJSON(&reqBody)
    if err != nil {
        return &bhomeclient.Reply{ Msg: "参数有误"}
    }
    r,err := service.RemoteUpgrade(reqBody)
    if err == nil {
        return r
    }
    return &bhomeclient.Reply{ Msg: err.Error()}
}
//系统更新
func (dc *DeviceController) RemoteSysUpdate(h *bhomeclient.WrapperHandler, c *bhomeclient.Request) *bhomeclient.Reply{
    var reqBody vo.SysUpdateArg
    err := c.BindJSON(&reqBody)
    if err != nil {
        return &bhomeclient.Reply{ Msg: "参数有误"}
    }
    r,err := service.RemoteSysUpdate(reqBody)
    if err == nil {
        return r
    }
    return &bhomeclient.Reply{ Msg: err.Error()}
}
devicemanage-service/main.go
New file
@@ -0,0 +1,134 @@
package main
import (
    "basic.com/valib/bhomeclient.git"
    "basic.com/valib/bhomedbapi.git"
    "basic.com/valib/logger.git"
    "basic.com/valib/version.git"
    "context"
    "flag"
    "os"
    "os/signal"
    "syscall"
    "time"
    "vamicro/config"
    "vamicro/devicemanage-service/controllers"
    "vamicro/devicemanage-service/models"
    "vamicro/devicemanage-service/service"
)
var (
    procName = "devicemanage-service"
    proc = &bhomeclient.ProcInfo{
        Name: procName, //进程名称
        ID: procName, //进程id
        Info: "", //进程的描述信息,用于区分同一进程名称下多个进程
    }
    env = flag.String("e", "pro", "")
)
func init() {
    flag.Parse()
    vaversion.Usage()
    config.Init(*env)
    // 日志初始化
    var logFile = config.LogConf.Path + "vamicro-"+procName+".log"
    logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge)
    logger.Info("log init success !")
}
func main(){
    flag.Parse()
    models.Init()
    defer models.CloseDB()
    ctx, cancel := context.WithCancel(context.Background())
    fm,pubTopics := initFuncMap()
    var reg = &bhomeclient.RegisterInfo {
        Proc: *proc,
        Channel: nil,
        PubTopic: pubTopics,
        SubTopic: []string{ service.CollectDeviceTopic },
    }
    q := make(chan os.Signal, 1)
    signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM)
    ms, err := bhomeclient.NewMicroNode(ctx, q, config.Server.AnalyServerId, reg, logger.Debug)
    if err !=nil {
        return
    }
    bhomedbapi.InitLog(logger.Debug)
    bhomedbapi.InitGetNetNode(ms.GetLocalNetNodeByTopic)
    bhomedbapi.InitDoReq(ms.RequestOnly)
    go dealSubMsg(ctx, ms)
    go testPubNetMsg(ms)
    go ms.StartServer(fm)
    service.DealApply()
    go controllers.RecvApprove(ctx)
    <-q
    ms.DeRegister()
    cancel()
    ms.Free()
}
//测试发布全网消息
func testPubNetMsg(ms *bhomeclient.MicroNode) {
    netTopic := "globalPublishTopic"
    tk := time.NewTicker(3 * time.Second)
    for {
        select {
        case <-tk.C:
            pmsg := []byte("hello world"+time.Now().Format("2006-01-02 15:04:05"))
            n := ms.PublishNetTimeout(nil, netTopic, pmsg, 1000)
            logger.Debug("PublishNetTimeOut n:", n)
        default:
            time.Sleep(1* time.Second)
        }
    }
}
//处理订阅消息
func dealSubMsg(ctx context.Context, ms *bhomeclient.MicroNode) {
    for {
        select {
        case <-ctx.Done():
            return
        case msg := <-ms.SubCh:
            logger.Debug("recv sub msg topic:", string(msg.Topic), " data:", string(msg.Data))
            if string(msg.Topic) == service.CollectDeviceTopic {
                service.CollectManageDeviceInfo(msg.Data)
            }
        }
    }
}
const urlPrefix= "/data/api-v"
func initFuncMap() (map[string]bhomeclient.MicroFunc,[]string) {
    m := make(map[string]bhomeclient.MicroFunc)
    dc := new(controllers.DeviceController)
    m[urlPrefix+"/device/search"] = dc.Search
    m[urlPrefix+"/device/addApply"] = dc.AddApply
    m[urlPrefix+"/device/applyApprove"] = dc.ApplyApprove
    m[urlPrefix+"/device/types"] = dc.Types
    m[urlPrefix+"/device/pageByType"] = dc.PageByType
    m[urlPrefix+"/device/detail"] = dc.Detail
    m[urlPrefix+"/device/remoteCreateCluster"] = dc.RemoteCreateCluster
    m[urlPrefix+"/device/remoteSearchCluster"] = dc.RemoteSearchCluster
    m[urlPrefix+"/device/remoteJoinCluster"] = dc.RemoteJoinCluster
    m[urlPrefix+"/device/remoteReboot"] = dc.RemoteReboot
    m[urlPrefix+"/device/remoteRemove"] = dc.RemoteRemove
    m[urlPrefix+"/device/remoteUninstall"] = dc.RemoteUninstall
    m[urlPrefix+"/device/remoteUpgrade"] = dc.RemoteUpgrade
    m[urlPrefix+"/device/remoteSysUpdate"] = dc.RemoteSysUpdate
    var pubTopics []string
    for key,_ := range m {
        pubTopics = append(pubTopics, key)
    }
    return m, pubTopics
}
devicemanage-service/models/db.go
New file
@@ -0,0 +1,32 @@
package models
import (
    "basic.com/valib/logger.git"
    "github.com/jinzhu/gorm"
    _ "github.com/jinzhu/gorm/dialects/sqlite"
    "vamicro/config"
)
var db *gorm.DB
var err error
// Init creates a connection to mysql database and
// migrates any new models
func Init() {
    db, err = gorm.Open(config.DBconf.Name, "../config/devicemanage-service.db")
    if err != nil {
        logger.Debug("db open error ", err)
    }
    db.LogMode(true)
    //db.SetLogger(&DbLogger{})
    db.AutoMigrate(&DeviceApply{})
}
//GetDB ...
func GetDB() *gorm.DB {
    return db
}
func CloseDB() {
    db.Close()
}
devicemanage-service/models/device.go
New file
@@ -0,0 +1,137 @@
package models
import "strconv"
type Device struct {
    Id                 string             `gorm:"primary_key;column:id" json:"id"`
    DevId             string             `gorm:"column:devId;unique;not null;" json:"devId"`         //设备id
    DevType         string             `gorm:"column:devType" json:"devType"`                         //设备类型,如:存储、分析(盒子或amd服务器)、分析存储一体
    DevMode         string             `gorm:"column:devMode" json:"devMode"`                         //设备型号,如:Bsk-JS1000X
    DevName         string             `gorm:"column:devName" json:"devName"`                        //设备名称
    MachineCode     string             `gorm:"column:machineCode" json:"machineCode"`                //机器码
    ActivateCode     string             `gorm:"column:activateCode" json:"activateCode"`            //激活码
    ProductId         string             `gorm:"column:productId" json:"productId"`                    //产品id
    UserId             string             `gorm:"column:userId" json:"userId"`                        //用户id
    Address         string             `gorm:"column:address" json:"address"`                        //地址
    DevIp              string             `gorm:"column:devIp" json:"devIp"`                            //ip
    DevCpu            string             `gorm:"column:devCpu" json:"devCpu"`                         //cpu
    DevGpu            string             `gorm:"column:devGpu" json:"devGpu"`                        //gpu
    Mem             int             `gorm:"column:mem" json:"mem"`                              //内存
    Disk             string             `gorm:"column:disk" json:"disk"`                              //硬盘
    ChannelCount     int             `gorm:"column:channelCount;default:16" json:"channelCount"` //算力数量
    MasterVersion     string             `gorm:"column:masterVersion" json:"masterVersion"`           //主控版本
    WebVersion         string             `gorm:"column:webVersion" json:"webVersion"`                   //web版本
    ServerPort         string             `gorm:"column:serverPort" json:"serverPort"`                   //端口:默认7003
    SubMask         string             `gorm:"column:subMask" json:"subMask"`                           //子网掩码
    Gateway         string             `gorm:"column:gateway" json:"gateway"`                            //网关
    Dns             string             `gorm:"column:dns" json:"dns"`                                    //dns
    Runtime         string             `gorm:"column:runtime" json:"runtime"`                         //运行时长
    InstallTime     string             `gorm:"column:installTime" json:"installTime"`               //安装时间
    FirstUseTime     string             `gorm:"column:firstUseTime" json:"firstUseTime"`             //首次使用时间
    //集群相关信息
    ClusterId         string             `gorm:"column:clusterId" json:"clusterId"`                   //集群id
    ClusterName     string             `gorm:"column:clusterName" json:"clusterName"`                //集群名称
    Status             int             `gorm:"column:status;default:0;" json:"status"`             //状态 -1:离线, 1:在线
    CreateTime         string             `gorm:"column:createTime" json:"createTime"`                //记录时间
    UpdateTime         string             `gorm:"column:updateTime" json:"updateTime"`                //更新时间
}
const (
    DevStatus_OnLine = 1   //在线
    DevStatus_OffLine = -1   //离线
)
func (Device) TableName() string {
    return "t_device"
}
func (d *Device) Insert() bool {
    result := db.Table(d.TableName()).Create(&d)
    if result.Error !=nil {
        return false
    }
    return result.RowsAffected>0
}
func (d *Device) Update() bool {
    result := db.Table(d.TableName()).Update(&d)
    if result.Error !=nil {
        return false
    }
    return result.RowsAffected>0
}
func (d *Device) SelectById(id string) (int64, error){
    result := db.Table(d.TableName()).Where("id=?",id).First(&d)
    if result.Error != nil || result.RowsAffected == 0 {
        return 0, err
    }
    return result.RowsAffected, nil
}
func (d *Device) FindByDevId(devId string) (int64,error) {
    result := db.Table(d.TableName()).Where("devId=?", devId).First(&d)
    if result.Error != nil || result.RowsAffected == 0 {
        return 0, result.Error
    }
    return result.RowsAffected, nil
}
func (d *Device) FindAll() (list []Device){
    if err:=db.Table(d.TableName()).Scan(&list).Error;err !=nil{
        return nil
    }
    return list
}
type DevClusterGroup struct {
    ClusterId         string         `json:"clusterId"`   //集群id
    ClusterName     string         `json:"clusterName"` //集群名称
    Count             int         `json:"count"`       //数量
}
func (d *Device) GroupByCluster() (list []DevClusterGroup) {
    err := db.Raw("select clusterId,clusterName,count(1) as count from "+d.TableName()+" where clusterId!='' group by clusterId,clusterName order by clusterId asc").Scan(&list).Error
    if err != nil {
        return nil
    }
    return list
}
func (d *Device) PageAllDev(page int, size int, status int, inputTxt string) (list []Device, err error) {
    sql := "select * from "+d.TableName()+" where 1=1"
    from := (page-1)*size
    if inputTxt != "" {
        sql += " and devName like '%"+inputTxt+"%'"
    }
    if status == DevStatus_OnLine || status == DevStatus_OffLine {
        sql += " and status="+strconv.Itoa(status)
    }
    sql += " order by createTime limit "+strconv.Itoa(from)+","+strconv.Itoa(size)+""
    err = db.Raw(sql).Scan(&list).Error
    if err != nil {
        return nil, err
    }
    return
}
func (d *Device) PageDevByCluster(page int, size int, clusterId string, status int, inputTxt string) (list []Device, err error) {
    sql := "select * from "+d.TableName()+" where 1=1 and clusterId='"+clusterId+"'"
    from := (page-1)*size
    if inputTxt != "" {
        sql += " and devName like '%"+inputTxt+"%'"
    }
    if status == DevStatus_OnLine || status == DevStatus_OffLine {
        sql += " and status="+strconv.Itoa(status)
    }
    sql += " order by createTime desc limit "+strconv.Itoa(from)+","+strconv.Itoa(size)+""
    err = db.Raw(sql).Scan(&list).Error
    if err != nil {
        return nil, err
    }
    return
}
devicemanage-service/models/deviceApp.go
New file
@@ -0,0 +1,69 @@
package models
type DeviceApp struct {
    Id                 string         `gorm:"column:id;primary_key;type:varchar(50);unique;not null;" json:"id"`
    DevId             string         `gorm:"column:devId" json:"devId"`
    MachineCode     string         `gorm:"column:machineCode" json:"machineCode"`
    ActivateCode     string         `gorm:"column:activateCode" json:"activateCode"`
    AppId             string         `gorm:"column:appId" json:"appId"`
    ExpireTime         string         `gorm:"column:expireTime" json:"expireTime"`      //激活时间
    InstallTime     string         `gorm:"column:installTime" json:"installTime"`   //安装时间
    AppName         string         `gorm:"column:appName" json:"appName"`   //汇总展现到页面上,此进程无法获取app和sdks表信息
    IconBlob         string         `gorm:"column:iconBlob" json:"iconBlob"`
    Version         string         `gorm:"column:version" json:"version"`
    IsDefault         bool         `gorm:"column:isDefault" json:"isDefault"`
    Upgrade         bool         `gorm:"column:upgrade" json:"upgrade"`
}
func (DeviceApp) TableName() string {
    return "t_device_app"
}
func (da *DeviceApp) Insert() bool {
    result := db.Table(da.TableName()).Create(&da)
    if result.Error == nil && result.RowsAffected > 0 {
        return true
    }
    return false
}
func (da *DeviceApp) SelectById(id string) (int64, error){
    result := db.Table(da.TableName()).Where("id=?",id).First(&da)
    if result.Error != nil || result.RowsAffected == 0 {
        return 0, err
    }
    return result.RowsAffected, nil
}
func (da *DeviceApp) Update() bool {
    result := db.Table(da.TableName()).Where("id=?", da.Id).Update(&da)
    if result.Error ==nil && result.RowsAffected > 0 {
        return true
    }
    return false
}
func (da *DeviceApp) DeleteById(id string) bool {
    result := db.Exec("delete from "+da.TableName()+" where id='"+id+"'")
    if result.Error == nil && result.RowsAffected > 0 {
        return true
    }
    return false
}
func (da *DeviceApp) FindByActivateCode(activateCode string) (list []DeviceApp, err error) {
    result := db.Table(da.TableName()).Where("activateCode=?", activateCode).Find(&list)
    if result.Error != nil {
        return []DeviceApp{},result.Error
    }
    return list, nil
}
func (da *DeviceApp) FindByMachineCode(machineCode string, serverId string) (list []DeviceApp, err error) {
    result := db.Table(da.TableName()).Where("machineCode=? or devId=?", machineCode, serverId).Order("installTime desc").Find(&list)
    if result.Error != nil {
        return []DeviceApp{},result.Error
    }
    return list, nil
}
devicemanage-service/models/deviceApply.go
New file
@@ -0,0 +1,69 @@
package models
//设备添加申请记录
type DeviceApply struct {
    Id                 string         `gorm:"column:id;primary_key;type:varchar(50);unique;not null;" json:"id"`
    DevId             string         `gorm:"column:devId;unique;not null;" json:"devId"`
    DevName         string         `gorm:"column:devName" json:"devName"`
    ApplyKey         string         `gorm:"column:applyKey" json:"applyKey"`
    Ip                 string         `gorm:"column:ip" json:"ip"`
    CreateTime         string         `gorm:"column:createTime" json:"createTime"`
    Status             int         `gorm:"column:status;default:0;" json:"status"`  //状态 0:已添加,待发送给对方 1:已成功发送请求到对方,等待反馈。 2:对方已同意 -2:对方已拒绝
}
const (
    ApplyStatus_KeyErr = -2   //密钥错误
    ApplyStatus_Reject = -1  //已拒绝
    ApplyStatus_Sending = 0  //等待发送给对方
    ApplyStatus_Waiting = 1  //等待响应
    ApplyStatus_Agreed = 2   //已同意
    ApplyStatus_Managed = 3  //已处于管理状态
)
func (DeviceApply) TableName() string {
    return "t_device_apply"
}
func (da *DeviceApply) Insert() bool {
    result := db.Table(da.TableName()).Create(&da)
    if result.Error == nil && result.RowsAffected > 0 {
        return true
    }
    return false
}
func (da *DeviceApply) FindByDevId(devId string) (int64,error) {
    result := db.Table(da.TableName()).Where("devId=?", devId).First(&da)
    if result.Error != nil || result.RowsAffected == 0 {
        return 0, result.Error
    }
    return result.RowsAffected, nil
}
func (da *DeviceApply) Update() bool {
    result := db.Table(da.TableName()).Where("id=?", da.Id).Update(&da)
    if result.Error ==nil && result.RowsAffected > 0 {
        return true
    }
    return false
}
func (da *DeviceApply) DeleteById(id string) bool {
    result := db.Exec("delete from "+da.TableName()+" where id='"+id+"'")
    if result.Error == nil && result.RowsAffected > 0 {
        return true
    }
    return false
}
func (da *DeviceApply) FindByStatus(status int) (list []DeviceApply) {
    err := db.Table(da.TableName()).Where("status=?", status).Find(&list).Error
    if err != nil {
        return nil
    }
    return list
}
func (da *DeviceApply) UpdateStatus(status int, id string) bool {
    return db.Exec("update "+da.TableName()+" set status=? where id=?", status, id).RowsAffected >0
}
devicemanage-service/models/deviceSdk.go
New file
@@ -0,0 +1,85 @@
package models
type DeviceSdk struct {
    Id                 string         `gorm:"column:id;primary_key;type:varchar(50);unique;not null;" json:"id"`
    DevId             string         `gorm:"column:devId" json:"devId"`
    MachineCode     string         `gorm:"column:machineCode" json:"machineCode"`
    ActivateCode     string         `gorm:"column:activateCode" json:"activateCode"`
    SdkId             string         `gorm:"column:sdkId" json:"sdkId"`
    ExpireTime         string         `gorm:"column:expireTime" json:"expireTime"`
    InstallTime     string         `gorm:"column:installTime" json:"installTime"`   //安装时间
    SdkName         string         `gorm:"column:sdkName" json:"sdkName"`
    IconBlob         string         `gorm:"column:iconBlob" json:"iconBlob"`
    Version         string         `gorm:"column:version" json:"version"`
    Upgrade         bool         `gorm:"column:upgrade" json:"upgrade"`
}
func (DeviceSdk) TableName() string {
    return "t_device_sdk"
}
func (ds *DeviceSdk) Insert() bool {
    result := db.Table(ds.TableName()).Create(&ds)
    if result.Error == nil && result.RowsAffected > 0 {
        return true
    }
    return false
}
func (ds *DeviceSdk) SelectById(id string) (int64, error){
    result := db.Table(ds.TableName()).Where("id=?",id).First(&ds)
    if result.Error != nil || result.RowsAffected == 0 {
        return 0, err
    }
    return result.RowsAffected, nil
}
func (ds *DeviceSdk) Update() bool {
    result := db.Table(ds.TableName()).Where("id=?", ds.Id).Update(&ds)
    if result.Error ==nil && result.RowsAffected > 0 {
        return true
    }
    return false
}
func (ds *DeviceSdk) DeleteById(id string) bool {
    result := db.Exec("delete from "+ds.TableName()+" where id='"+id+"'")
    if result.Error == nil && result.RowsAffected > 0 {
        return true
    }
    return false
}
func (ds *DeviceSdk) FindByActivateCode(activateCode string) (list []DeviceSdk, err error) {
    result := db.Table(ds.TableName()).Where("activateCode=?", activateCode).Find(&list)
    if result.Error != nil {
        return []DeviceSdk{},result.Error
    }
    return list, nil
}
func (ds *DeviceSdk) FindByMachineCode(machineCode string, serverId string) (list []DeviceSdk, err error) {
    result := db.Table(ds.TableName()).Where("machineCode=? or devId=?", machineCode, serverId).Order("installTime desc").Find(&list)
    if result.Error != nil {
        return []DeviceSdk{},result.Error
    }
    return list, nil
}
func (ds *DeviceSdk) FindByMacAndSdk(macCode, serverId, sdkId string) (list []DeviceSdk, err error) {
    result := db.Table(ds.TableName()).Where("(machineCode=? or devId=?) and sdkId=?", macCode, serverId, sdkId).Find(&list)
    if result.Error != nil {
        return []DeviceSdk{},result.Error
    }
    return list, nil
}
//按设备id统计算法数量
func (ds *DeviceSdk) CountByDevId(devId string) (int,error) {
    var count int
    if err := db.Raw("select count(1) as count from "+ds.TableName()+" where devId=?",devId).Count(&count).Error; err != nil {
        return 0, err
    }
    return count, nil
}
devicemanage-service/service/ctrl.go
New file
@@ -0,0 +1,61 @@
package service
import (
    "basic.com/valib/bhomeclient.git"
    "encoding/json"
    "errors"
    "vamicro/devicemanage-service/vo"
)
func remoteCall(arg interface{}, rTopic string, targetDevId string, ip string) (*bhomeclient.Reply, error) {
    body, err := json.Marshal(arg)
    if err != nil {
        return nil, err
    }
    r, err := WrapQueryRpc(rTopic, body, targetDevId, ip)
    if err != nil {
        return nil, err
    }
    if len(r) != 1 {
        return nil, errors.New("no response")
    }
    var ret bhomeclient.Reply
    err = json.Unmarshal(r[0].Payload, &ret)
    if err != nil {
        return nil, err
    }
    return &ret, nil
}
//控制其它节点创建集群
func RemoteCreateCluster(arg vo.CreateClusterArg) (*bhomeclient.Reply, error){
    return remoteCall(arg, "RemoteCreateCluster", "", arg.Ip)
}
func RemoteSearchCluster(arg vo.SearchClusterArg) (*bhomeclient.Reply, error) {
    return remoteCall(arg, "RemoteSearchCluster", arg.DevId, arg.Ip)
}
func RemoteGetSearchNodes(arg vo.GetSearchNodesArg) (*bhomeclient.Reply, error) {
    return remoteCall(arg, "RemoteGetSearchNodes", arg.DevId, arg.Ip)
}
func RemoteJoinCluster(arg vo.JoinClusterArg) (*bhomeclient.Reply, error) {
    return remoteCall(arg, "RemoteJoinCluster", arg.DevId, arg.Ip)
}
func RemoteReboot(arg vo.RebootArg) (*bhomeclient.Reply, error) {
    return remoteCall(arg, "RemoteReboot", arg.DevId, arg.Ip)
}
func RemoteUninstall(arg vo.UninstallArg) (*bhomeclient.Reply, error) {
    return remoteCall(arg, "RemoteUninstall", "", "")
}
func RemoteUpgrade(arg vo.UpgradeArg) (*bhomeclient.Reply, error) {
    return remoteCall(arg, "RemoteUpgrade", "", "")
}
func RemoteSysUpdate(arg vo.SysUpdateArg) (*bhomeclient.Reply, error) {
    return remoteCall(arg, "RemoteSysUpdate", "", "")
}
devicemanage-service/service/device.go
New file
@@ -0,0 +1,189 @@
package service
import (
    "basic.com/valib/logger.git"
    "basic.com/valib/serf.git/client"
    "encoding/json"
    "errors"
    "time"
    "vamicro/config"
    "vamicro/devicemanage-service/models"
    "vamicro/system-service/serf"
)
//处理申请信息,维护状态
func DealApply() {
    //1.处理待发送请求的数据,将请求发送到目标设备,并将status设置为已申请
    go applying()
    //2.处理已通过的数据,将此设备写入到device表中,表示可以控制此设备了
    go applied()
}
func applying() {
    var da models.DeviceApply
    for {
        list := da.FindByStatus(models.ApplyStatus_Sending)
        if list != nil && len(list) > 0 {
            for _,d := range list {
                if sendApply(d.ApplyKey, config.Server.AnalyServerId, d.Ip, d.DevId ) {
                    da.UpdateStatus(models.ApplyStatus_Waiting, d.Id)
                }
            }
        }
        time.Sleep(time.Second * 3)
    }
}
func applied() {
    var da models.DeviceApply
    for {
        list := da.FindByStatus(models.ApplyStatus_Agreed)
        if list != nil && len(list) > 0 {
            for _,d :=range list {
                //1.将信息写入到device表中
                //2.将此条申请信息置为Managed状态
                doAgreedByTx(d)
            }
        }
        time.Sleep(time.Second * 3)
    }
}
func doAgreedByTx(da models.DeviceApply)  {
    tx := models.GetDB().Begin()
    var err error
    defer func() {
        if err != nil || tx != nil {
            tx.Rollback()
        }
    }()
    var tmp models.DeviceApply
    i, _ := tmp.FindByDevId(da.DevId)
    if i >0 {
        err = errors.New("device表中已存在此设备")
        return
    }
    //1.获取设备信息
    //d := getRemoteDevInfo(da.DevId, da.Ip)
    //d.Id = uuid.NewV4().String()
    //if b := d.Insert(); !b {
    //    err = errors.New("新增device失败")
    //    return
    //}
    if !da.UpdateStatus(models.ApplyStatus_Managed, da.Id) {
        err = errors.New("修改managed状态失败")
        return
    }
    tx.Commit()
}
type devCollectNew struct {
    DeviceList     []models.Device     `json:"deviceList"`
    Sdks         []models.DeviceSdk     `json:"sdks"`
    Apps         []models.DeviceApp     `json:"apps"`
}
const CollectDeviceTopic = "collect-manage-device-info"
func CollectManageDeviceInfo(data []byte) error {
    applyM := make(map[string]struct{})
    var dApply models.DeviceApply
    list := dApply.FindByStatus(models.ApplyStatus_Managed)
    for _, d := range list {
        applyM[d.DevId] = struct{}{}
    }
    var dc devCollectNew
    e := json.Unmarshal(data, &dc)
    if e == nil {
        tx := models.GetDB().Begin()
        var err error
        var devModel models.Device
        var dsModel models.DeviceSdk
        var daModel models.DeviceApp
        defer func() {
            if err != nil && tx != nil {
                tx.Rollback()
            }
        }()
        for _, d := range dc.DeviceList {
            if _,ok := applyM[d.DevId]; ok {
                if rows,_ := devModel.SelectById(d.Id); rows >0 {
                    if err = tx.Table(d.TableName()).Save(&d).Error;err != nil {
                        return err
                    }
                } else {
                    if err = tx.Table(d.TableName()).Create(&d).Error;err != nil {
                        return err
                    }
                }
            }
        }
        for _,ds := range dc.Sdks {
            if rows,_ := dsModel.SelectById(ds.Id); rows >0 {
                if err = tx.Table(ds.TableName()).Save(&ds).Error;err != nil {
                    return err
                }
            } else {
                if err = tx.Table(ds.TableName()).Create(&ds).Error;err != nil {
                    return err
                }
            }
        }
        for _,da := range dc.Apps {
            if rows,_ := daModel.SelectById(da.Id); rows >0 {
                if err = tx.Table(da.TableName()).Save(&da).Error;err != nil {
                    return err
                }
            } else {
                if err = tx.Table(da.TableName()).Create(&da).Error;err != nil {
                    return err
                }
            }
        }
        tx.Commit()
        return nil
    } else {
        logger.Error("unmarshal err:", e)
        return e
    }
}
func getRemoteDevInfo(devId, ip string)  {
}
//将请求发送到指定节点上
func sendApply(key, devId, ip, targetDevId string) bool {
    //ipv4, _, _ := util.GetLocalIP(config.Server.NetworkAdapter)
    body := map[string]interface{} {
        "key":       key,
        "fromDevId": devId,
        "fromIp":    ip,
    }
    bts, _ := json.Marshal(body)
    resp, err := WrapQueryRpc("DevAuthApply", bts, targetDevId, ip)
    return err ==nil && len(resp) >0
}
func WrapQueryRpc(topic string, data []byte, targetDevId string, targetIp string) ([]client.NodeResponse, error) {
    arg := serf.RpcParamTopic{
        Topic: topic,
        Data: data,
    }
    d, _ := json.Marshal(arg)
    param := serf.RpcParam{
        Name: serf.QueryRpc,
        Timeout: time.Second * 5,
        FilterNodes: []string { targetDevId },
        Data: d,
    }
    return serf.RpcQuery(targetIp, &param)
}
devicemanage-service/vo/ctrl.go
New file
@@ -0,0 +1,47 @@
package vo
type CreateClusterArg struct {
    DevId         string     `json:"devId"`
    Ip             string     `json:"ip"`
    Password     string     `json:"password"`
    ClusterName string     `json:"clusterName"`
    ClusterId     string     `json:"clusterId"`
    VirtualIp     string     `json:"virtualIp"`
}
type SearchClusterArg struct {
    DevId         string     `json:"devId"`
    Ip             string     `json:"ip"`
    Password     string     `json:"password"`
}
type GetSearchNodesArg struct {
    DevId string `json:"devId"`
    Ip    string `json:"ip"`
}
type JoinClusterArg struct {
    DevId         string         `json:"devId"`
    Ip            string         `json:"ip"`
    ClusterId     string         `json:"clusterId"`
    Password     string         `json:"password"`
    NodeIps     []string     `json:"nodeIps"`
}
type RebootArg struct {
    DevId         string         `json:"devId"`
    Ip            string         `json:"ip"`
}
type UninstallArg struct {
    Id         string         `json:"id"`       //卸载的算法或应用id
    Type     int         `json:"type"`  //1.算法,2.应用
}
type UpgradeArg struct {
    Id         string         `json:"id"`     //升级算法或应用id
}
type SysUpdateArg struct {
}
devicemanage-service/vo/device.go
New file
@@ -0,0 +1,40 @@
package vo
import (
    "basic.com/pubsub/protomsg.git"
    "encoding/json"
    "vamicro/devicemanage-service/models"
    "vamicro/proto"
)
type DevDetail struct {
    proto.Device
    Sdks         []SdkDetail        `json:"sdks"`
    Apps         []AppDetail        `json:"apps"`
}
func CopyDeviceFromModel(d models.Device) (proto.Device, error) {
    var pd proto.Device
    b, err := json.Marshal(d)
    if err != nil {
        return pd, err
    }
    err = json.Unmarshal(b, &pd)
    return pd, err
}
type SdkDetail struct {
    protomsg.Sdk
    InstallInfo
}
type AppDetail struct {
    protomsg.App
    InstallInfo
}
type InstallInfo struct {
    InstallTime         string     `json:"installTime"`
    ExpireTime             string     `json:"expireTime"`
    ActivateCode         string     `json:"activateCode"`
}
system-service/controllers/syssetconf.go
@@ -10,7 +10,6 @@
    "time"
    "vamicro/config"
    "vamicro/extend/util"
    service2 "vamicro/saas-service/service"
    "vamicro/system-service/models"
    "vamicro/system-service/service"
    "vamicro/system-service/sys"
@@ -1180,7 +1179,7 @@
    machineCode := licence.GetMachineCode()
    // 获取设备激活时间
    res, err := service2.DoBusReq("/data/api-v/version/snBus", config.Server.AnalyServerId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, map[string]interface{}{})
    res, err := service.DoBusReq("/data/api-v/version/snBus", config.Server.AnalyServerId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, map[string]interface{}{})
    logger.Warn("snBus", string(res))
    installTime := ""
    if err == nil {
system-service/service/busRequest.go
New file
@@ -0,0 +1,141 @@
package service
import (
    "encoding/json"
    "strings"
    "vamicro/config"
    "basic.com/valib/bhomeclient.git"
    "basic.com/valib/bhomedbapi.git"
    "basic.com/valib/go-aiot.git/aiotProto/aiot"
    "basic.com/valib/logger.git"
    "gitee.com/LearingIt/goTypeTrans/trans"
)
const (
    Token = "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjQ3NDUwMjU5MjMsInVzZXIiOiJ7XCJpZFwiOlwiZTZjY2QzNmQtNGYxNi00NmZjLTg4ZDUtMDczNjU4NjZkMjA1XCIsXCJwZXJtaXNzaW9uc1wiOltcInByb2R1Y3RNYW5nZTpwdWJsaXNoXCIsXCJjb2RlTWFuZ2U6dmlld1wiLFwiZGV2aWNlTWFuYWdlOmFkZFwiLFwiYWRtaW5NYW5hZ2VcIixcIm9yZGVyTWFuZ2VcIixcImRldmljZU1hbmFnZTp2aWV3XCIsXCJwcm9kdWN0TWFuZ2U6YWRkXCIsXCJhZG1pbk1hbmFnZTp2aWV3XCIsXCJjb2RlTWFuZ2U6YWRkXCIsXCJwcm9kdWN0TWFuZ2U6b2ZmU2FsZVwiLFwib3JkZXJNYW5nZTpjYW5jZWxcIixcInByb2R1Y3RDZW50ZXI6ZG93bmxvYWRcIixcInByb2R1Y3RDZW50ZXI6YnV5XCIsXCJwcm9kdWN0TWFuZ2U6dmlld1wiLFwiYXBpXCIsXCJob21lXCIsXCJvcmRlck1hbmdlOnBheVwiLFwiYWRtaW5NYW5hZ2U6YWRkXCIsXCJvcmRlck1hbmdlOmRvd25sb2FkXCIsXCJwcm9kdWN0Q2VudGVyXCIsXCJkZXZpY2VNYW5hZ2U6dW5iaW5kXCIsXCJvcmRlck1hbmdlOnZpZXdcIixcImFkbWluTWFuYWdlOmVkaXRcIixcImRldmljZU1hbmFnZVwiLFwidmlwTWFuYWdlOmFkZFwiLFwidmlwTWFuYWdlOnZpZXdcIixcInByb2R1Y3RDZW50ZXI6dmlld1wiLFwidmlwTWFuYWdlOmVkaXRcIixcInZpcE1hbmFnZVwiLFwicHJvZHVjdE1hbmdlOmVkaXRcIixcImNvZGVNYW5nZVwiLFwicHJvZHVjdE1hbmdlXCJdLFwidXNlcm5hbWVcIjpcImJhc2ljXCJ9In0.vwjAFkWuEyadRLvIOGK8LFE3MjpY3SQ7j6AlTXnQDG8"
)
// string转interface
func stringParams2paramsBody(params map[string]string) map[string]interface{} {
    paramsBody := make(map[string]interface{})
    for k, v := range params {
        paramsBody[k] = v
    }
    return paramsBody
}
// interface转string
func interfaceParams2paramsBody(params map[string]interface{}) map[string]string {
    paramsBody := make(map[string]string)
    for k, v := range params {
        paramsBody[k] = trans.Any2String(v)
    }
    return paramsBody
}
// 封装统一请求方法
func DoBusReq(topic string, nodeId string, method aiot.RequestMethod, contentType aiot.RequestContentType, params map[string]interface{}) ([]byte, error) {
    var err error
    var data []byte
    logger.Debug("DoBusReq...", topic, nodeId, method, contentType, GetContentTypeString(contentType), params)
    switch method {
    case aiot.RequestMethod_Get:
        // 发送get请求
        return DoGetToBus(nodeId, topic, interfaceParams2paramsBody(params))
    case aiot.RequestMethod_Post:
        // post请求
        return DoPostToBus(nodeId, topic, GetContentTypeString(contentType), params)
    case aiot.RequestMethod_Put:
        // put
        return DoPutToBus(nodeId, topic, GetContentTypeString(contentType), params)
    case aiot.RequestMethod_Delete:
        // delete
        return DoDeleteToBus(nodeId, topic, GetContentTypeString(contentType), params)
    }
    return data, err
}
// 获取节点IP
func GetNodeIp(nodeId string) (string, error) {
    client := bhomedbapi.NewClient(bhomedbapi.WithNodes(nil))
    params := map[string]interface{}{
        "node_id": nodeId,
    }
    node, err := client.DoPostRequest("/data/api-v/cluster/findIpByNode", bhomedbapi.CONTENT_TYPE_JSON, params, nil, nil)
    if err != nil {
        logger.Error("Fail to execute GetNodeIp", err)
        return "", err
    }
    var ret bhomeclient.Reply
    err = json.Unmarshal(node, &ret)
    if err != nil {
        return "", err
    }
    ip := trans.Any2String(ret.Data)
    if ip != "" && strings.Index(ip, ":") > 0 {
        return ip[0:strings.Index(ip, ":")], nil
    }
    return ip, nil
}
// bus-get请求
func DoGetToBus(nodeId string, url string, params map[string]string) ([]byte, error) {
    ip, _ := GetNodeIp(nodeId)
    header := map[string]string{
        "Authorization": Token,
    }
    client := bhomedbapi.NewClient(bhomedbapi.WithIp(ip), bhomedbapi.WithDevId(nodeId), bhomedbapi.WithTopic(url))
    if client == nil || nodeId != config.Server.AnalyServerId {
        logger.Warn("client is nil. return from bus to http")
        client = bhomedbapi.HttpClient{}
        url = "http://" + ip + ":8888/" + strings.Trim(url, "/")
    }
    return client.DoGetRequest(url, params, header)
}
// bus-post请求
func DoPostToBus(nodeId string, url string, contentType string, params map[string]interface{}) ([]byte, error) {
    ip, _ := GetNodeIp(nodeId)
    header := map[string]string{
        "Authorization": Token,
    }
    client := bhomedbapi.NewClient(bhomedbapi.WithIp(ip), bhomedbapi.WithDevId(nodeId), bhomedbapi.WithTopic(url))
    if client == nil || nodeId != config.Server.AnalyServerId {
        logger.Warn("client is nil. return from bus to http")
        client = bhomedbapi.HttpClient{}
        url = "http://" + ip + ":8888/" + strings.Trim(url, "/")
    }
    return client.DoPostRequest(url, contentType, params, nil, header)
}
// bus-put请求
func DoPutToBus(nodeId string, url string, contentType string, params map[string]interface{}) ([]byte, error) {
    ip, _ := GetNodeIp(nodeId)
    header := map[string]string{
        "Authorization": Token,
    }
    client := bhomedbapi.NewClient(bhomedbapi.WithIp(ip), bhomedbapi.WithDevId(nodeId), bhomedbapi.WithTopic(url))
    if client == nil || nodeId != config.Server.AnalyServerId {
        logger.Warn("client is nil. return from bus to http")
        client = bhomedbapi.HttpClient{}
        url = "http://" + ip + ":8888/" + strings.Trim(url, "/")
    }
    return client.DoPutRequest(url, contentType, params, header)
}
// bus-delete请求
func DoDeleteToBus(nodeId string, url string, contentType string, params map[string]interface{}) ([]byte, error) {
    ip, _ := GetNodeIp(nodeId)
    header := map[string]string{
        "Authorization": Token,
    }
    client := bhomedbapi.NewClient(bhomedbapi.WithIp(ip), bhomedbapi.WithDevId(nodeId), bhomedbapi.WithTopic(url))
    if client == nil || nodeId != config.Server.AnalyServerId {
        logger.Warn("client is nil. return from bus to http")
        client = bhomedbapi.HttpClient{}
        url = "http://" + ip + ":8888/" + strings.Trim(url, "/")
    }
    return client.DoDeleteRequest(url, contentType, params, header)
}
system-service/service/toNode.go
New file
@@ -0,0 +1,80 @@
package service
import (
    "encoding/json"
    "errors"
    "basic.com/valib/bhomedbapi.git"
    "basic.com/valib/go-aiot.git/aiotProto/aiot"
    "basic.com/valib/logger.git"
)
// 返回结果
type Reply struct {
    Success bool        `json:"success"`
    Msg     string      `json:"msg"`
    Data    interface{} `json:"data"`
}
// 获取content-type字符串
func GetContentTypeString(contentType aiot.RequestContentType) string {
    switch contentType {
    case aiot.RequestContentType_ApplicationJson:
        // application/json 格式
        return bhomedbapi.CONTENT_TYPE_JSON
    case aiot.RequestContentType_ApplicationXWwwFormUrlencoded:
        // application/x-www-form-urlencoded 格式
        return bhomedbapi.CONTENT_TYPE_FORM
    case aiot.RequestContentType_MultipartFormData:
        // multipart/form-data 格式
        return bhomedbapi.CONTENT_TYPE_MULFORM
    case aiot.RequestContentType_ApplicationXml:
        // application/xml
        return "application/xml"
    default:
        // application/json 格式
        return bhomedbapi.CONTENT_TYPE_JSON
    }
}
// 单节点请求
func NodeReq(msg *aiot.Protocol, req *aiot.NodeReq, reply *aiot.BusinessReply) error {
    // 根据参数调用服务
    var data []byte
    param := make(map[string]interface{})
    err := json.Unmarshal(req.Req, &param)
    if err != nil {
        logger.Error("Fail to Unmarshal req.Req", req.Req, err)
        return err
    }
    var nodeId string
    if len(msg.DeviceProto.DeviceIds) == 1 {
        nodeId = msg.DeviceProto.DeviceIds[0]
    } else if msg.DeviceProto.MasterDeviceId != "" {
        // debug_test
        nodeId = msg.DeviceProto.MasterDeviceId
    } else {
        err = errors.New("the node can not be null for NodeReq")
        logger.Error("The node is null", err, msg)
        return err
    }
    logger.Debugf("bus return result nodeId=%v, topic=%v, Method=%v, DeviceProto=%v", nodeId, req.Topic, req.Method, msg.DeviceProto)
    data, err = DoBusReq(req.Topic, nodeId, req.Method, req.ContentType, param)
    dataReply := Reply{}
    _ = json.Unmarshal(data, &dataReply)
    reply.Code = 200
    if !dataReply.Success {
        reply.Code = 500
    }
    reply.Success = dataReply.Success
    reply.Msg = dataReply.Msg
    reply.Data, _ = json.Marshal(dataReply.Data)
    if err != nil {
        return err
    }
    return nil
}