New file |
| | |
| | | plc-recorder.exe |
New file |
| | |
| | | <?xml version="1.0" encoding="UTF-8"?> |
| | | <project version="4"> |
| | | <component name="VcsDirectoryMappings"> |
| | | <mapping directory="$PROJECT_DIR$" vcs="Git" /> |
| | | </component> |
| | | </project> |
| | |
| | | |
| | | "plc-recorder/logger" |
| | | "plc-recorder/msg" |
| | | |
| | | plc4go "github.com/apache/plc4x/plc4go/pkg/api" |
| | | ) |
| | | |
| | | var mapTask sync.Map |
| | |
| | | func InitTask() { |
| | | device := msg.PLCDevice{ |
| | | Id: "0", |
| | | Ip: "192.168.20.188", |
| | | Ip: "192.168.1.188", |
| | | Address: []int{17021}, |
| | | Interval: 1, |
| | | } |
| | |
| | | |
| | | mapTask.Store(device.Id, &proc) |
| | | |
| | | go runCollectionTask(ctx, &device) |
| | | go connectingDevice(ctx, &device) |
| | | } |
| | | |
| | | func runCollectionTask(ctx context.Context, device *msg.PLCDevice) { |
| | | // 创建modbusTCP连接, 循环查询数据并上报 |
| | | func connectingDevice(ctx context.Context, dev *msg.PLCDevice) { |
| | | plcResponse := msg.PLCResponse{ |
| | | Id: dev.Id, |
| | | Name: dev.Name, |
| | | Ip: dev.Ip, |
| | | Online: false, |
| | | } |
| | | |
| | | for { |
| | | err, plcConnection := NewModbusConnection(device.Ip) |
| | | if err != nil { |
| | | logger.Warn("Error connecting to PLC: %s, ip:", device.Name, device.Ip) |
| | | |
| | | time.Sleep(30 * time.Second) |
| | | continue |
| | | } |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | logger.Warn("Plc device %s, ip: %s, end of collection.", device.Name, device.Ip) |
| | | plcConnection.Close() |
| | | return |
| | | default: |
| | | |
| | | // 根据设置的地址查询数据,上报 |
| | | // 暂停 |
| | | time.Sleep(time.Duration(device.Interval) * time.Second) |
| | | select { |
| | | case <-ctx.Done(): |
| | | logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip) |
| | | return |
| | | default: |
| | | plcConnection, err := NewModbusConnection(dev.Ip) |
| | | if err != nil { |
| | | logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip) |
| | | plcResponse.Online = false |
| | | msg.SendDeviceLiveData(&plcResponse) |
| | | time.Sleep(30 * time.Second) |
| | | } else { |
| | | // 连接成功后, 开始采集数据, 会判断连接是否有效, 断开后会采集任务会退出, 继续重新尝试连接设备 |
| | | runCollectionTask(ctx, dev, plcConnection) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) { |
| | | // 创建modbusTCP连接, 循环查询数据并上报 |
| | | plcResponse := msg.PLCResponse{ |
| | | Id: dev.Id, |
| | | Name: dev.Name, |
| | | Ip: dev.Ip, |
| | | Online: true, |
| | | Data: nil, |
| | | } |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip) |
| | | conn.Close() |
| | | return |
| | | default: |
| | | if !conn.IsConnected() { |
| | | logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip) |
| | | return |
| | | } |
| | | |
| | | // 根据设置的地址查询数据,上报 |
| | | plcResponse.Data = make(map[int][]byte, 0) |
| | | for _, addr := range dev.Address { |
| | | result, err := ReadHoldingRegister(conn, addr) |
| | | if err != nil { |
| | | logger.Warn("plc device Read Holding Register error, %s", err.Error()) |
| | | } else { |
| | | plcResponse.Data[addr] = result |
| | | } |
| | | } |
| | | |
| | | msg.SendDeviceLiveData(&plcResponse) |
| | | // 间隔时间 |
| | | time.Sleep(time.Duration(dev.Interval) * time.Second) |
| | | } |
| | | } |
| | | } |
| | |
| | | "github.com/apache/plc4x/plc4go/pkg/api/transports" |
| | | ) |
| | | |
| | | func NewModbusConnection(ipAddr string) (error, plc4go.PlcConnection) { |
| | | func NewModbusConnection(ipAddr string) (plc4go.PlcConnection, error) { |
| | | // 创建驱动管理器 |
| | | driverManager := plc4go.NewPlcDriverManager() |
| | | |
| | |
| | | |
| | | // 判断是否连接成功 |
| | | if err := connectionResult.GetErr(); err != nil { |
| | | return err, nil |
| | | return nil, err |
| | | } |
| | | |
| | | return nil, connectionResult.GetConnection() |
| | | return connectionResult.GetConnection(), nil |
| | | } |
| | | |
| | | func ReadHoldingRegister(connection plc4go.PlcConnection, address int) (error, []byte) { |
| | | func ReadHoldingRegister(connection plc4go.PlcConnection, address int) ([]byte, error) { |
| | | tagAddress := fmt.Sprintf("holding-register:%d:DINT", address) |
| | | |
| | | // 读模式 |
| | | readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build() |
| | | if err != nil { |
| | | fmt.Printf("Error preparing read-request:%s\n", err.Error()) |
| | | return err, nil |
| | | return nil, err |
| | | } |
| | | |
| | | // 执行 |
| | | readResult := <-readRequest.Execute() |
| | | if err := readResult.GetErr(); err != nil { |
| | | fmt.Printf("Error execting read-request:%s\n", err.Error()) |
| | | return err, nil |
| | | return nil, err |
| | | } |
| | | |
| | | // 判断响应码是否正确 |
| | | if readResult.GetResponse().GetResponseCode("tag") != apiModel.PlcResponseCode_OK { |
| | | fmt.Printf("error an non-ok return code: %s", readResult.GetResponse().GetResponseCode("tag").GetName()) |
| | | return err, nil |
| | | return nil, err |
| | | } |
| | | |
| | | value := readResult.GetResponse().GetValue("tag") |
| | | |
| | | return nil, value.GetRaw() |
| | | return value.GetRaw(), err |
| | | |
| | | } |
New file |
| | |
| | | { |
| | | "nsq_server": "fai365.com:4150", |
| | | "plc_data_topic": "aps.factory.plc.livedata", |
| | | "plc_set_topic": "" |
| | | } |
| | |
| | | package main |
| | | |
| | | import ( |
| | | "runtime" |
| | | |
| | | "plc-recorder/collector" |
| | | "plc-recorder/config" |
| | | "plc-recorder/logger" |
| | | "plc-recorder/nsqclient" |
| | | |
| | | "github.com/rs/zerolog" |
| | | ) |
| | | |
| | | var logFilePath = "./log/plc-recorder.log" |
| | | |
| | | func main() { |
| | | // 初始化配置 |
| | |
| | | zerolog.SetGlobalLevel(zerolog.FatalLevel) |
| | | |
| | | // 初始化日志 |
| | | logger.InitLog("./log/plc-recorder.log", "debug", 15, false) |
| | | if runtime.GOOS == "windows" { |
| | | logFilePath = "log/plc-recorder.log" |
| | | } |
| | | logger.InitLog(logFilePath, "debug", 15, false) |
| | | logger.Info("plc-recorde start!") |
| | | |
| | | // 初始化nsq |
| | | nsqclient.InitNsqProducer() |
| | | |
| | | collector.InitTask() |
| | | |
| | | select {} |
| | | } |
| | |
| | | Address []int // 数据地址 |
| | | Interval int // 采集的时间间隔. 秒 |
| | | } |
| | | |
| | | type PLCResponse struct { |
| | | Id string |
| | | Name string |
| | | Ip string |
| | | Online bool |
| | | Data map[int][]byte |
| | | } |
New file |
| | |
| | | package msg |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "plc-recorder/config" |
| | | "plc-recorder/logger" |
| | | "plc-recorder/nsqclient" |
| | | ) |
| | | |
| | | func SendDeviceLiveData(response *PLCResponse) { |
| | | logger.Debug("plc live data: %+v", response) |
| | | b, _ := json.Marshal(response) |
| | | |
| | | nsqclient.Produce(config.Options.PLCDataTopic, b) |
| | | } |
| | |
| | | import ( |
| | | "context" |
| | | |
| | | "kingdee-dbapi/config" |
| | | "kingdee-dbapi/logger" |
| | | "plc-recorder/config" |
| | | "plc-recorder/logger" |
| | | ) |
| | | |
| | | var producerCli Producer |