package report import ( "encoding/json" "time" "kingdee-dbapi/cache" "kingdee-dbapi/config" "kingdee-dbapi/kingdee" "kingdee-dbapi/logger" "kingdee-dbapi/models" "kingdee-dbapi/nsqclient" ) // 上报销售订单, 增量, 本地会存储已经上报过的. 订单不存在修改 func SendOrder() { list := kingdee.SeOrderList() logger.Debug("查询到%d条订单信息", len(list)) var completedOrderNo = make(map[string]struct{}) for i := 0; i < len(list); { if cache.Exists(list[i].FBillNo) { list = append(list[:i], list[i+1:]...) } else { completedOrderNo[list[i].FBillNo] = struct{}{} i++ } } b, _ := json.Marshal(list) // http协议上报, 已修改为TCP //ok := nsqclient.HttpPost(config.Options.OrderTopic, b) if len(list) == 0 { logger.Debug("没有新的订单数据") return } // TCP协议上报 ok := nsqclient.Produce(config.Options.OrderTopic, b) if ok { // 写入数据库, 标记已经上报过了,避免重复上报 for orderNo, _ := range completedOrderNo { cursor := models.Order{ OrderNo: orderNo, } cursor.Insert() cache.WriteCache(orderNo) } logger.Debug("已上报%d个订单信息", len(list)) } else { logger.Warn("订单数据上报失败") } } var invReportedCache = make(map[string]float64, 0) var bomReportedCache = make(map[string]struct{}, 0) var fullInvData bool // 上报库存 func SendInventory() { // 设置每天凌晨1点上报一次全量数据 hour := time.Now().Hour() if hour == 1 { if fullInvData == false { invReportedCache = make(map[string]float64, 0) // 顺便清理下bom的当天缓存 bomReportedCache = make(map[string]struct{}) fullInvData = true } } else { fullInvData = false } list := kingdee.ICInventory() logger.Debug("查询到%d条库存数据", len(list)) // 先过滤一遍数据, 格瑞米发现有同一个产品同批号同仓库, 有多条库存记录的情况 // 将类似的数据库存数累计到一起 var filterMap = make(map[string]float64, 0) for i := 0; i < len(list); { cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo if qty, ok := filterMap[cacheKey]; ok { filterMap[cacheKey] = list[i].FUnitQty + qty list = append(list[:i], list[i+1:]...) } else { filterMap[cacheKey] = list[i].FUnitQty i++ } } // 过滤数据, 判断是否已经上报过 for i := 0; i < len(list); { cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty { list = append(list[:i], list[i+1:]...) } else { invReportedCache[cacheKey] = list[i].FUnitQty i++ } } if len(list) == 0 { logger.Debug("没有要更新的库存数据.") return } // 每次发 1000 条 successCnt := 0 for i := 0; i < len(list); i += 1000 { end := i + 1000 if end > len(list) { end = len(list) } b, _ := json.Marshal(list[i:end]) // HTTP协议上报,已修改为TCP //nsqclient.HttpPost(config.Options.InventoryTopic, b) // TCP协议上报 ok := nsqclient.Produce(config.Options.InventoryTopic, b) if !ok { logger.Warn("库存数据上报失败") //上报失败, 缓存清空 invReportedCache = make(map[string]float64, 0) } else { successCnt = end } } logger.Debug("已上报%d条库存数据", successCnt) } func SendBom(fData bool) { // 上报bom bomList := kingdee.BomList(fData) logger.Debug("查询到%d条Bom数据", len(bomList)) // 过滤数据, 判断是否已经上报过, 请求全量数据不过滤, 直接上报 if fData { for i := 0; i < len(bomList); { cacheKey := bomList[i].FBOMNumber + bomList[i].FAudDate if _, ok := bomReportedCache[cacheKey]; ok { bomList = append(bomList[:i], bomList[i+1:]...) } else { bomReportedCache[cacheKey] = struct{}{} i++ } } } if len(bomList) == 0 { logger.Debug("没有要更新的Bom数据.") } else { // 每次发 1000 条 successCnt := 0 for i := 0; i < len(bomList); i += 1000 { end := i + 1000 if end > len(bomList) { end = len(bomList) } b, _ := json.Marshal(bomList[i:end]) // TCP协议上报 ok := nsqclient.Produce(config.Options.BomTopic, b) if !ok { logger.Warn("BOM数据上报失败") //上报失败, 缓存清空 bomReportedCache = make(map[string]struct{}, 0) } else { successCnt = end } } logger.Debug("已上报%d条BOM数据", successCnt) } // 上报bom子项 }