package report import ( "encoding/json" "io/ioutil" "time" "kingdee-dbapi/cache" "kingdee-dbapi/config" "kingdee-dbapi/kingdee" "kingdee-dbapi/logger" "kingdee-dbapi/models" "kingdee-dbapi/nsqclient" ) const orderLocalStore = "order.tmp" const inventoryLocalStore = "inventory.tmp" func SendOrder() { var list []kingdee.SEOrder if config.Options.Debug { data, err := ioutil.ReadFile(orderLocalStore) if err != nil { logger.Error("文件读取失败, %s", err.Error()) return } err = json.Unmarshal(data, &list) if err != nil { logger.Error("文件内容解析失败, %s", err.Error()) return } } else { list = kingdee.SeOrderList() logger.Debug("查询到%d条订单信息", len(list)) } var completedOrderNo = make(map[string]struct{}) for i := 0; i < len(list); { if cache.Exists(list[i].FBillNo) { list = append(list[:i], list[i+1:]...) } else { completedOrderNo[list[i].FBillNo] = struct{}{} i++ } } b, _ := json.Marshal(list) if !config.Options.Debug { ioutil.WriteFile(orderLocalStore, b, 0644) } // http协议上报, 已修改为TCP //ok := nsqclient.HttpPost(config.Options.OrderTopic, b) if len(list) == 0 { logger.Debug("没有新的订单数据") return } // TCP协议上报 ok := nsqclient.Produce(config.Options.OrderTopic, b) if ok { // 写入数据库, 标记已经上报过了,避免重复上报 for orderNo, _ := range completedOrderNo { cursor := models.Order{ OrderNo: orderNo, } cursor.Insert() cache.WriteCache(orderNo) } logger.Debug("已上报%d个订单信息", len(list)) } else { logger.Warn("订单数据上报失败") } } var invReportedCache = make(map[string]float64, 0) var fullLoad bool func SendInventory() { var list []kingdee.Inventory // 设置每天凌晨1点上报一次全量数据 hour := time.Now().Hour() if hour == 1 { if fullLoad == false { invReportedCache = make(map[string]float64, 0) fullLoad = true } } else { fullLoad = false } if config.Options.Debug { data, err := ioutil.ReadFile(inventoryLocalStore) if err != nil { logger.Error("文件读取失败, %s", err.Error()) return } err = json.Unmarshal(data, &list) if err != nil { logger.Error("文件内容解析失败, %s", err.Error()) return } } else { list = kingdee.ICInventory() logger.Debug("查询到%d条库存数据", len(list)) // 先过滤一遍数据, 格瑞米发现有同一个产品同批号同仓库, 有多条库存记录的情况 // 将类似的数据库存数累计到一起 var filterMap = make(map[string]float64, 0) for i := 0; i < len(list); { cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo if qty, ok := filterMap[cacheKey]; ok { filterMap[cacheKey] = list[i].FUnitQty + qty list = append(list[:i], list[i+1:]...) } else { filterMap[cacheKey] = list[i].FUnitQty i++ } } // 过滤数据, 判断是否已经上报过 for i := 0; i < len(list); { cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty { list = append(list[:i], list[i+1:]...) } else { invReportedCache[cacheKey] = list[i].FUnitQty i++ } } if len(list) == 0 { logger.Debug("没有要更新的库存数据.") return } } // 每次发 1000 条 successCnt := 0 for i := 0; i < len(list); i += 1000 { end := i + 1000 if end > len(list) { end = len(list) } b, _ := json.Marshal(list[i:end]) if !config.Options.Debug { ioutil.WriteFile(inventoryLocalStore, b, 0644) } // HTTP协议上报,已修改为TCP //nsqclient.HttpPost(config.Options.InventoryTopic, b) // TCP协议上报 ok := nsqclient.Produce(config.Options.InventoryTopic, b) if !ok { logger.Warn("库存数据上报失败") //上报失败, 缓存清空 invReportedCache = make(map[string]float64, 0) } else { successCnt = end } } logger.Debug("已上报%d条库存数据", successCnt) }