From 8a5f5dc40e82bf98e307082726ebe860002e019f Mon Sep 17 00:00:00 2001
From: gigibox <gigibox@163.com>
Date: 星期三, 21 六月 2023 15:45:31 +0800
Subject: [PATCH] 修改库存数据为增量上报

---
 config.json      |    8 ++--
 fyne.syso        |    0 
 report/loop.go   |    6 ++-
 kingdee/query.go |   71 ++++++++++++++++++++++++-----------
 report/tasks.go  |   30 +++++++++++++++
 5 files changed, 86 insertions(+), 29 deletions(-)

diff --git a/config.json b/config.json
index 4a7c8af..401eb49 100644
--- a/config.json
+++ b/config.json
@@ -1,11 +1,11 @@
 {
     "web_port": "10210",
-    "sql_addr": "os.smartai.com",
+    "sql_addr": "10.6.201.7",
     "sql_db_name": "LZGS",
-    "sql_username": "sa",
-    "sql_password": "basic@2023",
+    "sql_username": "webapi",
+    "sql_password": "api2023",
     "nsq_server": "fai365.com:4150",
-    "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic",
+    "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub",
     "order_topic": "aps.wangpengfei.erp.seorder",
     "inventory_topic": "aps.wangpengfei.erp.inventory",
     "query_topic": "aps.wangpengfei.erp.k3resource",
diff --git a/fyne.syso b/fyne.syso
index c9009ac..d57dab3 100644
--- a/fyne.syso
+++ b/fyne.syso
Binary files differ
diff --git a/kingdee/query.go b/kingdee/query.go
index 65fd8d4..2da57a5 100644
--- a/kingdee/query.go
+++ b/kingdee/query.go
@@ -2,6 +2,7 @@
 
 import (
 	"encoding/json"
+	"errors"
 	"strings"
 
 	"kingdee-dbapi/config"
@@ -9,29 +10,57 @@
 	"kingdee-dbapi/nsqclient"
 )
 
-func QueryMsgHandle(data []byte) error {
-	var result []interface{}
+type QueryMsg struct {
+	Key     string // 璇锋眰
+	Command string
+	Success bool
+	Message string
+	Result  []byte
+}
 
-	var sql = string(data)
+func QueryMsgHandle(msg []byte) error {
+	var query QueryMsg
 
+	if err := json.Unmarshal(msg, &query); err != nil {
+		logger.Warn("瑙f瀽璇锋眰澶辫触, %s", err.Error())
+		return err
+	}
+
+	var sql = query.Command
 	logger.Debug("鎺ユ敹鍒版煡璇㈣姹�,%s", sql)
 
 	if !sqlCheck(sql) {
-		logger.Warn("璇嗗埆鍒板嵄闄╃殑sql璇彞, 鎷掔粷鎵ц. %s", sql)
-
-		return nil
+		query.Message = "鍗遍櫓鐨剆ql璇彞, 鎷掔粷鎵ц"
+		logger.Warn(query.Message)
+	} else {
+		result, err := execSqlCommand(sql)
+		if err != nil {
+			query.Message = err.Error()
+			logger.Warn("sql鎵ц澶辫触:%s", query.Message)
+		} else {
+			query.Result = result
+			query.Success = true
+			logger.Warn("sql鎵ц瀹屾垚.")
+		}
 	}
 
-	if db == nil {
-		logger.Debug("鏁版嵁搴撴湭杩炴帴")
+	replyData, _ := json.Marshal(query)
+	ok := nsqclient.Produce(config.Options.ReplyTopic, replyData)
+	logger.Warn("搴旂瓟鏌ヨ璇锋眰缁撴灉:%t, key:%s", ok, query.Key)
 
-		return nil
+	return nil
+}
+
+func execSqlCommand(sql string) ([]byte, error) {
+	var result []interface{}
+
+	if db == nil {
+		return nil, errors.New("鏁版嵁搴撴湭杩炴帴")
 	}
 
 	rows, err := db.Raw(sql).Rows()
 	if err != nil {
-		result = append(result, err.Error())
-		return err
+		return nil, err
 	}
 
 	var cols []string
@@ -44,13 +73,17 @@
 		//寤虹珛淇╀釜interface鏁扮粍锛宑olumnPointers涓瓨鍦╟olumns鐨勫湴鍧�
 		columns := make([]interface{}, len(cols))
 		columnPointers := make([]interface{}, len(cols))
-		for i, _ := range columns {
+		for i := 0; i < len(columns); i++ {
 			//璧嬪�煎湴鍧�
 			columnPointers[i] = &columns[i]
 		}
 
 		//鎵弿缁撴灉
-		rows.Scan(columnPointers...)
+		err = rows.Scan(columnPointers...)
+		if err != nil {
+			return nil, err
+		}
+
 		m := make(map[string]interface{})
 		for i, colName := range cols {
 			val := columnPointers[i].(*interface{})
@@ -60,17 +93,9 @@
 		result = append(result, m)
 	}
 
-	logger.Debug("鏁版嵁搴撹繑鍥炴暟鎹�%+v", result)
-	b, _ := json.Marshal(result)
+	rb, _ := json.Marshal(result)
 
-	ok := nsqclient.Produce(config.Options.ReplyTopic, b)
-	if !ok {
-		logger.Warn("搴旂瓟鏌ヨ璇锋眰澶辫触.")
-	} else {
-		logger.Debug("搴旂瓟鏌ヨ璇锋眰鎴愬姛. 鏁版嵁:%s", string(b))
-	}
-
-	return nil
+	return rb, nil
 }
 
 // 绠�鍗曡繃婊や笅sql璇彞,鎷掔粷澧炲垹鏀规搷浣�
diff --git a/report/loop.go b/report/loop.go
index b2dbff3..93c362b 100644
--- a/report/loop.go
+++ b/report/loop.go
@@ -2,7 +2,9 @@
 
 import (
 	"context"
+	"kingdee-dbapi/config"
 	"kingdee-dbapi/logger"
+	"time"
 )
 
 var ctx context.Context
@@ -37,8 +39,8 @@
 			//sql := []byte("select * from t_icitem where FItemID=3316")
 			//ok := nsqclient.Produce(config.Options.QueryTopic, sql)
 			//logger.Debug("娴嬭瘯璇锋眰鎺ュ彛, %v", ok)
-			//
-			//time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second)
+
+			time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second)
 		}
 	}
 }
