zhangzengfei
2023-08-15 60a0e6ef7d9b6237bf414ef0aee9a39d4f1892e9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package report
 
import (
    "encoding/json"
    "time"
 
    "kingdee-dbapi/cache"
    "kingdee-dbapi/config"
    "kingdee-dbapi/kingdee"
    "kingdee-dbapi/logger"
    "kingdee-dbapi/models"
    "kingdee-dbapi/nsqclient"
)
 
// 上报销售订单, 增量, 本地会存储已经上报过的. 订单不存在修改
func SendOrder() {
    list := kingdee.SeOrderList()
    logger.Debug("查询到%d条订单信息", len(list))
 
    var completedOrderNo = make(map[string]struct{})
 
    for i := 0; i < len(list); {
        if cache.Exists(list[i].FBillNo) {
            list = append(list[:i], list[i+1:]...)
        } else {
            completedOrderNo[list[i].FBillNo] = struct{}{}
            i++
        }
    }
 
    b, _ := json.Marshal(list)
 
    // http协议上报, 已修改为TCP
    //ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
 
    if len(list) == 0 {
        logger.Debug("没有新的订单数据")
        return
    }
 
    // TCP协议上报
    ok := nsqclient.Produce(config.Options.OrderTopic, b)
    if ok {
        // 写入数据库, 标记已经上报过了,避免重复上报
        for orderNo, _ := range completedOrderNo {
            cursor := models.Order{
                OrderNo: orderNo,
            }
 
            cursor.Insert()
            cache.WriteCache(orderNo)
        }
 
        logger.Debug("已上报%d个订单信息", len(list))
    } else {
        logger.Warn("订单数据上报失败")
    }
}
 
var invReportedCache = make(map[string]float64, 0)
var bomReportedCache = make(map[string]struct{}, 0)
var fullInvData bool
 
// 上报库存
func SendInventory() {
    // 设置每天凌晨1点上报一次全量数据
    hour := time.Now().Hour()
    if hour == 1 {
        if fullInvData == false {
            invReportedCache = make(map[string]float64, 0)
            // 顺便清理下bom的当天缓存
            bomReportedCache = make(map[string]struct{})
            fullInvData = true
        }
    } else {
        fullInvData = false
    }
 
    list := kingdee.ICInventory()
 
    logger.Debug("查询到%d条库存数据", len(list))
 
    // 先过滤一遍数据, 格瑞米发现有同一个产品同批号同仓库, 有多条库存记录的情况
    // 将类似的数据库存数累计到一起
    var filterMap = make(map[string]float64, 0)
    for i := 0; i < len(list); {
        cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo
        if qty, ok := filterMap[cacheKey]; ok {
            filterMap[cacheKey] = list[i].FUnitQty + qty
            list = append(list[:i], list[i+1:]...)
        } else {
            filterMap[cacheKey] = list[i].FUnitQty
            i++
        }
    }
 
    // 过滤数据, 判断是否已经上报过
    for i := 0; i < len(list); {
        cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo
        if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty {
            list = append(list[:i], list[i+1:]...)
        } else {
            invReportedCache[cacheKey] = list[i].FUnitQty
            i++
        }
    }
 
    if len(list) == 0 {
        logger.Debug("没有要更新的库存数据.")
        return
    }
 
    // 每次发 1000 条
    successCnt := 0
    for i := 0; i < len(list); i += 1000 {
        end := i + 1000
        if end > len(list) {
            end = len(list)
        }
 
        b, _ := json.Marshal(list[i:end])
 
        // HTTP协议上报,已修改为TCP
        //nsqclient.HttpPost(config.Options.InventoryTopic, b)
 
        // TCP协议上报
        ok := nsqclient.Produce(config.Options.InventoryTopic, b)
        if !ok {
            logger.Warn("库存数据上报失败")
 
            //上报失败, 缓存清空
            invReportedCache = make(map[string]float64, 0)
        } else {
            successCnt = end
        }
    }
 
    logger.Debug("已上报%d条库存数据", successCnt)
}
 
func SendBom(fData bool) {
    // 上报bom
    bomList := kingdee.BomList(fData)
    logger.Debug("查询到%d条Bom数据", len(bomList))
 
    // 过滤数据, 判断是否已经上报过, 请求全量数据不过滤, 直接上报
    if fData {
        for i := 0; i < len(bomList); {
            cacheKey := bomList[i].FBOMNumber + bomList[i].FAudDate
            if _, ok := bomReportedCache[cacheKey]; ok {
                bomList = append(bomList[:i], bomList[i+1:]...)
            } else {
                bomReportedCache[cacheKey] = struct{}{}
                i++
            }
        }
    }
 
    if len(bomList) == 0 {
        logger.Debug("没有要更新的Bom数据.")
    } else {
        // 每次发 1000 条
        successCnt := 0
        for i := 0; i < len(bomList); i += 1000 {
            end := i + 1000
            if end > len(bomList) {
                end = len(bomList)
            }
 
            b, _ := json.Marshal(bomList[i:end])
 
            // TCP协议上报
            ok := nsqclient.Produce(config.Options.BomTopic, b)
            if !ok {
                logger.Warn("BOM数据上报失败")
 
                //上报失败, 缓存清空
                bomReportedCache = make(map[string]struct{}, 0)
            } else {
                successCnt = end
            }
        }
        logger.Debug("已上报%d条BOM数据", successCnt)
    }
 
    // 上报bom子项
}