From 88da1e13a073e8b5656387a246d827593fbd6163 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 04 八月 2023 18:25:59 +0800
Subject: [PATCH] 添加设备查询,修改, 采集数据上报
---
collector/plc4x.go | 41 ++++++
msg/send.go | 13 ++
/dev/null | 35 -----
config/config.go | 14 +
msg/msg.go | 50 ++++++-
util/httpClient.go | 25 ++++
collector/device.go | 52 ++++++++
main.go | 3
collector/collector.go | 83 +++++++++----
nsqclient/client.go | 1
10 files changed, 236 insertions(+), 81 deletions(-)
diff --git a/collector/collector.go b/collector/collector.go
index 15c6325..6a3f3f9 100644
--- a/collector/collector.go
+++ b/collector/collector.go
@@ -19,41 +19,66 @@
}
func InitTask() {
- device := msg.PLCDevice{
- Id: "0",
- Ip: "192.168.1.188",
- Address: []int{17021},
- Interval: 1,
+ //device := msg.PLCDevice{
+ // DeviceID: "0",
+ // DeviceName: "test",
+ // DeviceIP: "192.168.1.188",
+ // Brand: "sim",
+ // Method: "modbus",
+ // PortName: "",
+ // Frequency: 1,
+ // Details: []*msg.PLCAddress{&msg.PLCAddress{
+ // StartAddress: 17021,
+ // Length: 1,
+ // Type: "int",
+ // FieldName: "count",
+ // }},
+ //}
+
+ devices, err := getDeviceList()
+ if err != nil {
+ return
}
+ for idx, dev := range devices {
+ // 鍒ゆ柇璁惧鐘舵��, 濡傛灉閰嶇疆鏈紑鍚噰闆嗘暟鎹垨鍏朵粬鏂瑰紡, 鐜板湪鍋囪瀛楁status涓�0鏃朵唬琛ㄩ噰闆�
+ if dev.Status != 0 {
+ continue
+ }
+
+ addTask(&devices[idx])
+ }
+}
+
+func addTask(device *msg.PLCDevice) {
ctx, cancel := context.WithCancel(context.Background())
proc := collectorProc{
- device: &device,
+ device: device,
cancel: cancel,
}
- mapTask.Store(device.Id, &proc)
+ mapTask.Store(device.DeviceID, &proc)
- go connectingDevice(ctx, &device)
+ go connectingDevice(ctx, device)
}
func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
plcResponse := msg.PLCResponse{
- Id: dev.Id,
- Name: dev.Name,
- Ip: dev.Ip,
- Online: false,
+ DeviceID: dev.DeviceID,
+ DeviceName: dev.DeviceName,
+ DeviceIP: dev.DeviceIP,
+ Online: false,
}
for {
select {
case <-ctx.Done():
- logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip)
+ logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP)
return
default:
- plcConnection, err := NewModbusConnection(dev.Ip)
+ plcConnection, err := NewModbusConnection(dev.DeviceIP)
if err != nil {
- logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip)
+ logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP)
plcResponse.Online = false
msg.SendDeviceLiveData(&plcResponse)
time.Sleep(30 * time.Second)
@@ -68,35 +93,39 @@
func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
// 鍒涘缓modbusTCP杩炴帴, 寰幆鏌ヨ鏁版嵁骞朵笂鎶�
plcResponse := msg.PLCResponse{
- Id: dev.Id,
- Name: dev.Name,
- Ip: dev.Ip,
- Online: true,
- Data: nil,
+ DeviceID: dev.DeviceID,
+ DeviceName: dev.DeviceName,
+ DeviceIP: dev.DeviceIP,
+ Online: true,
}
for {
select {
case <-ctx.Done():
- logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip)
+ logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
conn.Close()
return
default:
if !conn.IsConnected() {
- logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip)
+ logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
return
}
// 鏍规嵁璁剧疆鐨勫湴鍧�鏌ヨ鏁版嵁锛屼笂鎶�
- plcResponse.Data = make(map[int][]byte, 0)
plcResponse.Message = ""
- for _, addr := range dev.Address {
- result, err := ReadHoldingRegister(conn, addr)
+ for _, addr := range dev.Details {
+ result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
if err != nil {
logger.Warn("plc device Read Holding Register error, %s", err.Error())
plcResponse.Message = err.Error()
} else {
- plcResponse.Data[addr] = result
+ plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{
+ StartAddress: addr.StartAddress,
+ Length: addr.Length,
+ Type: addr.Type,
+ FieldName: addr.FieldName,
+ Data: result,
+ })
}
}
@@ -106,7 +135,7 @@
}
// 闂撮殧鏃堕棿
- time.Sleep(time.Duration(dev.Interval) * time.Second)
+ time.Sleep(time.Duration(dev.Frequency) * time.Second)
}
}
}
diff --git a/collector/device.go b/collector/device.go
new file mode 100644
index 0000000..9812adc
--- /dev/null
+++ b/collector/device.go
@@ -0,0 +1,52 @@
+package collector
+
+import (
+ "encoding/json"
+
+ "plc-recorder/config"
+ "plc-recorder/logger"
+ "plc-recorder/msg"
+ "plc-recorder/util"
+)
+
+func getDeviceList() ([]msg.PLCDevice, error) {
+ responseBody, err := util.HttpPost(config.Options.ApsDeviceWebApi, nil)
+ if err != nil {
+ logger.Warn("get device list from aps error:%s", err.Error())
+ return nil, err
+ }
+
+ var response msg.ApsDeviceApiResponse
+ err = json.Unmarshal(responseBody, &response)
+ if err != nil {
+ logger.Warn("unmarshal aps response error:%s", err.Error())
+ return nil, err
+ }
+
+ logger.Debug("get device list total:%d", response.Total)
+
+ return response.Data, nil
+}
+
+// 璁惧淇℃伅鍙戠敓鏀瑰彉鏃舵洿鏂伴噰闆嗙殑浠诲姟, 鍒犻櫎, 淇敼, 鎴栬�呴噸鏂板惎鍔�
+func HandleDeviceUpdate(message []byte) error {
+ var device msg.PLCDevice
+
+ err := json.Unmarshal(message, &device)
+ if err != nil {
+ logger.Error("unmarshal device update msg error:%s", err.Error())
+ return err
+ }
+
+ if task, ok := mapTask.Load(device.DeviceID); ok {
+ // 瀛樺湪鐨勪换鍔�, 鍏堝仠姝㈡帀, 鐒跺悗閲嶆柊寮�鍚竴涓�
+ task.(collectorProc).cancel()
+ }
+
+ // 鍒ゆ柇鏄惁鏄噸鏂板惎鍔ㄧ殑鐘舵��, 鍚姩涓�涓柊鐨勪换鍔�
+ if device.Status == 0 {
+ addTask(&device)
+ }
+
+ return nil
+}
diff --git a/collector/plc4x.go b/collector/plc4x.go
index 5e7966d..8d5cb1b 100644
--- a/collector/plc4x.go
+++ b/collector/plc4x.go
@@ -35,8 +35,45 @@
return connectionResult.GetConnection(), nil
}
-func ReadHoldingRegister(connection plc4go.PlcConnection, address int) ([]byte, error) {
- tagAddress := fmt.Sprintf("holding-register:%d:DINT", address)
+func ReadHoldingRegister(connection plc4go.PlcConnection, address, length int) ([]byte, error) {
+ if length > 1 {
+ return ReadHoldingRegisterList(connection, address, length)
+ }
+
+ return ReadHoldingRegisterSingle(connection, address)
+}
+
+func ReadHoldingRegisterSingle(connection plc4go.PlcConnection, address int) ([]byte, error) {
+ tagAddress := fmt.Sprintf("holding-register:%d:UINT", address)
+
+ // 璇绘ā寮�
+ readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build()
+ if err != nil {
+ fmt.Printf("preparing read-request:%s\n", err.Error())
+ return nil, err
+ }
+
+ // 鎵ц
+ readResult := <-readRequest.Execute()
+ if err := readResult.GetErr(); err != nil {
+ fmt.Printf("execting read-request:%s\n", err.Error())
+ return nil, err
+ }
+
+ // 鍒ゆ柇鍝嶅簲鐮佹槸鍚︽纭�
+ if readResult.GetResponse().GetResponseCode("tag") != apiModel.PlcResponseCode_OK {
+ fmt.Printf("error an non-ok return code: %s", readResult.GetResponse().GetResponseCode("tag").GetName())
+ return nil, nil
+ }
+
+ value := readResult.GetResponse().GetValue("tag")
+
+ return value.GetRaw(), err
+
+}
+
+func ReadHoldingRegisterList(connection plc4go.PlcConnection, address, length int) ([]byte, error) {
+ tagAddress := fmt.Sprintf("holding-register:%d:UINT[%d]", address, length)
// 璇绘ā寮�
readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build()
diff --git a/config/config.go b/config/config.go
index a2d0073..091e4f8 100644
--- a/config/config.go
+++ b/config/config.go
@@ -9,9 +9,12 @@
)
type Config struct {
- NsqServer string `json:"nsq_server"` // nsq TCP鏈嶅姟绔湴鍧�
- PLCDataTopic string `json:"plc_data_topic"` // 璁㈠崟涓婃姤鐨則opic
- PLCSetTopic string `json:"plc_set_topic"` // 璁㈠崟涓婃姤鐨則opic
+ NsqServer string `json:"nsq_server"` // nsq TCP鏈嶅姟绔湴鍧�
+ PubPLCDataTopic string `json:"plc_data_topic"` // 鍙戝竷plc鏁版嵁鐨則opic
+ PLCSetTopic string `json:"plc_set_topic"` // 鎺ユ敹plc閰嶇疆鏁版嵁鐨則opic
+ SubDeviceTopic string `json:"sub_device_topic"` // 鎺ユ敹璁惧鍙樻洿閫氱煡鐨則opic
+ ApsDeviceWebApi string `json:"aps_device_webapi"` // 鑾峰彇璁惧鍒楄〃鐨勬帴鍙�, aps 鎻愪緵, http鎺ュ彛
+ ApsPLCDataWebApi string `json:"aps_plc_data_webapi"` // 涓婁紶缁檃ps plc鏁版嵁鐨勬帴鍙e湴鍧�. aps 鎻愪緵, http鎺ュ彛
}
const configPath = "config.json"
@@ -20,7 +23,10 @@
func DefaultConfig() {
Options.NsqServer = "fai365.com:4150"
- Options.PLCDataTopic = "aps.factory.plc.livedata"
+ Options.PubPLCDataTopic = "aps.factory.plc.livedata"
+ Options.PLCSetTopic = ""
+ Options.ApsDeviceWebApi = ""
+ Options.ApsPLCDataWebApi = ""
}
func Load() {
diff --git a/main.go b/main.go
index f77fc7a..fd296d0 100644
--- a/main.go
+++ b/main.go
@@ -30,6 +30,9 @@
// 鍒濆鍖杗sq
nsqclient.InitNsqProducer()
+ // 璁㈤槄璁惧鍙樻洿
+ nsqclient.InitNsqConsumer(config.Options.SubDeviceTopic, "sensor01", collector.HandleDeviceUpdate)
+
collector.InitTask()
select {}
diff --git a/msg/msg.go b/msg/msg.go
index f1488b9..dce1c24 100644
--- a/msg/msg.go
+++ b/msg/msg.go
@@ -1,18 +1,46 @@
package msg
type PLCDevice struct {
- Id string
- Name string
- Ip string
- Address []int // 鏁版嵁鍦板潃
- Interval int // 閲囬泦鐨勬椂闂撮棿闅�. 绉�
+ DeviceID string `json:"deviceId"`
+ DeviceName string `json:"deviceName"`
+ DeviceIP string `json:"deviceIp"`
+ Brand string `json:"brand"`
+ Method string `json:"method"`
+ PortName string `json:"portName"`
+ Frequency int `json:"frequency"` // 鏁版嵁鏇存柊棰戠巼 0-瀹炴椂鏇存柊 1-1娆�/绉�"
+ Status int `json:"status"`
+ Details []*PLCAddress `gorm:"-" json:"Details"`
}
type PLCResponse struct {
- Id string
- Name string
- Ip string
- Online bool
- Message string
- Data map[int][]byte
+ DeviceID string `json:"deviceId"`
+ DeviceName string `json:"deviceName"`
+ DeviceIP string `json:"deviceIp"`
+ Online bool `json:"online"`
+ Message string `json:"message"`
+ PLCData []PLCData `json:"plcData"`
+}
+
+type PLCAddress struct {
+ StartAddress int `json:"startAddress"` // 鏁版嵁璧峰鍦板潃
+ Length int `json:"length"` // 鏁版嵁闀垮害
+ Type string `json:"type"` // 鏁版嵁绫诲瀷
+ FieldName string `json:"fieldName"` // 瀵瑰簲绯荤粺瀛楁
+}
+
+type PLCData struct {
+ StartAddress int `json:"startAddress"` // 鏁版嵁璧峰鍦板潃
+ Length int `json:"length"` // 鏁版嵁闀垮害
+ Type string `json:"type"` // 鏁版嵁绫诲瀷
+ FieldName string `json:"fieldName"` // 瀵瑰簲绯荤粺瀛楁
+ Data []byte // 浠巔lc璇诲彇鐨勫師濮嬫暟鎹�
+}
+
+type ApsDeviceApiResponse struct {
+ Code int `json:"code"`
+ Data []PLCDevice `json:"data"`
+ Msg string `json:"msg"`
+ Page int `json:"page"`
+ PageSize int `json:"pageSize"`
+ Total int `json:"total"`
}
diff --git a/msg/send.go b/msg/send.go
index e7036f1..334d112 100644
--- a/msg/send.go
+++ b/msg/send.go
@@ -2,6 +2,8 @@
import (
"encoding/json"
+ "plc-recorder/util"
+
"plc-recorder/config"
"plc-recorder/logger"
"plc-recorder/nsqclient"
@@ -11,5 +13,14 @@
logger.Debug("plc live data: %+v", response)
b, _ := json.Marshal(response)
- nsqclient.Produce(config.Options.PLCDataTopic, b)
+ // nsq 鍙戝竷
+ nsqclient.Produce(config.Options.PubPLCDataTopic, b)
+
+ // aps 鍙戝竷
+ if config.Options.ApsPLCDataWebApi != "" {
+ _, err := util.HttpPost(config.Options.ApsPLCDataWebApi, b)
+ if err != nil {
+ logger.Warn(err.Error())
+ }
+ }
}
diff --git a/nsqclient/client.go b/nsqclient/client.go
index 52eb3b3..e4f6905 100644
--- a/nsqclient/client.go
+++ b/nsqclient/client.go
@@ -45,6 +45,5 @@
if err := c.Run(config.Options.NsqServer, 1); err != nil {
logger.Error("杩愯nsq娑堣垂瀹㈡埛绔け璐�, %s", err.Error())
}
-
}
}
diff --git a/nsqclient/httpClient.go b/nsqclient/httpClient.go
deleted file mode 100644
index d400a58..0000000
--- a/nsqclient/httpClient.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package nsqclient
-
-import (
- "bytes"
- "fmt"
- "io/ioutil"
- "net/http"
-)
-
-const nsqWebApi = "http://121.31.232.83:9080/api/nsq/pub"
-
-// http鎺ュ彛 http://121.31.232.83:9080/api/nsq/pub?topic=your_topic
-func HttpPost(topic string, data []byte) bool {
- uri := nsqWebApi + "?topic=" + topic
-
- request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(data))
- if err != nil {
- return false
- }
-
- request.Header.Set("Content-Type", "application/json;charset=UTF-8")
-
- response, err := http.DefaultClient.Do(request)
- if err != nil {
- fmt.Printf(err.Error())
- return false
- }
- defer response.Body.Close()
-
- body, _ := ioutil.ReadAll(response.Body)
-
- fmt.Println("response:", string(body))
-
- return true
-}
diff --git a/util/httpClient.go b/util/httpClient.go
new file mode 100644
index 0000000..5fd2739
--- /dev/null
+++ b/util/httpClient.go
@@ -0,0 +1,25 @@
+package util
+
+import (
+ "bytes"
+ "io/ioutil"
+ "net/http"
+)
+
+func HttpPost(uri string, param []byte) ([]byte, error) {
+ request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(param))
+ if err != nil {
+ return nil, err
+ }
+
+ request.Header.Set("Content-Type", "application/json;charset=UTF-8")
+
+ response, err := http.DefaultClient.Do(request)
+ if err != nil {
+ return nil, err
+ }
+
+ defer response.Body.Close()
+
+ return ioutil.ReadAll(response.Body)
+}
--
Gitblit v1.8.0