gigibox
2023-06-19 942f3416b333304bde50f0dca5581595f397eafa
完善功能,添加日志,添加nsq tcp上报
2个文件已添加
12个文件已修改
324 ■■■■ 已修改文件
.gitignore 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.json 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui/gui.go 28 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
kingdee/db.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
kingdee/icInventory.go 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
kingdee/query.go 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
logger/logger.go 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 19 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/client.go 38 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
report/report.go 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
report/task.go 107 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -1,6 +1,5 @@
.idea
.vscode
kingdee-dbapi.exe
invert.txt
kingdee-api.db
order.txt
*.tmp
kingdee-api.db
config.json
@@ -4,7 +4,11 @@
    "sql_db_name": "LZGS",
    "sql_username": "webapi",
    "sql_password": "api2023",
    "nsq_server": "fai365.com:4150",
    "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic",
    "order_topic": "aps.wangpengfei.erp.seorder",
    "inventory_topic": "aps.wangpengfei.erp.inventory",
    "interval": 60
    "query_topic": "aps.wangpengfei.erp.k3resource",
    "interval": 60,
    "debug": false
}
config/config.go
@@ -15,10 +15,12 @@
    SqlUsername    string `json:"sql_username"`    // 数据库用户
    SqlPassword    string `json:"sql_password"`    // 数据库密码
    NsqServer      string `json:"nsq_server"`      // nsq TCP服务端地址
    NsqWebApi      string `json:"nsq_server"`      // nsq HTTP接口地址
    NsqWebApi      string `json:"nsq_webapi"`      // nsq HTTP接口地址
    OrderTopic     string `json:"order_topic"`     // 订单上报的topic
    InventoryTopic string `json:"inventory_topic"` // 库存上报的topic
    QueryTopic     string `json:"query_topic"`     // 金蝶查询接口的topic
    SyncInterval   int    `json:"interval"`        // 同步的时间间隔, 单位/秒
    Debug          bool   `json:"debug"`           // 本地调试, 取本地数据
}
const configPath = "config.json"
@@ -33,9 +35,11 @@
    Options.SqlPassword = "123456"
    Options.NsqServer = "fai365.com:4150"
    Options.NsqWebApi = "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic"
    Options.OrderTopic = "/province/city/factoryNo/kingdee_seOrder"
    Options.InventoryTopic = "/province/city/factoryNo/kingdee_inventory"
    Options.OrderTopic = "aps.factory.erp.seorder"
    Options.InventoryTopic = "aps.factory.erp.inventory"
    Options.QueryTopic = "aps.factory.erp.k3resource"
    Options.SyncInterval = 60
    Options.Debug = false
}
func Load() {
@@ -49,8 +53,6 @@
        fd := json.NewDecoder(file)
        fd.Decode(&Options)
        fmt.Printf("%v\n", Options)
    }
}
go.mod
@@ -5,6 +5,7 @@
require (
    fyne.io/fyne v1.4.3 // indirect
    fyne.io/fyne/v2 v2.3.4
    github.com/fatedier/beego v1.7.2 // indirect
    github.com/flopp/go-findfont v0.1.0 // indirect
    github.com/gin-gonic/gin v1.7.0 // indirect
    github.com/jinzhu/gorm v1.9.16 // indirect
go.sum
@@ -84,6 +84,8 @@
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/erikstmartin/go-testdb v0.0.0-20160219214506-8d10e4a1bae5/go.mod h1:a2zkGnVExMxdzMo3M0Hi/3sEU+cWnZpSni0O6/Yb/P0=
github.com/fatedier/beego v1.7.2 h1:kVw3oKiXccInqG+Z/7l8zyRQXrsCQEfcUxgzfGK+R8g=
github.com/fatedier/beego v1.7.2/go.mod h1:wx3gB6dbIfBRcucp94PI9Bt3I0F2c/MyNEWuhzpWiwk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/flopp/go-findfont v0.1.0 h1:lPn0BymDUtJo+ZkV01VS3661HL6F4qFlkhcJN55u6mU=
github.com/flopp/go-findfont v0.1.0/go.mod h1:wKKxRDjD024Rh7VMwoU90i6ikQRCr+JTHB5n4Ejkqvw=
gui/gui.go
@@ -1,7 +1,7 @@
package gui
import (
    "fmt"
    "kingdee-dbapi/logger"
    "strings"
    "kingdee-dbapi/config"
@@ -12,11 +12,9 @@
    "fyne.io/fyne/v2"
    "fyne.io/fyne/v2/app"
    "fyne.io/fyne/v2/canvas"
    "fyne.io/fyne/v2/container"
    "fyne.io/fyne/v2/dialog"
    "fyne.io/fyne/v2/layout"
    "fyne.io/fyne/v2/storage"
    "fyne.io/fyne/v2/theme"
    "fyne.io/fyne/v2/widget"
)
@@ -79,9 +77,12 @@
        // 连接数据库
        err := kingdee.Init(config.Options.SqlUsername, config.Options.SqlPassword, config.Options.SqlAddr, config.Options.SqlDBName)
        if err != nil {
            fmt.Println("db init error:", err.Error())
            logger.Error("db init error:%s", err.Error())
            dialog.ShowError(err, w)
            return
            if !config.Options.Debug {
                return
            }
        }
        form.Disable()
@@ -114,21 +115,4 @@
    })
    d.Window.ShowAndRun()
}
func (d *Display) DrawImage(imgUri string) {
    uri, err := storage.ParseURI(imgUri)
    if err != nil {
        fmt.Println("parse uri error:", err)
    }
    image := canvas.NewImageFromURI(uri)
    //image := canvas.NewImageFromImage(src)
    // image := canvas.NewImageFromReader(reader, name)
    //image := canvas.NewImageFromFile("./a.png")
    //image.FillMode = canvas.ImageFillContain
    //image.FillMode = canvas.ImageFillOriginal
    image.FillMode = canvas.ImageFillStretch
    d.Window.SetContent(image)
}
kingdee/db.go
@@ -14,11 +14,10 @@
func Init(username, password, addr, dbName string) error {
    var err error
    sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s",
    sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s;encrypt=disable;",
        username, password, addr, dbName)
    fmt.Println(sqlServer)
    // 打开数据库连接
    //db, err = gorm.Open("mssql", "sqlserver://sa:LZdba@)@)@10.6.201.7:1433?database=LZGS")
    db, err = gorm.Open("mssql", sqlServer)
    if err != nil {
        return err
kingdee/icInventory.go
@@ -25,7 +25,7 @@
        ti.FName AS FUnit,
        CONVERT (FLOAT, i.FQty) AS FUnitQty,
        ti2.FName AS FSec,
        i.FQty AS FQty
        i.FQty AS FQty
    FROM
        ICInventory AS i
    LEFT OUTER JOIN t_ICItem AS item ON i.FItemID = item.FItemID
@@ -34,10 +34,10 @@
    LEFT OUTER JOIN t_Item AS ti2 ON item.FStoreUnitID = ti2.FItemID
    LEFT OUTER JOIN t_Item AS ti3 ON LEFT (item.FNumber, 5) = ti3.FNumber
    AND ti3.FItemClassID = 4
    WHERE
        (i.FQty <> 0)
    --WHERE
    --      (i.FQty <> 0)
    ORDER BY
        item.FNumber
        item.FNumber
    `
    var result []Inventory
kingdee/query.go
New file
@@ -0,0 +1,8 @@
package kingdee
import "fmt"
func QueryMsgHandle(data []byte) error {
    fmt.Println("recv msg ", string(data))
    return nil
}
logger/logger.go
New file
@@ -0,0 +1,79 @@
package logger
import (
    "fmt"
    "github.com/fatedier/beego/logs"
)
// Log is the under log object
var Log *logs.BeeLogger
func init() {
    Log = logs.NewLogger(10000)
    Log.EnableFuncCallDepth(true)
    Log.SetLogFuncCallDepth(Log.GetLogFuncCallDepth() + 1)
}
func InitLog(logFile string, logLevel string, maxdays int64, disableLogColor bool) {
    SetLogFile("", logFile, maxdays, disableLogColor)
    SetLogLevel(logLevel)
}
// SetLogFile to configure log params
// logWay: file or console
func SetLogFile(logWay string, logFile string, maxdays int64, disableLogColor bool) {
    if logWay == "console" {
        params := ""
        if disableLogColor {
            params = `{"color": false}`
        }
        _ = Log.SetLogger("console", params)
    } else {
        params := fmt.Sprintf(`{"filename": "%s", "maxdays": %d, "maxlines":0, "maxsize":0, "daily":true}`, logFile, maxdays)
        _ = Log.SetLogger("file", params)
    }
}
// SetLogLevel set log level, default is warning
// value: error, warning, info, debug, trace
func SetLogLevel(logLevel string) {
    var level int
    switch logLevel {
    case "error":
        level = 3
    case "warn":
        level = 4
    case "info":
        level = 6
    case "debug":
        level = 7
    case "trace":
        level = 8
    default:
        level = 4 // warning
    }
    Log.SetLevel(level)
}
// wrap log
func Error(format string, v ...interface{}) {
    Log.Error(format, v...)
}
func Warn(format string, v ...interface{}) {
    Log.Warn(format, v...)
}
func Info(format string, v ...interface{}) {
    Log.Info(format, v...)
}
func Debug(format string, v ...interface{}) {
    Log.Debug(format, v...)
}
func Trace(format string, v ...interface{}) {
    Log.Trace(format, v...)
}
main.go
@@ -1,23 +1,38 @@
package main
import (
    "kingdee-dbapi/kingdee"
    "kingdee-dbapi/models"
    "os"
    "strings"
    "kingdee-dbapi/cache"
    "kingdee-dbapi/config"
    "kingdee-dbapi/gui"
    "kingdee-dbapi/kingdee"
    "kingdee-dbapi/logger"
    "kingdee-dbapi/models"
    "kingdee-dbapi/nsqclient"
    "github.com/flopp/go-findfont"
)
func main() {
    logger.InitLog("kdingdee-dbapi.log", "debug", 15, false)
    logger.Info("playletServer start!")
    config.Load()
    // sqlite3数据库
    models.Init()
    // 初始化缓存, 记录了已经上报的订单号
    cache.InitCache()
    // 初始化nsq
    nsqclient.InitNsqProducer()
    // 开启订阅
    go nsqclient.InitNsqConsumer(config.Options.QueryTopic, "sensor01", kingdee.QueryMsgHandle)
    // 设置中文字体
    setFont()
    defer os.Unsetenv("FYNE_FONT")
nsqclient/client.go
@@ -1,17 +1,18 @@
package nsqclient
import (
    "context"
    "fmt"
    "kingdee-dbapi/config"
)
var nsqClient Producer
var producerCli Producer
var consumeCli NsqConsumer
const plcTopic = "plcTopic"
func InitNsqClient() error {
func InitNsqProducer() error {
    var err error
    nsqClient, err = NewProducer(config.Options.NsqServer)
    producerCli, err = NewProducer(config.Options.NsqServer)
    if err != nil {
        fmt.Println(err.Error())
    }
@@ -19,17 +20,32 @@
    return err
}
func Produce(msg []byte) (err error) {
    if nsqClient == nil {
        err = InitNsqClient()
func Produce(topic string, msg []byte) bool {
    if producerCli == nil {
        err := InitNsqProducer()
        if err != nil {
            return err
            fmt.Println("Init Nsq Client error:" + err.Error())
            return false
        }
    }
    if err = nsqClient.Publish(plcTopic, msg); err != nil {
    err := producerCli.Publish(topic, msg)
    if err != nil {
        fmt.Println("Publish error:" + err.Error())
    }
    return
    return err == nil
}
func InitNsqConsumer(topic, channel string, handle func(data []byte) error) {
    if c, err := NewNsqConsumer(context.Background(), topic, channel); err != nil {
        fmt.Println("NewNsqConsumer failed", err)
        return
    } else {
        c.AddHandler(handle)
        if err := c.Run(config.Options.NsqServer, 1); err != nil {
            fmt.Println("run consumer failed", err)
        }
    }
}
report/report.go
@@ -2,7 +2,7 @@
import (
    "context"
    "fmt"
    "kingdee-dbapi/logger"
    "time"
    "kingdee-dbapi/config"
@@ -23,11 +23,11 @@
}
func Loop(c context.Context) {
    fmt.Println("start report")
    logger.Debug("启动数据上报任务")
    for {
        select {
        case <-c.Done():
            fmt.Println("loop break")
            logger.Debug("停止上报")
            return
        default:
            // 上报订单
report/task.go
@@ -2,6 +2,8 @@
import (
    "encoding/json"
    "io/ioutil"
    "kingdee-dbapi/logger"
    "kingdee-dbapi/cache"
    "kingdee-dbapi/config"
@@ -10,21 +12,55 @@
    "kingdee-dbapi/nsqclient"
)
func SendOrder() {
    var completedOrderNo = make(map[string]struct{})
    list := kingdee.SeOrderList()
const orderLocalStore = "order.tmp"
const inventoryLocalStore = "inventory.tmp"
    for i := 0; i < len(list); i++ {
func SendOrder() {
    var list []kingdee.SEOrder
    if config.Options.Debug {
        data, err := ioutil.ReadFile(orderLocalStore)
        if err != nil {
            logger.Error("文件读取失败, %s", err.Error())
            return
        }
        err = json.Unmarshal(data, &list)
        if err != nil {
            logger.Error("文件内容解析失败, %s", err.Error())
            return
        }
    } else {
        list = kingdee.SeOrderList()
        logger.Debug("查询到%d条订单信息", len(list))
    }
    var completedOrderNo = make(map[string]struct{})
    for i := 0; i < len(list); {
        if cache.Exists(list[i].FBillNo) {
            list = append(list[:i], list[i+1:]...)
        } else {
            completedOrderNo[list[i].FBillNo] = struct{}{}
            i++
        }
    }
    b, _ := json.Marshal(list)
    ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
    if !config.Options.Debug {
        ioutil.WriteFile(orderLocalStore, b, 0644)
    }
    // http协议上报, 已修改为TCP
    //ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
    if len(list) == 0 {
        logger.Debug("没有新的订单需要上报")
        return
    }
    // TCP协议上报
    ok := nsqclient.Produce(config.Options.OrderTopic, b)
    if ok {
        // 写入数据库, 标记已经上报过了,避免重复上报
        for orderNo, _ := range completedOrderNo {
@@ -35,36 +71,57 @@
            cursor.Insert()
            cache.WriteCache(orderNo)
        }
    }
    // 逐条发送
    //for idx, _ := range list {
    //    // 已经推送过的订单
    //    if cache.Exists(list[idx].FBillNo) {
    //        continue
    //    }
    //
    //    b, _ := json.Marshal(list[idx])
    //
    //    ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
    //    if ok {
    //        completedOrderNo[list[idx].FBillNo] = struct{}{}
    //    }
    //}
        logger.Debug("已上报%d个订单信息", len(list))
    } else {
        logger.Warn("订单数据上报失败")
    }
}
func SendInventory() {
    list := kingdee.ICInventory()
    var list []kingdee.Inventory
    // 每次发 300 条
    for i := 0; i < len(list); i += 300 {
        end := i + 300
    if config.Options.Debug {
        data, err := ioutil.ReadFile(inventoryLocalStore)
        if err != nil {
            logger.Error("文件读取失败, %s", err.Error())
            return
        }
        err = json.Unmarshal(data, &list)
        if err != nil {
            logger.Error("文件内容解析失败, %s", err.Error())
            return
        }
    } else {
        list = kingdee.ICInventory()
        logger.Debug("查询到%d条库存数据", len(list))
    }
    // 每次发 100 条
    successCnt := 0
    for i := 0; i < len(list); i += 1000 {
        end := i + 1000
        if end > len(list) {
            end = len(list)
        }
        b, _ := json.Marshal(list[i:end])
        nsqclient.HttpPost(config.Options.InventoryTopic, b)
        if !config.Options.Debug {
            ioutil.WriteFile(inventoryLocalStore, b, 0644)
        }
        // HTTP协议上报,已修改为TCP
        //nsqclient.HttpPost(config.Options.InventoryTopic, b)
        // TCP协议上报
        ok := nsqclient.Produce(config.Options.InventoryTopic, b)
        if !ok {
            logger.Warn("库存数据上报失败")
        } else {
            successCnt += end
        }
    }
    logger.Debug("已上报%d条库存数据", successCnt)
}