zhangzengfei
2023-08-04 88da1e13a073e8b5656387a246d827593fbd6163
添加设备查询,修改, 采集数据上报
1个文件已删除
2个文件已添加
7个文件已修改
311 ■■■■ 已修改文件
collector/collector.go 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collector/device.go 52 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collector/plc4x.go 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go 12 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
msg/msg.go 50 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
msg/send.go 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/client.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/httpClient.go 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util/httpClient.go 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collector/collector.go
@@ -19,41 +19,66 @@
}
func InitTask() {
    device := msg.PLCDevice{
        Id:       "0",
        Ip:       "192.168.1.188",
        Address:  []int{17021},
        Interval: 1,
    //device := msg.PLCDevice{
    //    DeviceID:   "0",
    //    DeviceName: "test",
    //    DeviceIP:   "192.168.1.188",
    //    Brand:      "sim",
    //    Method:     "modbus",
    //    PortName:   "",
    //    Frequency:  1,
    //    Details: []*msg.PLCAddress{&msg.PLCAddress{
    //        StartAddress: 17021,
    //        Length:       1,
    //        Type:         "int",
    //        FieldName:    "count",
    //    }},
    //}
    devices, err := getDeviceList()
    if err != nil {
        return
    }
    for idx, dev := range devices {
        // 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集
        if dev.Status != 0 {
            continue
        }
        addTask(&devices[idx])
    }
}
func addTask(device *msg.PLCDevice) {
    ctx, cancel := context.WithCancel(context.Background())
    proc := collectorProc{
        device: &device,
        device: device,
        cancel: cancel,
    }
    mapTask.Store(device.Id, &proc)
    mapTask.Store(device.DeviceID, &proc)
    go connectingDevice(ctx, &device)
    go connectingDevice(ctx, device)
}
func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
    plcResponse := msg.PLCResponse{
        Id:     dev.Id,
        Name:   dev.Name,
        Ip:     dev.Ip,
        DeviceID:   dev.DeviceID,
        DeviceName: dev.DeviceName,
        DeviceIP:   dev.DeviceIP,
        Online: false,
    }
    for {
        select {
        case <-ctx.Done():
            logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip)
            logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP)
            return
        default:
            plcConnection, err := NewModbusConnection(dev.Ip)
            plcConnection, err := NewModbusConnection(dev.DeviceIP)
            if err != nil {
                logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip)
                logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP)
                plcResponse.Online = false
                msg.SendDeviceLiveData(&plcResponse)
                time.Sleep(30 * time.Second)
@@ -68,35 +93,39 @@
func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
    // 创建modbusTCP连接, 循环查询数据并上报
    plcResponse := msg.PLCResponse{
        Id:     dev.Id,
        Name:   dev.Name,
        Ip:     dev.Ip,
        DeviceID:   dev.DeviceID,
        DeviceName: dev.DeviceName,
        DeviceIP:   dev.DeviceIP,
        Online: true,
        Data:   nil,
    }
    for {
        select {
        case <-ctx.Done():
            logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip)
            logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
            conn.Close()
            return
        default:
            if !conn.IsConnected() {
                logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip)
                logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
                return
            }
            // 根据设置的地址查询数据,上报
            plcResponse.Data = make(map[int][]byte, 0)
            plcResponse.Message = ""
            for _, addr := range dev.Address {
                result, err := ReadHoldingRegister(conn, addr)
            for _, addr := range dev.Details {
                result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
                if err != nil {
                    logger.Warn("plc device Read Holding Register error, %s", err.Error())
                    plcResponse.Message = err.Error()
                } else {
                    plcResponse.Data[addr] = result
                    plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{
                        StartAddress: addr.StartAddress,
                        Length:       addr.Length,
                        Type:         addr.Type,
                        FieldName:    addr.FieldName,
                        Data:         result,
                    })
                }
            }
