zhangzengfei
2023-08-01 e8c97695dd6930465e66b8fac819301f03624512
完善连接过程. 添加重连
4个文件已添加
5个文件已修改
156 ■■■■ 已修改文件
.gitignore 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.idea/vcs.xml 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collector/collector.go 89 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
collector/plc4x.go 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.json 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
msg/msg.go 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
msg/send.go 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/client.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
New file
@@ -0,0 +1 @@
plc-recorder.exe
.idea/vcs.xml
New file
@@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
  <component name="VcsDirectoryMappings">
    <mapping directory="$PROJECT_DIR$" vcs="Git" />
  </component>
</project>
collector/collector.go
@@ -7,6 +7,8 @@
    "plc-recorder/logger"
    "plc-recorder/msg"
    plc4go "github.com/apache/plc4x/plc4go/pkg/api"
)
var mapTask sync.Map
@@ -19,7 +21,7 @@
func InitTask() {
    device := msg.PLCDevice{
        Id:       "0",
        Ip:       "192.168.20.188",
        Ip:       "192.168.1.188",
        Address:  []int{17021},
        Interval: 1,
    }
@@ -32,32 +34,73 @@
    mapTask.Store(device.Id, &proc)
    go runCollectionTask(ctx, &device)
    go connectingDevice(ctx, &device)
}
func runCollectionTask(ctx context.Context, device *msg.PLCDevice) {
    // 创建modbusTCP连接, 循环查询数据并上报
func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
    plcResponse := msg.PLCResponse{
        Id:     dev.Id,
        Name:   dev.Name,
        Ip:     dev.Ip,
        Online: false,
    }
    for {
        err, plcConnection := NewModbusConnection(device.Ip)
        if err != nil {
            logger.Warn("Error connecting to PLC: %s, ip:", device.Name, device.Ip)
            time.Sleep(30 * time.Second)
            continue
        }
        for {
            select {
            case <-ctx.Done():
                logger.Warn("Plc device %s, ip: %s, end of collection.", device.Name, device.Ip)
                plcConnection.Close()
                return
            default:
                // 根据设置的地址查询数据,上报
                // 暂停
                time.Sleep(time.Duration(device.Interval) * time.Second)
        select {
        case <-ctx.Done():
            logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip)
            return
        default:
            plcConnection, err := NewModbusConnection(dev.Ip)
            if err != nil {
                logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip)
                plcResponse.Online = false
                msg.SendDeviceLiveData(&plcResponse)
                time.Sleep(30 * time.Second)
            } else {
                // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备
                runCollectionTask(ctx, dev, plcConnection)
            }
        }
    }
}
func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
    // 创建modbusTCP连接, 循环查询数据并上报
    plcResponse := msg.PLCResponse{
        Id:     dev.Id,
        Name:   dev.Name,
        Ip:     dev.Ip,
        Online: true,
        Data:   nil,
    }
    for {
        select {
        case <-ctx.Done():
            logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip)
            conn.Close()
            return
        default:
            if !conn.IsConnected() {
                logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip)
                return
            }
            // 根据设置的地址查询数据,上报
            plcResponse.Data = make(map[int][]byte, 0)
            for _, addr := range dev.Address {
                result, err := ReadHoldingRegister(conn, addr)
                if err != nil {
                    logger.Warn("plc device Read Holding Register error, %s", err.Error())
                } else {
                    plcResponse.Data[addr] = result
                }
            }
            msg.SendDeviceLiveData(&plcResponse)
            // 间隔时间
            time.Sleep(time.Duration(dev.Interval) * time.Second)
        }
    }
}
collector/plc4x.go
@@ -9,7 +9,7 @@
    "github.com/apache/plc4x/plc4go/pkg/api/transports"
)
func NewModbusConnection(ipAddr string) (error, plc4go.PlcConnection) {
func NewModbusConnection(ipAddr string) (plc4go.PlcConnection, error) {
    // 创建驱动管理器
    driverManager := plc4go.NewPlcDriverManager()
@@ -29,37 +29,37 @@
    // 判断是否连接成功
    if err := connectionResult.GetErr(); err != nil {
        return err, nil
        return nil, err
    }
    return nil, connectionResult.GetConnection()
    return connectionResult.GetConnection(), nil
}
func ReadHoldingRegister(connection plc4go.PlcConnection, address int) (error, []byte) {
func ReadHoldingRegister(connection plc4go.PlcConnection, address int) ([]byte, error) {
    tagAddress := fmt.Sprintf("holding-register:%d:DINT", address)
    // 读模式
    readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build()
    if err != nil {
        fmt.Printf("Error preparing read-request:%s\n", err.Error())
        return err, nil
        return nil, err
    }
    // 执行
    readResult := <-readRequest.Execute()
    if err := readResult.GetErr(); err != nil {
        fmt.Printf("Error execting read-request:%s\n", err.Error())
        return err, nil
        return nil, err
    }
    // 判断响应码是否正确
    if readResult.GetResponse().GetResponseCode("tag") != apiModel.PlcResponseCode_OK {
        fmt.Printf("error an non-ok return code: %s", readResult.GetResponse().GetResponseCode("tag").GetName())
        return err, nil
        return nil, err
    }
    value := readResult.GetResponse().GetValue("tag")
    return nil, value.GetRaw()
    return value.GetRaw(), err
}
config.json
New file
@@ -0,0 +1,5 @@
{
    "nsq_server": "fai365.com:4150",
    "plc_data_topic": "aps.factory.plc.livedata",
    "plc_set_topic": ""
}
main.go
@@ -1,12 +1,17 @@
package main
import (
    "runtime"
    "plc-recorder/collector"
    "plc-recorder/config"
    "plc-recorder/logger"
    "plc-recorder/nsqclient"
    "github.com/rs/zerolog"
)
var logFilePath = "./log/plc-recorder.log"
func main() {
    // 初始化配置
@@ -16,11 +21,16 @@
    zerolog.SetGlobalLevel(zerolog.FatalLevel)
    // 初始化日志
    logger.InitLog("./log/plc-recorder.log", "debug", 15, false)
    if runtime.GOOS == "windows" {
        logFilePath = "log/plc-recorder.log"
    }
    logger.InitLog(logFilePath, "debug", 15, false)
    logger.Info("plc-recorde start!")
    // 初始化nsq
    nsqclient.InitNsqProducer()
    collector.InitTask()
    select {}
}
msg/msg.go
@@ -7,3 +7,11 @@
    Address  []int // 数据地址
    Interval int   // 采集的时间间隔. 秒
}
type PLCResponse struct {
    Id     string
    Name   string
    Ip     string
    Online bool
    Data   map[int][]byte
}
msg/send.go
New file
@@ -0,0 +1,15 @@
package msg
import (
    "encoding/json"
    "plc-recorder/config"
    "plc-recorder/logger"
    "plc-recorder/nsqclient"
)
func SendDeviceLiveData(response *PLCResponse) {
    logger.Debug("plc live data: %+v", response)
    b, _ := json.Marshal(response)
    nsqclient.Produce(config.Options.PLCDataTopic, b)
}
nsqclient/client.go
@@ -3,8 +3,8 @@
import (
    "context"
    "kingdee-dbapi/config"
    "kingdee-dbapi/logger"
    "plc-recorder/config"
    "plc-recorder/logger"
)
var producerCli Producer