From 8a5f5dc40e82bf98e307082726ebe860002e019f Mon Sep 17 00:00:00 2001
From: gigibox <gigibox@163.com>
Date: 星期三, 21 六月 2023 15:45:31 +0800
Subject: [PATCH] 修改库存数据为增量上报
---
config.json | 8 ++--
fyne.syso | 0
report/loop.go | 6 ++-
kingdee/query.go | 71 ++++++++++++++++++++++++-----------
report/tasks.go | 30 +++++++++++++++
5 files changed, 86 insertions(+), 29 deletions(-)
diff --git a/config.json b/config.json
index 4a7c8af..401eb49 100644
--- a/config.json
+++ b/config.json
@@ -1,11 +1,11 @@
{
"web_port": "10210",
- "sql_addr": "os.smartai.com",
+ "sql_addr": "10.6.201.7",
"sql_db_name": "LZGS",
- "sql_username": "sa",
- "sql_password": "basic@2023",
+ "sql_username": "webapi",
+ "sql_password": "api2023",
"nsq_server": "fai365.com:4150",
- "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic",
+ "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub",
"order_topic": "aps.wangpengfei.erp.seorder",
"inventory_topic": "aps.wangpengfei.erp.inventory",
"query_topic": "aps.wangpengfei.erp.k3resource",
diff --git a/fyne.syso b/fyne.syso
index c9009ac..d57dab3 100644
--- a/fyne.syso
+++ b/fyne.syso
Binary files differ
diff --git a/kingdee/query.go b/kingdee/query.go
index 65fd8d4..2da57a5 100644
--- a/kingdee/query.go
+++ b/kingdee/query.go
@@ -2,6 +2,7 @@
import (
"encoding/json"
+ "errors"
"strings"
"kingdee-dbapi/config"
@@ -9,29 +10,57 @@
"kingdee-dbapi/nsqclient"
)
-func QueryMsgHandle(data []byte) error {
- var result []interface{}
+type QueryMsg struct {
+ Key string // 璇锋眰
+ Command string
+ Success bool
+ Message string
+ Result []byte
+}
- var sql = string(data)
+func QueryMsgHandle(msg []byte) error {
+ var query QueryMsg
+ if err := json.Unmarshal(msg, &query); err != nil {
+ logger.Warn("瑙f瀽璇锋眰澶辫触, %s", err.Error())
+ return err
+ }
+
+ var sql = query.Command
logger.Debug("鎺ユ敹鍒版煡璇㈣姹�,%s", sql)
if !sqlCheck(sql) {
- logger.Warn("璇嗗埆鍒板嵄闄╃殑sql璇彞, 鎷掔粷鎵ц. %s", sql)
-
- return nil
+ query.Message = "鍗遍櫓鐨剆ql璇彞, 鎷掔粷鎵ц"
+ logger.Warn(query.Message)
+ } else {
+ result, err := execSqlCommand(sql)
+ if err != nil {
+ query.Message = err.Error()
+ logger.Warn("sql鎵ц澶辫触:%s", query.Message)
+ } else {
+ query.Result = result
+ query.Success = true
+ logger.Warn("sql鎵ц瀹屾垚.")
+ }
}
- if db == nil {
- logger.Debug("鏁版嵁搴撴湭杩炴帴")
+ replyData, _ := json.Marshal(query)
+ ok := nsqclient.Produce(config.Options.ReplyTopic, replyData)
+ logger.Warn("搴旂瓟鏌ヨ璇锋眰缁撴灉:%t, key:%s", ok, query.Key)
- return nil
+ return nil
+}
+
+func execSqlCommand(sql string) ([]byte, error) {
+ var result []interface{}
+
+ if db == nil {
+ return nil, errors.New("鏁版嵁搴撴湭杩炴帴")
}
rows, err := db.Raw(sql).Rows()
if err != nil {
- result = append(result, err.Error())
- return err
+ return nil, err
}
var cols []string
@@ -44,13 +73,17 @@
//寤虹珛淇╀釜interface鏁扮粍锛宑olumnPointers涓瓨鍦╟olumns鐨勫湴鍧�
columns := make([]interface{}, len(cols))
columnPointers := make([]interface{}, len(cols))
- for i, _ := range columns {
+ for i := 0; i < len(columns); i++ {
//璧嬪�煎湴鍧�
columnPointers[i] = &columns[i]
}
//鎵弿缁撴灉
- rows.Scan(columnPointers...)
+ err = rows.Scan(columnPointers...)
+ if err != nil {
+ return nil, err
+ }
+
m := make(map[string]interface{})
for i, colName := range cols {
val := columnPointers[i].(*interface{})
@@ -60,17 +93,9 @@
result = append(result, m)
}
- logger.Debug("鏁版嵁搴撹繑鍥炴暟鎹�%+v", result)
- b, _ := json.Marshal(result)
+ rb, _ := json.Marshal(result)
- ok := nsqclient.Produce(config.Options.ReplyTopic, b)
- if !ok {
- logger.Warn("搴旂瓟鏌ヨ璇锋眰澶辫触.")
- } else {
- logger.Debug("搴旂瓟鏌ヨ璇锋眰鎴愬姛. 鏁版嵁:%s", string(b))
- }
-
- return nil
+ return rb, nil
}
// 绠�鍗曡繃婊や笅sql璇彞,鎷掔粷澧炲垹鏀规搷浣�
diff --git a/report/loop.go b/report/loop.go
index b2dbff3..93c362b 100644
--- a/report/loop.go
+++ b/report/loop.go
@@ -2,7 +2,9 @@
import (
"context"
+ "kingdee-dbapi/config"
"kingdee-dbapi/logger"
+ "time"
)
var ctx context.Context
@@ -37,8 +39,8 @@
//sql := []byte("select * from t_icitem where FItemID=3316")
//ok := nsqclient.Produce(config.Options.QueryTopic, sql)
//logger.Debug("娴嬭瘯璇锋眰鎺ュ彛, %v", ok)
- //
- //time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second)
+
+ time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second)
}
}
}
diff --git a/report/tasks.go b/report/tasks.go
index de930dc..7d23605 100644
--- a/report/tasks.go
+++ b/report/tasks.go
@@ -4,6 +4,7 @@
"encoding/json"
"io/ioutil"
"kingdee-dbapi/logger"
+ "time"
"kingdee-dbapi/cache"
"kingdee-dbapi/config"
@@ -78,8 +79,22 @@
}
}
+var invReportedCache = make(map[string]float64, 0)
+var fullLoad bool
+
func SendInventory() {
var list []kingdee.Inventory
+
+ // 璁剧疆姣忓ぉ鍑屾櫒1鐐逛笂鎶ヤ竴娆″叏閲忔暟鎹�
+ hour := time.Now().Hour()
+ if hour == 1 {
+ if fullLoad == false {
+ invReportedCache = make(map[string]float64, 0)
+ fullLoad = true
+ }
+ } else {
+ fullLoad = false
+ }
if config.Options.Debug {
data, err := ioutil.ReadFile(inventoryLocalStore)
@@ -95,7 +110,18 @@
}
} else {
list = kingdee.ICInventory()
+
logger.Debug("鏌ヨ鍒�%d鏉″簱瀛樻暟鎹�", len(list))
+
+ for i := 0; i < len(list); {
+ cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo
+ if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty {
+ list = append(list[:i], list[i+1:]...)
+ } else {
+ invReportedCache[cacheKey] = list[i].FUnitQty
+ i++
+ }
+ }
}
// 姣忔鍙� 100 鏉�
@@ -119,9 +145,13 @@
ok := nsqclient.Produce(config.Options.InventoryTopic, b)
if !ok {
logger.Warn("搴撳瓨鏁版嵁涓婃姤澶辫触")
+
+ //涓婃姤澶辫触, 缂撳瓨娓呯┖
+ invReportedCache = make(map[string]float64, 0)
} else {
successCnt = end
}
}
+
logger.Debug("宸蹭笂鎶�%d鏉″簱瀛樻暟鎹�", successCnt)
}
--
Gitblit v1.8.0