@@ -106,7 +135,7 @@
            }
            // 间隔时间
            time.Sleep(time.Duration(dev.Interval) * time.Second)
            time.Sleep(time.Duration(dev.Frequency) * time.Second)
        }
    }
}
collector/device.go
New file
@@ -0,0 +1,52 @@
package collector
import (
    "encoding/json"
    "plc-recorder/config"
    "plc-recorder/logger"
    "plc-recorder/msg"
    "plc-recorder/util"
)
func getDeviceList() ([]msg.PLCDevice, error) {
    responseBody, err := util.HttpPost(config.Options.ApsDeviceWebApi, nil)
    if err != nil {
        logger.Warn("get device list from aps error:%s", err.Error())
        return nil, err
    }
    var response msg.ApsDeviceApiResponse
    err = json.Unmarshal(responseBody, &response)
    if err != nil {
        logger.Warn("unmarshal aps response error:%s", err.Error())
        return nil, err
    }
    logger.Debug("get device list total:%d", response.Total)
    return response.Data, nil
}
// 设备信息发生改变时更新采集的任务, 删除, 修改, 或者重新启动
func HandleDeviceUpdate(message []byte) error {
    var device msg.PLCDevice
    err := json.Unmarshal(message, &device)
    if err != nil {
        logger.Error("unmarshal device update msg error:%s", err.Error())
        return err
    }
    if task, ok := mapTask.Load(device.DeviceID); ok {
        // 存在的任务, 先停止掉, 然后重新开启一个
        task.(collectorProc).cancel()
    }
    // 判断是否是重新启动的状态, 启动一个新的任务
    if device.Status == 0 {
        addTask(&device)
    }
    return nil
}
collector/plc4x.go
@@ -35,8 +35,45 @@
    return connectionResult.GetConnection(), nil
}
func ReadHoldingRegister(connection plc4go.PlcConnection, address int) ([]byte, error) {
    tagAddress := fmt.Sprintf("holding-register:%d:DINT", address)
func ReadHoldingRegister(connection plc4go.PlcConnection, address, length int) ([]byte, error) {
    if length > 1 {
        return ReadHoldingRegisterList(connection, address, length)
    }
    return ReadHoldingRegisterSingle(connection, address)
}
func ReadHoldingRegisterSingle(connection plc4go.PlcConnection, address int) ([]byte, error) {
    tagAddress := fmt.Sprintf("holding-register:%d:UINT", address)
    // 读模式
    readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build()
    if err != nil {
        fmt.Printf("preparing read-request:%s\n", err.Error())
        return nil, err
    }
    // 执行
    readResult := <-readRequest.Execute()
    if err := readResult.GetErr(); err != nil {
        fmt.Printf("execting read-request:%s\n", err.Error())
        return nil, err
    }
    // 判断响应码是否正确
    if readResult.GetResponse().GetResponseCode("tag") != apiModel.PlcResponseCode_OK {
        fmt.Printf("error an non-ok return code: %s", readResult.GetResponse().GetResponseCode("tag").GetName())
        return nil, nil
    }
    value := readResult.GetResponse().GetValue("tag")
    return value.GetRaw(), err
}
func ReadHoldingRegisterList(connection plc4go.PlcConnection, address, length int) ([]byte, error) {
    tagAddress := fmt.Sprintf("holding-register:%d:UINT[%d]", address, length)
    // 读模式
    readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build()
config/config.go
@@ -10,8 +10,11 @@
type Config struct {
    NsqServer    string `json:"nsq_server"`     // nsq TCP服务端地址
    PLCDataTopic string `json:"plc_data_topic"` // 订单上报的topic
    PLCSetTopic  string `json:"plc_set_topic"`  // 订单上报的topic
    PubPLCDataTopic  string `json:"plc_data_topic"`      // 发布plc数据的topic
    PLCSetTopic      string `json:"plc_set_topic"`       // 接收plc配置数据的topic
    SubDeviceTopic   string `json:"sub_device_topic"`    // 接收设备变更通知的topic
    ApsDeviceWebApi  string `json:"aps_device_webapi"`   // 获取设备列表的接口, aps 提供, http接口
    ApsPLCDataWebApi string `json:"aps_plc_data_webapi"` // 上传给aps plc数据的接口地址. aps 提供, http接口
}
const configPath = "config.json"
@@ -20,7 +23,10 @@
func DefaultConfig() {
    Options.NsqServer = "fai365.com:4150"
    Options.PLCDataTopic = "aps.factory.plc.livedata"
    Options.PubPLCDataTopic = "aps.factory.plc.livedata"
    Options.PLCSetTopic = ""
    Options.ApsDeviceWebApi = ""
    Options.ApsPLCDataWebApi = ""
}
func Load() {
main.go
@@ -30,6 +30,9 @@
    // 初始化nsq
    nsqclient.InitNsqProducer()
    // 订阅设备变更
    nsqclient.InitNsqConsumer(config.Options.SubDeviceTopic, "sensor01", collector.HandleDeviceUpdate)
    collector.InitTask()
    select {}
msg/msg.go
@@ -1,18 +1,46 @@
package msg
type PLCDevice struct {
    Id       string
    Name     string
    Ip       string
    Address  []int // 数据地址
    Interval int   // 采集的时间间隔. 秒
    DeviceID   string        `json:"deviceId"`
    DeviceName string        `json:"deviceName"`
    DeviceIP   string        `json:"deviceIp"`
    Brand      string        `json:"brand"`
    Method     string        `json:"method"`
    PortName   string        `json:"portName"`
    Frequency  int           `json:"frequency"` // 数据更新频率 0-实时更新 1-1次/秒"
    Status     int           `json:"status"`
    Details    []*PLCAddress `gorm:"-" json:"Details"`
}
type PLCResponse struct {
    Id      string
    Name    string
    Ip      string
    Online  bool
    Message string
    Data    map[int][]byte
    DeviceID   string    `json:"deviceId"`
    DeviceName string    `json:"deviceName"`
    DeviceIP   string    `json:"deviceIp"`
    Online     bool      `json:"online"`
    Message    string    `json:"message"`
    PLCData    []PLCData `json:"plcData"`
}
type PLCAddress struct {
    StartAddress int    `json:"startAddress"` // 数据起始地址
    Length       int    `json:"length"`       // 数据长度
    Type         string `json:"type"`         // 数据类型
    FieldName    string `json:"fieldName"`    // 对应系统字段
}
type PLCData struct {
    StartAddress int    `json:"startAddress"` // 数据起始地址
    Length       int    `json:"length"`       // 数据长度
    Type         string `json:"type"`         // 数据类型
    FieldName    string `json:"fieldName"`    // 对应系统字段
    Data         []byte // 从plc读取的原始数据
}
type ApsDeviceApiResponse struct {
    Code     int         `json:"code"`
    Data     []PLCDevice `json:"data"`
    Msg      string      `json:"msg"`
    Page     int         `json:"page"`
    PageSize int         `json:"pageSize"`
    Total    int         `json:"total"`
}
msg/send.go
@@ -2,6 +2,8 @@
import (
    "encoding/json"
    "plc-recorder/util"
    "plc-recorder/config"
    "plc-recorder/logger"
    "plc-recorder/nsqclient"
@@ -11,5 +13,14 @@
    logger.Debug("plc live data: %+v", response)
    b, _ := json.Marshal(response)
    nsqclient.Produce(config.Options.PLCDataTopic, b)
    // nsq 发布
    nsqclient.Produce(config.Options.PubPLCDataTopic, b)
    // aps 发布
    if config.Options.ApsPLCDataWebApi != "" {
        _, err := util.HttpPost(config.Options.ApsPLCDataWebApi, b)
        if err != nil {
            logger.Warn(err.Error())
        }
    }
}
nsqclient/client.go
@@ -45,6 +45,5 @@
        if err := c.Run(config.Options.NsqServer, 1); err != nil {
            logger.Error("运行nsq消费客户端失败, %s", err.Error())
        }
    }
}
nsqclient/httpClient.go
File was deleted
util/httpClient.go
New file
@@ -0,0 +1,25 @@
package util
import (
    "bytes"
    "io/ioutil"
    "net/http"
)
func HttpPost(uri string, param []byte) ([]byte, error) {
    request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(param))
    if err != nil {
        return nil, err
    }
    request.Header.Set("Content-Type", "application/json;charset=UTF-8")
    response, err := http.DefaultClient.Do(request)
    if err != nil {
        return nil, err
    }
    defer response.Body.Close()
    return ioutil.ReadAll(response.Body)
}