zhangzengfei
2023-08-14 8f750b461a4f442825e516016bf78d05ed66afcb
添加bom查询
1个文件已删除
2个文件已添加
1 文件已重命名
7个文件已修改
479 ■■■■■ 已修改文件
config.json 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui/gui.go 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
kingdee/bom.go 63 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
kingdee/cst.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
kingdee/icInventory.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
kingdee/query.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
report/send.go 185 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
report/task.go 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
report/tasks.go 177 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.json
@@ -7,12 +7,14 @@
    "nsq_server": "121.31.232.83:4150",
    "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic",
    "order_topic": "aps.wangpengfei.erp.seorder",
    "bom_topic": "aps.wangpengfei.erp.icBom",
    "bom_query_topic": "",
    "bom_child_topic": "",
    "inventory_topic": "aps.wangpengfei.erp.inventory",
    "query_topic": "aps.wangpengfei.erp.k3resource",
    "reply_topic": "aps.wangpengfei.erp.k3reply",
    "cst_webapi": "http://192.168.20.249/cst/local_post.ashx",
    "cst_query_topic": "aps.wangpengfei.erp.cstApply",
    "cst_reply_topic": "aps.wangpengfei.erp.cstReply",
    "interval": 60,
    "debug": false
    "interval": 60
}
config/config.go
@@ -17,6 +17,9 @@
    NsqServer      string `json:"nsq_server"`      // nsq TCP服务端地址
    NsqWebApi      string `json:"nsq_webapi"`      // nsq HTTP接口地址
    OrderTopic     string `json:"order_topic"`     // 订单上报的topic
    BomTopic       string `json:"bom_topic"`       // bom上报的topic
    BomQueryTopic  string `json:"bom_query_topic"` // bom上报的topic
    BomChildTopic  string `json:"bom_child_topic"` // bom子项上报的topic
    InventoryTopic string `json:"inventory_topic"` // 库存上报的topic
    SqlQueryTopic  string `json:"query_topic"`     // 金蝶查询接口的topic
    SqlReplyTopic  string `json:"reply_topic"`     // 金蝶响应查询接口的topic
