gigibox
2023-06-21 8a5f5dc40e82bf98e307082726ebe860002e019f
修改库存数据为增量上报
5个文件已修改
115 ■■■■ 已修改文件
config.json 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
fyne.syso 补丁 | 查看 | 原始文档 | blame | 历史
kingdee/query.go 71 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
report/loop.go 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
report/tasks.go 30 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.json
@@ -1,11 +1,11 @@
{
    "web_port": "10210",
    "sql_addr": "os.smartai.com",
    "sql_addr": "10.6.201.7",
    "sql_db_name": "LZGS",
    "sql_username": "sa",
    "sql_password": "basic@2023",
    "sql_username": "webapi",
    "sql_password": "api2023",
    "nsq_server": "fai365.com:4150",
    "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic",
    "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub",
    "order_topic": "aps.wangpengfei.erp.seorder",
    "inventory_topic": "aps.wangpengfei.erp.inventory",
    "query_topic": "aps.wangpengfei.erp.k3resource",
fyne.syso
Binary files differ
kingdee/query.go
@@ -2,6 +2,7 @@
import (
    "encoding/json"
    "errors"
    "strings"
    "kingdee-dbapi/config"
@@ -9,29 +10,57 @@
    "kingdee-dbapi/nsqclient"
)
func QueryMsgHandle(data []byte) error {
    var result []interface{}
type QueryMsg struct {
    Key     string // 请求
    Command string
    Success bool
    Message string
    Result  []byte
}
    var sql = string(data)
func QueryMsgHandle(msg []byte) error {
    var query QueryMsg
    if err := json.Unmarshal(msg, &query); err != nil {
        logger.Warn("解析请求失败, %s", err.Error())
        return err
    }
    var sql = query.Command
    logger.Debug("接收到查询请求,%s", sql)
    if !sqlCheck(sql) {
        logger.Warn("识别到危险的sql语句, 拒绝执行. %s", sql)
        return nil
        query.Message = "危险的sql语句, 拒绝执行"
        logger.Warn(query.Message)
    } else {
        result, err := execSqlCommand(sql)
        if err != nil {
            query.Message = err.Error()
            logger.Warn("sql执行失败:%s", query.Message)
        } else {
            query.Result = result
            query.Success = true
            logger.Warn("sql执行完成.")
        }
    }
    if db == nil {
        logger.Debug("数据库未连接")
    replyData, _ := json.Marshal(query)
    ok := nsqclient.Produce(config.Options.ReplyTopic, replyData)
    logger.Warn("应答查询请求结果:%t, key:%s", ok, query.Key)
        return nil
    return nil
}
func execSqlCommand(sql string) ([]byte, error) {
    var result []interface{}
    if db == nil {
        return nil, errors.New("数据库未连接")
    }
    rows, err := db.Raw(sql).Rows()
    if err != nil {
        result = append(result, err.Error())
        return err
        return nil, err
    }
    var cols []string
@@ -44,13 +73,17 @@
        //建立俩个interface数组,columnPointers中存在columns的地址
        columns := make([]interface{}, len(cols))
        columnPointers := make([]interface{}, len(cols))
        for i, _ := range columns {
        for i := 0; i < len(columns); i++ {
            //赋值地址
            columnPointers[i] = &columns[i]
        }
        //扫描结果
        rows.Scan(columnPointers...)
        err = rows.Scan(columnPointers...)
        if err != nil {
            return nil, err
        }
        m := make(map[string]interface{})
        for i, colName := range cols {
            val := columnPointers[i].(*interface{})
@@ -60,17 +93,9 @@
        result = append(result, m)
    }
    logger.Debug("数据库返回数据%+v", result)
    b, _ := json.Marshal(result)
    rb, _ := json.Marshal(result)
    ok := nsqclient.Produce(config.Options.ReplyTopic, b)
    if !ok {
        logger.Warn("应答查询请求失败.")
    } else {
        logger.Debug("应答查询请求成功. 数据:%s", string(b))
    }
    return nil
    return rb, nil
}
// 简单过滤下sql语句,拒绝增删改操作
report/loop.go
@@ -2,7 +2,9 @@
import (
    "context"
    "kingdee-dbapi/config"
    "kingdee-dbapi/logger"
    "time"
)
var ctx context.Context
@@ -37,8 +39,8 @@
            //sql := []byte("select * from t_icitem where FItemID=3316")
            //ok := nsqclient.Produce(config.Options.QueryTopic, sql)
            //logger.Debug("测试请求接口, %v", ok)
            //
            //time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second)
            time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second)
        }
    }
}
report/tasks.go
@@ -4,6 +4,7 @@
    "encoding/json"
    "io/ioutil"
    "kingdee-dbapi/logger"
    "time"
    "kingdee-dbapi/cache"
    "kingdee-dbapi/config"
@@ -78,8 +79,22 @@
    }
}
var invReportedCache = make(map[string]float64, 0)
var fullLoad bool
func SendInventory() {
    var list []kingdee.Inventory
    // 设置每天凌晨1点上报一次全量数据
    hour := time.Now().Hour()
    if hour == 1 {
        if fullLoad == false {
            invReportedCache = make(map[string]float64, 0)
            fullLoad = true
        }
    } else {
        fullLoad = false
    }
    if config.Options.Debug {
        data, err := ioutil.ReadFile(inventoryLocalStore)
@@ -95,7 +110,18 @@
        }
    } else {
        list = kingdee.ICInventory()
        logger.Debug("查询到%d条库存数据", len(list))
        for i := 0; i < len(list); {
            cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo
            if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty {
                list = append(list[:i], list[i+1:]...)
            } else {
                invReportedCache[cacheKey] = list[i].FUnitQty
                i++
            }
        }
    }
    // 每次发 100 条
@@ -119,9 +145,13 @@
        ok := nsqclient.Produce(config.Options.InventoryTopic, b)
        if !ok {
            logger.Warn("库存数据上报失败")
            //上报失败, 缓存清空
            invReportedCache = make(map[string]float64, 0)
        } else {
            successCnt = end
        }
    }
    logger.Debug("已上报%d条库存数据", successCnt)
}