From 1873461012c99801f364bd07fae2c218d245048e Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期一, 11 九月 2023 19:42:39 +0800 Subject: [PATCH] 修复bom第一启动时的全量上报 --- report/task.go | 148 +++++++++++++------------------------------------ 1 files changed, 39 insertions(+), 109 deletions(-) diff --git a/report/task.go b/report/task.go index 96380e2..50c2727 100644 --- a/report/task.go +++ b/report/task.go @@ -1,127 +1,57 @@ package report import ( - "encoding/json" - "io/ioutil" - "kingdee-dbapi/logger" + "context" + "time" - "kingdee-dbapi/cache" "kingdee-dbapi/config" - "kingdee-dbapi/kingdee" - "kingdee-dbapi/models" - "kingdee-dbapi/nsqclient" + "kingdee-dbapi/logger" ) -const orderLocalStore = "order.tmp" -const inventoryLocalStore = "inventory.tmp" +var ctx context.Context +var cancel context.CancelFunc -func SendOrder() { - var list []kingdee.SEOrder +func Start() { + ctx, cancel = context.WithCancel(context.Background()) + go queryTasks(ctx) +} - if config.Options.Debug { - data, err := ioutil.ReadFile(orderLocalStore) - if err != nil { - logger.Error("鏂囦欢璇诲彇澶辫触, %s", err.Error()) +func RestartReport() { + cancel() + + Start() +} + +func queryTasks(c context.Context) { + logger.Debug("鍚姩鏁版嵁涓婃姤浠诲姟") + for { + select { + case <-c.Done(): + logger.Debug("鍋滄涓婃姤") return - } - err = json.Unmarshal(data, &list) - if err != nil { - logger.Error("鏂囦欢鍐呭瑙f瀽澶辫触, %s", err.Error()) - return - } - } else { - list = kingdee.SeOrderList() - logger.Debug("鏌ヨ鍒�%d鏉¤鍗曚俊鎭�", len(list)) - } - - var completedOrderNo = make(map[string]struct{}) - - for i := 0; i < len(list); { - if cache.Exists(list[i].FBillNo) { - list = append(list[:i], list[i+1:]...) - } else { - completedOrderNo[list[i].FBillNo] = struct{}{} - i++ - } - } - - b, _ := json.Marshal(list) - - if !config.Options.Debug { - ioutil.WriteFile(orderLocalStore, b, 0644) - } - - // http鍗忚涓婃姤, 宸蹭慨鏀逛负TCP - //ok := nsqclient.HttpPost(config.Options.OrderTopic, b) - - if len(list) == 0 { - logger.Debug("娌℃湁鏂扮殑璁㈠崟闇�瑕佷笂鎶�") - return - } - - // TCP鍗忚涓婃姤 - ok := nsqclient.Produce(config.Options.OrderTopic, b) - if ok { - // 鍐欏叆鏁版嵁搴�, 鏍囪宸茬粡涓婃姤杩囦簡,閬垮厤閲嶅涓婃姤 - for orderNo, _ := range completedOrderNo { - cursor := models.Order{ - OrderNo: orderNo, + default: + // 涓婃姤璁㈠崟 + if config.Options.OrderTopic != "" { + SendOrder() } - cursor.Insert() - cache.WriteCache(orderNo) - } + // 涓婃姤鍗虫椂搴撳瓨 + if config.Options.InventoryTopic != "" { + SendInventory() + } - logger.Debug("宸蹭笂鎶�%d涓鍗曚俊鎭�", len(list)) - } else { - logger.Warn("璁㈠崟鏁版嵁涓婃姤澶辫触") + // 涓婃姤bom + if config.Options.BomTopic != "" { + SendBom(false) + } + + time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second) + } } } -func SendInventory() { - var list []kingdee.Inventory +func HandleBomQuery(msg []byte) error { + SendBom(true) - if config.Options.Debug { - data, err := ioutil.ReadFile(inventoryLocalStore) - if err != nil { - logger.Error("鏂囦欢璇诲彇澶辫触, %s", err.Error()) - return - } - - err = json.Unmarshal(data, &list) - if err != nil { - logger.Error("鏂囦欢鍐呭瑙f瀽澶辫触, %s", err.Error()) - return - } - } else { - list = kingdee.ICInventory() - logger.Debug("鏌ヨ鍒�%d鏉″簱瀛樻暟鎹�", len(list)) - } - - // 姣忔鍙� 100 鏉� - successCnt := 0 - for i := 0; i < len(list); i += 1000 { - end := i + 1000 - if end > len(list) { - end = len(list) - } - - b, _ := json.Marshal(list[i:end]) - - if !config.Options.Debug { - ioutil.WriteFile(inventoryLocalStore, b, 0644) - } - - // HTTP鍗忚涓婃姤,宸蹭慨鏀逛负TCP - //nsqclient.HttpPost(config.Options.InventoryTopic, b) - - // TCP鍗忚涓婃姤 - ok := nsqclient.Produce(config.Options.InventoryTopic, b) - if !ok { - logger.Warn("搴撳瓨鏁版嵁涓婃姤澶辫触") - } else { - successCnt += end - } - } - logger.Debug("宸蹭笂鎶�%d鏉″簱瀛樻暟鎹�", successCnt) + return nil } -- Gitblit v1.8.0