From 88da1e13a073e8b5656387a246d827593fbd6163 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期五, 04 八月 2023 18:25:59 +0800 Subject: [PATCH] 添加设备查询,修改, 采集数据上报 --- collector/plc4x.go | 41 ++++++ msg/send.go | 13 ++ /dev/null | 35 ----- config/config.go | 14 + msg/msg.go | 50 ++++++- util/httpClient.go | 25 ++++ collector/device.go | 52 ++++++++ main.go | 3 collector/collector.go | 83 +++++++++---- nsqclient/client.go | 1 10 files changed, 236 insertions(+), 81 deletions(-) diff --git a/collector/collector.go b/collector/collector.go index 15c6325..6a3f3f9 100644 --- a/collector/collector.go +++ b/collector/collector.go @@ -19,41 +19,66 @@ } func InitTask() { - device := msg.PLCDevice{ - Id: "0", - Ip: "192.168.1.188", - Address: []int{17021}, - Interval: 1, + //device := msg.PLCDevice{ + // DeviceID: "0", + // DeviceName: "test", + // DeviceIP: "192.168.1.188", + // Brand: "sim", + // Method: "modbus", + // PortName: "", + // Frequency: 1, + // Details: []*msg.PLCAddress{&msg.PLCAddress{ + // StartAddress: 17021, + // Length: 1, + // Type: "int", + // FieldName: "count", + // }}, + //} + + devices, err := getDeviceList() + if err != nil { + return } + for idx, dev := range devices { + // 鍒ゆ柇璁惧鐘舵��, 濡傛灉閰嶇疆鏈紑鍚噰闆嗘暟鎹垨鍏朵粬鏂瑰紡, 鐜板湪鍋囪瀛楁status涓�0鏃朵唬琛ㄩ噰闆� + if dev.Status != 0 { + continue + } + + addTask(&devices[idx]) + } +} + +func addTask(device *msg.PLCDevice) { ctx, cancel := context.WithCancel(context.Background()) proc := collectorProc{ - device: &device, + device: device, cancel: cancel, } - mapTask.Store(device.Id, &proc) + mapTask.Store(device.DeviceID, &proc) - go connectingDevice(ctx, &device) + go connectingDevice(ctx, device) } func connectingDevice(ctx context.Context, dev *msg.PLCDevice) { plcResponse := msg.PLCResponse{ - Id: dev.Id, - Name: dev.Name, - Ip: dev.Ip, - Online: false, + DeviceID: dev.DeviceID, + DeviceName: dev.DeviceName, + DeviceIP: dev.DeviceIP, + Online: false, } for { select { case <-ctx.Done(): - logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip) + logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP) return default: - plcConnection, err := NewModbusConnection(dev.Ip) + plcConnection, err := NewModbusConnection(dev.DeviceIP) if err != nil { - logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip) + logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP) plcResponse.Online = false msg.SendDeviceLiveData(&plcResponse) time.Sleep(30 * time.Second) @@ -68,35 +93,39 @@ func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) { // 鍒涘缓modbusTCP杩炴帴, 寰幆鏌ヨ鏁版嵁骞朵笂鎶� plcResponse := msg.PLCResponse{ - Id: dev.Id, - Name: dev.Name, - Ip: dev.Ip, - Online: true, - Data: nil, + DeviceID: dev.DeviceID, + DeviceName: dev.DeviceName, + DeviceIP: dev.DeviceIP, + Online: true, } for { select { case <-ctx.Done(): - logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip) + logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP) conn.Close() return default: if !conn.IsConnected() { - logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip) + logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP) return } // 鏍规嵁璁剧疆鐨勫湴鍧�鏌ヨ鏁版嵁锛屼笂鎶� - plcResponse.Data = make(map[int][]byte, 0) plcResponse.Message = "" - for _, addr := range dev.Address { - result, err := ReadHoldingRegister(conn, addr) + for _, addr := range dev.Details { + result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length) if err != nil { logger.Warn("plc device Read Holding Register error, %s", err.Error()) plcResponse.Message = err.Error() } else { - plcResponse.Data[addr] = result + plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{ + StartAddress: addr.StartAddress, + Length: addr.Length, + Type: addr.Type, + FieldName: addr.FieldName, + Data: result, + }) } } @@ -106,7 +135,7 @@ } // 闂撮殧鏃堕棿 - time.Sleep(time.Duration(dev.Interval) * time.Second) + time.Sleep(time.Duration(dev.Frequency) * time.Second) } } } diff --git a/collector/device.go b/collector/device.go new file mode 100644 index 0000000..9812adc --- /dev/null +++ b/collector/device.go @@ -0,0 +1,52 @@ +package collector + +import ( + "encoding/json" + + "plc-recorder/config" + "plc-recorder/logger" + "plc-recorder/msg" + "plc-recorder/util" +) + +func getDeviceList() ([]msg.PLCDevice, error) { + responseBody, err := util.HttpPost(config.Options.ApsDeviceWebApi, nil) + if err != nil { + logger.Warn("get device list from aps error:%s", err.Error()) + return nil, err + } + + var response msg.ApsDeviceApiResponse + err = json.Unmarshal(responseBody, &response) + if err != nil { + logger.Warn("unmarshal aps response error:%s", err.Error()) + return nil, err + } + + logger.Debug("get device list total:%d", response.Total) + + return response.Data, nil +} + +// 璁惧淇℃伅鍙戠敓鏀瑰彉鏃舵洿鏂伴噰闆嗙殑浠诲姟, 鍒犻櫎, 淇敼, 鎴栬�呴噸鏂板惎鍔� +func HandleDeviceUpdate(message []byte) error { + var device msg.PLCDevice + + err := json.Unmarshal(message, &device) + if err != nil { + logger.Error("unmarshal device update msg error:%s", err.Error()) + return err + } + + if task, ok := mapTask.Load(device.DeviceID); ok { + // 瀛樺湪鐨勪换鍔�, 鍏堝仠姝㈡帀, 鐒跺悗閲嶆柊寮�鍚竴涓� + task.(collectorProc).cancel() + } + + // 鍒ゆ柇鏄惁鏄噸鏂板惎鍔ㄧ殑鐘舵��, 鍚姩涓�涓柊鐨勪换鍔� + if device.Status == 0 { + addTask(&device) + } + + return nil +} diff --git a/collector/plc4x.go b/collector/plc4x.go index 5e7966d..8d5cb1b 100644 --- a/collector/plc4x.go +++ b/collector/plc4x.go @@ -35,8 +35,45 @@ return connectionResult.GetConnection(), nil } -func ReadHoldingRegister(connection plc4go.PlcConnection, address int) ([]byte, error) { - tagAddress := fmt.Sprintf("holding-register:%d:DINT", address) +func ReadHoldingRegister(connection plc4go.PlcConnection, address, length int) ([]byte, error) { + if length > 1 { + return ReadHoldingRegisterList(connection, address, length) + } + + return ReadHoldingRegisterSingle(connection, address) +} + +func ReadHoldingRegisterSingle(connection plc4go.PlcConnection, address int) ([]byte, error) { + tagAddress := fmt.Sprintf("holding-register:%d:UINT", address) + + // 璇绘ā寮� + readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build() + if err != nil { + fmt.Printf("preparing read-request:%s\n", err.Error()) + return nil, err + } + + // 鎵ц + readResult := <-readRequest.Execute() + if err := readResult.GetErr(); err != nil { + fmt.Printf("execting read-request:%s\n", err.Error()) + return nil, err + } + + // 鍒ゆ柇鍝嶅簲鐮佹槸鍚︽纭� + if readResult.GetResponse().GetResponseCode("tag") != apiModel.PlcResponseCode_OK { + fmt.Printf("error an non-ok return code: %s", readResult.GetResponse().GetResponseCode("tag").GetName()) + return nil, nil + } + + value := readResult.GetResponse().GetValue("tag") + + return value.GetRaw(), err + +} + +func ReadHoldingRegisterList(connection plc4go.PlcConnection, address, length int) ([]byte, error) { + tagAddress := fmt.Sprintf("holding-register:%d:UINT[%d]", address, length) // 璇绘ā寮� readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build() diff --git a/config/config.go b/config/config.go index a2d0073..091e4f8 100644 --- a/config/config.go +++ b/config/config.go @@ -9,9 +9,12 @@ ) type Config struct { - NsqServer string `json:"nsq_server"` // nsq TCP鏈嶅姟绔湴鍧� - PLCDataTopic string `json:"plc_data_topic"` // 璁㈠崟涓婃姤鐨則opic - PLCSetTopic string `json:"plc_set_topic"` // 璁㈠崟涓婃姤鐨則opic + NsqServer string `json:"nsq_server"` // nsq TCP鏈嶅姟绔湴鍧� + PubPLCDataTopic string `json:"plc_data_topic"` // 鍙戝竷plc鏁版嵁鐨則opic + PLCSetTopic string `json:"plc_set_topic"` // 鎺ユ敹plc閰嶇疆鏁版嵁鐨則opic + SubDeviceTopic string `json:"sub_device_topic"` // 鎺ユ敹璁惧鍙樻洿閫氱煡鐨則opic + ApsDeviceWebApi string `json:"aps_device_webapi"` // 鑾峰彇璁惧鍒楄〃鐨勬帴鍙�, aps 鎻愪緵, http鎺ュ彛 + ApsPLCDataWebApi string `json:"aps_plc_data_webapi"` // 涓婁紶缁檃ps plc鏁版嵁鐨勬帴鍙e湴鍧�. aps 鎻愪緵, http鎺ュ彛 } const configPath = "config.json" @@ -20,7 +23,10 @@ func DefaultConfig() { Options.NsqServer = "fai365.com:4150" - Options.PLCDataTopic = "aps.factory.plc.livedata" + Options.PubPLCDataTopic = "aps.factory.plc.livedata" + Options.PLCSetTopic = "" + Options.ApsDeviceWebApi = "" + Options.ApsPLCDataWebApi = "" } func Load() { diff --git a/main.go b/main.go index f77fc7a..fd296d0 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,9 @@ // 鍒濆鍖杗sq nsqclient.InitNsqProducer() + // 璁㈤槄璁惧鍙樻洿 + nsqclient.InitNsqConsumer(config.Options.SubDeviceTopic, "sensor01", collector.HandleDeviceUpdate) + collector.InitTask() select {} diff --git a/msg/msg.go b/msg/msg.go index f1488b9..dce1c24 100644 --- a/msg/msg.go +++ b/msg/msg.go @@ -1,18 +1,46 @@ package msg type PLCDevice struct { - Id string - Name string - Ip string - Address []int // 鏁版嵁鍦板潃 - Interval int // 閲囬泦鐨勬椂闂撮棿闅�. 绉� + DeviceID string `json:"deviceId"` + DeviceName string `json:"deviceName"` + DeviceIP string `json:"deviceIp"` + Brand string `json:"brand"` + Method string `json:"method"` + PortName string `json:"portName"` + Frequency int `json:"frequency"` // 鏁版嵁鏇存柊棰戠巼 0-瀹炴椂鏇存柊 1-1娆�/绉�" + Status int `json:"status"` + Details []*PLCAddress `gorm:"-" json:"Details"` } type PLCResponse struct { - Id string - Name string - Ip string - Online bool - Message string - Data map[int][]byte + DeviceID string `json:"deviceId"` + DeviceName string `json:"deviceName"` + DeviceIP string `json:"deviceIp"` + Online bool `json:"online"` + Message string `json:"message"` + PLCData []PLCData `json:"plcData"` +} + +type PLCAddress struct { + StartAddress int `json:"startAddress"` // 鏁版嵁璧峰鍦板潃 + Length int `json:"length"` // 鏁版嵁闀垮害 + Type string `json:"type"` // 鏁版嵁绫诲瀷 + FieldName string `json:"fieldName"` // 瀵瑰簲绯荤粺瀛楁 +} + +type PLCData struct { + StartAddress int `json:"startAddress"` // 鏁版嵁璧峰鍦板潃 + Length int `json:"length"` // 鏁版嵁闀垮害 + Type string `json:"type"` // 鏁版嵁绫诲瀷 + FieldName string `json:"fieldName"` // 瀵瑰簲绯荤粺瀛楁 + Data []byte // 浠巔lc璇诲彇鐨勫師濮嬫暟鎹� +} + +type ApsDeviceApiResponse struct { + Code int `json:"code"` + Data []PLCDevice `json:"data"` + Msg string `json:"msg"` + Page int `json:"page"` + PageSize int `json:"pageSize"` + Total int `json:"total"` } diff --git a/msg/send.go b/msg/send.go index e7036f1..334d112 100644 --- a/msg/send.go +++ b/msg/send.go @@ -2,6 +2,8 @@ import ( "encoding/json" + "plc-recorder/util" + "plc-recorder/config" "plc-recorder/logger" "plc-recorder/nsqclient" @@ -11,5 +13,14 @@ logger.Debug("plc live data: %+v", response) b, _ := json.Marshal(response) - nsqclient.Produce(config.Options.PLCDataTopic, b) + // nsq 鍙戝竷 + nsqclient.Produce(config.Options.PubPLCDataTopic, b) + + // aps 鍙戝竷 + if config.Options.ApsPLCDataWebApi != "" { + _, err := util.HttpPost(config.Options.ApsPLCDataWebApi, b) + if err != nil { + logger.Warn(err.Error()) + } + } } diff --git a/nsqclient/client.go b/nsqclient/client.go index 52eb3b3..e4f6905 100644 --- a/nsqclient/client.go +++ b/nsqclient/client.go @@ -45,6 +45,5 @@ if err := c.Run(config.Options.NsqServer, 1); err != nil { logger.Error("杩愯nsq娑堣垂瀹㈡埛绔け璐�, %s", err.Error()) } - } } diff --git a/nsqclient/httpClient.go b/nsqclient/httpClient.go deleted file mode 100644 index d400a58..0000000 --- a/nsqclient/httpClient.go +++ /dev/null @@ -1,35 +0,0 @@ -package nsqclient - -import ( - "bytes" - "fmt" - "io/ioutil" - "net/http" -) - -const nsqWebApi = "http://121.31.232.83:9080/api/nsq/pub" - -// http鎺ュ彛 http://121.31.232.83:9080/api/nsq/pub?topic=your_topic -func HttpPost(topic string, data []byte) bool { - uri := nsqWebApi + "?topic=" + topic - - request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(data)) - if err != nil { - return false - } - - request.Header.Set("Content-Type", "application/json;charset=UTF-8") - - response, err := http.DefaultClient.Do(request) - if err != nil { - fmt.Printf(err.Error()) - return false - } - defer response.Body.Close() - - body, _ := ioutil.ReadAll(response.Body) - - fmt.Println("response:", string(body)) - - return true -} diff --git a/util/httpClient.go b/util/httpClient.go new file mode 100644 index 0000000..5fd2739 --- /dev/null +++ b/util/httpClient.go @@ -0,0 +1,25 @@ +package util + +import ( + "bytes" + "io/ioutil" + "net/http" +) + +func HttpPost(uri string, param []byte) ([]byte, error) { + request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(param)) + if err != nil { + return nil, err + } + + request.Header.Set("Content-Type", "application/json;charset=UTF-8") + + response, err := http.DefaultClient.Do(request) + if err != nil { + return nil, err + } + + defer response.Body.Close() + + return ioutil.ReadAll(response.Body) +} -- Gitblit v1.8.0