From 88da1e13a073e8b5656387a246d827593fbd6163 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 04 八月 2023 18:25:59 +0800
Subject: [PATCH] 添加设备查询,修改, 采集数据上报

---
 collector/plc4x.go     |   41 ++++++
 msg/send.go            |   13 ++
 /dev/null              |   35 -----
 config/config.go       |   14 +
 msg/msg.go             |   50 ++++++-
 util/httpClient.go     |   25 ++++
 collector/device.go    |   52 ++++++++
 main.go                |    3 
 collector/collector.go |   83 +++++++++----
 nsqclient/client.go    |    1 
 10 files changed, 236 insertions(+), 81 deletions(-)

diff --git a/collector/collector.go b/collector/collector.go
index 15c6325..6a3f3f9 100644
--- a/collector/collector.go
+++ b/collector/collector.go
@@ -19,41 +19,66 @@
 }
 
 func InitTask() {
-	device := msg.PLCDevice{
-		Id:       "0",
-		Ip:       "192.168.1.188",
-		Address:  []int{17021},
-		Interval: 1,
+	//device := msg.PLCDevice{
+	//	DeviceID:   "0",
+	//	DeviceName: "test",
+	//	DeviceIP:   "192.168.1.188",
+	//	Brand:      "sim",
+	//	Method:     "modbus",
+	//	PortName:   "",
+	//	Frequency:  1,
+	//	Details: []*msg.PLCAddress{&msg.PLCAddress{
+	//		StartAddress: 17021,
+	//		Length:       1,
+	//		Type:         "int",
+	//		FieldName:    "count",
+	//	}},
+	//}
+
+	devices, err := getDeviceList()
+	if err != nil {
+		return
 	}
 
+	for idx, dev := range devices {
+		// 鍒ゆ柇璁惧鐘舵��, 濡傛灉閰嶇疆鏈紑鍚噰闆嗘暟鎹垨鍏朵粬鏂瑰紡, 鐜板湪鍋囪瀛楁status涓�0鏃朵唬琛ㄩ噰闆�
+		if dev.Status != 0 {
+			continue
+		}
+
+		addTask(&devices[idx])
+	}
+}
+
+func addTask(device *msg.PLCDevice) {
 	ctx, cancel := context.WithCancel(context.Background())
 	proc := collectorProc{
-		device: &device,
+		device: device,
 		cancel: cancel,
 	}
 
-	mapTask.Store(device.Id, &proc)
+	mapTask.Store(device.DeviceID, &proc)
 
-	go connectingDevice(ctx, &device)
+	go connectingDevice(ctx, device)
 }
 
 func connectingDevice(ctx context.Context, dev *msg.PLCDevice) {
 	plcResponse := msg.PLCResponse{
-		Id:     dev.Id,
-		Name:   dev.Name,
-		Ip:     dev.Ip,
-		Online: false,
+		DeviceID:   dev.DeviceID,
+		DeviceName: dev.DeviceName,
+		DeviceIP:   dev.DeviceIP,
+		Online:     false,
 	}
 
 	for {
 		select {
 		case <-ctx.Done():
-			logger.Warn("plc device %s, ip: %s, end of connecting.", dev.Name, dev.Ip)
+			logger.Warn("plc device %s, ip: %s, end of connecting.", dev.DeviceName, dev.DeviceIP)
 			return
 		default:
-			plcConnection, err := NewModbusConnection(dev.Ip)
+			plcConnection, err := NewModbusConnection(dev.DeviceIP)
 			if err != nil {
-				logger.Warn("error connecting to PLC: %s, ip: %s", dev.Name, dev.Ip)
+				logger.Warn("error connecting to PLC: %s, ip: %s", dev.DeviceName, dev.DeviceIP)
 				plcResponse.Online = false
 				msg.SendDeviceLiveData(&plcResponse)
 				time.Sleep(30 * time.Second)
@@ -68,35 +93,39 @@
 func runCollectionTask(ctx context.Context, dev *msg.PLCDevice, conn plc4go.PlcConnection) {
 	// 鍒涘缓modbusTCP杩炴帴, 寰幆鏌ヨ鏁版嵁骞朵笂鎶�
 	plcResponse := msg.PLCResponse{
-		Id:     dev.Id,
-		Name:   dev.Name,
-		Ip:     dev.Ip,
-		Online: true,
-		Data:   nil,
+		DeviceID:   dev.DeviceID,
+		DeviceName: dev.DeviceName,
+		DeviceIP:   dev.DeviceIP,
+		Online:     true,
 	}
 
 	for {
 		select {
 		case <-ctx.Done():
-			logger.Warn("plc device %s, ip: %s, end of collection.", dev.Name, dev.Ip)
+			logger.Warn("plc device %s, ip: %s, end of collection.", dev.DeviceName, dev.DeviceIP)
 			conn.Close()
 			return
 		default:
 			if !conn.IsConnected() {
-				logger.Warn("plc device %s, ip: %s, disconnected.", dev.Name, dev.Ip)
+				logger.Warn("plc device %s, ip: %s, disconnected.", dev.DeviceName, dev.DeviceIP)
 				return
 			}
 
 			// 鏍规嵁璁剧疆鐨勫湴鍧�鏌ヨ鏁版嵁锛屼笂鎶�
-			plcResponse.Data = make(map[int][]byte, 0)
 			plcResponse.Message = ""
-			for _, addr := range dev.Address {
-				result, err := ReadHoldingRegister(conn, addr)
+			for _, addr := range dev.Details {
+				result, err := ReadHoldingRegister(conn, addr.StartAddress, addr.Length)
 				if err != nil {
 					logger.Warn("plc device Read Holding Register error, %s", err.Error())
 					plcResponse.Message = err.Error()
 				} else {
-					plcResponse.Data[addr] = result
+					plcResponse.PLCData = append(plcResponse.PLCData, msg.PLCData{
+						StartAddress: addr.StartAddress,
+						Length:       addr.Length,
+						Type:         addr.Type,
+						FieldName:    addr.FieldName,
+						Data:         result,
+					})
 				}
 			}
 
@@ -106,7 +135,7 @@
 			}
 
 			// 闂撮殧鏃堕棿
-			time.Sleep(time.Duration(dev.Interval) * time.Second)
+			time.Sleep(time.Duration(dev.Frequency) * time.Second)
 		}
 	}
 }
diff --git a/collector/device.go b/collector/device.go
new file mode 100644
index 0000000..9812adc
--- /dev/null
+++ b/collector/device.go
@@ -0,0 +1,52 @@
+package collector
+
+import (
+	"encoding/json"
+
+	"plc-recorder/config"
+	"plc-recorder/logger"
+	"plc-recorder/msg"
+	"plc-recorder/util"
+)
+
+func getDeviceList() ([]msg.PLCDevice, error) {
+	responseBody, err := util.HttpPost(config.Options.ApsDeviceWebApi, nil)
+	if err != nil {
+		logger.Warn("get device list from aps error:%s", err.Error())
+		return nil, err
+	}
+
+	var response msg.ApsDeviceApiResponse
+	err = json.Unmarshal(responseBody, &response)
+	if err != nil {
+		logger.Warn("unmarshal aps response error:%s", err.Error())
+		return nil, err
+	}
+
+	logger.Debug("get device list total:%d", response.Total)
+
+	return response.Data, nil
+}
+
+// 璁惧淇℃伅鍙戠敓鏀瑰彉鏃舵洿鏂伴噰闆嗙殑浠诲姟, 鍒犻櫎, 淇敼, 鎴栬�呴噸鏂板惎鍔�
+func HandleDeviceUpdate(message []byte) error {
+	var device msg.PLCDevice
+
+	err := json.Unmarshal(message, &device)
+	if err != nil {
+		logger.Error("unmarshal device update msg error:%s", err.Error())
+		return err
+	}
+
+	if task, ok := mapTask.Load(device.DeviceID); ok {
+		// 瀛樺湪鐨勪换鍔�, 鍏堝仠姝㈡帀, 鐒跺悗閲嶆柊寮�鍚竴涓�
+		task.(collectorProc).cancel()
+	}
+
+	// 鍒ゆ柇鏄惁鏄噸鏂板惎鍔ㄧ殑鐘舵��, 鍚姩涓�涓柊鐨勪换鍔�
+	if device.Status == 0 {
+		addTask(&device)
+	}
+
+	return nil
+}
diff --git a/collector/plc4x.go b/collector/plc4x.go
index 5e7966d..8d5cb1b 100644
--- a/collector/plc4x.go
+++ b/collector/plc4x.go
@@ -35,8 +35,45 @@
 	return connectionResult.GetConnection(), nil
 }
 
-func ReadHoldingRegister(connection plc4go.PlcConnection, address int) ([]byte, error) {
-	tagAddress := fmt.Sprintf("holding-register:%d:DINT", address)
+func ReadHoldingRegister(connection plc4go.PlcConnection, address, length int) ([]byte, error) {
+	if length > 1 {
+		return ReadHoldingRegisterList(connection, address, length)
+	}
+
+	return ReadHoldingRegisterSingle(connection, address)
+}
+
+func ReadHoldingRegisterSingle(connection plc4go.PlcConnection, address int) ([]byte, error) {
+	tagAddress := fmt.Sprintf("holding-register:%d:UINT", address)
+
+	// 璇绘ā寮�
+	readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build()
+	if err != nil {
+		fmt.Printf("preparing read-request:%s\n", err.Error())
+		return nil, err
+	}
+
+	// 鎵ц
+	readResult := <-readRequest.Execute()
+	if err := readResult.GetErr(); err != nil {
+		fmt.Printf("execting read-request:%s\n", err.Error())
+		return nil, err
+	}
+
+	// 鍒ゆ柇鍝嶅簲鐮佹槸鍚︽纭�
+	if readResult.GetResponse().GetResponseCode("tag") != apiModel.PlcResponseCode_OK {
+		fmt.Printf("error an non-ok return code: %s", readResult.GetResponse().GetResponseCode("tag").GetName())
+		return nil, nil
+	}
+
+	value := readResult.GetResponse().GetValue("tag")
+
+	return value.GetRaw(), err
+
+}
+
+func ReadHoldingRegisterList(connection plc4go.PlcConnection, address, length int) ([]byte, error) {
+	tagAddress := fmt.Sprintf("holding-register:%d:UINT[%d]", address, length)
 
 	// 璇绘ā寮�
 	readRequest, err := connection.ReadRequestBuilder().AddTagAddress("tag", tagAddress).Build()
diff --git a/config/config.go b/config/config.go
index a2d0073..091e4f8 100644
--- a/config/config.go
+++ b/config/config.go
@@ -9,9 +9,12 @@
 )
 
 type Config struct {
-	NsqServer    string `json:"nsq_server"`     // nsq TCP鏈嶅姟绔湴鍧�
-	PLCDataTopic string `json:"plc_data_topic"` // 璁㈠崟涓婃姤鐨則opic
-	PLCSetTopic  string `json:"plc_set_topic"`  // 璁㈠崟涓婃姤鐨則opic
+	NsqServer        string `json:"nsq_server"`          // nsq TCP鏈嶅姟绔湴鍧�
+	PubPLCDataTopic  string `json:"plc_data_topic"`      // 鍙戝竷plc鏁版嵁鐨則opic
+	PLCSetTopic      string `json:"plc_set_topic"`       // 鎺ユ敹plc閰嶇疆鏁版嵁鐨則opic
+	SubDeviceTopic   string `json:"sub_device_topic"`    // 鎺ユ敹璁惧鍙樻洿閫氱煡鐨則opic
+	ApsDeviceWebApi  string `json:"aps_device_webapi"`   // 鑾峰彇璁惧鍒楄〃鐨勬帴鍙�, aps 鎻愪緵, http鎺ュ彛
+	ApsPLCDataWebApi string `json:"aps_plc_data_webapi"` // 涓婁紶缁檃ps plc鏁版嵁鐨勬帴鍙e湴鍧�. aps 鎻愪緵, http鎺ュ彛
 }
 
 const configPath = "config.json"
@@ -20,7 +23,10 @@
 
 func DefaultConfig() {
 	Options.NsqServer = "fai365.com:4150"
-	Options.PLCDataTopic = "aps.factory.plc.livedata"
+	Options.PubPLCDataTopic = "aps.factory.plc.livedata"
+	Options.PLCSetTopic = ""
+	Options.ApsDeviceWebApi = ""
+	Options.ApsPLCDataWebApi = ""
 }
 
 func Load() {
diff --git a/main.go b/main.go
index f77fc7a..fd296d0 100644
--- a/main.go
+++ b/main.go
@@ -30,6 +30,9 @@
 	// 鍒濆鍖杗sq
 	nsqclient.InitNsqProducer()
 
+	// 璁㈤槄璁惧鍙樻洿
+	nsqclient.InitNsqConsumer(config.Options.SubDeviceTopic, "sensor01", collector.HandleDeviceUpdate)
+
 	collector.InitTask()
 
 	select {}
diff --git a/msg/msg.go b/msg/msg.go
index f1488b9..dce1c24 100644
--- a/msg/msg.go
+++ b/msg/msg.go
@@ -1,18 +1,46 @@
 package msg
 
 type PLCDevice struct {
-	Id       string
-	Name     string
-	Ip       string
-	Address  []int // 鏁版嵁鍦板潃
-	Interval int   // 閲囬泦鐨勬椂闂撮棿闅�. 绉�
+	DeviceID   string        `json:"deviceId"`
+	DeviceName string        `json:"deviceName"`
+	DeviceIP   string        `json:"deviceIp"`
+	Brand      string        `json:"brand"`
+	Method     string        `json:"method"`
+	PortName   string        `json:"portName"`
+	Frequency  int           `json:"frequency"` // 鏁版嵁鏇存柊棰戠巼 0-瀹炴椂鏇存柊 1-1娆�/绉�"
+	Status     int           `json:"status"`
+	Details    []*PLCAddress `gorm:"-" json:"Details"`
 }
 
 type PLCResponse struct {
-	Id      string
-	Name    string
-	Ip      string
-	Online  bool
-	Message string
-	Data    map[int][]byte
+	DeviceID   string    `json:"deviceId"`
+	DeviceName string    `json:"deviceName"`
+	DeviceIP   string    `json:"deviceIp"`
+	Online     bool      `json:"online"`
+	Message    string    `json:"message"`
+	PLCData    []PLCData `json:"plcData"`
+}
+
+type PLCAddress struct {
+	StartAddress int    `json:"startAddress"` // 鏁版嵁璧峰鍦板潃
+	Length       int    `json:"length"`       // 鏁版嵁闀垮害
+	Type         string `json:"type"`         // 鏁版嵁绫诲瀷
+	FieldName    string `json:"fieldName"`    // 瀵瑰簲绯荤粺瀛楁
+}
+
+type PLCData struct {
+	StartAddress int    `json:"startAddress"` // 鏁版嵁璧峰鍦板潃
+	Length       int    `json:"length"`       // 鏁版嵁闀垮害
+	Type         string `json:"type"`         // 鏁版嵁绫诲瀷
+	FieldName    string `json:"fieldName"`    // 瀵瑰簲绯荤粺瀛楁
+	Data         []byte // 浠巔lc璇诲彇鐨勫師濮嬫暟鎹�
+}
+
+type ApsDeviceApiResponse struct {
+	Code     int         `json:"code"`
+	Data     []PLCDevice `json:"data"`
+	Msg      string      `json:"msg"`
+	Page     int         `json:"page"`
+	PageSize int         `json:"pageSize"`
+	Total    int         `json:"total"`
 }
diff --git a/msg/send.go b/msg/send.go
index e7036f1..334d112 100644
--- a/msg/send.go
+++ b/msg/send.go
@@ -2,6 +2,8 @@
 
 import (
 	"encoding/json"
+	"plc-recorder/util"
+
 	"plc-recorder/config"
 	"plc-recorder/logger"
 	"plc-recorder/nsqclient"
@@ -11,5 +13,14 @@
 	logger.Debug("plc live data: %+v", response)
 	b, _ := json.Marshal(response)
 
-	nsqclient.Produce(config.Options.PLCDataTopic, b)
+	// nsq 鍙戝竷
+	nsqclient.Produce(config.Options.PubPLCDataTopic, b)
+
+	// aps 鍙戝竷
+	if config.Options.ApsPLCDataWebApi != "" {
+		_, err := util.HttpPost(config.Options.ApsPLCDataWebApi, b)
+		if err != nil {
+			logger.Warn(err.Error())
+		}
+	}
 }
diff --git a/nsqclient/client.go b/nsqclient/client.go
index 52eb3b3..e4f6905 100644
--- a/nsqclient/client.go
+++ b/nsqclient/client.go
@@ -45,6 +45,5 @@
 		if err := c.Run(config.Options.NsqServer, 1); err != nil {
 			logger.Error("杩愯nsq娑堣垂瀹㈡埛绔け璐�, %s", err.Error())
 		}
-
 	}
 }
diff --git a/nsqclient/httpClient.go b/nsqclient/httpClient.go
deleted file mode 100644
index d400a58..0000000
--- a/nsqclient/httpClient.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package nsqclient
-
-import (
-	"bytes"
-	"fmt"
-	"io/ioutil"
-	"net/http"
-)
-
-const nsqWebApi = "http://121.31.232.83:9080/api/nsq/pub"
-
-// http鎺ュ彛 http://121.31.232.83:9080/api/nsq/pub?topic=your_topic
-func HttpPost(topic string, data []byte) bool {
-	uri := nsqWebApi + "?topic=" + topic
-
-	request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(data))
-	if err != nil {
-		return false
-	}
-
-	request.Header.Set("Content-Type", "application/json;charset=UTF-8")
-
-	response, err := http.DefaultClient.Do(request)
-	if err != nil {
-		fmt.Printf(err.Error())
-		return false
-	}
-	defer response.Body.Close()
-
-	body, _ := ioutil.ReadAll(response.Body)
-
-	fmt.Println("response:", string(body))
-
-	return true
-}
diff --git a/util/httpClient.go b/util/httpClient.go
new file mode 100644
index 0000000..5fd2739
--- /dev/null
+++ b/util/httpClient.go
@@ -0,0 +1,25 @@
+package util
+
+import (
+	"bytes"
+	"io/ioutil"
+	"net/http"
+)
+
+func HttpPost(uri string, param []byte) ([]byte, error) {
+	request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(param))
+	if err != nil {
+		return nil, err
+	}
+
+	request.Header.Set("Content-Type", "application/json;charset=UTF-8")
+
+	response, err := http.DefaultClient.Do(request)
+	if err != nil {
+		return nil, err
+	}
+
+	defer response.Body.Close()
+
+	return ioutil.ReadAll(response.Body)
+}

--
Gitblit v1.8.0