gigibox
2023-06-20 476dc06f15ff4b9197bfbc51801899bf51f11f84
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package report
 
import (
    "encoding/json"
    "io/ioutil"
    "kingdee-dbapi/logger"
 
    "kingdee-dbapi/cache"
    "kingdee-dbapi/config"
    "kingdee-dbapi/kingdee"
    "kingdee-dbapi/models"
    "kingdee-dbapi/nsqclient"
)
 
const orderLocalStore = "order.tmp"
const inventoryLocalStore = "inventory.tmp"
 
func SendOrder() {
    var list []kingdee.SEOrder
 
    if config.Options.Debug {
        data, err := ioutil.ReadFile(orderLocalStore)
        if err != nil {
            logger.Error("文件读取失败, %s", err.Error())
            return
        }
        err = json.Unmarshal(data, &list)
        if err != nil {
            logger.Error("文件内容解析失败, %s", err.Error())
            return
        }
    } else {
        list = kingdee.SeOrderList()
        logger.Debug("查询到%d条订单信息", len(list))
    }
 
    var completedOrderNo = make(map[string]struct{})
 
    for i := 0; i < len(list); {
        if cache.Exists(list[i].FBillNo) {
            list = append(list[:i], list[i+1:]...)
        } else {
            completedOrderNo[list[i].FBillNo] = struct{}{}
            i++
        }
    }
 
    b, _ := json.Marshal(list)
 
    if !config.Options.Debug {
        ioutil.WriteFile(orderLocalStore, b, 0644)
    }
 
    // http协议上报, 已修改为TCP
    //ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
 
    if len(list) == 0 {
        logger.Debug("没有新的订单需要上报")
        return
    }
 
    // TCP协议上报
    ok := nsqclient.Produce(config.Options.OrderTopic, b)
    if ok {
        // 写入数据库, 标记已经上报过了,避免重复上报
        for orderNo, _ := range completedOrderNo {
            cursor := models.Order{
                OrderNo: orderNo,
            }
 
            cursor.Insert()
            cache.WriteCache(orderNo)
        }
 
        logger.Debug("已上报%d个订单信息", len(list))
    } else {
        logger.Warn("订单数据上报失败")
    }
}
 
func SendInventory() {
    var list []kingdee.Inventory
 
    if config.Options.Debug {
        data, err := ioutil.ReadFile(inventoryLocalStore)
        if err != nil {
            logger.Error("文件读取失败, %s", err.Error())
            return
        }
 
        err = json.Unmarshal(data, &list)
        if err != nil {
            logger.Error("文件内容解析失败, %s", err.Error())
            return
        }
    } else {
        list = kingdee.ICInventory()
        logger.Debug("查询到%d条库存数据", len(list))
    }
 
    // 每次发 100 条
    successCnt := 0
    for i := 0; i < len(list); i += 1000 {
        end := i + 1000
        if end > len(list) {
            end = len(list)
        }
 
        b, _ := json.Marshal(list[i:end])
 
        if !config.Options.Debug {
            ioutil.WriteFile(inventoryLocalStore, b, 0644)
        }
 
        // HTTP协议上报,已修改为TCP
        //nsqclient.HttpPost(config.Options.InventoryTopic, b)
 
        // TCP协议上报
        ok := nsqclient.Produce(config.Options.InventoryTopic, b)
        if !ok {
            logger.Warn("库存数据上报失败")
        } else {
            successCnt = end
        }
    }
    logger.Debug("已上报%d条库存数据", successCnt)
}