1个文件已删除
2个文件已添加
1 文件已重命名
7个文件已修改
| | |
| | | "nsq_server": "121.31.232.83:4150", |
| | | "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic", |
| | | "order_topic": "aps.wangpengfei.erp.seorder", |
| | | "bom_topic": "aps.wangpengfei.erp.icBom", |
| | | "bom_query_topic": "", |
| | | "bom_child_topic": "", |
| | | "inventory_topic": "aps.wangpengfei.erp.inventory", |
| | | "query_topic": "aps.wangpengfei.erp.k3resource", |
| | | "reply_topic": "aps.wangpengfei.erp.k3reply", |
| | | "cst_webapi": "http://192.168.20.249/cst/local_post.ashx", |
| | | "cst_query_topic": "aps.wangpengfei.erp.cstApply", |
| | | "cst_reply_topic": "aps.wangpengfei.erp.cstReply", |
| | | "interval": 60, |
| | | "debug": false |
| | | "interval": 60 |
| | | } |
| | |
| | | NsqServer string `json:"nsq_server"` // nsq TCP服务端地址 |
| | | NsqWebApi string `json:"nsq_webapi"` // nsq HTTP接口地址 |
| | | OrderTopic string `json:"order_topic"` // 订单上报的topic |
| | | BomTopic string `json:"bom_topic"` // bom上报的topic |
| | | BomQueryTopic string `json:"bom_query_topic"` // bom上报的topic |
| | | BomChildTopic string `json:"bom_child_topic"` // bom子项上报的topic |
| | | InventoryTopic string `json:"inventory_topic"` // 库存上报的topic |
| | | SqlQueryTopic string `json:"query_topic"` // 金蝶查询接口的topic |
| | | SqlReplyTopic string `json:"reply_topic"` // 金蝶响应查询接口的topic |
| | |
| | | CSTQueryTopic string `json:"cst_query_topic"` // 提交生产任务单主题 |
| | | CSTReplyTopic string `json:"cst_reply_topic"` // 响应生产任务单主题 |
| | | SyncInterval int `json:"interval"` // 同步的时间间隔, 单位/秒 |
| | | Debug bool `json:"debug"` // 本地调试, 取本地数据 |
| | | } |
| | | |
| | | const configPath = "config.json" |
| | |
| | | Options.NsqServer = "fai365.com:4150" |
| | | Options.NsqWebApi = "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic" |
| | | Options.OrderTopic = "aps.factory.erp.seorder" |
| | | Options.BomTopic = "aps.factory.erp.icBom" |
| | | Options.BomChildTopic = "aps.factory.erp.icBomChild" |
| | | Options.BomQueryTopic = "aps.factory.erp.icBomQuery" |
| | | Options.InventoryTopic = "aps.factory.erp.inventory" |
| | | Options.SqlQueryTopic = "aps.factory.erp.k3resource" |
| | | Options.SqlReplyTopic = "aps.factory.erp.k3reply" |
| | | Options.CSTQueryTopic = "aps.factory.erp.cstApply" |
| | | Options.CSTReplyTopic = "aps.factory.erp.cstReply" |
| | | Options.SyncInterval = 60 |
| | | Options.Debug = false |
| | | } |
| | | |
| | | func Load() { |
| | |
| | | if err != nil { |
| | | logger.Error("db init error:%s", err.Error()) |
| | | dialog.ShowError(err, w) |
| | | |
| | | if !config.Options.Debug { |
| | | return |
| | | } |
| | | } |
| | | |
| | | form.Disable() |
| | | submitBtn.Text = "已启动" |
| | | submitBtn.Disable() |
| | | |
| | | report.StartReport() |
| | | report.Start() |
| | | |
| | | // 开启订阅SQL查询 |
| | | go nsqclient.InitNsqConsumer(config.Options.SqlQueryTopic, "sensor01", kingdee.SqlQueryHandle) |
New file |
| | |
| | | package kingdee |
| | | |
| | | type ICBom struct { |
| | | FInterID int // 内码 |
| | | FBOMNumber string // BOM单编号 |
| | | FUseStatus int // 使用状态码 |
| | | FUseStatusName string // 使用状态 |
| | | FItemIDNumber string // 物料代码 |
| | | FItemIDName string // 物料名称 |
| | | FModel string // 规格型号 |
| | | FErpClsID string // 物料属性 |
| | | FQty float64 // 数量 |
| | | FUnitName string // 单位 |
| | | FYield float64 // 成品率 |
| | | FRoutingIDNumber string // 工艺路线代码 |
| | | FRoutingIDName string // 工艺路线名称 |
| | | FNote string // 备注 |
| | | FBomType int // BOM类型 |
| | | FAudDate string // 更新时间 |
| | | FPDMImportDate string // 导入时间 |
| | | FStatus int // 状态 |
| | | } |
| | | |
| | | func BomList(fData bool) []ICBom { |
| | | sql := ` |
| | | SELECT |
| | | ICBom.FInterID, |
| | | ICBom.FBOMNumber, |
| | | ICBom.FUseStatus, |
| | | t_SubMessage.FName as FUseStatusName, |
| | | t_ICItem.FNumber as FItemIDNumber, |
| | | t_ICItem.FName as FItemIDName, |
| | | t_ICItem.FModel as FModel, |
| | | (SELECT FName FROM t_SubMessage WHERE t_ICItem.FErpClsID=FInterID) as FErpClsID, |
| | | ICBom.FQty, |
| | | t_MeasureUnit.FName as FUnitName, |
| | | ICBom.FYield, |
| | | t_Routing.FBillNO as FRoutingIDNumber, |
| | | t_Routing.FRoutingName as FRoutingIDName, |
| | | ICBom.FNote, |
| | | ICBom.FBomType, |
| | | ICBom.FAudDate, |
| | | ICBom.FPDMImportDate, |
| | | ICBom.FStatus |
| | | FROM ICBom |
| | | left join t_SubMessage on ICBom.FUseStatus = t_SubMessage.FInterID AND t_SubMessage.FInterID <> 0 |
| | | join t_ICItem on ICBom.FItemID= t_ICItem.FItemID AND t_ICItem.FItemID <> 0 |
| | | left join t_MeasureUnit on ICBom.FUnitID = t_MeasureUnit.FItemID AND t_MeasureUnit.FItemID <> 0 |
| | | left join t_Routing on ICBom.FRoutingID = t_Routing.FInterID AND t_Routing.FInterID <> 0 |
| | | ` |
| | | |
| | | // 如果不是请求全部数据, 仅查询当天更新的, 默认查当天 |
| | | if !fData { |
| | | sql = sql + " Where DateDiff(dd,FAudDate,getdate())=1" |
| | | } |
| | | |
| | | var result []ICBom |
| | | |
| | | db.Raw(sql).Scan(&result) |
| | | //db.Raw(sql).Debug().Scan(&result) |
| | | |
| | | return result |
| | | } |
| | |
| | | package kingdee |
| | | |
| | | /* |
| | | 请求写入生产任务单的接口 |
| | | 接收aps的请求, 调用本地的生产任务单服务, 并响应结果 |
| | | */ |
| | | import ( |
| | | "bytes" |
| | | "encoding/json" |
| | |
| | | package kingdee |
| | | |
| | | // 库存查询 |
| | | |
| | | type Inventory struct { |
| | | FNumber string `gorm:"column:FNumber" json:"FNumber"` // 物料代码 |
| | | FName string `gorm:"column:FName" json:"FName"` // 物料名称 |
| | |
| | | "kingdee-dbapi/nsqclient" |
| | | ) |
| | | |
| | | // 通用sql查询接口 |
| | | |
| | | type SqlQueryMsg struct { |
| | | Key string // 请求 |
| | | Command string |
| | |
| | | ) |
| | | |
| | | func main() { |
| | | logger.InitLog("kingdee-dbapi.log", "debug", 15, false) |
| | | logger.InitLog("log/kingdee-dbapi.log", "debug", 15, false) |
| | | logger.Info("kingdee-dbapi start!") |
| | | |
| | | config.Load() |
New file |
| | |
| | | package report |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "time" |
| | | |
| | | "kingdee-dbapi/cache" |
| | | "kingdee-dbapi/config" |
| | | "kingdee-dbapi/kingdee" |
| | | "kingdee-dbapi/logger" |
| | | "kingdee-dbapi/models" |
| | | "kingdee-dbapi/nsqclient" |
| | | ) |
| | | |
| | | // 上报销售订单, 增量, 本地会存储已经上报过的. 订单不存在修改 |
| | | func SendOrder() { |
| | | list := kingdee.SeOrderList() |
| | | logger.Debug("查询到%d条订单信息", len(list)) |
| | | |
| | | var completedOrderNo = make(map[string]struct{}) |
| | | |
| | | for i := 0; i < len(list); { |
| | | if cache.Exists(list[i].FBillNo) { |
| | | list = append(list[:i], list[i+1:]...) |
| | | } else { |
| | | completedOrderNo[list[i].FBillNo] = struct{}{} |
| | | i++ |
| | | } |
| | | } |
| | | |
| | | b, _ := json.Marshal(list) |
| | | |
| | | // http协议上报, 已修改为TCP |
| | | //ok := nsqclient.HttpPost(config.Options.OrderTopic, b) |
| | | |
| | | if len(list) == 0 { |
| | | logger.Debug("没有新的订单数据") |
| | | return |
| | | } |
| | | |
| | | // TCP协议上报 |
| | | ok := nsqclient.Produce(config.Options.OrderTopic, b) |
| | | if ok { |
| | | // 写入数据库, 标记已经上报过了,避免重复上报 |
| | | for orderNo, _ := range completedOrderNo { |
| | | cursor := models.Order{ |
| | | OrderNo: orderNo, |
| | | } |
| | | |
| | | cursor.Insert() |
| | | cache.WriteCache(orderNo) |
| | | } |
| | | |
| | | logger.Debug("已上报%d个订单信息", len(list)) |
| | | } else { |
| | | logger.Warn("订单数据上报失败") |
| | | } |
| | | } |
| | | |
| | | var invReportedCache = make(map[string]float64, 0) |
| | | var bomReportedCache = make(map[string]struct{}, 0) |
| | | var fullInvData bool |
| | | |
| | | // 上报库存 |
| | | func SendInventory() { |
| | | // 设置每天凌晨1点上报一次全量数据 |
| | | hour := time.Now().Hour() |
| | | if hour == 1 { |
| | | if fullInvData == false { |
| | | invReportedCache = make(map[string]float64, 0) |
| | | // 顺便清理下bom的当天缓存 |
| | | bomReportedCache = make(map[string]struct{}) |
| | | fullInvData = true |
| | | } |
| | | } else { |
| | | fullInvData = false |
| | | } |
| | | |
| | | list := kingdee.ICInventory() |
| | | |
| | | logger.Debug("查询到%d条库存数据", len(list)) |
| | | |
| | | // 先过滤一遍数据, 格瑞米发现有同一个产品同批号同仓库, 有多条库存记录的情况 |
| | | // 将类似的数据库存数累计到一起 |
| | | var filterMap = make(map[string]float64, 0) |
| | | for i := 0; i < len(list); { |
| | | cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo |
| | | if qty, ok := filterMap[cacheKey]; ok { |
| | | filterMap[cacheKey] = list[i].FUnitQty + qty |
| | | list = append(list[:i], list[i+1:]...) |
| | | } else { |
| | | filterMap[cacheKey] = list[i].FUnitQty |
| | | i++ |
| | | } |
| | | } |
| | | |
| | | // 过滤数据, 判断是否已经上报过 |
| | | for i := 0; i < len(list); { |
| | | cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo |
| | | if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty { |
| | | list = append(list[:i], list[i+1:]...) |
| | | } else { |
| | | invReportedCache[cacheKey] = list[i].FUnitQty |
| | | i++ |
| | | } |
| | | } |
| | | |
| | | if len(list) == 0 { |
| | | logger.Debug("没有要更新的库存数据.") |
| | | return |
| | | } |
| | | |
| | | // 每次发 1000 条 |
| | | successCnt := 0 |
| | | for i := 0; i < len(list); i += 1000 { |
| | | end := i + 1000 |
| | | if end > len(list) { |
| | | end = len(list) |
| | | } |
| | | |
| | | b, _ := json.Marshal(list[i:end]) |
| | | |
| | | // HTTP协议上报,已修改为TCP |
| | | //nsqclient.HttpPost(config.Options.InventoryTopic, b) |
| | | |
| | | // TCP协议上报 |
| | | ok := nsqclient.Produce(config.Options.InventoryTopic, b) |
| | | if !ok { |
| | | logger.Warn("库存数据上报失败") |
| | | |
| | | //上报失败, 缓存清空 |
| | | invReportedCache = make(map[string]float64, 0) |
| | | } else { |
| | | successCnt = end |
| | | } |
| | | } |
| | | |
| | | logger.Debug("已上报%d条库存数据", successCnt) |
| | | } |
| | | |
| | | func SendBom(fData bool) { |
| | | // 上报bom |
| | | bomList := kingdee.BomList(fData) |
| | | // 过滤数据, 判断是否已经上报过, 请求全量数据不过滤, 直接上报 |
| | | if fData { |
| | | for i := 0; i < len(bomList); { |
| | | cacheKey := bomList[i].FBOMNumber + bomList[i].FAudDate |
| | | if _, ok := bomReportedCache[cacheKey]; ok { |
| | | bomList = append(bomList[:i], bomList[i+1:]...) |
| | | } else { |
| | | bomReportedCache[cacheKey] = struct{}{} |
| | | i++ |
| | | } |
| | | } |
| | | } |
| | | |
| | | if len(bomList) == 0 { |
| | | logger.Debug("没有要更新的Bom数据.") |
| | | } else { |
| | | // 每次发 1000 条 |
| | | successCnt := 0 |
| | | for i := 0; i < len(bomList); i += 1000 { |
| | | end := i + 1000 |
| | | if end > len(bomList) { |
| | | end = len(bomList) |
| | | } |
| | | |
| | | b, _ := json.Marshal(bomList[i:end]) |
| | | |
| | | // TCP协议上报 |
| | | ok := nsqclient.Produce(config.Options.BomTopic, b) |
| | | if !ok { |
| | | logger.Warn("BOM数据上报失败") |
| | | |
| | | //上报失败, 缓存清空 |
| | | bomReportedCache = make(map[string]struct{}, 0) |
| | | } else { |
| | | successCnt = end |
| | | } |
| | | } |
| | | logger.Debug("已上报%d条BOM数据", successCnt) |
| | | } |
| | | |
| | | // 上报bom子项 |
| | | } |
File was renamed from report/loop.go |
| | |
| | | var ctx context.Context |
| | | var cancel context.CancelFunc |
| | | |
| | | func StartReport() { |
| | | func Start() { |
| | | ctx, cancel = context.WithCancel(context.Background()) |
| | | go Loop(ctx) |
| | | go queryTasks(ctx) |
| | | } |
| | | |
| | | func RestartReport() { |
| | | cancel() |
| | | |
| | | StartReport() |
| | | Start() |
| | | } |
| | | |
| | | func Loop(c context.Context) { |
| | | func queryTasks(c context.Context) { |
| | | logger.Debug("启动数据上报任务") |
| | | for { |
| | | select { |
| | |
| | | return |
| | | default: |
| | | // 上报订单 |
| | | SendOrder() |
| | | if config.Options.OrderTopic != "" { |
| | | SendOrder() |
| | | } |
| | | |
| | | // 上报即时库存 |
| | | SendInventory() |
| | | if config.Options.InventoryTopic != "" { |
| | | SendInventory() |
| | | } |
| | | |
| | | // 测试查询请求 |
| | | //sql := []byte("select * from t_icitem where FItemID=3316") |
| | | //ok := nsqclient.Produce(config.Options.SqlQueryTopic, sql) |
| | | //logger.Debug("测试请求接口, %v", ok) |
| | | // 上报bom |
| | | if config.Options.BomTopic != "" { |
| | | SendBom(true) |
| | | } |
| | | |
| | | time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second) |
| | | } |