| | |
| | | } |
| | | |
| | | func InitTask() { |
| | | device := msg.PLCDevice{ |
| | | Id: "0", |
| | | Ip: "192.168.1.188", |
| | | Address: []int{17021}, |
| | | Interval: 1, |
| | | //device := msg.PLCDevice{ |
| | | // DeviceID: "0", |
| | | // DeviceName: "test", |
| | | // DeviceIP: "192.168.1.188", |
| | | // Brand: "sim", |
| | | // Method: "modbus", |
| | | // PortName: "", |
| | | // Frequency: 1, |
| | | // Details: []*msg.PLCAddress{&msg.PLCAddress{ |
| | | // StartAddress: 17021, |
| | | // Length: 1, |
| | | // Type: "int", |
| | | // FieldName: "count", |
| | | // }}, |
| | | //} |
| | | |
| | | devices, err := getDeviceList() |
| | | if err != nil { |
| | | return |
| | | } |
| | | |
| | | for idx, dev := range devices { |
| | | // 判断设备状态, 如果配置未开启采集数据或其他方式, 现在假设字段status为0时代表采集 |
| | | if dev.Status != 0 { |
| | | continue |
| | | } |
| | | |
| | | addTask(&devices[idx]) |
| | | } |
| | | } |
| | | |
| | | func addTask(device *msg.PLCDevice) { |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | proc := collectorProc{ |
| | | device: &device, |
| | | device: device, |
| | | cancel: cancel, |
| | | } |
| | | |
| | | mapTask.Store(device.Id, &proc) |
| | | mapTask.Store(device.DeviceID, &proc) |
| | | |
| | | go connectingDevice(ctx, &device) |
| | | go connectingDevice(ctx, device) |
| | | } |
| | | |
| | | func connectingDevice(ctx context.Context, dev *msg.PLCDevice) { |
| | | plcResponse := msg.PLCResponse{ |
| | | Id: dev.Id, |
| | | Name: dev.Name, |
| | | Ip: dev.Ip, |
| | | Online: false, |
| | | DeviceID: dev.DeviceID, |
| | | DeviceName: dev.DeviceName, |
| | | DeviceIP: dev.DeviceIP, |
| | | Online: false, |
| | | } |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip) |
| | | logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP) |
| | | return |
| | | default: |
| | | plcConnection, err := NewModbusConnection(dev.Ip) |
| | | plcConnection, err := NewModbusConnection(dev.DeviceIP) |
| | | if err != nil { |
| | | logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip) |
| | | logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP) |
| | | plcResponse.Online = false |
| | | msg.SendDeviceLiveData(&plcResponse) |
| | | time.Sleep(30 * time.Second) |
| | |
| | | func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) { |
| | | // 创建modbusTCP连接, 循环查询数据并上报 |
| | | plcResponse := msg.PLCResponse{ |
| | | Id: dev.Id, |
| | | Name: dev.Name, |
| | | Ip: dev.Ip, |
| | | Online: true, |
| | | Data: nil, |
| | | DeviceID: dev.DeviceID, |
| | | DeviceName: dev.DeviceName, |
| | | DeviceIP: dev.DeviceIP, |
| | | Online: true, |
| | | } |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip) |
| | | logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP) |
| | | conn.Close() |
| | | return |
| | | default: |
| | | if !conn.IsConnected() { |
| | | logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip) |
| | | logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP) |
| | | return |
| | | } |
| | | |
| | | // 根据设置的地址查询数据,上报 |
| | | plcResponse.Data = make(map[int][]byte, 0) |
| | | plcResponse.Message = "" |
| | | for _, addr := range dev.Address { |
| | | result, err := ReadHoldingRegister(conn, addr) |
| | | for _, addr := range dev.Details { |
| | | result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length) |
| | | if err != nil { |
| | | logger.Warn("plc device Read Holding Register error, %s", err.Error()) |
| | | plcResponse.Message = err.Error() |
| | | } else { |
| | | plcResponse.Data[addr] = result |
| | | plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{ |
| | | StartAddress: addr.StartAddress, |
| | | Length: addr.Length, |
| | | Type: addr.Type, |
| | | FieldName: addr.FieldName, |
| | | Data: result, |
| | | }) |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | // 间隔时间 |
| | | time.Sleep(time.Duration(dev.Interval) * time.Second) |
| | | time.Sleep(time.Duration(dev.Frequency) * time.Second) |
| | | } |
| | | } |
| | | } |
New file |
| | |
| | | package collector |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | |
| | | "plc-recorder/config" |
| | | "plc-recorder/logger" |
| | | "plc-recorder/msg" |
| | | "plc-recorder/util" |
| | | ) |
| | | |
| | | func getDeviceList() ([]msg.PLCDevice, error) { |
| | | responseBody, err := util.HttpPost(config.Options.ApsDeviceWebApi, nil) |
| | | if err != nil { |
| | | logger.Warn("get device list from aps error:%s", err.Error()) |
| | | return nil, err |
| | | } |
| | | |
| | | var response msg.ApsDeviceApiResponse |
| | | err = json.Unmarshal(responseBody, &response) |
| | | if err != nil { |
| | | logger.Warn("unmarshal aps response error:%s", err.Error()) |
| | | return nil, err |
| | | } |
| | | |
| | | logger.Debug("get device list total:%d", response.Total) |
| | | |
| | | return response.Data, nil |
| | | } |
| | | |
| | | // 设备信息发生改变时更新采集的任务, 删除, 修改, 或者重新启动 |
| | | func HandleDeviceUpdate(message []byte) error { |
| | | var device msg.PLCDevice |
| | | |
| | | err := json.Unmarshal(message, &device) |
| | | if err != nil { |
| | | logger.Error("unmarshal device update msg error:%s", err.Error()) |
| | | return err |
| | | } |
| | | |
| | | if task, ok := mapTask.Load(device.DeviceID); ok { |
| | | // 存在的任务, 先停止掉, 然后重新开启一个 |
| | | task.(collectorProc).cancel() |
| | | } |
| | | |
| | | // 判断是否是重新启动的状态, 启动一个新的任务 |
| | | if device.Status == 0 { |
| | | addTask(&device) |
| | | } |
| | | |
| | | return nil |
| | | } |
| | |
| | | return connectionResult.GetConnection(), nil |
| | | } |
| | | |
| | | func ReadHoldingRegister(connection plc4go.PlcConnection, address int) ([]byte, error) { |
| | | tagAddress := fmt.Sprintf("holding-register:%d:DINT", address) |
| | | func ReadHoldingRegister(connection plc4go.PlcConnection, address, length int) ([]byte, error) { |
| | | if length > 1 { |
| | | return ReadHoldingRegisterList(connection, address, length) |
| | | } |
| | | |
| | | return ReadHoldingRegisterSingle(connection, address) |
| | | } |
| | | |
| | | func ReadHoldingRegisterSingle(connection plc4go.PlcConnection, address int) ([]byte, error) { |
| | | tagAddress := fmt.Sprintf("holding-register:%d:UINT", address) |
| | | |
| | | // 读模式 |
| | | readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build() |
| | | if err != nil { |
| | | fmt.Printf("preparing read-request:%s\n", err.Error()) |
| | | return nil, err |
| | | } |
| | | |
| | | // 执行 |
| | | readResult := <-readRequest.Execute() |
| | | if err := readResult.GetErr(); err != nil { |
| | | fmt.Printf("execting read-request:%s\n", err.Error()) |
| | | return nil, err |
| | | } |
| | | |
| | | // 判断响应码是否正确 |
| | | if readResult.GetResponse().GetResponseCode("tag") != apiModel.PlcResponseCode_OK { |
| | | fmt.Printf("error an non-ok return code: %s", readResult.GetResponse().GetResponseCode("tag").GetName()) |
| | | return nil, nil |
| | | } |
| | | |
| | | value := readResult.GetResponse().GetValue("tag") |
| | | |
| | | return value.GetRaw(), err |
| | | |
| | | } |
| | | |
| | | func ReadHoldingRegisterList(connection plc4go.PlcConnection, address, length int) ([]byte, error) { |
| | | tagAddress := fmt.Sprintf("holding-register:%d:UINT[%d]", address, length) |
| | | |
| | | // 读模式 |
| | | readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build() |
| | |
| | | ) |
| | | |
| | | type Config struct { |
| | | NsqServer string `json:"nsq_server"` // nsq TCP服务端地址 |
| | | PLCDataTopic string `json:"plc_data_topic"` // 订单上报的topic |
| | | PLCSetTopic string `json:"plc_set_topic"` // 订单上报的topic |
| | | NsqServer string `json:"nsq_server"` // nsq TCP服务端地址 |
| | | PubPLCDataTopic string `json:"plc_data_topic"` // 发布plc数据的topic |
| | | PLCSetTopic string `json:"plc_set_topic"` // 接收plc配置数据的topic |
| | | SubDeviceTopic string `json:"sub_device_topic"` // 接收设备变更通知的topic |
| | | ApsDeviceWebApi string `json:"aps_device_webapi"` // 获取设备列表的接口, aps 提供, http接口 |
| | | ApsPLCDataWebApi string `json:"aps_plc_data_webapi"` // 上传给aps plc数据的接口地址. aps 提供, http接口 |
| | | } |
| | | |
| | | const configPath = "config.json" |
| | |
| | | |
| | | func DefaultConfig() { |
| | | Options.NsqServer = "fai365.com:4150" |
| | | Options.PLCDataTopic = "aps.factory.plc.livedata" |
| | | Options.PubPLCDataTopic = "aps.factory.plc.livedata" |
| | | Options.PLCSetTopic = "" |
| | | Options.ApsDeviceWebApi = "" |
| | | Options.ApsPLCDataWebApi = "" |
| | | } |
| | | |
| | | func Load() { |
| | |
| | | // 初始化nsq |
| | | nsqclient.InitNsqProducer() |
| | | |
| | | // 订阅设备变更 |
| | | nsqclient.InitNsqConsumer(config.Options.SubDeviceTopic, "sensor01", collector.HandleDeviceUpdate) |
| | | |
| | | collector.InitTask() |
| | | |
| | | select {} |
| | |
| | | package msg |
| | | |
| | | type PLCDevice struct { |
| | | Id string |
| | | Name string |
| | | Ip string |
| | | Address []int // 数据地址 |
| | | Interval int // 采集的时间间隔. 秒 |
| | | DeviceID string `json:"deviceId"` |
| | | DeviceName string `json:"deviceName"` |
| | | DeviceIP string `json:"deviceIp"` |
| | | Brand string `json:"brand"` |
| | | Method string `json:"method"` |
| | | PortName string `json:"portName"` |
| | | Frequency int `json:"frequency"` // 数据更新频率 0-实时更新 1-1次/秒" |
| | | Status int `json:"status"` |
| | | Details []*PLCAddress `gorm:"-" json:"Details"` |
| | | } |
| | | |
| | | type PLCResponse struct { |
| | | Id string |
| | | Name string |
| | | Ip string |
| | | Online bool |
| | | Message string |
| | | Data map[int][]byte |
| | | DeviceID string `json:"deviceId"` |
| | | DeviceName string `json:"deviceName"` |
| | | DeviceIP string `json:"deviceIp"` |
| | | Online bool `json:"online"` |
| | | Message string `json:"message"` |
| | | PLCData []PLCData `json:"plcData"` |
| | | } |
| | | |
| | | type PLCAddress struct { |
| | | StartAddress int `json:"startAddress"` // 数据起始地址 |
| | | Length int `json:"length"` // 数据长度 |
| | | Type string `json:"type"` // 数据类型 |
| | | FieldName string `json:"fieldName"` // 对应系统字段 |
| | | } |
| | | |
| | | type PLCData struct { |
| | | StartAddress int `json:"startAddress"` // 数据起始地址 |
| | | Length int `json:"length"` // 数据长度 |
| | | Type string `json:"type"` // 数据类型 |
| | | FieldName string `json:"fieldName"` // 对应系统字段 |
| | | Data []byte // 从plc读取的原始数据 |
| | | } |
| | | |
| | | type ApsDeviceApiResponse struct { |
| | | Code int `json:"code"` |
| | | Data []PLCDevice `json:"data"` |
| | | Msg string `json:"msg"` |
| | | Page int `json:"page"` |
| | | PageSize int `json:"pageSize"` |
| | | Total int `json:"total"` |
| | | } |
| | |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "plc-recorder/util" |
| | | |
| | | "plc-recorder/config" |
| | | "plc-recorder/logger" |
| | | "plc-recorder/nsqclient" |
| | |
| | | logger.Debug("plc live data: %+v", response) |
| | | b, _ := json.Marshal(response) |
| | | |
| | | nsqclient.Produce(config.Options.PLCDataTopic, b) |
| | | // nsq 发布 |
| | | nsqclient.Produce(config.Options.PubPLCDataTopic, b) |
| | | |
| | | // aps 发布 |
| | | if config.Options.ApsPLCDataWebApi != "" { |
| | | _, err := util.HttpPost(config.Options.ApsPLCDataWebApi, b) |
| | | if err != nil { |
| | | logger.Warn(err.Error()) |
| | | } |
| | | } |
| | | } |
| | |
| | | if err := c.Run(config.Options.NsqServer, 1); err != nil { |
| | | logger.Error("运行nsq消费客户端失败, %s", err.Error()) |
| | | } |
| | | |
| | | } |
| | | } |
New file |
| | |
| | | package util |
| | | |
| | | import ( |
| | | "bytes" |
| | | "io/ioutil" |
| | | "net/http" |
| | | ) |
| | | |
| | | func HttpPost(uri string, param []byte) ([]byte, error) { |
| | | request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(param)) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | request.Header.Set("Content-Type", "application/json;charset=UTF-8") |
| | | |
| | | response, err := http.DefaultClient.Do(request) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | defer response.Body.Close() |
| | | |
| | | return ioutil.ReadAll(response.Body) |
| | | } |