package report
|
|
import (
|
"encoding/json"
|
"io/ioutil"
|
"time"
|
|
"kingdee-dbapi/cache"
|
"kingdee-dbapi/config"
|
"kingdee-dbapi/kingdee"
|
"kingdee-dbapi/logger"
|
"kingdee-dbapi/models"
|
"kingdee-dbapi/nsqclient"
|
)
|
|
// 上报销售订单, 增量, 本地会存储已经上报过的. 订单不存在修改
|
func SendOrder() {
|
list := kingdee.SeOrderList()
|
logger.Debug("查询到%d条订单信息", len(list))
|
|
var completedOrderNo = make(map[string]struct{})
|
|
for i := 0; i < len(list); {
|
if cache.Exists(list[i].FBillNo) {
|
list = append(list[:i], list[i+1:]...)
|
} else {
|
completedOrderNo[list[i].FBillNo] = struct{}{}
|
i++
|
}
|
}
|
|
b, _ := json.Marshal(list)
|
|
// http协议上报, 已修改为TCP
|
//ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
|
|
if len(list) == 0 {
|
logger.Debug("没有新的订单数据")
|
return
|
}
|
|
// TCP协议上报
|
ok := nsqclient.Produce(config.Options.OrderTopic, b)
|
if ok {
|
// 写入数据库, 标记已经上报过了,避免重复上报
|
for orderNo, _ := range completedOrderNo {
|
cursor := models.Order{
|
OrderNo: orderNo,
|
}
|
|
cursor.Insert()
|
cache.WriteCache(orderNo)
|
}
|
|
logger.Debug("已上报%d个订单信息", len(list))
|
} else {
|
logger.Warn("订单数据上报失败")
|
}
|
}
|
|
var invReportedCache = make(map[string]float64, 0)
|
var bomReportedCache = make(map[string]struct{}, 0)
|
var fullInvData bool
|
|
// 上报库存
|
func SendInventory() {
|
// 设置每天凌晨1点上报一次全量数据
|
hour := time.Now().Hour()
|
if hour == 1 {
|
if fullInvData == false {
|
invReportedCache = make(map[string]float64, 0)
|
// 顺便清理下bom的当天缓存
|
bomReportedCache = make(map[string]struct{})
|
fullInvData = true
|
}
|
} else {
|
fullInvData = false
|
}
|
|
list := kingdee.ICInventory()
|
|
logger.Debug("查询到%d条库存数据", len(list))
|
|
// 先过滤一遍数据, 格瑞米发现有同一个产品同批号同仓库, 有多条库存记录的情况
|
// 将类似的数据库存数累计到一起
|
var filterMap = make(map[string]float64, 0)
|
for i := 0; i < len(list); {
|
cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo
|
if qty, ok := filterMap[cacheKey]; ok {
|
filterMap[cacheKey] = list[i].FUnitQty + qty
|
list = append(list[:i], list[i+1:]...)
|
} else {
|
filterMap[cacheKey] = list[i].FUnitQty
|
i++
|
}
|
}
|
|
// 过滤数据, 判断是否已经上报过
|
for i := 0; i < len(list); {
|
cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo
|
if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty {
|
list = append(list[:i], list[i+1:]...)
|
} else {
|
invReportedCache[cacheKey] = list[i].FUnitQty
|
i++
|
}
|
}
|
|
if len(list) == 0 {
|
logger.Debug("没有要更新的库存数据.")
|
return
|
}
|
|
// 每次发 1000 条
|
successCnt := 0
|
for i := 0; i < len(list); i += 1000 {
|
end := i + 1000
|
if end > len(list) {
|
end = len(list)
|
}
|
|
b, _ := json.Marshal(list[i:end])
|
|
// HTTP协议上报,已修改为TCP
|
//nsqclient.HttpPost(config.Options.InventoryTopic, b)
|
|
// TCP协议上报
|
ok := nsqclient.Produce(config.Options.InventoryTopic, b)
|
if !ok {
|
logger.Warn("库存数据上报失败")
|
|
//上报失败, 缓存清空
|
invReportedCache = make(map[string]float64, 0)
|
} else {
|
successCnt = end
|
}
|
}
|
|
logger.Debug("已上报%d条库存数据", successCnt)
|
}
|
|
func SendBom(fData bool) {
|
// 上报bom
|
bomList := kingdee.BomList(fData)
|
logger.Debug("查询到%d条Bom数据", len(bomList))
|
|
b, _ := json.Marshal(bomList)
|
|
ioutil.WriteFile("bomList.tmp", b, 0644)
|
|
// 过滤数据, 判断是否已经上报过, 请求全量数据不过滤, 直接上报
|
if fData {
|
for i := 0; i < len(bomList); {
|
cacheKey := bomList[i].FBOMNumber + bomList[i].FAudDate
|
if _, ok := bomReportedCache[cacheKey]; ok {
|
bomList = append(bomList[:i], bomList[i+1:]...)
|
} else {
|
bomReportedCache[cacheKey] = struct{}{}
|
i++
|
}
|
}
|
}
|
|
if len(bomList) == 0 {
|
logger.Debug("没有要更新的Bom数据.")
|
} else {
|
// 每次发 1000 条
|
successCnt := 0
|
for i := 0; i < len(bomList); i += 1000 {
|
end := i + 1000
|
if end > len(bomList) {
|
end = len(bomList)
|
}
|
|
b, _ := json.Marshal(bomList[i:end])
|
|
// TCP协议上报
|
ok := nsqclient.Produce(config.Options.BomTopic, b)
|
if !ok {
|
logger.Warn("BOM数据上报失败")
|
|
//上报失败, 缓存清空
|
bomReportedCache = make(map[string]struct{}, 0)
|
} else {
|
successCnt = end
|
}
|
}
|
logger.Debug("已上报%d条BOM数据", successCnt)
|
}
|
|
// 上报bom子项
|
}
|