From 8a5f5dc40e82bf98e307082726ebe860002e019f Mon Sep 17 00:00:00 2001 From: gigibox <gigibox@163.com> Date: 星期三, 21 六月 2023 15:45:31 +0800 Subject: [PATCH] 修改库存数据为增量上报 --- config.json | 8 ++-- fyne.syso | 0 report/loop.go | 6 ++- kingdee/query.go | 71 ++++++++++++++++++++++++----------- report/tasks.go | 30 +++++++++++++++ 5 files changed, 86 insertions(+), 29 deletions(-) diff --git a/config.json b/config.json index 4a7c8af..401eb49 100644 --- a/config.json +++ b/config.json @@ -1,11 +1,11 @@ { "web_port": "10210", - "sql_addr": "os.smartai.com", + "sql_addr": "10.6.201.7", "sql_db_name": "LZGS", - "sql_username": "sa", - "sql_password": "basic@2023", + "sql_username": "webapi", + "sql_password": "api2023", "nsq_server": "fai365.com:4150", - "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic", + "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub", "order_topic": "aps.wangpengfei.erp.seorder", "inventory_topic": "aps.wangpengfei.erp.inventory", "query_topic": "aps.wangpengfei.erp.k3resource", diff --git a/fyne.syso b/fyne.syso index c9009ac..d57dab3 100644 --- a/fyne.syso +++ b/fyne.syso Binary files differ diff --git a/kingdee/query.go b/kingdee/query.go index 65fd8d4..2da57a5 100644 --- a/kingdee/query.go +++ b/kingdee/query.go @@ -2,6 +2,7 @@ import ( "encoding/json" + "errors" "strings" "kingdee-dbapi/config" @@ -9,29 +10,57 @@ "kingdee-dbapi/nsqclient" ) -func QueryMsgHandle(data []byte) error { - var result []interface{} +type QueryMsg struct { + Key string // 璇锋眰 + Command string + Success bool + Message string + Result []byte +} - var sql = string(data) +func QueryMsgHandle(msg []byte) error { + var query QueryMsg + if err := json.Unmarshal(msg, &query); err != nil { + logger.Warn("瑙f瀽璇锋眰澶辫触, %s", err.Error()) + return err + } + + var sql = query.Command logger.Debug("鎺ユ敹鍒版煡璇㈣姹�,%s", sql) if !sqlCheck(sql) { - logger.Warn("璇嗗埆鍒板嵄闄╃殑sql璇彞, 鎷掔粷鎵ц. %s", sql) - - return nil + query.Message = "鍗遍櫓鐨剆ql璇彞, 鎷掔粷鎵ц" + logger.Warn(query.Message) + } else { + result, err := execSqlCommand(sql) + if err != nil { + query.Message = err.Error() + logger.Warn("sql鎵ц澶辫触:%s", query.Message) + } else { + query.Result = result + query.Success = true + logger.Warn("sql鎵ц瀹屾垚.") + } } - if db == nil { - logger.Debug("鏁版嵁搴撴湭杩炴帴") + replyData, _ := json.Marshal(query) + ok := nsqclient.Produce(config.Options.ReplyTopic, replyData) + logger.Warn("搴旂瓟鏌ヨ璇锋眰缁撴灉:%t, key:%s", ok, query.Key) - return nil + return nil +} + +func execSqlCommand(sql string) ([]byte, error) { + var result []interface{} + + if db == nil { + return nil, errors.New("鏁版嵁搴撴湭杩炴帴") } rows, err := db.Raw(sql).Rows() if err != nil { - result = append(result, err.Error()) - return err + return nil, err } var cols []string @@ -44,13 +73,17 @@ //寤虹珛淇╀釜interface鏁扮粍锛宑olumnPointers涓瓨鍦╟olumns鐨勫湴鍧� columns := make([]interface{}, len(cols)) columnPointers := make([]interface{}, len(cols)) - for i, _ := range columns { + for i := 0; i < len(columns); i++ { //璧嬪�煎湴鍧� columnPointers[i] = &columns[i] } //鎵弿缁撴灉 - rows.Scan(columnPointers...) + err = rows.Scan(columnPointers...) + if err != nil { + return nil, err + } + m := make(map[string]interface{}) for i, colName := range cols { val := columnPointers[i].(*interface{}) @@ -60,17 +93,9 @@ result = append(result, m) } - logger.Debug("鏁版嵁搴撹繑鍥炴暟鎹�%+v", result) - b, _ := json.Marshal(result) + rb, _ := json.Marshal(result) - ok := nsqclient.Produce(config.Options.ReplyTopic, b) - if !ok { - logger.Warn("搴旂瓟鏌ヨ璇锋眰澶辫触.") - } else { - logger.Debug("搴旂瓟鏌ヨ璇锋眰鎴愬姛. 鏁版嵁:%s", string(b)) - } - - return nil + return rb, nil } // 绠�鍗曡繃婊や笅sql璇彞,鎷掔粷澧炲垹鏀规搷浣� diff --git a/report/loop.go b/report/loop.go index b2dbff3..93c362b 100644 --- a/report/loop.go +++ b/report/loop.go @@ -2,7 +2,9 @@ import ( "context" + "kingdee-dbapi/config" "kingdee-dbapi/logger" + "time" ) var ctx context.Context @@ -37,8 +39,8 @@ //sql := []byte("select * from t_icitem where FItemID=3316") //ok := nsqclient.Produce(config.Options.QueryTopic, sql) //logger.Debug("娴嬭瘯璇锋眰鎺ュ彛, %v", ok) - // - //time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second) + + time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second) } } } diff --git a/report/tasks.go b/report/tasks.go index de930dc..7d23605 100644 --- a/report/tasks.go +++ b/report/tasks.go @@ -4,6 +4,7 @@ "encoding/json" "io/ioutil" "kingdee-dbapi/logger" + "time" "kingdee-dbapi/cache" "kingdee-dbapi/config" @@ -78,8 +79,22 @@ } } +var invReportedCache = make(map[string]float64, 0) +var fullLoad bool + func SendInventory() { var list []kingdee.Inventory + + // 璁剧疆姣忓ぉ鍑屾櫒1鐐逛笂鎶ヤ竴娆″叏閲忔暟鎹� + hour := time.Now().Hour() + if hour == 1 { + if fullLoad == false { + invReportedCache = make(map[string]float64, 0) + fullLoad = true + } + } else { + fullLoad = false + } if config.Options.Debug { data, err := ioutil.ReadFile(inventoryLocalStore) @@ -95,7 +110,18 @@ } } else { list = kingdee.ICInventory() + logger.Debug("鏌ヨ鍒�%d鏉″簱瀛樻暟鎹�", len(list)) + + for i := 0; i < len(list); { + cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo + if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty { + list = append(list[:i], list[i+1:]...) + } else { + invReportedCache[cacheKey] = list[i].FUnitQty + i++ + } + } } // 姣忔鍙� 100 鏉� @@ -119,9 +145,13 @@ ok := nsqclient.Produce(config.Options.InventoryTopic, b) if !ok { logger.Warn("搴撳瓨鏁版嵁涓婃姤澶辫触") + + //涓婃姤澶辫触, 缂撳瓨娓呯┖ + invReportedCache = make(map[string]float64, 0) } else { successCnt = end } } + logger.Debug("宸蹭笂鎶�%d鏉″簱瀛樻暟鎹�", successCnt) } -- Gitblit v1.8.0