gigibox
2023-06-21 12836b886ef153b4cbe4930bcfedffe33414b8ae
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package report
 
import (
    "encoding/json"
    "io/ioutil"
    "time"
 
    "kingdee-dbapi/cache"
    "kingdee-dbapi/config"
    "kingdee-dbapi/kingdee"
    "kingdee-dbapi/logger"
    "kingdee-dbapi/models"
    "kingdee-dbapi/nsqclient"
)
 
const orderLocalStore = "order.tmp"
const inventoryLocalStore = "inventory.tmp"
 
func SendOrder() {
    var list []kingdee.SEOrder
 
    if config.Options.Debug {
        data, err := ioutil.ReadFile(orderLocalStore)
        if err != nil {
            logger.Error("文件读取失败, %s", err.Error())
            return
        }
        err = json.Unmarshal(data, &list)
        if err != nil {
            logger.Error("文件内容解析失败, %s", err.Error())
            return
        }
    } else {
        list = kingdee.SeOrderList()
        logger.Debug("查询到%d条订单信息", len(list))
    }
 
    var completedOrderNo = make(map[string]struct{})
 
    for i := 0; i < len(list); {
        if cache.Exists(list[i].FBillNo) {
            list = append(list[:i], list[i+1:]...)
        } else {
            completedOrderNo[list[i].FBillNo] = struct{}{}
            i++
        }
    }
 
    b, _ := json.Marshal(list)
 
    if !config.Options.Debug {
        ioutil.WriteFile(orderLocalStore, b, 0644)
    }
 
    // http协议上报, 已修改为TCP
    //ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
 
    if len(list) == 0 {
        logger.Debug("没有新的订单数据")
        return
    }
 
    // TCP协议上报
    ok := nsqclient.Produce(config.Options.OrderTopic, b)
    if ok {
        // 写入数据库, 标记已经上报过了,避免重复上报
        for orderNo, _ := range completedOrderNo {
            cursor := models.Order{
                OrderNo: orderNo,
            }
 
            cursor.Insert()
            cache.WriteCache(orderNo)
        }
 
        logger.Debug("已上报%d个订单信息", len(list))
    } else {
        logger.Warn("订单数据上报失败")
    }
}
 
var invReportedCache = make(map[string]float64, 0)
var fullLoad bool
 
func SendInventory() {
    var list []kingdee.Inventory
 
    // 设置每天凌晨1点上报一次全量数据
    hour := time.Now().Hour()
    if hour == 1 {
        if fullLoad == false {
            invReportedCache = make(map[string]float64, 0)
            fullLoad = true
        }
    } else {
        fullLoad = false
    }
 
    if config.Options.Debug {
        data, err := ioutil.ReadFile(inventoryLocalStore)
        if err != nil {
            logger.Error("文件读取失败, %s", err.Error())
            return
        }
 
        err = json.Unmarshal(data, &list)
        if err != nil {
            logger.Error("文件内容解析失败, %s", err.Error())
            return
        }
    } else {
        list = kingdee.ICInventory()
 
        logger.Debug("查询到%d条库存数据", len(list))
 
        // 先过滤一遍数据, 格瑞米发现有同一个产品同批号同仓库, 有多条库存记录的情况
        // 将类似的数据库存数累计到一起
        var filterMap = make(map[string]float64, 0)
        for i := 0; i < len(list); {
            cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo
            if qty, ok := filterMap[cacheKey]; ok {
                filterMap[cacheKey] = list[i].FUnitQty + qty
                list = append(list[:i], list[i+1:]...)
            } else {
                filterMap[cacheKey] = list[i].FUnitQty
                i++
            }
        }
 
        // 过滤数据, 判断是否已经上报过
        for i := 0; i < len(list); {
            cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo
            if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty {
                list = append(list[:i], list[i+1:]...)
            } else {
                invReportedCache[cacheKey] = list[i].FUnitQty
                i++
            }
        }
 
        if len(list) == 0 {
            logger.Debug("没有要更新的库存数据.")
            return
        }
    }
 
    // 每次发 1000 条
    successCnt := 0
    for i := 0; i < len(list); i += 1000 {
        end := i + 1000
        if end > len(list) {
            end = len(list)
        }
 
        b, _ := json.Marshal(list[i:end])
 
        if !config.Options.Debug {
            ioutil.WriteFile(inventoryLocalStore, b, 0644)
        }
 
        // HTTP协议上报,已修改为TCP
        //nsqclient.HttpPost(config.Options.InventoryTopic, b)
 
        // TCP协议上报
        ok := nsqclient.Produce(config.Options.InventoryTopic, b)
        if !ok {
            logger.Warn("库存数据上报失败")
 
            //上报失败, 缓存清空
            invReportedCache = make(map[string]float64, 0)
        } else {
            successCnt = end
        }
    }
 
    logger.Debug("已上报%d条库存数据", successCnt)
}