diff --git a/report/tasks.go b/report/tasks.go
index de930dc..7d23605 100644
--- a/report/tasks.go
+++ b/report/tasks.go
@@ -4,6 +4,7 @@
 	"encoding/json"
 	"io/ioutil"
 	"kingdee-dbapi/logger"
+	"time"
 
 	"kingdee-dbapi/cache"
 	"kingdee-dbapi/config"
@@ -78,8 +79,22 @@
 	}
 }
 
+var invReportedCache = make(map[string]float64, 0)
+var fullLoad bool
+
 func SendInventory() {
 	var list []kingdee.Inventory
+
+	// 璁剧疆姣忓ぉ鍑屾櫒1鐐逛笂鎶ヤ竴娆″叏閲忔暟鎹�
+	hour := time.Now().Hour()
+	if hour == 1 {
+		if fullLoad == false {
+			invReportedCache = make(map[string]float64, 0)
+			fullLoad = true
+		}
+	} else {
+		fullLoad = false
+	}
 
 	if config.Options.Debug {
 		data, err := ioutil.ReadFile(inventoryLocalStore)
@@ -95,7 +110,18 @@
 		}
 	} else {
 		list = kingdee.ICInventory()
+
 		logger.Debug("鏌ヨ鍒�%d鏉″簱瀛樻暟鎹�", len(list))
+
+		for i := 0; i < len(list); {
+			cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo
+			if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty {
+				list = append(list[:i], list[i+1:]...)
+			} else {
+				invReportedCache[cacheKey] = list[i].FUnitQty
+				i++
+			}
+		}
 	}
 
 	// 姣忔鍙� 100 鏉�
@@ -119,9 +145,13 @@
 		ok := nsqclient.Produce(config.Options.InventoryTopic, b)
 		if !ok {
 			logger.Warn("搴撳瓨鏁版嵁涓婃姤澶辫触")
+
+			//涓婃姤澶辫触, 缂撳瓨娓呯┖
+			invReportedCache = make(map[string]float64, 0)
 		} else {
 			successCnt = end
 		}
 	}
+
 	logger.Debug("宸蹭笂鎶�%d鏉″簱瀛樻暟鎹�", successCnt)
 }

--
Gitblit v1.8.0