| | |
| | | package report |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "io/ioutil" |
| | | "kingdee-dbapi/logger" |
| | | "context" |
| | | "time" |
| | | |
| | | "kingdee-dbapi/cache" |
| | | "kingdee-dbapi/config" |
| | | "kingdee-dbapi/kingdee" |
| | | "kingdee-dbapi/models" |
| | | "kingdee-dbapi/nsqclient" |
| | | "kingdee-dbapi/logger" |
| | | ) |
| | | |
| | | const orderLocalStore = "order.tmp" |
| | | const inventoryLocalStore = "inventory.tmp" |
| | | var ctx context.Context |
| | | var cancel context.CancelFunc |
| | | |
| | | func SendOrder() { |
| | | var list []kingdee.SEOrder |
| | | func Start() { |
| | | ctx, cancel = context.WithCancel(context.Background()) |
| | | go queryTasks(ctx) |
| | | } |
| | | |
| | | if config.Options.Debug { |
| | | data, err := ioutil.ReadFile(orderLocalStore) |
| | | if err != nil { |
| | | logger.Error("文件读取失败, %s", err.Error()) |
| | | func RestartReport() { |
| | | cancel() |
| | | |
| | | Start() |
| | | } |
| | | |
| | | func queryTasks(c context.Context) { |
| | | logger.Debug("启动数据上报任务") |
| | | for { |
| | | select { |
| | | case <-c.Done(): |
| | | logger.Debug("停止上报") |
| | | return |
| | | } |
| | | err = json.Unmarshal(data, &list) |
| | | if err != nil { |
| | | logger.Error("文件内容解析失败, %s", err.Error()) |
| | | return |
| | | } |
| | | } else { |
| | | list = kingdee.SeOrderList() |
| | | logger.Debug("查询到%d条订单信息", len(list)) |
| | | } |
| | | |
| | | var completedOrderNo = make(map[string]struct{}) |
| | | |
| | | for i := 0; i < len(list); { |
| | | if cache.Exists(list[i].FBillNo) { |
| | | list = append(list[:i], list[i+1:]...) |
| | | } else { |
| | | completedOrderNo[list[i].FBillNo] = struct{}{} |
| | | i++ |
| | | } |
| | | } |
| | | |
| | | b, _ := json.Marshal(list) |
| | | |
| | | if !config.Options.Debug { |
| | | ioutil.WriteFile(orderLocalStore, b, 0644) |
| | | } |
| | | |
| | | // http协议上报, 已修改为TCP |
| | | //ok := nsqclient.HttpPost(config.Options.OrderTopic, b) |
| | | |
| | | if len(list) == 0 { |
| | | logger.Debug("没有新的订单需要上报") |
| | | return |
| | | } |
| | | |
| | | // TCP协议上报 |
| | | ok := nsqclient.Produce(config.Options.OrderTopic, b) |
| | | if ok { |
| | | // 写入数据库, 标记已经上报过了,避免重复上报 |
| | | for orderNo, _ := range completedOrderNo { |
| | | cursor := models.Order{ |
| | | OrderNo: orderNo, |
| | | default: |
| | | // 上报订单 |
| | | if config.Options.OrderTopic != "" { |
| | | SendOrder() |
| | | } |
| | | |
| | | cursor.Insert() |
| | | cache.WriteCache(orderNo) |
| | | } |
| | | // 上报即时库存 |
| | | if config.Options.InventoryTopic != "" { |
| | | SendInventory() |
| | | } |
| | | |
| | | logger.Debug("已上报%d个订单信息", len(list)) |
| | | } else { |
| | | logger.Warn("订单数据上报失败") |
| | | // 上报bom |
| | | if config.Options.BomTopic != "" { |
| | | SendBom(false) |
| | | } |
| | | |
| | | time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second) |
| | | } |
| | | } |
| | | } |
| | | |
| | | func SendInventory() { |
| | | var list []kingdee.Inventory |
| | | func HandleBomQuery(msg []byte) error { |
| | | SendBom(true) |
| | | |
| | | if config.Options.Debug { |
| | | data, err := ioutil.ReadFile(inventoryLocalStore) |
| | | if err != nil { |
| | | logger.Error("文件读取失败, %s", err.Error()) |
| | | return |
| | | } |
| | | |
| | | err = json.Unmarshal(data, &list) |
| | | if err != nil { |
| | | logger.Error("文件内容解析失败, %s", err.Error()) |
| | | return |
| | | } |
| | | } else { |
| | | list = kingdee.ICInventory() |
| | | logger.Debug("查询到%d条库存数据", len(list)) |
| | | } |
| | | |
| | | // 每次发 100 条 |
| | | successCnt := 0 |
| | | for i := 0; i < len(list); i += 1000 { |
| | | end := i + 1000 |
| | | if end > len(list) { |
| | | end = len(list) |
| | | } |
| | | |
| | | b, _ := json.Marshal(list[i:end]) |
| | | |
| | | if !config.Options.Debug { |
| | | ioutil.WriteFile(inventoryLocalStore, b, 0644) |
| | | } |
| | | |
| | | // HTTP协议上报,已修改为TCP |
| | | //nsqclient.HttpPost(config.Options.InventoryTopic, b) |
| | | |
| | | // TCP协议上报 |
| | | ok := nsqclient.Produce(config.Options.InventoryTopic, b) |
| | | if !ok { |
| | | logger.Warn("库存数据上报失败") |
| | | } else { |
| | | successCnt += end |
| | | } |
| | | } |
| | | logger.Debug("已上报%d条库存数据", successCnt) |
| | | return nil |
| | | } |