zhangzengfei
2023-09-11 1873461012c99801f364bd07fae2c218d245048e
report/task.go
@@ -1,70 +1,57 @@
package report
import (
   "encoding/json"
   "context"
   "time"
   "kingdee-dbapi/cache"
   "kingdee-dbapi/config"
   "kingdee-dbapi/kingdee"
   "kingdee-dbapi/models"
   "kingdee-dbapi/nsqclient"
   "kingdee-dbapi/logger"
)
func SendOrder() {
   var completedOrderNo = make(map[string]struct{})
   list := kingdee.SeOrderList()
var ctx context.Context
var cancel context.CancelFunc
   for i := 0; i < len(list); i++ {
      if cache.Exists(list[i].FBillNo) {
         list = append(list[:i], list[i+1:]...)
      } else {
         completedOrderNo[list[i].FBillNo] = struct{}{}
      }
   }
   b, _ := json.Marshal(list)
   ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
   if ok {
      // 写入数据库, 标记已经上报过了,避免重复上报
      for orderNo, _ := range completedOrderNo {
         cursor := models.Order{
            OrderNo: orderNo,
         }
         cursor.Insert()
         cache.WriteCache(orderNo)
      }
   }
   // 逐条发送
   //for idx, _ := range list {
   //   // 已经推送过的订单
   //   if cache.Exists(list[idx].FBillNo) {
   //      continue
   //   }
   //
   //   b, _ := json.Marshal(list[idx])
   //
   //   ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
   //   if ok {
   //      completedOrderNo[list[idx].FBillNo] = struct{}{}
   //   }
   //}
func Start() {
   ctx, cancel = context.WithCancel(context.Background())
   go queryTasks(ctx)
}
func SendInventory() {
   list := kingdee.ICInventory()
func RestartReport() {
   cancel()
   // 每次发 300 条
   for i := 0; i < len(list); i += 300 {
      end := i + 300
      if end > len(list) {
         end = len(list)
   Start()
}
func queryTasks(c context.Context) {
   logger.Debug("启动数据上报任务")
   for {
      select {
      case <-c.Done():
         logger.Debug("停止上报")
         return
      default:
         // 上报订单
         if config.Options.OrderTopic != "" {
            SendOrder()
         }
         // 上报即时库存
         if config.Options.InventoryTopic != "" {
            SendInventory()
         }
         // 上报bom
         if config.Options.BomTopic != "" {
            SendBom(false)
         }
         time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second)
      }
      b, _ := json.Marshal(list[i:end])
      nsqclient.HttpPost(config.Options.InventoryTopic, b)
   }
}
func HandleBomQuery(msg []byte) error {
   SendBom(true)
   return nil
}