13个文件已添加
2 文件已重命名
9个文件已修改
| | |
| | | .idea |
| | | .vscode |
| | | kingdee-dbapi.exe |
| | | kingdee-dbapi.exe |
| | | invert.txt |
| | | kingdee-api.db |
| | | order.txt |
New file |
| | |
| | | package cache |
| | | |
| | | import ( |
| | | "kingdee-dbapi/models" |
| | | "sync" |
| | | ) |
| | | |
| | | var orderKeyCache sync.Map |
| | | |
| | | func InitCache() { |
| | | var db models.Order |
| | | |
| | | cacheData := db.FindAll() |
| | | |
| | | for _, order := range cacheData { |
| | | orderKeyCache.Store(order.OrderNo, struct{}{}) |
| | | } |
| | | } |
| | | |
| | | func Exists(key string) (ok bool) { |
| | | _, ok = orderKeyCache.Load(key) |
| | | |
| | | return |
| | | } |
| | | |
| | | func WriteCache(key string) { |
| | | orderKeyCache.Store(key, struct{}{}) |
| | | } |
| | |
| | | { |
| | | "web_port": "808", |
| | | "web_port": "10210", |
| | | "sql_addr": "10.6.201.7", |
| | | "sql_db_name": "LZGS", |
| | | "sql_username": "webapi", |
| | | "sql_password": "api2023" |
| | | "sql_password": "api2023", |
| | | "order_topic": "aps.wangpengfei.erp.seorder", |
| | | "inventory_topic": "aps.wangpengfei.erp.inventory", |
| | | "interval": 60 |
| | | } |
| | |
| | | ) |
| | | |
| | | type Config struct { |
| | | WebPort string `json:"web_port"` |
| | | SqlAddr string `json:"sql_addr"` |
| | | SqlDBName string `json:"sql_db_name"` |
| | | SqlUsername string `json:"sql_username"` |
| | | SqlPassword string `json:"sql_password"` |
| | | WebPort string `json:"web_port"` // 本地web服务绑定接口 |
| | | SqlAddr string `json:"sql_addr"` // 数据库ip地址, 不加端口号, 默认使用1433端口 |
| | | SqlDBName string `json:"sql_db_name"` // 数据库名称 |
| | | SqlUsername string `json:"sql_username"` // 数据库用户 |
| | | SqlPassword string `json:"sql_password"` // 数据库密码 |
| | | NsqServer string `json:"nsq_server"` // nsq TCP服务端地址 |
| | | NsqWebApi string `json:"nsq_server"` // nsq HTTP接口地址 |
| | | OrderTopic string `json:"order_topic"` // 订单上报的topic |
| | | InventoryTopic string `json:"inventory_topic"` // 库存上报的topic |
| | | SyncInterval int `json:"interval"` // 同步的时间间隔, 单位/秒 |
| | | } |
| | | |
| | | const configPath = "config.json" |
| | | |
| | | var Options Config |
| | | |
| | | // Init is an exported method that takes the environment starts the viper |
| | | // (external lib) and returns the configuration struct. |
| | | func init() { |
| | | func DefaultConfig() { |
| | | Options.WebPort = "10210" |
| | | Options.SqlAddr = "127.0.0.1" |
| | | Options.SqlDBName = "dbname" |
| | | Options.SqlUsername = "sa" |
| | | Options.SqlPassword = "123456" |
| | | Options.NsqServer = "fai365.com:4150" |
| | | Options.NsqWebApi = "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic" |
| | | Options.OrderTopic = "/province/city/factoryNo/kingdee_seOrder" |
| | | Options.InventoryTopic = "/province/city/factoryNo/kingdee_inventory" |
| | | Options.SyncInterval = 60 |
| | | } |
| | | |
| | | func Load() { |
| | | if !pathExists(configPath) { |
| | | DefaultConfig() |
| | | SaveConfig() |
| | | } else { |
| | | file, _ := os.Open(configPath) |
| | |
| | | b, err := json.Marshal(Options) |
| | | if err == nil { |
| | | var out bytes.Buffer |
| | | |
| | | err = json.Indent(&out, b, "", " ") |
| | | ioutil.WriteFile(configPath, out.Bytes(), 0644) |
| | | if err == nil { |
| | | err = ioutil.WriteFile(configPath, out.Bytes(), 0644) |
| | | } |
| | | } |
| | | |
| | | return err |
| | |
| | | github.com/flopp/go-findfont v0.1.0 // indirect |
| | | github.com/gin-gonic/gin v1.7.0 // indirect |
| | | github.com/jinzhu/gorm v1.9.16 // indirect |
| | | github.com/nsqio/go-nsq v1.1.0 // indirect |
| | | ) |
| | |
| | | github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= |
| | | github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= |
| | | github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= |
| | | github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= |
| | | github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= |
| | | github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= |
| | | github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= |
| | | github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= |
| | |
| | | github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= |
| | | github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= |
| | | github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= |
| | | github.com/mattn/go-sqlite3 v1.14.0 h1:mLyGNKR8+Vv9CAU7PphKa2hkEqxxhn8i32J6FPj1/QA= |
| | | github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus= |
| | | github.com/mcuadros/go-version v0.0.0-20190830083331-035f6764e8d2/go.mod h1:76rfSfYPWj01Z85hUf/ituArm797mNKcvINh1OlsZKo= |
| | | github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= |
| | |
| | | github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= |
| | | github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= |
| | | github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= |
| | | github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= |
| | | github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= |
| | | github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= |
| | | github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= |
| | | github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= |
| | |
| | | |
| | | import ( |
| | | "fmt" |
| | | "kingdee-dbapi/config" |
| | | "kingdee-dbapi/models" |
| | | "kingdee-dbapi/webserver" |
| | | "strings" |
| | | |
| | | "kingdee-dbapi/config" |
| | | "kingdee-dbapi/kingdee" |
| | | "kingdee-dbapi/report" |
| | | "kingdee-dbapi/static" |
| | | "kingdee-dbapi/webserver" |
| | | |
| | | "fyne.io/fyne/v2" |
| | | "fyne.io/fyne/v2/app" |
| | |
| | | config.SaveConfig() |
| | | |
| | | // 连接数据库 |
| | | err := models.Init(config.Options.SqlUsername, config.Options.SqlPassword, config.Options.SqlAddr, config.Options.SqlDBName) |
| | | err := kingdee.Init(config.Options.SqlUsername, config.Options.SqlPassword, config.Options.SqlAddr, config.Options.SqlDBName) |
| | | if err != nil { |
| | | fmt.Println("db init error:", err.Error()) |
| | | dialog.ShowError(err, w) |
| | |
| | | submitBtn.Text = "已启动" |
| | | submitBtn.Disable() |
| | | |
| | | report.StartReport() |
| | | |
| | | go webserver.Serve(serverPort.Text) |
| | | }) |
| | | |
New file |
| | |
| | | package kingdee |
| | | |
| | | import ( |
| | | "fmt" |
| | | "github.com/jinzhu/gorm" |
| | | |
| | | _ "github.com/jinzhu/gorm/dialects/mssql" |
| | | ) |
| | | |
| | | var db *gorm.DB |
| | | var err error |
| | | |
| | | // Init . |
| | | func Init(username, password, addr, dbName string) error { |
| | | var err error |
| | | |
| | | sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s", |
| | | username, password, addr, dbName) |
| | | fmt.Println(sqlServer) |
| | | // 打开数据库连接 |
| | | //db, err = gorm.Open("mssql", "sqlserver://sa:LZdba@)@)@10.6.201.7:1433?database=LZGS") |
| | | db, err = gorm.Open("mssql", sqlServer) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | |
| | | func GetDB() *gorm.DB { |
| | | return db |
| | | } |
| | | |
| | | // CloseDB . |
| | | func CloseDB() { |
| | | if db != nil { |
| | | db.Close() |
| | | } |
| | | } |
File was renamed from models/icInventory.go |
| | |
| | | package models |
| | | package kingdee |
| | | |
| | | type Inventory struct { |
| | | FNumber string `gorm:"column:FNumber" json:"FNumber"` // 物料代码 |
| | |
| | | FStockNo string `gorm:"column:FStockNo" json:"FStockNo"` // 仓库代码 |
| | | FStockName string `gorm:"column:FStockName" json:"FStockName"` // 仓库名称 |
| | | FUnit string `gorm:"column:FUnit" json:"FUnit"` // 基本计量单位 |
| | | FUnitQty float64 `gorm:"column:FUnitQty" json:"FUnitQty"` //基本计量单位数量 |
| | | FUnitQty float64 `gorm:"column:FUnitQty" json:"FUnitQty"` // 基本计量单位数量 |
| | | FSec string `gorm:"column:FSec" json:"FSec"` // 常用计量单位 |
| | | FQty float64 `gorm:"column:FQty" json:"FQty"` // 常用计量单位数量 |
| | | } |
| | | |
| | | func FindICInventory() []Inventory { |
| | | func ICInventory() []Inventory { |
| | | sql := ` |
| | | SELECT |
| | | TOP (100) PERCENT item.FNumber AS FNumber, |
| | |
| | | item.FNumber |
| | | ` |
| | | var result []Inventory |
| | | //var result []map[string]interface{} |
| | | |
| | | db.Raw(sql).Debug().Scan(&result) |
| | | db.Raw(sql).Scan(&result) |
| | | //db.Raw(sql).Debug().Scan(&result) |
| | | |
| | | // 打印列名称测试 |
| | | //rows, _ := db.Raw(sql).Debug().Rows() |
| | | //fmt.Println(rows.Columns()) |
| | | |
File was renamed from models/seOrder.go |
| | |
| | | package models |
| | | package kingdee |
| | | |
| | | // 销售订单表 |
| | | type SEOrder struct { |
| | |
| | | FCess float64 `gorm:"column:FCess" json:"FCess"` // 税率 |
| | | FNote string `gorm:"column:FNote" json:"FNote"` // 备注 |
| | | FAdviceConsignDate string `gorm:"column:FAdviceConsignDate" json:"FAdviceConsignDate"` // 交货日期 |
| | | Fdate string `gorm:"column:Fdate" json:"Fdate"` // 订单日期 |
| | | FCustID string `gorm:"column:FCustID" json:"FCustID"` // 购货单位 |
| | | FInventory float64 `gorm:"column:FInventory" json:"FInventory"` // 即时库存 |
| | | // 含税单价 |
| | |
| | | i.FCess, |
| | | i.FNote, |
| | | i.FAdviceConsignDate, |
| | | i.Fdate, |
| | | i.FCustID, |
| | | SUM(ICInventory.FQty) AS FInventory |
| | | FROM vwICBill_32 AS i |
| | | FROM |
| | | vwICBill_32 AS i |
| | | JOIN t_ICitem ON i.FShortNumber = t_ICitem.FShortNumber |
| | | JOIN ICInventory ON ICInventory.FItemID = t_ICitem.FItemID |
| | | WHERE |
| | | DateDiff(dd,fDate,getdate())=0 |
| | | GROUP BY |
| | | i.FBillNo, |
| | | i.FNumber, |
| | |
| | | i.FCess, |
| | | i.FNote, |
| | | i.FAdviceConsignDate, |
| | | i.Fdate, |
| | | i.FCustID; |
| | | ` |
| | | |
| | | //查询当天日期 |
| | | //select * from vwICBill_32 where DateDiff(dd,fDate,getdate())=0 |
| | | var result []SEOrder |
| | | |
| | | db.Raw(sql).Debug().Scan(&result) |
| | | db.Raw(sql).Scan(&result) |
| | | //db.Raw(sql).Debug().Scan(&result) |
| | | |
| | | return result |
| | | } |
| | |
| | | package main |
| | | |
| | | import ( |
| | | "kingdee-dbapi/kingdee" |
| | | "kingdee-dbapi/models" |
| | | "os" |
| | | "strings" |
| | | |
| | | "kingdee-dbapi/config" |
| | | "kingdee-dbapi/gui" |
| | | "kingdee-dbapi/models" |
| | | |
| | | "github.com/flopp/go-findfont" |
| | | ) |
| | |
| | | func main() { |
| | | config.Load() |
| | | |
| | | // 连接数据库 |
| | | //models.Init() |
| | | // |
| | | //var order models.SEOrder |
| | | //err := order.FindByFBillNo("SEORD006060") |
| | | //if err != nil { |
| | | // fmt.Println(err.Error()) |
| | | //} else { |
| | | // fmt.Printf("%+v\n", order) |
| | | //} |
| | | // sqlite3数据库 |
| | | models.Init() |
| | | |
| | | // 设置中文字体 |
| | | setFont() |
| | | defer os.Unsetenv("FYNE_FONT") |
| | | defer models.CloseDB() |
| | | defer kingdee.CloseDB() |
| | | |
| | | // 创建窗口并运行 |
| | | window := gui.NewDisplay() |
| | |
| | | package models |
| | | |
| | | import ( |
| | | "fmt" |
| | | "github.com/jinzhu/gorm" |
| | | |
| | | _ "github.com/jinzhu/gorm/dialects/mssql" |
| | | _ "github.com/jinzhu/gorm/dialects/sqlite" |
| | | ) |
| | | |
| | | var db *gorm.DB |
| | | var err error |
| | | |
| | | // Init . |
| | | func Init(username, password, addr, dbName string) error { |
| | | var err error |
| | | |
| | | sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s", |
| | | username, password, addr, dbName) |
| | | fmt.Println(sqlServer) |
| | | func Init() error { |
| | | // 打开数据库连接 |
| | | //db, err = gorm.Open("mssql", "sqlserver://sa:LZdba@)@)@10.6.201.7:1433?database=LZGS") |
| | | db, err = gorm.Open("mssql", sqlServer) |
| | | db, err = gorm.Open("sqlite3", "kingdee-api.db") |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | return nil |
| | | } |
| | | db.AutoMigrate(&Order{}) |
| | | |
| | | func GetDB() *gorm.DB { |
| | | return db |
| | | return nil |
| | | } |
| | | |
| | | // CloseDB . |
| | | func CloseDB() { |
| | | if db != nil { |
| | | db.Close() |
| | | } |
| | | db.Close() |
| | | } |
New file |
| | |
| | | package models |
| | | |
| | | type Order struct { |
| | | ID uint `gorm:"primary_key"` |
| | | CreateTime string |
| | | OrderNo string |
| | | } |
| | | |
| | | //指定表名. |
| | | func (o *Order) TableName() string { |
| | | return "t_order" |
| | | } |
| | | |
| | | //Insert 插入记录. |
| | | func (o *Order) Insert() bool { |
| | | result := db.Table(o.TableName()).Create(&o) |
| | | if result.Error != nil { |
| | | return false |
| | | } |
| | | |
| | | return result.RowsAffected > 0 |
| | | } |
| | | |
| | | //FindAll 查找所有记录. |
| | | func (o *Order) FindAll() (list []Order) { |
| | | if err := db.Table(o.TableName()).Where("1 = 1").Scan(&list).Error; err != nil { |
| | | return nil |
| | | } |
| | | |
| | | return list |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "errors" |
| | | "fmt" |
| | | "sync" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | // channelPool implements the Pool interface based on buffered channels. |
| | | type channelPool struct { |
| | | // storage for our net.Conn connections |
| | | mu sync.Mutex |
| | | conns chan *nsq.Producer |
| | | |
| | | // net.Conn generator |
| | | factory Factory |
| | | } |
| | | |
| | | // Factory is a function to create new connections. |
| | | type Factory func() (*nsq.Producer, error) |
| | | |
| | | // NewChannelPool returns a new pool based on buffered channels with an initial |
| | | // capacity and maximum capacity. Factory is used when initial capacity is |
| | | // greater than zero to fill the pool. A zero initialCap doesn't fill the Pool |
| | | // until a new Get() is called. During a Get(), If there is no new connection |
| | | // available in the pool, a new connection will be created via the Factory() |
| | | // method. |
| | | func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) { |
| | | if initialCap < 0 || maxCap <= 0 || initialCap > maxCap { |
| | | return nil, errors.New("invalid capacity settings") |
| | | } |
| | | |
| | | c := &channelPool{ |
| | | conns: make(chan *nsq.Producer, maxCap), |
| | | factory: factory, |
| | | } |
| | | |
| | | // create initial connections, if something goes wrong, |
| | | // just close the pool error out. |
| | | for i := 0; i < initialCap; i++ { |
| | | conn, err := factory() |
| | | if err != nil { |
| | | c.Close() |
| | | return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) |
| | | } |
| | | c.conns <- conn |
| | | } |
| | | |
| | | return c, nil |
| | | } |
| | | |
| | | func (c *channelPool) getConns() chan *nsq.Producer { |
| | | c.mu.Lock() |
| | | conns := c.conns |
| | | c.mu.Unlock() |
| | | return conns |
| | | } |
| | | |
| | | // Get implements the Pool interfaces Get() method. If there is no new |
| | | // connection available in the pool, a new connection will be created via the |
| | | // Factory() method. |
| | | func (c *channelPool) Get() (*PoolConn, error) { |
| | | conns := c.getConns() |
| | | if conns == nil { |
| | | return nil, ErrClosed |
| | | } |
| | | |
| | | // wrap our connections with out custom net.Conn implementation (wrapConn |
| | | // method) that puts the connection back to the pool if it's closed. |
| | | select { |
| | | case conn := <-conns: |
| | | if conn == nil { |
| | | return nil, ErrClosed |
| | | } |
| | | |
| | | return c.wrapConn(conn), nil |
| | | default: |
| | | conn, err := c.factory() |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | return c.wrapConn(conn), nil |
| | | } |
| | | } |
| | | |
| | | // put puts the connection back to the pool. If the pool is full or closed, |
| | | // conn is simply closed. A nil conn will be rejected. |
| | | func (c *channelPool) put(conn *nsq.Producer) error { |
| | | if conn == nil { |
| | | return errors.New("connection is nil. rejecting") |
| | | } |
| | | |
| | | c.mu.Lock() |
| | | defer c.mu.Unlock() |
| | | |
| | | if c.conns == nil { |
| | | // pool is closed, close passed connection |
| | | conn.Stop() |
| | | return nil |
| | | } |
| | | |
| | | // put the resource back into the pool. If the pool is full, this will |
| | | // block and the default case will be executed. |
| | | select { |
| | | case c.conns <- conn: |
| | | return nil |
| | | default: |
| | | // pool is full, close passed connection |
| | | conn.Stop() |
| | | return nil |
| | | } |
| | | } |
| | | |
| | | func (c *channelPool) Close() { |
| | | c.mu.Lock() |
| | | conns := c.conns |
| | | c.conns = nil |
| | | c.factory = nil |
| | | c.mu.Unlock() |
| | | |
| | | if conns == nil { |
| | | return |
| | | } |
| | | |
| | | close(conns) |
| | | for conn := range conns { |
| | | conn.Stop() |
| | | } |
| | | } |
| | | |
| | | func (c *channelPool) Len() int { return len(c.getConns()) } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "fmt" |
| | | "kingdee-dbapi/config" |
| | | ) |
| | | |
| | | var nsqClient Producer |
| | | |
| | | const plcTopic = "plcTopic" |
| | | |
| | | func InitNsqClient() error { |
| | | var err error |
| | | nsqClient, err = NewProducer(config.Options.NsqServer) |
| | | if err != nil { |
| | | fmt.Println(err.Error()) |
| | | } |
| | | |
| | | return err |
| | | } |
| | | |
| | | func Produce(msg []byte) (err error) { |
| | | if nsqClient == nil { |
| | | err = InitNsqClient() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | } |
| | | |
| | | if err = nsqClient.Publish(plcTopic, msg); err != nil { |
| | | fmt.Println("Publish error:" + err.Error()) |
| | | } |
| | | |
| | | return |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "sync" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | // PoolConn is a wrapper around net.Conn to modify the the behavior of |
| | | // net.Conn's Close() method. |
| | | type PoolConn struct { |
| | | *nsq.Producer |
| | | mu sync.RWMutex |
| | | c *channelPool |
| | | unusable bool |
| | | } |
| | | |
| | | // Close puts the given connects back to the pool instead of closing it. |
| | | func (p *PoolConn) Close() error { |
| | | p.mu.RLock() |
| | | defer p.mu.RUnlock() |
| | | |
| | | if p.unusable { |
| | | if p.Producer != nil { |
| | | p.Producer.Stop() |
| | | return nil |
| | | } |
| | | return nil |
| | | } |
| | | return p.c.put(p.Producer) |
| | | } |
| | | |
| | | // MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool. |
| | | func (p *PoolConn) MarkUnusable() { |
| | | p.mu.Lock() |
| | | p.unusable = true |
| | | p.mu.Unlock() |
| | | } |
| | | |
| | | // newConn wraps a standard net.Conn to a poolConn net.Conn. |
| | | func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn { |
| | | p := &PoolConn{c: c} |
| | | p.Producer = conn |
| | | return p |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "context" |
| | | "fmt" |
| | | "time" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | type NsqConsumer struct { |
| | | consumer *nsq.Consumer |
| | | // handler nsq.Handler |
| | | handler func([]byte) error |
| | | ctx context.Context |
| | | ctxCancel context.CancelFunc |
| | | topic string |
| | | channel string |
| | | } |
| | | |
| | | func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { |
| | | conf := nsq.NewConfig() |
| | | conf.MaxAttempts = 0 |
| | | conf.MsgTimeout = 10 * time.Minute // 默认一个消息最多能处理十分钟,否则就会重新丢入队列 |
| | | conf.LookupdPollInterval = 3 * time.Second // 调整consumer的重连间隔时间为3秒 |
| | | for _, option := range options { |
| | | option(conf) |
| | | } |
| | | |
| | | consumer, err := nsq.NewConsumer(topic, channel, conf) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | return &NsqConsumer{ |
| | | consumer: consumer, |
| | | ctx: ctx, |
| | | topic: topic, |
| | | channel: channel, |
| | | }, nil |
| | | } |
| | | |
| | | func DestroyNsqConsumer(c *NsqConsumer) { |
| | | if c != nil { |
| | | if c.ctxCancel != nil { |
| | | c.ctxCancel() |
| | | } |
| | | } |
| | | } |
| | | |
| | | // func (n *NsqConsumer) AddHandler(handler nsq.Handler) { |
| | | // n.handler = handler |
| | | // } |
| | | |
| | | func (n *NsqConsumer) AddHandler(handler func([]byte) error) { |
| | | n.handler = handler |
| | | } |
| | | |
| | | func (n *NsqConsumer) Run(qaddr string, concurrency int) error { |
| | | return n.RunDistributed([]string{qaddr}, nil, concurrency) |
| | | } |
| | | |
| | | func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error { |
| | | return n.RunDistributed(nil, []string{lookupAddr}, concurrency) |
| | | } |
| | | |
| | | func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { |
| | | n.consumer.ChangeMaxInFlight(concurrency) |
| | | // n.consumer.AddConcurrentHandlers(n.handler, concurrency) |
| | | n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error { |
| | | return n.handler(msg.Body) |
| | | // return nil |
| | | }), concurrency) |
| | | |
| | | var err error |
| | | if len(qAddr) > 0 { |
| | | err = n.consumer.ConnectToNSQDs(qAddr) |
| | | } else if len(lAddr) > 0 { |
| | | err = n.consumer.ConnectToNSQLookupds(lAddr) |
| | | } else { |
| | | err = fmt.Errorf("Addr Must NOT Empty") |
| | | } |
| | | if err != nil { |
| | | return err |
| | | } |
| | | |
| | | if n.ctx == nil { |
| | | n.ctx, n.ctxCancel = context.WithCancel(context.Background()) |
| | | } |
| | | |
| | | for { |
| | | select { |
| | | case <-n.ctx.Done(): |
| | | fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel) |
| | | n.consumer.Stop() |
| | | fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel) |
| | | return nil |
| | | } |
| | | } |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "bytes" |
| | | "fmt" |
| | | "io/ioutil" |
| | | "net/http" |
| | | ) |
| | | |
| | | const nsqWebApi = "http://121.31.232.83:9080/api/nsq/pub" |
| | | |
| | | // http接口 http://121.31.232.83:9080/api/nsq/pub?topic=your_topic |
| | | func HttpPost(topic string, data []byte) bool { |
| | | uri := nsqWebApi + "?topic=" + topic |
| | | |
| | | request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(data)) |
| | | if err != nil { |
| | | return false |
| | | } |
| | | |
| | | request.Header.Set("Content-Type", "application/json;charset=UTF-8") |
| | | |
| | | response, err := http.DefaultClient.Do(request) |
| | | if err != nil { |
| | | fmt.Printf(err.Error()) |
| | | return false |
| | | } |
| | | defer response.Body.Close() |
| | | |
| | | body, _ := ioutil.ReadAll(response.Body) |
| | | |
| | | fmt.Println("response:", string(body)) |
| | | |
| | | return true |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | // #include <stdlib.h> |
| | | import "C" |
| | | import ( |
| | | "sync" |
| | | "unsafe" |
| | | ) |
| | | |
| | | var ( |
| | | mutex sync.RWMutex |
| | | store = map[unsafe.Pointer]interface{}{} |
| | | ) |
| | | |
| | | func Save(v interface{}) unsafe.Pointer { |
| | | if v == nil { |
| | | return nil |
| | | } |
| | | |
| | | // Generate real fake C pointer. |
| | | // This pointer will not store any data, but will bi used for indexing purposes. |
| | | // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte. |
| | | // Why we need indexing, because Go doest allow C code to store pointers to Go data. |
| | | var ptr unsafe.Pointer = C.malloc(C.size_t(1)) |
| | | if ptr == nil { |
| | | panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil") |
| | | } |
| | | |
| | | mutex.Lock() |
| | | store[ptr] = v |
| | | mutex.Unlock() |
| | | |
| | | return ptr |
| | | } |
| | | |
| | | func Restore(ptr unsafe.Pointer) (v interface{}) { |
| | | if ptr == nil { |
| | | return nil |
| | | } |
| | | |
| | | mutex.RLock() |
| | | v = store[ptr] |
| | | mutex.RUnlock() |
| | | return |
| | | } |
| | | |
| | | func Unref(ptr unsafe.Pointer) { |
| | | if ptr == nil { |
| | | return |
| | | } |
| | | |
| | | mutex.Lock() |
| | | delete(store, ptr) |
| | | mutex.Unlock() |
| | | |
| | | C.free(ptr) |
| | | } |
New file |
| | |
| | | // Package pool implements a pool of net.Conn interfaces to manage and reuse them. |
| | | package nsqclient |
| | | |
| | | import "errors" |
| | | |
| | | var ( |
| | | // ErrClosed is the error resulting if the pool is closed via pool.Close(). |
| | | ErrClosed = errors.New("pool is closed") |
| | | ) |
| | | |
| | | // Pool interface describes a pool implementation. A pool should have maximum |
| | | // capacity. An ideal pool is threadsafe and easy to use. |
| | | type Pool interface { |
| | | // Get returns a new connection from the pool. Closing the connections puts |
| | | // it back to the Pool. Closing it when the pool is destroyed or full will |
| | | // be counted as an error. |
| | | Get() (*PoolConn, error) |
| | | |
| | | // Close closes the pool and all its connections. After Close() the pool is |
| | | // no longer usable. |
| | | Close() |
| | | |
| | | // Len returns the current number of connections of the pool. |
| | | Len() int |
| | | } |
New file |
| | |
| | | package nsqclient |
| | | |
| | | import ( |
| | | "fmt" |
| | | "time" |
| | | |
| | | nsq "github.com/nsqio/go-nsq" |
| | | ) |
| | | |
| | | type Producer interface { |
| | | Publish(topic string, body []byte) error |
| | | MultiPublish(topic string, body [][]byte) error |
| | | DeferredPublish(topic string, delay time.Duration, body []byte) error |
| | | } |
| | | |
| | | var _ Producer = (*producer)(nil) |
| | | |
| | | type producer struct { |
| | | pool Pool |
| | | } |
| | | |
| | | var ( |
| | | // name pool producer |
| | | nsqList = make(map[string]Pool) |
| | | ) |
| | | |
| | | type Config struct { |
| | | Addr string `toml:"addr" json:"addr"` |
| | | InitSize int `toml:"init_size" json:"init_size"` |
| | | MaxSize int `toml:"max_size" json:"max_size"` |
| | | } |
| | | |
| | | func CreateProducerPool(configs map[string]Config) { |
| | | for name, conf := range configs { |
| | | n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize) |
| | | if err == nil { |
| | | nsqList[name] = n |
| | | // 支持ip:port寻址 |
| | | nsqList[conf.Addr] = n |
| | | } |
| | | } |
| | | } |
| | | |
| | | func DestroyProducerPool() { |
| | | for _, p := range nsqList { |
| | | p.Close() |
| | | } |
| | | } |
| | | |
| | | func GetProducer(key ...string) (*producer, error) { |
| | | k := "default" |
| | | if len(key) > 0 { |
| | | k = key[0] |
| | | } |
| | | if n, ok := nsqList[k]; ok { |
| | | return &producer{n}, nil |
| | | } |
| | | return nil, fmt.Errorf("GetProducer can't get producer") |
| | | } |
| | | |
| | | // CreateNSQProducer create nsq producer |
| | | func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) { |
| | | cfg := nsq.NewConfig() |
| | | for _, option := range options { |
| | | option(cfg) |
| | | } |
| | | |
| | | producer, err := nsq.NewProducer(addr, cfg) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError) |
| | | return producer, nil |
| | | } |
| | | |
| | | // CreateNSQProducerPool create a nwq producer pool |
| | | func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) { |
| | | factory := func() (*nsq.Producer, error) { |
| | | // TODO 这里应该执行ping方法来确定连接是正常的否则不应该创建conn |
| | | return newProducer(addr, options...) |
| | | } |
| | | nsqPool, err := NewChannelPool(initSize, maxSize, factory) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | return nsqPool, nil |
| | | } |
| | | |
| | | func NewProducer(addr string) (*producer, error) { |
| | | CreateProducerPool(map[string]Config{"default": {addr, 1, 1}}) |
| | | return GetProducer() |
| | | } |
| | | |
| | | func retry(num int, fn func() error) error { |
| | | var err error |
| | | for i := 0; i < num; i++ { |
| | | err = fn() |
| | | if err == nil { |
| | | break |
| | | } |
| | | } |
| | | return err |
| | | } |
| | | |
| | | func (p *producer) Publish(topic string, body []byte) error { |
| | | nsq, err := p.pool.Get() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer nsq.Close() |
| | | |
| | | return retry(2, func() error { |
| | | return nsq.Publish(topic, body) |
| | | }) |
| | | } |
| | | |
| | | func (p *producer) MultiPublish(topic string, body [][]byte) error { |
| | | nsq, err := p.pool.Get() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer nsq.Close() |
| | | |
| | | return retry(2, func() error { |
| | | return nsq.MultiPublish(topic, body) |
| | | }) |
| | | } |
| | | |
| | | func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error { |
| | | nsq, err := p.pool.Get() |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer nsq.Close() |
| | | |
| | | return retry(2, func() error { |
| | | return nsq.DeferredPublish(topic, delay, body) |
| | | }) |
| | | } |
New file |
| | |
| | | package report |
| | | |
| | | import ( |
| | | "context" |
| | | "fmt" |
| | | "time" |
| | | |
| | | "kingdee-dbapi/config" |
| | | ) |
| | | |
| | | var ctx context.Context |
| | | var cancel context.CancelFunc |
| | | |
| | | func StartReport() { |
| | | ctx, cancel = context.WithCancel(context.Background()) |
| | | go Loop(ctx) |
| | | } |
| | | |
| | | func RestartReport() { |
| | | cancel() |
| | | |
| | | StartReport() |
| | | } |
| | | |
| | | func Loop(c context.Context) { |
| | | fmt.Println("start report") |
| | | for { |
| | | select { |
| | | case <-c.Done(): |
| | | fmt.Println("loop break") |
| | | return |
| | | default: |
| | | // 上报订单 |
| | | SendOrder() |
| | | |
| | | // 上报即时库存 |
| | | SendInventory() |
| | | |
| | | time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second) |
| | | } |
| | | } |
| | | } |
New file |
| | |
| | | package report |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | |
| | | "kingdee-dbapi/cache" |
| | | "kingdee-dbapi/config" |
| | | "kingdee-dbapi/kingdee" |
| | | "kingdee-dbapi/models" |
| | | "kingdee-dbapi/nsqclient" |
| | | ) |
| | | |
| | | func SendOrder() { |
| | | var completedOrderNo = make(map[string]struct{}) |
| | | list := kingdee.SeOrderList() |
| | | |
| | | for i := 0; i < len(list); i++ { |
| | | if cache.Exists(list[i].FBillNo) { |
| | | list = append(list[:i], list[i+1:]...) |
| | | } else { |
| | | completedOrderNo[list[i].FBillNo] = struct{}{} |
| | | } |
| | | } |
| | | |
| | | b, _ := json.Marshal(list) |
| | | |
| | | ok := nsqclient.HttpPost(config.Options.OrderTopic, b) |
| | | if ok { |
| | | // 写入数据库, 标记已经上报过了,避免重复上报 |
| | | for orderNo, _ := range completedOrderNo { |
| | | cursor := models.Order{ |
| | | OrderNo: orderNo, |
| | | } |
| | | |
| | | cursor.Insert() |
| | | cache.WriteCache(orderNo) |
| | | } |
| | | } |
| | | |
| | | // 逐条发送 |
| | | //for idx, _ := range list { |
| | | // // 已经推送过的订单 |
| | | // if cache.Exists(list[idx].FBillNo) { |
| | | // continue |
| | | // } |
| | | // |
| | | // b, _ := json.Marshal(list[idx]) |
| | | // |
| | | // ok := nsqclient.HttpPost(config.Options.OrderTopic, b) |
| | | // if ok { |
| | | // completedOrderNo[list[idx].FBillNo] = struct{}{} |
| | | // } |
| | | //} |
| | | } |
| | | |
| | | func SendInventory() { |
| | | list := kingdee.ICInventory() |
| | | |
| | | // 每次发 300 条 |
| | | for i := 0; i < len(list); i += 300 { |
| | | end := i + 300 |
| | | if end > len(list) { |
| | | end = len(list) |
| | | } |
| | | |
| | | b, _ := json.Marshal(list[i:end]) |
| | | |
| | | nsqclient.HttpPost(config.Options.InventoryTopic, b) |
| | | } |
| | | } |
| | |
| | | package webserver |
| | | |
| | | import ( |
| | | "kingdee-dbapi/kingdee" |
| | | |
| | | "github.com/gin-gonic/gin" |
| | | "kingdee-dbapi/models" |
| | | ) |
| | | |
| | | func OrderList(c *gin.Context) { |
| | | rspData := models.SeOrderList() |
| | | rspData := kingdee.SeOrderList() |
| | | c.JSON(200, gin.H{ |
| | | "success": true, |
| | | "message": "", |
| | |
| | | } |
| | | |
| | | func InventoryList(c *gin.Context) { |
| | | rspData := models.FindICInventory() |
| | | rspData := kingdee.ICInventory() |
| | | c.JSON(200, gin.H{ |
| | | "success": true, |
| | | "message": "", |