gigibox
2023-06-19 942f3416b333304bde50f0dca5581595f397eafa
report/task.go
@@ -2,6 +2,8 @@
import (
   "encoding/json"
   "io/ioutil"
   "kingdee-dbapi/logger"
   "kingdee-dbapi/cache"
   "kingdee-dbapi/config"
@@ -10,21 +12,55 @@
   "kingdee-dbapi/nsqclient"
)
func SendOrder() {
   var completedOrderNo = make(map[string]struct{})
   list := kingdee.SeOrderList()
const orderLocalStore = "order.tmp"
const inventoryLocalStore = "inventory.tmp"
   for i := 0; i < len(list); i++ {
func SendOrder() {
   var list []kingdee.SEOrder
   if config.Options.Debug {
      data, err := ioutil.ReadFile(orderLocalStore)
      if err != nil {
         logger.Error("文件读取失败, %s", err.Error())
         return
      }
      err = json.Unmarshal(data, &list)
      if err != nil {
         logger.Error("文件内容解析失败, %s", err.Error())
         return
      }
   } else {
      list = kingdee.SeOrderList()
      logger.Debug("查询到%d条订单信息", len(list))
   }
   var completedOrderNo = make(map[string]struct{})
   for i := 0; i < len(list); {
      if cache.Exists(list[i].FBillNo) {
         list = append(list[:i], list[i+1:]...)
      } else {
         completedOrderNo[list[i].FBillNo] = struct{}{}
         i++
      }
   }
   b, _ := json.Marshal(list)
   ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
   if !config.Options.Debug {
      ioutil.WriteFile(orderLocalStore, b, 0644)
   }
   // http协议上报, 已修改为TCP
   //ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
   if len(list) == 0 {
      logger.Debug("没有新的订单需要上报")
      return
   }
   // TCP协议上报
   ok := nsqclient.Produce(config.Options.OrderTopic, b)
   if ok {
      // 写入数据库, 标记已经上报过了,避免重复上报
      for orderNo, _ := range completedOrderNo {
@@ -35,36 +71,57 @@
         cursor.Insert()
         cache.WriteCache(orderNo)
      }
   }
   // 逐条发送
   //for idx, _ := range list {
   //   // 已经推送过的订单
   //   if cache.Exists(list[idx].FBillNo) {
   //      continue
   //   }
   //
   //   b, _ := json.Marshal(list[idx])
   //
   //   ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
   //   if ok {
   //      completedOrderNo[list[idx].FBillNo] = struct{}{}
   //   }
   //}
      logger.Debug("已上报%d个订单信息", len(list))
   } else {
      logger.Warn("订单数据上报失败")
   }
}
func SendInventory() {
   list := kingdee.ICInventory()
   var list []kingdee.Inventory
   // 每次发 300 条
   for i := 0; i < len(list); i += 300 {
      end := i + 300
   if config.Options.Debug {
      data, err := ioutil.ReadFile(inventoryLocalStore)
      if err != nil {
         logger.Error("文件读取失败, %s", err.Error())
         return
      }
      err = json.Unmarshal(data, &list)
      if err != nil {
         logger.Error("文件内容解析失败, %s", err.Error())
         return
      }
   } else {
      list = kingdee.ICInventory()
      logger.Debug("查询到%d条库存数据", len(list))
   }
   // 每次发 100 条
   successCnt := 0
   for i := 0; i < len(list); i += 1000 {
      end := i + 1000
      if end > len(list) {
         end = len(list)
      }
      b, _ := json.Marshal(list[i:end])
      nsqclient.HttpPost(config.Options.InventoryTopic, b)
      if !config.Options.Debug {
         ioutil.WriteFile(inventoryLocalStore, b, 0644)
      }
      // HTTP协议上报,已修改为TCP
      //nsqclient.HttpPost(config.Options.InventoryTopic, b)
      // TCP协议上报
      ok := nsqclient.Produce(config.Options.InventoryTopic, b)
      if !ok {
         logger.Warn("库存数据上报失败")
      } else {
         successCnt += end
      }
   }
   logger.Debug("已上报%d条库存数据", successCnt)
}