gigibox
2023-06-15 ff3cadba4a63cd1b63cd0e36358f49ccedb88bef
完成基本功能
13个文件已添加
2 文件已重命名
9个文件已修改
919 ■■■■■ 已修改文件
.gitignore 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cache/cache.go 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.json 7 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go 35 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
gui/gui.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
kingdee/db.go 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
kingdee/icInventory.go 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
kingdee/seOrder.go 16 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 17 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/db.go 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/order.go 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/channel.go 134 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/client.go 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/conn.go 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/consumer.go 99 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/httpClient.go 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/pointer.go 57 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/pool.go 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nsqclient/producer.go 139 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
report/report.go 42 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
report/task.go 70 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
webserver/controller.go 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -1,3 +1,6 @@
.idea
.vscode
kingdee-dbapi.exe
kingdee-dbapi.exe
invert.txt
kingdee-api.db
order.txt
cache/cache.go
New file
@@ -0,0 +1,28 @@
package cache
import (
    "kingdee-dbapi/models"
    "sync"
)
var orderKeyCache sync.Map
func InitCache() {
    var db models.Order
    cacheData := db.FindAll()
    for _, order := range cacheData {
        orderKeyCache.Store(order.OrderNo, struct{}{})
    }
}
func Exists(key string) (ok bool) {
    _, ok = orderKeyCache.Load(key)
    return
}
func WriteCache(key string) {
    orderKeyCache.Store(key, struct{}{})
}
config.json
@@ -1,7 +1,10 @@
{
    "web_port": "808",
    "web_port": "10210",
    "sql_addr": "10.6.201.7",
    "sql_db_name": "LZGS",
    "sql_username": "webapi",
    "sql_password": "api2023"
    "sql_password": "api2023",
    "order_topic": "aps.wangpengfei.erp.seorder",
    "inventory_topic": "aps.wangpengfei.erp.inventory",
    "interval": 60
}
config/config.go
@@ -9,24 +9,38 @@
)
type Config struct {
    WebPort     string `json:"web_port"`
    SqlAddr     string `json:"sql_addr"`
    SqlDBName   string `json:"sql_db_name"`
    SqlUsername string `json:"sql_username"`
    SqlPassword string `json:"sql_password"`
    WebPort        string `json:"web_port"`        // 本地web服务绑定接口
    SqlAddr        string `json:"sql_addr"`        // 数据库ip地址, 不加端口号, 默认使用1433端口
    SqlDBName      string `json:"sql_db_name"`     // 数据库名称
    SqlUsername    string `json:"sql_username"`    // 数据库用户
    SqlPassword    string `json:"sql_password"`    // 数据库密码
    NsqServer      string `json:"nsq_server"`      // nsq TCP服务端地址
    NsqWebApi      string `json:"nsq_server"`      // nsq HTTP接口地址
    OrderTopic     string `json:"order_topic"`     // 订单上报的topic
    InventoryTopic string `json:"inventory_topic"` // 库存上报的topic
    SyncInterval   int    `json:"interval"`        // 同步的时间间隔, 单位/秒
}
const configPath = "config.json"
var Options Config
// Init is an exported method that takes the environment starts the viper
// (external lib) and returns the configuration struct.
func init() {
func DefaultConfig() {
    Options.WebPort = "10210"
    Options.SqlAddr = "127.0.0.1"
    Options.SqlDBName = "dbname"
    Options.SqlUsername = "sa"
    Options.SqlPassword = "123456"
    Options.NsqServer = "fai365.com:4150"
    Options.NsqWebApi = "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic"
    Options.OrderTopic = "/province/city/factoryNo/kingdee_seOrder"
    Options.InventoryTopic = "/province/city/factoryNo/kingdee_inventory"
    Options.SyncInterval = 60
}
func Load() {
    if !pathExists(configPath) {
        DefaultConfig()
        SaveConfig()
    } else {
        file, _ := os.Open(configPath)
@@ -44,8 +58,11 @@
    b, err := json.Marshal(Options)
    if err == nil {
        var out bytes.Buffer
        err = json.Indent(&out, b, "", "    ")
        ioutil.WriteFile(configPath, out.Bytes(), 0644)
        if err == nil {
            err = ioutil.WriteFile(configPath, out.Bytes(), 0644)
        }
    }
    return err
go.mod
@@ -8,4 +8,5 @@
    github.com/flopp/go-findfont v0.1.0 // indirect
    github.com/gin-gonic/gin v1.7.0 // indirect
    github.com/jinzhu/gorm v1.9.16 // indirect
    github.com/nsqio/go-nsq v1.1.0 // indirect
)
go.sum
@@ -168,6 +168,8 @@
github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@@ -268,6 +270,7 @@
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-sqlite3 v1.14.0 h1:mLyGNKR8+Vv9CAU7PphKa2hkEqxxhn8i32J6FPj1/QA=
github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
github.com/mcuadros/go-version v0.0.0-20190830083331-035f6764e8d2/go.mod h1:76rfSfYPWj01Z85hUf/ituArm797mNKcvINh1OlsZKo=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
@@ -290,6 +293,8 @@
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
gui/gui.go
@@ -2,12 +2,13 @@
import (
    "fmt"
    "kingdee-dbapi/config"
    "kingdee-dbapi/models"
    "kingdee-dbapi/webserver"
    "strings"
    "kingdee-dbapi/config"
    "kingdee-dbapi/kingdee"
    "kingdee-dbapi/report"
    "kingdee-dbapi/static"
    "kingdee-dbapi/webserver"
    "fyne.io/fyne/v2"
    "fyne.io/fyne/v2/app"
@@ -76,7 +77,7 @@
        config.SaveConfig()
        // 连接数据库
        err := models.Init(config.Options.SqlUsername, config.Options.SqlPassword, config.Options.SqlAddr, config.Options.SqlDBName)
        err := kingdee.Init(config.Options.SqlUsername, config.Options.SqlPassword, config.Options.SqlAddr, config.Options.SqlDBName)
        if err != nil {
            fmt.Println("db init error:", err.Error())
            dialog.ShowError(err, w)
@@ -87,6 +88,8 @@
        submitBtn.Text = "已启动"
        submitBtn.Disable()
        report.StartReport()
        go webserver.Serve(serverPort.Text)
    })
kingdee/db.go
New file
@@ -0,0 +1,39 @@
package kingdee
import (
    "fmt"
    "github.com/jinzhu/gorm"
    _ "github.com/jinzhu/gorm/dialects/mssql"
)
var db *gorm.DB
var err error
// Init .
func Init(username, password, addr, dbName string) error {
    var err error
    sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s",
        username, password, addr, dbName)
    fmt.Println(sqlServer)
    // 打开数据库连接
    //db, err = gorm.Open("mssql", "sqlserver://sa:LZdba@)@)@10.6.201.7:1433?database=LZGS")
    db, err = gorm.Open("mssql", sqlServer)
    if err != nil {
        return err
    }
    return nil
}
func GetDB() *gorm.DB {
    return db
}
// CloseDB .
func CloseDB() {
    if db != nil {
        db.Close()
    }
}
kingdee/icInventory.go
File was renamed from models/icInventory.go
@@ -1,4 +1,4 @@
package models
package kingdee
type Inventory struct {
    FNumber    string  `gorm:"column:FNumber" json:"FNumber"`       // 物料代码
@@ -8,12 +8,12 @@
    FStockNo   string  `gorm:"column:FStockNo" json:"FStockNo"`     // 仓库代码
    FStockName string  `gorm:"column:FStockName" json:"FStockName"` // 仓库名称
    FUnit      string  `gorm:"column:FUnit" json:"FUnit"`           // 基本计量单位
    FUnitQty   float64 `gorm:"column:FUnitQty" json:"FUnitQty"`     //基本计量单位数量
    FUnitQty   float64 `gorm:"column:FUnitQty" json:"FUnitQty"`     // 基本计量单位数量
    FSec       string  `gorm:"column:FSec" json:"FSec"`             // 常用计量单位
    FQty       float64 `gorm:"column:FQty" json:"FQty"`             // 常用计量单位数量
}
func FindICInventory() []Inventory {
func ICInventory() []Inventory {
    sql := `
    SELECT
        TOP (100) PERCENT item.FNumber AS FNumber,
@@ -40,9 +40,11 @@
        item.FNumber    
    `
    var result []Inventory
    //var result []map[string]interface{}
    db.Raw(sql).Debug().Scan(&result)
    db.Raw(sql).Scan(&result)
    //db.Raw(sql).Debug().Scan(&result)
    // 打印列名称测试
    //rows, _ := db.Raw(sql).Debug().Rows()
    //fmt.Println(rows.Columns())
kingdee/seOrder.go
File was renamed from models/seOrder.go
@@ -1,4 +1,4 @@
package models
package kingdee
// 销售订单表
type SEOrder struct {
@@ -16,6 +16,7 @@
    FCess              float64 `gorm:"column:FCess" json:"FCess"`                           // 税率
    FNote              string  `gorm:"column:FNote" json:"FNote"`                           // 备注
    FAdviceConsignDate string  `gorm:"column:FAdviceConsignDate" json:"FAdviceConsignDate"` // 交货日期
    Fdate              string  `gorm:"column:Fdate" json:"Fdate"`                           // 订单日期
    FCustID            string  `gorm:"column:FCustID" json:"FCustID"`                       // 购货单位
    FInventory         float64 `gorm:"column:FInventory" json:"FInventory"`                 // 即时库存
    // 含税单价
@@ -40,11 +41,15 @@
        i.FCess,
        i.FNote,
        i.FAdviceConsignDate,
        i.Fdate,
        i.FCustID,
        SUM(ICInventory.FQty) AS FInventory
    FROM vwICBill_32 AS i
    FROM
        vwICBill_32 AS i
    JOIN t_ICitem ON i.FShortNumber = t_ICitem.FShortNumber
    JOIN ICInventory ON ICInventory.FItemID = t_ICitem.FItemID
    WHERE
        DateDiff(dd,fDate,getdate())=0
    GROUP BY
        i.FBillNo,
        i.FNumber,
@@ -60,11 +65,16 @@
        i.FCess,
        i.FNote,
        i.FAdviceConsignDate,
        i.Fdate,
        i.FCustID;
    `
    //查询当天日期
    //select * from vwICBill_32 where DateDiff(dd,fDate,getdate())=0
    var result []SEOrder
    db.Raw(sql).Debug().Scan(&result)
    db.Raw(sql).Scan(&result)
    //db.Raw(sql).Debug().Scan(&result)
    return result
}
main.go
@@ -1,12 +1,13 @@
package main
import (
    "kingdee-dbapi/kingdee"
    "kingdee-dbapi/models"
    "os"
    "strings"
    "kingdee-dbapi/config"
    "kingdee-dbapi/gui"
    "kingdee-dbapi/models"
    "github.com/flopp/go-findfont"
)
@@ -14,21 +15,13 @@
func main() {
    config.Load()
    // 连接数据库
    //models.Init()
    //
    //var order models.SEOrder
    //err := order.FindByFBillNo("SEORD006060")
    //if err != nil {
    //    fmt.Println(err.Error())
    //} else {
    //    fmt.Printf("%+v\n", order)
    //}
    // sqlite3数据库
    models.Init()
    // 设置中文字体
    setFont()
    defer os.Unsetenv("FYNE_FONT")
    defer models.CloseDB()
    defer kingdee.CloseDB()
    // 创建窗口并运行
    window := gui.NewDisplay()
models/db.go
@@ -1,39 +1,27 @@
package models
import (
    "fmt"
    "github.com/jinzhu/gorm"
    _ "github.com/jinzhu/gorm/dialects/mssql"
    _ "github.com/jinzhu/gorm/dialects/sqlite"
)
var db *gorm.DB
var err error
// Init .
func Init(username, password, addr, dbName string) error {
    var err error
    sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s",
        username, password, addr, dbName)
    fmt.Println(sqlServer)
func Init() error {
    // 打开数据库连接
    //db, err = gorm.Open("mssql", "sqlserver://sa:LZdba@)@)@10.6.201.7:1433?database=LZGS")
    db, err = gorm.Open("mssql", sqlServer)
    db, err = gorm.Open("sqlite3", "kingdee-api.db")
    if err != nil {
        return err
    }
    return nil
}
    db.AutoMigrate(&Order{})
func GetDB() *gorm.DB {
    return db
    return nil
}
// CloseDB .
func CloseDB() {
    if db != nil {
        db.Close()
    }
    db.Close()
}
models/order.go
New file
@@ -0,0 +1,31 @@
package models
type Order struct {
    ID         uint `gorm:"primary_key"`
    CreateTime string
    OrderNo    string
}
//指定表名.
func (o *Order) TableName() string {
    return "t_order"
}
//Insert 插入记录.
func (o *Order) Insert() bool {
    result := db.Table(o.TableName()).Create(&o)
    if result.Error != nil {
        return false
    }
    return result.RowsAffected > 0
}
//FindAll 查找所有记录.
func (o *Order) FindAll() (list []Order) {
    if err := db.Table(o.TableName()).Where("1 = 1").Scan(&list).Error; err != nil {
        return nil
    }
    return list
}
nsqclient/channel.go
New file
@@ -0,0 +1,134 @@
package nsqclient
import (
    "errors"
    "fmt"
    "sync"
    nsq "github.com/nsqio/go-nsq"
)
// channelPool implements the Pool interface based on buffered channels.
type channelPool struct {
    // storage for our net.Conn connections
    mu    sync.Mutex
    conns chan *nsq.Producer
    // net.Conn generator
    factory Factory
}
// Factory is a function to create new connections.
type Factory func() (*nsq.Producer, error)
// NewChannelPool returns a new pool based on buffered channels with an initial
// capacity and maximum capacity. Factory is used when initial capacity is
// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
// until a new Get() is called. During a Get(), If there is no new connection
// available in the pool, a new connection will be created via the Factory()
// method.
func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
    if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
        return nil, errors.New("invalid capacity settings")
    }
    c := &channelPool{
        conns:   make(chan *nsq.Producer, maxCap),
        factory: factory,
    }
    // create initial connections, if something goes wrong,
    // just close the pool error out.
    for i := 0; i < initialCap; i++ {
        conn, err := factory()
        if err != nil {
            c.Close()
            return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
        }
        c.conns <- conn
    }
    return c, nil
}
func (c *channelPool) getConns() chan *nsq.Producer {
    c.mu.Lock()
    conns := c.conns
    c.mu.Unlock()
    return conns
}
// Get implements the Pool interfaces Get() method. If there is no new
// connection available in the pool, a new connection will be created via the
// Factory() method.
func (c *channelPool) Get() (*PoolConn, error) {
    conns := c.getConns()
    if conns == nil {
        return nil, ErrClosed
    }
    // wrap our connections with out custom net.Conn implementation (wrapConn
    // method) that puts the connection back to the pool if it's closed.
    select {
    case conn := <-conns:
        if conn == nil {
            return nil, ErrClosed
        }
        return c.wrapConn(conn), nil
    default:
        conn, err := c.factory()
        if err != nil {
            return nil, err
        }
        return c.wrapConn(conn), nil
    }
}
// put puts the connection back to the pool. If the pool is full or closed,
// conn is simply closed. A nil conn will be rejected.
func (c *channelPool) put(conn *nsq.Producer) error {
    if conn == nil {
        return errors.New("connection is nil. rejecting")
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.conns == nil {
        // pool is closed, close passed connection
        conn.Stop()
        return nil
    }
    // put the resource back into the pool. If the pool is full, this will
    // block and the default case will be executed.
    select {
    case c.conns <- conn:
        return nil
    default:
        // pool is full, close passed connection
        conn.Stop()
        return nil
    }
}
func (c *channelPool) Close() {
    c.mu.Lock()
    conns := c.conns
    c.conns = nil
    c.factory = nil
    c.mu.Unlock()
    if conns == nil {
        return
    }
    close(conns)
    for conn := range conns {
        conn.Stop()
    }
}
func (c *channelPool) Len() int { return len(c.getConns()) }
nsqclient/client.go
New file
@@ -0,0 +1,35 @@
package nsqclient
import (
    "fmt"
    "kingdee-dbapi/config"
)
var nsqClient Producer
const plcTopic = "plcTopic"
func InitNsqClient() error {
    var err error
    nsqClient, err = NewProducer(config.Options.NsqServer)
    if err != nil {
        fmt.Println(err.Error())
    }
    return err
}
func Produce(msg []byte) (err error) {
    if nsqClient == nil {
        err = InitNsqClient()
        if err != nil {
            return err
        }
    }
    if err = nsqClient.Publish(plcTopic, msg); err != nil {
        fmt.Println("Publish error:" + err.Error())
    }
    return
}
nsqclient/conn.go
New file
@@ -0,0 +1,45 @@
package nsqclient
import (
    "sync"
    nsq "github.com/nsqio/go-nsq"
)
// PoolConn is a wrapper around net.Conn to modify the the behavior of
// net.Conn's Close() method.
type PoolConn struct {
    *nsq.Producer
    mu       sync.RWMutex
    c        *channelPool
    unusable bool
}
// Close puts the given connects back to the pool instead of closing it.
func (p *PoolConn) Close() error {
    p.mu.RLock()
    defer p.mu.RUnlock()
    if p.unusable {
        if p.Producer != nil {
            p.Producer.Stop()
            return nil
        }
        return nil
    }
    return p.c.put(p.Producer)
}
// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
func (p *PoolConn) MarkUnusable() {
    p.mu.Lock()
    p.unusable = true
    p.mu.Unlock()
}
// newConn wraps a standard net.Conn to a poolConn net.Conn.
func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn {
    p := &PoolConn{c: c}
    p.Producer = conn
    return p
}
nsqclient/consumer.go
New file
@@ -0,0 +1,99 @@
package nsqclient
import (
    "context"
    "fmt"
    "time"
    nsq "github.com/nsqio/go-nsq"
)
type NsqConsumer struct {
    consumer *nsq.Consumer
    // handler   nsq.Handler
    handler   func([]byte) error
    ctx       context.Context
    ctxCancel context.CancelFunc
    topic     string
    channel   string
}
func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
    conf := nsq.NewConfig()
    conf.MaxAttempts = 0
    conf.MsgTimeout = 10 * time.Minute         // 默认一个消息最多能处理十分钟,否则就会重新丢入队列
    conf.LookupdPollInterval = 3 * time.Second // 调整consumer的重连间隔时间为3秒
    for _, option := range options {
        option(conf)
    }
    consumer, err := nsq.NewConsumer(topic, channel, conf)
    if err != nil {
        return nil, err
    }
    return &NsqConsumer{
        consumer: consumer,
        ctx:      ctx,
        topic:    topic,
        channel:  channel,
    }, nil
}
func DestroyNsqConsumer(c *NsqConsumer) {
    if c != nil {
        if c.ctxCancel != nil {
            c.ctxCancel()
        }
    }
}
// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
//     n.handler = handler
// }
func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
    n.handler = handler
}
func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
    return n.RunDistributed([]string{qaddr}, nil, concurrency)
}
func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
    return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
}
func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
    n.consumer.ChangeMaxInFlight(concurrency)
    // n.consumer.AddConcurrentHandlers(n.handler, concurrency)
    n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
        return n.handler(msg.Body)
        // return nil
    }), concurrency)
    var err error
    if len(qAddr) > 0 {
        err = n.consumer.ConnectToNSQDs(qAddr)
    } else if len(lAddr) > 0 {
        err = n.consumer.ConnectToNSQLookupds(lAddr)
    } else {
        err = fmt.Errorf("Addr Must NOT Empty")
    }
    if err != nil {
        return err
    }
    if n.ctx == nil {
        n.ctx, n.ctxCancel = context.WithCancel(context.Background())
    }
    for {
        select {
        case <-n.ctx.Done():
            fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
            n.consumer.Stop()
            fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
            return nil
        }
    }
}
nsqclient/httpClient.go
New file
@@ -0,0 +1,35 @@
package nsqclient
import (
    "bytes"
    "fmt"
    "io/ioutil"
    "net/http"
)
const nsqWebApi = "http://121.31.232.83:9080/api/nsq/pub"
// http接口 http://121.31.232.83:9080/api/nsq/pub?topic=your_topic
func HttpPost(topic string, data []byte) bool {
    uri := nsqWebApi + "?topic=" + topic
    request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(data))
    if err != nil {
        return false
    }
    request.Header.Set("Content-Type", "application/json;charset=UTF-8")
    response, err := http.DefaultClient.Do(request)
    if err != nil {
        fmt.Printf(err.Error())
        return false
    }
    defer response.Body.Close()
    body, _ := ioutil.ReadAll(response.Body)
    fmt.Println("response:", string(body))
    return true
}
nsqclient/pointer.go
New file
@@ -0,0 +1,57 @@
package nsqclient
// #include <stdlib.h>
import "C"
import (
    "sync"
    "unsafe"
)
var (
    mutex sync.RWMutex
    store = map[unsafe.Pointer]interface{}{}
)
func Save(v interface{}) unsafe.Pointer {
    if v == nil {
        return nil
    }
    // Generate real fake C pointer.
    // This pointer will not store any data, but will bi used for indexing purposes.
    // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
    // Why we need indexing, because Go doest allow C code to store pointers to Go data.
    var ptr unsafe.Pointer = C.malloc(C.size_t(1))
    if ptr == nil {
        panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
    }
    mutex.Lock()
    store[ptr] = v
    mutex.Unlock()
    return ptr
}
func Restore(ptr unsafe.Pointer) (v interface{}) {
    if ptr == nil {
        return nil
    }
    mutex.RLock()
    v = store[ptr]
    mutex.RUnlock()
    return
}
func Unref(ptr unsafe.Pointer) {
    if ptr == nil {
        return
    }
    mutex.Lock()
    delete(store, ptr)
    mutex.Unlock()
    C.free(ptr)
}
nsqclient/pool.go
New file
@@ -0,0 +1,25 @@
// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
package nsqclient
import "errors"
var (
    // ErrClosed is the error resulting if the pool is closed via pool.Close().
    ErrClosed = errors.New("pool is closed")
)
// Pool interface describes a pool implementation. A pool should have maximum
// capacity. An ideal pool is threadsafe and easy to use.
type Pool interface {
    // Get returns a new connection from the pool. Closing the connections puts
    // it back to the Pool. Closing it when the pool is destroyed or full will
    // be counted as an error.
    Get() (*PoolConn, error)
    // Close closes the pool and all its connections. After Close() the pool is
    // no longer usable.
    Close()
    // Len returns the current number of connections of the pool.
    Len() int
}
nsqclient/producer.go
New file
@@ -0,0 +1,139 @@
package nsqclient
import (
    "fmt"
    "time"
    nsq "github.com/nsqio/go-nsq"
)
type Producer interface {
    Publish(topic string, body []byte) error
    MultiPublish(topic string, body [][]byte) error
    DeferredPublish(topic string, delay time.Duration, body []byte) error
}
var _ Producer = (*producer)(nil)
type producer struct {
    pool Pool
}
var (
    //                    name   pool producer
    nsqList = make(map[string]Pool)
)
type Config struct {
    Addr     string `toml:"addr" json:"addr"`
    InitSize int    `toml:"init_size" json:"init_size"`
    MaxSize  int    `toml:"max_size" json:"max_size"`
}
func CreateProducerPool(configs map[string]Config) {
    for name, conf := range configs {
        n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
        if err == nil {
            nsqList[name] = n
            // 支持ip:port寻址
            nsqList[conf.Addr] = n
        }
    }
}
func DestroyProducerPool() {
    for _, p := range nsqList {
        p.Close()
    }
}
func GetProducer(key ...string) (*producer, error) {
    k := "default"
    if len(key) > 0 {
        k = key[0]
    }
    if n, ok := nsqList[k]; ok {
        return &producer{n}, nil
    }
    return nil, fmt.Errorf("GetProducer can't get producer")
}
// CreateNSQProducer create nsq producer
func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
    cfg := nsq.NewConfig()
    for _, option := range options {
        option(cfg)
    }
    producer, err := nsq.NewProducer(addr, cfg)
    if err != nil {
        return nil, err
    }
    // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
    return producer, nil
}
// CreateNSQProducerPool create a nwq producer pool
func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
    factory := func() (*nsq.Producer, error) {
        // TODO 这里应该执行ping方法来确定连接是正常的否则不应该创建conn
        return newProducer(addr, options...)
    }
    nsqPool, err := NewChannelPool(initSize, maxSize, factory)
    if err != nil {
        return nil, err
    }
    return nsqPool, nil
}
func NewProducer(addr string) (*producer, error) {
    CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
    return GetProducer()
}
func retry(num int, fn func() error) error {
    var err error
    for i := 0; i < num; i++ {
        err = fn()
        if err == nil {
            break
        }
    }
    return err
}
func (p *producer) Publish(topic string, body []byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
    return retry(2, func() error {
        return nsq.Publish(topic, body)
    })
}
func (p *producer) MultiPublish(topic string, body [][]byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
    return retry(2, func() error {
        return nsq.MultiPublish(topic, body)
    })
}
func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
    nsq, err := p.pool.Get()
    if err != nil {
        return err
    }
    defer nsq.Close()
    return retry(2, func() error {
        return nsq.DeferredPublish(topic, delay, body)
    })
}
report/report.go
New file
@@ -0,0 +1,42 @@
package report
import (
    "context"
    "fmt"
    "time"
    "kingdee-dbapi/config"
)
var ctx context.Context
var cancel context.CancelFunc
func StartReport() {
    ctx, cancel = context.WithCancel(context.Background())
    go Loop(ctx)
}
func RestartReport() {
    cancel()
    StartReport()
}
func Loop(c context.Context) {
    fmt.Println("start report")
    for {
        select {
        case <-c.Done():
            fmt.Println("loop break")
            return
        default:
            // 上报订单
            SendOrder()
            // 上报即时库存
            SendInventory()
            time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second)
        }
    }
}
report/task.go
New file
@@ -0,0 +1,70 @@
package report
import (
    "encoding/json"
    "kingdee-dbapi/cache"
    "kingdee-dbapi/config"
    "kingdee-dbapi/kingdee"
    "kingdee-dbapi/models"
    "kingdee-dbapi/nsqclient"
)
func SendOrder() {
    var completedOrderNo = make(map[string]struct{})
    list := kingdee.SeOrderList()
    for i := 0; i < len(list); i++ {
        if cache.Exists(list[i].FBillNo) {
            list = append(list[:i], list[i+1:]...)
        } else {
            completedOrderNo[list[i].FBillNo] = struct{}{}
        }
    }
    b, _ := json.Marshal(list)
    ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
    if ok {
        // 写入数据库, 标记已经上报过了,避免重复上报
        for orderNo, _ := range completedOrderNo {
            cursor := models.Order{
                OrderNo: orderNo,
            }
            cursor.Insert()
            cache.WriteCache(orderNo)
        }
    }
    // 逐条发送
    //for idx, _ := range list {
    //    // 已经推送过的订单
    //    if cache.Exists(list[idx].FBillNo) {
    //        continue
    //    }
    //
    //    b, _ := json.Marshal(list[idx])
    //
    //    ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
    //    if ok {
    //        completedOrderNo[list[idx].FBillNo] = struct{}{}
    //    }
    //}
}
func SendInventory() {
    list := kingdee.ICInventory()
    // 每次发 300 条
    for i := 0; i < len(list); i += 300 {
        end := i + 300
        if end > len(list) {
            end = len(list)
        }
        b, _ := json.Marshal(list[i:end])
        nsqclient.HttpPost(config.Options.InventoryTopic, b)
    }
}
webserver/controller.go
@@ -1,12 +1,13 @@
package webserver
import (
    "kingdee-dbapi/kingdee"
    "github.com/gin-gonic/gin"
    "kingdee-dbapi/models"
)
func OrderList(c *gin.Context) {
    rspData := models.SeOrderList()
    rspData := kingdee.SeOrderList()
    c.JSON(200, gin.H{
        "success": true,
        "message": "",
@@ -16,7 +17,7 @@
}
func InventoryList(c *gin.Context) {
    rspData := models.FindICInventory()
    rspData := kingdee.ICInventory()
    c.JSON(200, gin.H{
        "success": true,
        "message": "",