@@ -24,7 +27,6 @@
    CSTQueryTopic  string `json:"cst_query_topic"` // 提交生产任务单主题
    CSTReplyTopic  string `json:"cst_reply_topic"` // 响应生产任务单主题
    SyncInterval   int    `json:"interval"`        // 同步的时间间隔, 单位/秒
    Debug          bool   `json:"debug"`           // 本地调试, 取本地数据
}
const configPath = "config.json"
@@ -40,13 +42,15 @@
    Options.NsqServer = "fai365.com:4150"
    Options.NsqWebApi = "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic"
    Options.OrderTopic = "aps.factory.erp.seorder"
    Options.BomTopic = "aps.factory.erp.icBom"
    Options.BomChildTopic = "aps.factory.erp.icBomChild"
    Options.BomQueryTopic = "aps.factory.erp.icBomQuery"
    Options.InventoryTopic = "aps.factory.erp.inventory"
    Options.SqlQueryTopic = "aps.factory.erp.k3resource"
    Options.SqlReplyTopic = "aps.factory.erp.k3reply"
    Options.CSTQueryTopic = "aps.factory.erp.cstApply"
    Options.CSTReplyTopic = "aps.factory.erp.cstReply"
    Options.SyncInterval = 60
    Options.Debug = false
}
func Load() {
gui/gui.go
@@ -81,17 +81,13 @@
        if err != nil {
            logger.Error("db init error:%s", err.Error())
            dialog.ShowError(err, w)
            if !config.Options.Debug {
                return
            }
        }
        form.Disable()
        submitBtn.Text = "已启动"
        submitBtn.Disable()
        report.StartReport()
        report.Start()
        // 开启订阅SQL查询
        go nsqclient.InitNsqConsumer(config.Options.SqlQueryTopic, "sensor01", kingdee.SqlQueryHandle)
kingdee/bom.go
New file
@@ -0,0 +1,63 @@
package kingdee
type ICBom struct {
    FInterID         int     // 内码
    FBOMNumber       string  // BOM单编号
    FUseStatus       int     // 使用状态码
    FUseStatusName   string  // 使用状态
    FItemIDNumber    string  // 物料代码
    FItemIDName      string  // 物料名称
    FModel           string  // 规格型号
    FErpClsID        string  // 物料属性
    FQty             float64 // 数量
    FUnitName        string  // 单位
    FYield           float64 // 成品率
    FRoutingIDNumber string  // 工艺路线代码
    FRoutingIDName   string  // 工艺路线名称
    FNote            string  // 备注
    FBomType         int     // BOM类型
    FAudDate         string  // 更新时间
    FPDMImportDate   string  // 导入时间
    FStatus          int     // 状态
}
func BomList(fData bool) []ICBom {
    sql := `
    SELECT
        ICBom.FInterID,
        ICBom.FBOMNumber,
        ICBom.FUseStatus,
        t_SubMessage.FName as FUseStatusName,
        t_ICItem.FNumber as FItemIDNumber,
        t_ICItem.FName as FItemIDName,
        t_ICItem.FModel as FModel,
        (SELECT FName FROM t_SubMessage WHERE t_ICItem.FErpClsID=FInterID) as FErpClsID,
        ICBom.FQty,
        t_MeasureUnit.FName as FUnitName,
        ICBom.FYield,
        t_Routing.FBillNO as FRoutingIDNumber,
        t_Routing.FRoutingName as FRoutingIDName,
        ICBom.FNote,
        ICBom.FBomType,
        ICBom.FAudDate,
        ICBom.FPDMImportDate,
        ICBom.FStatus
     FROM ICBom
        left join t_SubMessage on ICBom.FUseStatus = t_SubMessage.FInterID AND t_SubMessage.FInterID <> 0
        join t_ICItem on ICBom.FItemID= t_ICItem.FItemID  AND t_ICItem.FItemID <> 0
        left join t_MeasureUnit on ICBom.FUnitID = t_MeasureUnit.FItemID AND t_MeasureUnit.FItemID <> 0
        left join t_Routing on ICBom.FRoutingID = t_Routing.FInterID AND t_Routing.FInterID <> 0
    `
    // 如果不是请求全部数据, 仅查询当天更新的, 默认查当天
    if !fData {
        sql = sql + " Where DateDiff(dd,FAudDate,getdate())=1"
    }
    var result []ICBom
    db.Raw(sql).Scan(&result)
    //db.Raw(sql).Debug().Scan(&result)
    return result
}
kingdee/cst.go
@@ -1,5 +1,9 @@
package kingdee
/*
请求写入生产任务单的接口
接收aps的请求, 调用本地的生产任务单服务, 并响应结果
*/
import (
    "bytes"
    "encoding/json"
kingdee/icInventory.go
@@ -1,5 +1,7 @@
package kingdee
// 库存查询
type Inventory struct {
    FNumber    string  `gorm:"column:FNumber" json:"FNumber"`       // 物料代码
    FName      string  `gorm:"column:FName" json:"FName"`           // 物料名称
kingdee/query.go
@@ -10,6 +10,8 @@
    "kingdee-dbapi/nsqclient"
)
// 通用sql查询接口
type SqlQueryMsg struct {
    Key     string // 请求
    Command string
main.go
@@ -16,7 +16,7 @@
)
func main() {
    logger.InitLog("kingdee-dbapi.log", "debug", 15, false)
    logger.InitLog("log/kingdee-dbapi.log", "debug", 15, false)
    logger.Info("kingdee-dbapi start!")
    config.Load()
report/send.go
New file
@@ -0,0 +1,185 @@
package report
import (
    "encoding/json"
    "time"
    "kingdee-dbapi/cache"
    "kingdee-dbapi/config"
    "kingdee-dbapi/kingdee"
    "kingdee-dbapi/logger"
    "kingdee-dbapi/models"
    "kingdee-dbapi/nsqclient"
)
// 上报销售订单, 增量, 本地会存储已经上报过的. 订单不存在修改
func SendOrder() {
    list := kingdee.SeOrderList()
    logger.Debug("查询到%d条订单信息", len(list))
    var completedOrderNo = make(map[string]struct{})
    for i := 0; i < len(list); {
        if cache.Exists(list[i].FBillNo) {
            list = append(list[:i], list[i+1:]...)
        } else {
            completedOrderNo[list[i].FBillNo] = struct{}{}
            i++
        }
    }
    b, _ := json.Marshal(list)
    // http协议上报, 已修改为TCP
    //ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
    if len(list) == 0 {
        logger.Debug("没有新的订单数据")
        return
    }
    // TCP协议上报
    ok := nsqclient.Produce(config.Options.OrderTopic, b)
    if ok {
        // 写入数据库, 标记已经上报过了,避免重复上报
        for orderNo, _ := range completedOrderNo {
            cursor := models.Order{
                OrderNo: orderNo,
            }
            cursor.Insert()
            cache.WriteCache(orderNo)
        }
        logger.Debug("已上报%d个订单信息", len(list))
    } else {
        logger.Warn("订单数据上报失败")
    }
}
var invReportedCache = make(map[string]float64, 0)
var bomReportedCache = make(map[string]struct{}, 0)
var fullInvData bool
// 上报库存
func SendInventory() {
    // 设置每天凌晨1点上报一次全量数据
    hour := time.Now().Hour()
    if hour == 1 {
        if fullInvData == false {
            invReportedCache = make(map[string]float64, 0)
            // 顺便清理下bom的当天缓存
            bomReportedCache = make(map[string]struct{})
            fullInvData = true
        }
    } else {
        fullInvData = false
    }
    list := kingdee.ICInventory()
    logger.Debug("查询到%d条库存数据", len(list))
    // 先过滤一遍数据, 格瑞米发现有同一个产品同批号同仓库, 有多条库存记录的情况
    // 将类似的数据库存数累计到一起
    var filterMap = make(map[string]float64, 0)
    for i := 0; i < len(list); {
        cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo
        if qty, ok := filterMap[cacheKey]; ok {
            filterMap[cacheKey] = list[i].FUnitQty + qty
            list = append(list[:i], list[i+1:]...)
        } else {
            filterMap[cacheKey] = list[i].FUnitQty
            i++
        }
    }
    // 过滤数据, 判断是否已经上报过
    for i := 0; i < len(list); {
        cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo
        if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty {
            list = append(list[:i], list[i+1:]...)
        } else {
            invReportedCache[cacheKey] = list[i].FUnitQty
            i++
        }
    }
    if len(list) == 0 {
        logger.Debug("没有要更新的库存数据.")
        return
    }
    // 每次发 1000 条
    successCnt := 0
    for i := 0; i < len(list); i += 1000 {
        end := i + 1000
        if end > len(list) {
            end = len(list)
        }
        b, _ := json.Marshal(list[i:end])
        // HTTP协议上报,已修改为TCP
        //nsqclient.HttpPost(config.Options.InventoryTopic, b)
        // TCP协议上报
        ok := nsqclient.Produce(config.Options.InventoryTopic, b)
        if !ok {
            logger.Warn("库存数据上报失败")
            //上报失败, 缓存清空
            invReportedCache = make(map[string]float64, 0)
        } else {
            successCnt = end
        }
    }
    logger.Debug("已上报%d条库存数据", successCnt)
}
func SendBom(fData bool) {
    // 上报bom
    bomList := kingdee.BomList(fData)
    // 过滤数据, 判断是否已经上报过, 请求全量数据不过滤, 直接上报
    if fData {
        for i := 0; i < len(bomList); {
            cacheKey := bomList[i].FBOMNumber + bomList[i].FAudDate
            if _, ok := bomReportedCache[cacheKey]; ok {
                bomList = append(bomList[:i], bomList[i+1:]...)
            } else {
                bomReportedCache[cacheKey] = struct{}{}
                i++
            }
        }
    }
    if len(bomList) == 0 {
        logger.Debug("没有要更新的Bom数据.")
    } else {
        // 每次发 1000 条
        successCnt := 0
        for i := 0; i < len(bomList); i += 1000 {
            end := i + 1000
            if end > len(bomList) {
                end = len(bomList)
            }
            b, _ := json.Marshal(bomList[i:end])
            // TCP协议上报
            ok := nsqclient.Produce(config.Options.BomTopic, b)
            if !ok {
                logger.Warn("BOM数据上报失败")
                //上报失败, 缓存清空
                bomReportedCache = make(map[string]struct{}, 0)
            } else {
                successCnt = end
            }
        }
        logger.Debug("已上报%d条BOM数据", successCnt)
    }
    // 上报bom子项
}
report/task.go
File was renamed from report/loop.go
@@ -11,18 +11,18 @@
var ctx context.Context
var cancel context.CancelFunc
func StartReport() {
func Start() {
    ctx, cancel = context.WithCancel(context.Background())
    go Loop(ctx)
    go queryTasks(ctx)
}
func RestartReport() {
    cancel()
    StartReport()
    Start()
}
func Loop(c context.Context) {
func queryTasks(c context.Context) {
    logger.Debug("启动数据上报任务")
    for {
        select {
@@ -31,15 +31,19 @@
            return
        default:
            // 上报订单
            SendOrder()
            if config.Options.OrderTopic != "" {
                SendOrder()
            }
            // 上报即时库存
            SendInventory()
            if config.Options.InventoryTopic != "" {
                SendInventory()
            }
            // 测试查询请求
            //sql := []byte("select * from t_icitem where FItemID=3316")
            //ok := nsqclient.Produce(config.Options.SqlQueryTopic, sql)
            //logger.Debug("测试请求接口, %v", ok)
            // 上报bom
            if config.Options.BomTopic != "" {
                SendBom(true)
            }
            time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second)
        }
report/tasks.go
File was deleted