From ff3cadba4a63cd1b63cd0e36358f49ccedb88bef Mon Sep 17 00:00:00 2001 From: gigibox <gigibox@163.com> Date: 星期四, 15 六月 2023 10:02:20 +0800 Subject: [PATCH] 完成基本功能 --- config.json | 7 report/report.go | 42 ++ .gitignore | 5 kingdee/icInventory.go | 12 go.mod | 1 kingdee/db.go | 39 ++ nsqclient/conn.go | 45 ++ nsqclient/channel.go | 134 ++++++++ nsqclient/consumer.go | 99 ++++++ config/config.go | 35 + nsqclient/httpClient.go | 35 ++ go.sum | 5 webserver/controller.go | 7 report/task.go | 70 ++++ nsqclient/pointer.go | 57 +++ kingdee/seOrder.go | 16 gui/gui.go | 11 nsqclient/producer.go | 139 ++++++++ cache/cache.go | 28 + models/db.go | 24 - nsqclient/pool.go | 25 + main.go | 17 models/order.go | 31 + nsqclient/client.go | 35 ++ 24 files changed, 862 insertions(+), 57 deletions(-) diff --git a/.gitignore b/.gitignore index 45256fa..aae17bd 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ .idea .vscode -kingdee-dbapi.exe \ No newline at end of file +kingdee-dbapi.exe +invert.txt +kingdee-api.db +order.txt \ No newline at end of file diff --git a/cache/cache.go b/cache/cache.go new file mode 100644 index 0000000..8fa4fb9 --- /dev/null +++ b/cache/cache.go @@ -0,0 +1,28 @@ +package cache + +import ( + "kingdee-dbapi/models" + "sync" +) + +var orderKeyCache sync.Map + +func InitCache() { + var db models.Order + + cacheData := db.FindAll() + + for _, order := range cacheData { + orderKeyCache.Store(order.OrderNo, struct{}{}) + } +} + +func Exists(key string) (ok bool) { + _, ok = orderKeyCache.Load(key) + + return +} + +func WriteCache(key string) { + orderKeyCache.Store(key, struct{}{}) +} diff --git a/config.json b/config.json index 09e83df..651c0c1 100644 --- a/config.json +++ b/config.json @@ -1,7 +1,10 @@ { - "web_port": "808", + "web_port": "10210", "sql_addr": "10.6.201.7", "sql_db_name": "LZGS", "sql_username": "webapi", - "sql_password": "api2023" + "sql_password": "api2023", + "order_topic": "aps.wangpengfei.erp.seorder", + "inventory_topic": "aps.wangpengfei.erp.inventory", + "interval": 60 } \ No newline at end of file diff --git a/config/config.go b/config/config.go index 80b5711..b50fcf5 100644 --- a/config/config.go +++ b/config/config.go @@ -9,24 +9,38 @@ ) type Config struct { - WebPort string `json:"web_port"` - SqlAddr string `json:"sql_addr"` - SqlDBName string `json:"sql_db_name"` - SqlUsername string `json:"sql_username"` - SqlPassword string `json:"sql_password"` + WebPort string `json:"web_port"` // 鏈湴web鏈嶅姟缁戝畾鎺ュ彛 + SqlAddr string `json:"sql_addr"` // 鏁版嵁搴搃p鍦板潃, 涓嶅姞绔彛鍙�, 榛樿浣跨敤1433绔彛 + SqlDBName string `json:"sql_db_name"` // 鏁版嵁搴撳悕绉� + SqlUsername string `json:"sql_username"` // 鏁版嵁搴撶敤鎴� + SqlPassword string `json:"sql_password"` // 鏁版嵁搴撳瘑鐮� + NsqServer string `json:"nsq_server"` // nsq TCP鏈嶅姟绔湴鍧� + NsqWebApi string `json:"nsq_server"` // nsq HTTP鎺ュ彛鍦板潃 + OrderTopic string `json:"order_topic"` // 璁㈠崟涓婃姤鐨則opic + InventoryTopic string `json:"inventory_topic"` // 搴撳瓨涓婃姤鐨則opic + SyncInterval int `json:"interval"` // 鍚屾鐨勬椂闂撮棿闅�, 鍗曚綅/绉� } const configPath = "config.json" var Options Config -// Init is an exported method that takes the environment starts the viper -// (external lib) and returns the configuration struct. -func init() { +func DefaultConfig() { + Options.WebPort = "10210" + Options.SqlAddr = "127.0.0.1" + Options.SqlDBName = "dbname" + Options.SqlUsername = "sa" + Options.SqlPassword = "123456" + Options.NsqServer = "fai365.com:4150" + Options.NsqWebApi = "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic" + Options.OrderTopic = "/province/city/factoryNo/kingdee_seOrder" + Options.InventoryTopic = "/province/city/factoryNo/kingdee_inventory" + Options.SyncInterval = 60 } func Load() { if !pathExists(configPath) { + DefaultConfig() SaveConfig() } else { file, _ := os.Open(configPath) @@ -44,8 +58,11 @@ b, err := json.Marshal(Options) if err == nil { var out bytes.Buffer + err = json.Indent(&out, b, "", " ") - ioutil.WriteFile(configPath, out.Bytes(), 0644) + if err == nil { + err = ioutil.WriteFile(configPath, out.Bytes(), 0644) + } } return err diff --git a/go.mod b/go.mod index 6b07c62..5123a17 100644 --- a/go.mod +++ b/go.mod @@ -8,4 +8,5 @@ github.com/flopp/go-findfont v0.1.0 // indirect github.com/gin-gonic/gin v1.7.0 // indirect github.com/jinzhu/gorm v1.9.16 // indirect + github.com/nsqio/go-nsq v1.1.0 // indirect ) diff --git a/go.sum b/go.sum index af446b3..afda856 100644 --- a/go.sum +++ b/go.sum @@ -168,6 +168,8 @@ github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM= github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= @@ -268,6 +270,7 @@ github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/mattn/go-sqlite3 v1.14.0 h1:mLyGNKR8+Vv9CAU7PphKa2hkEqxxhn8i32J6FPj1/QA= github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus= github.com/mcuadros/go-version v0.0.0-20190830083331-035f6764e8d2/go.mod h1:76rfSfYPWj01Z85hUf/ituArm797mNKcvINh1OlsZKo= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= @@ -290,6 +293,8 @@ github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/gui/gui.go b/gui/gui.go index 1046e8c..abbe701 100644 --- a/gui/gui.go +++ b/gui/gui.go @@ -2,12 +2,13 @@ import ( "fmt" - "kingdee-dbapi/config" - "kingdee-dbapi/models" - "kingdee-dbapi/webserver" "strings" + "kingdee-dbapi/config" + "kingdee-dbapi/kingdee" + "kingdee-dbapi/report" "kingdee-dbapi/static" + "kingdee-dbapi/webserver" "fyne.io/fyne/v2" "fyne.io/fyne/v2/app" @@ -76,7 +77,7 @@ config.SaveConfig() // 杩炴帴鏁版嵁搴� - err := models.Init(config.Options.SqlUsername, config.Options.SqlPassword, config.Options.SqlAddr, config.Options.SqlDBName) + err := kingdee.Init(config.Options.SqlUsername, config.Options.SqlPassword, config.Options.SqlAddr, config.Options.SqlDBName) if err != nil { fmt.Println("db init error:", err.Error()) dialog.ShowError(err, w) @@ -87,6 +88,8 @@ submitBtn.Text = "宸插惎鍔�" submitBtn.Disable() + report.StartReport() + go webserver.Serve(serverPort.Text) }) diff --git a/kingdee/db.go b/kingdee/db.go new file mode 100644 index 0000000..e504eca --- /dev/null +++ b/kingdee/db.go @@ -0,0 +1,39 @@ +package kingdee + +import ( + "fmt" + "github.com/jinzhu/gorm" + + _ "github.com/jinzhu/gorm/dialects/mssql" +) + +var db *gorm.DB +var err error + +// Init . +func Init(username, password, addr, dbName string) error { + var err error + + sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s", + username, password, addr, dbName) + fmt.Println(sqlServer) + // 鎵撳紑鏁版嵁搴撹繛鎺� + //db, err = gorm.Open("mssql", "sqlserver://sa:LZdba@)@)@10.6.201.7:1433?database=LZGS") + db, err = gorm.Open("mssql", sqlServer) + if err != nil { + return err + } + + return nil +} + +func GetDB() *gorm.DB { + return db +} + +// CloseDB . +func CloseDB() { + if db != nil { + db.Close() + } +} diff --git a/models/icInventory.go b/kingdee/icInventory.go similarity index 90% rename from models/icInventory.go rename to kingdee/icInventory.go index 68e06eb..bd824a5 100644 --- a/models/icInventory.go +++ b/kingdee/icInventory.go @@ -1,4 +1,4 @@ -package models +package kingdee type Inventory struct { FNumber string `gorm:"column:FNumber" json:"FNumber"` // 鐗╂枡浠g爜 @@ -8,12 +8,12 @@ FStockNo string `gorm:"column:FStockNo" json:"FStockNo"` // 浠撳簱浠g爜 FStockName string `gorm:"column:FStockName" json:"FStockName"` // 浠撳簱鍚嶇О FUnit string `gorm:"column:FUnit" json:"FUnit"` // 鍩烘湰璁¢噺鍗曚綅 - FUnitQty float64 `gorm:"column:FUnitQty" json:"FUnitQty"` //鍩烘湰璁¢噺鍗曚綅鏁伴噺 + FUnitQty float64 `gorm:"column:FUnitQty" json:"FUnitQty"` // 鍩烘湰璁¢噺鍗曚綅鏁伴噺 FSec string `gorm:"column:FSec" json:"FSec"` // 甯哥敤璁¢噺鍗曚綅 FQty float64 `gorm:"column:FQty" json:"FQty"` // 甯哥敤璁¢噺鍗曚綅鏁伴噺 } -func FindICInventory() []Inventory { +func ICInventory() []Inventory { sql := ` SELECT TOP (100) PERCENT item.FNumber AS FNumber, @@ -40,9 +40,11 @@ item.FNumber ` var result []Inventory - //var result []map[string]interface{} - db.Raw(sql).Debug().Scan(&result) + db.Raw(sql).Scan(&result) + //db.Raw(sql).Debug().Scan(&result) + + // 鎵撳嵃鍒楀悕绉版祴璇� //rows, _ := db.Raw(sql).Debug().Rows() //fmt.Println(rows.Columns()) diff --git a/models/seOrder.go b/kingdee/seOrder.go similarity index 86% rename from models/seOrder.go rename to kingdee/seOrder.go index a1c2f27..297555d 100644 --- a/models/seOrder.go +++ b/kingdee/seOrder.go @@ -1,4 +1,4 @@ -package models +package kingdee // 閿�鍞鍗曡〃 type SEOrder struct { @@ -16,6 +16,7 @@ FCess float64 `gorm:"column:FCess" json:"FCess"` // 绋庣巼 FNote string `gorm:"column:FNote" json:"FNote"` // 澶囨敞 FAdviceConsignDate string `gorm:"column:FAdviceConsignDate" json:"FAdviceConsignDate"` // 浜よ揣鏃ユ湡 + Fdate string `gorm:"column:Fdate" json:"Fdate"` // 璁㈠崟鏃ユ湡 FCustID string `gorm:"column:FCustID" json:"FCustID"` // 璐揣鍗曚綅 FInventory float64 `gorm:"column:FInventory" json:"FInventory"` // 鍗虫椂搴撳瓨 // 鍚◣鍗曚环 @@ -40,11 +41,15 @@ i.FCess, i.FNote, i.FAdviceConsignDate, + i.Fdate, i.FCustID, SUM(ICInventory.FQty) AS FInventory - FROM vwICBill_32 AS i + FROM + vwICBill_32 AS i JOIN t_ICitem ON i.FShortNumber = t_ICitem.FShortNumber JOIN ICInventory ON ICInventory.FItemID = t_ICitem.FItemID + WHERE + DateDiff(dd,fDate,getdate())=0 GROUP BY i.FBillNo, i.FNumber, @@ -60,11 +65,16 @@ i.FCess, i.FNote, i.FAdviceConsignDate, + i.Fdate, i.FCustID; ` + + //鏌ヨ褰撳ぉ鏃ユ湡 + //select * from vwICBill_32 where DateDiff(dd,fDate,getdate())=0 var result []SEOrder - db.Raw(sql).Debug().Scan(&result) + db.Raw(sql).Scan(&result) + //db.Raw(sql).Debug().Scan(&result) return result } diff --git a/main.go b/main.go index a027b57..fa399aa 100644 --- a/main.go +++ b/main.go @@ -1,12 +1,13 @@ package main import ( + "kingdee-dbapi/kingdee" + "kingdee-dbapi/models" "os" "strings" "kingdee-dbapi/config" "kingdee-dbapi/gui" - "kingdee-dbapi/models" "github.com/flopp/go-findfont" ) @@ -14,21 +15,13 @@ func main() { config.Load() - // 杩炴帴鏁版嵁搴� - //models.Init() - // - //var order models.SEOrder - //err := order.FindByFBillNo("SEORD006060") - //if err != nil { - // fmt.Println(err.Error()) - //} else { - // fmt.Printf("%+v\n", order) - //} + // sqlite3鏁版嵁搴� + models.Init() // 璁剧疆涓枃瀛椾綋 setFont() defer os.Unsetenv("FYNE_FONT") - defer models.CloseDB() + defer kingdee.CloseDB() // 鍒涘缓绐楀彛骞惰繍琛� window := gui.NewDisplay() diff --git a/models/db.go b/models/db.go index d40e5b5..6e18fa4 100644 --- a/models/db.go +++ b/models/db.go @@ -1,39 +1,27 @@ package models import ( - "fmt" "github.com/jinzhu/gorm" - - _ "github.com/jinzhu/gorm/dialects/mssql" + _ "github.com/jinzhu/gorm/dialects/sqlite" ) var db *gorm.DB var err error // Init . -func Init(username, password, addr, dbName string) error { - var err error - - sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s", - username, password, addr, dbName) - fmt.Println(sqlServer) +func Init() error { // 鎵撳紑鏁版嵁搴撹繛鎺� - //db, err = gorm.Open("mssql", "sqlserver://sa:LZdba@)@)@10.6.201.7:1433?database=LZGS") - db, err = gorm.Open("mssql", sqlServer) + db, err = gorm.Open("sqlite3", "kingdee-api.db") if err != nil { return err } - return nil -} + db.AutoMigrate(&Order{}) -func GetDB() *gorm.DB { - return db + return nil } // CloseDB . func CloseDB() { - if db != nil { - db.Close() - } + db.Close() } diff --git a/models/order.go b/models/order.go new file mode 100644 index 0000000..e34efde --- /dev/null +++ b/models/order.go @@ -0,0 +1,31 @@ +package models + +type Order struct { + ID uint `gorm:"primary_key"` + CreateTime string + OrderNo string +} + +//鎸囧畾琛ㄥ悕. +func (o *Order) TableName() string { + return "t_order" +} + +//Insert 鎻掑叆璁板綍. +func (o *Order) Insert() bool { + result := db.Table(o.TableName()).Create(&o) + if result.Error != nil { + return false + } + + return result.RowsAffected > 0 +} + +//FindAll 鏌ユ壘鎵�鏈夎褰�. +func (o *Order) FindAll() (list []Order) { + if err := db.Table(o.TableName()).Where("1 = 1").Scan(&list).Error; err != nil { + return nil + } + + return list +} diff --git a/nsqclient/channel.go b/nsqclient/channel.go new file mode 100644 index 0000000..1594dcc --- /dev/null +++ b/nsqclient/channel.go @@ -0,0 +1,134 @@ +package nsqclient + +import ( + "errors" + "fmt" + "sync" + + nsq "github.com/nsqio/go-nsq" +) + +// channelPool implements the Pool interface based on buffered channels. +type channelPool struct { + // storage for our net.Conn connections + mu sync.Mutex + conns chan *nsq.Producer + + // net.Conn generator + factory Factory +} + +// Factory is a function to create new connections. +type Factory func() (*nsq.Producer, error) + +// NewChannelPool returns a new pool based on buffered channels with an initial +// capacity and maximum capacity. Factory is used when initial capacity is +// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool +// until a new Get() is called. During a Get(), If there is no new connection +// available in the pool, a new connection will be created via the Factory() +// method. +func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) { + if initialCap < 0 || maxCap <= 0 || initialCap > maxCap { + return nil, errors.New("invalid capacity settings") + } + + c := &channelPool{ + conns: make(chan *nsq.Producer, maxCap), + factory: factory, + } + + // create initial connections, if something goes wrong, + // just close the pool error out. + for i := 0; i < initialCap; i++ { + conn, err := factory() + if err != nil { + c.Close() + return nil, fmt.Errorf("factory is not able to fill the pool: %s", err) + } + c.conns <- conn + } + + return c, nil +} + +func (c *channelPool) getConns() chan *nsq.Producer { + c.mu.Lock() + conns := c.conns + c.mu.Unlock() + return conns +} + +// Get implements the Pool interfaces Get() method. If there is no new +// connection available in the pool, a new connection will be created via the +// Factory() method. +func (c *channelPool) Get() (*PoolConn, error) { + conns := c.getConns() + if conns == nil { + return nil, ErrClosed + } + + // wrap our connections with out custom net.Conn implementation (wrapConn + // method) that puts the connection back to the pool if it's closed. + select { + case conn := <-conns: + if conn == nil { + return nil, ErrClosed + } + + return c.wrapConn(conn), nil + default: + conn, err := c.factory() + if err != nil { + return nil, err + } + + return c.wrapConn(conn), nil + } +} + +// put puts the connection back to the pool. If the pool is full or closed, +// conn is simply closed. A nil conn will be rejected. +func (c *channelPool) put(conn *nsq.Producer) error { + if conn == nil { + return errors.New("connection is nil. rejecting") + } + + c.mu.Lock() + defer c.mu.Unlock() + + if c.conns == nil { + // pool is closed, close passed connection + conn.Stop() + return nil + } + + // put the resource back into the pool. If the pool is full, this will + // block and the default case will be executed. + select { + case c.conns <- conn: + return nil + default: + // pool is full, close passed connection + conn.Stop() + return nil + } +} + +func (c *channelPool) Close() { + c.mu.Lock() + conns := c.conns + c.conns = nil + c.factory = nil + c.mu.Unlock() + + if conns == nil { + return + } + + close(conns) + for conn := range conns { + conn.Stop() + } +} + +func (c *channelPool) Len() int { return len(c.getConns()) } diff --git a/nsqclient/client.go b/nsqclient/client.go new file mode 100644 index 0000000..1b27d18 --- /dev/null +++ b/nsqclient/client.go @@ -0,0 +1,35 @@ +package nsqclient + +import ( + "fmt" + "kingdee-dbapi/config" +) + +var nsqClient Producer + +const plcTopic = "plcTopic" + +func InitNsqClient() error { + var err error + nsqClient, err = NewProducer(config.Options.NsqServer) + if err != nil { + fmt.Println(err.Error()) + } + + return err +} + +func Produce(msg []byte) (err error) { + if nsqClient == nil { + err = InitNsqClient() + if err != nil { + return err + } + } + + if err = nsqClient.Publish(plcTopic, msg); err != nil { + fmt.Println("Publish error:" + err.Error()) + } + + return +} diff --git a/nsqclient/conn.go b/nsqclient/conn.go new file mode 100644 index 0000000..5c680b5 --- /dev/null +++ b/nsqclient/conn.go @@ -0,0 +1,45 @@ +package nsqclient + +import ( + "sync" + + nsq "github.com/nsqio/go-nsq" +) + +// PoolConn is a wrapper around net.Conn to modify the the behavior of +// net.Conn's Close() method. +type PoolConn struct { + *nsq.Producer + mu sync.RWMutex + c *channelPool + unusable bool +} + +// Close puts the given connects back to the pool instead of closing it. +func (p *PoolConn) Close() error { + p.mu.RLock() + defer p.mu.RUnlock() + + if p.unusable { + if p.Producer != nil { + p.Producer.Stop() + return nil + } + return nil + } + return p.c.put(p.Producer) +} + +// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool. +func (p *PoolConn) MarkUnusable() { + p.mu.Lock() + p.unusable = true + p.mu.Unlock() +} + +// newConn wraps a standard net.Conn to a poolConn net.Conn. +func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn { + p := &PoolConn{c: c} + p.Producer = conn + return p +} diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go new file mode 100644 index 0000000..a0df0b0 --- /dev/null +++ b/nsqclient/consumer.go @@ -0,0 +1,99 @@ +package nsqclient + +import ( + "context" + "fmt" + "time" + + nsq "github.com/nsqio/go-nsq" +) + +type NsqConsumer struct { + consumer *nsq.Consumer + // handler nsq.Handler + handler func([]byte) error + ctx context.Context + ctxCancel context.CancelFunc + topic string + channel string +} + +func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) { + conf := nsq.NewConfig() + conf.MaxAttempts = 0 + conf.MsgTimeout = 10 * time.Minute // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪 + conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉� + for _, option := range options { + option(conf) + } + + consumer, err := nsq.NewConsumer(topic, channel, conf) + if err != nil { + return nil, err + } + return &NsqConsumer{ + consumer: consumer, + ctx: ctx, + topic: topic, + channel: channel, + }, nil +} + +func DestroyNsqConsumer(c *NsqConsumer) { + if c != nil { + if c.ctxCancel != nil { + c.ctxCancel() + } + } +} + +// func (n *NsqConsumer) AddHandler(handler nsq.Handler) { +// n.handler = handler +// } + +func (n *NsqConsumer) AddHandler(handler func([]byte) error) { + n.handler = handler +} + +func (n *NsqConsumer) Run(qaddr string, concurrency int) error { + return n.RunDistributed([]string{qaddr}, nil, concurrency) +} + +func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error { + return n.RunDistributed(nil, []string{lookupAddr}, concurrency) +} + +func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error { + n.consumer.ChangeMaxInFlight(concurrency) + // n.consumer.AddConcurrentHandlers(n.handler, concurrency) + n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error { + return n.handler(msg.Body) + // return nil + }), concurrency) + + var err error + if len(qAddr) > 0 { + err = n.consumer.ConnectToNSQDs(qAddr) + } else if len(lAddr) > 0 { + err = n.consumer.ConnectToNSQLookupds(lAddr) + } else { + err = fmt.Errorf("Addr Must NOT Empty") + } + if err != nil { + return err + } + + if n.ctx == nil { + n.ctx, n.ctxCancel = context.WithCancel(context.Background()) + } + + for { + select { + case <-n.ctx.Done(): + fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel) + n.consumer.Stop() + fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel) + return nil + } + } +} diff --git a/nsqclient/httpClient.go b/nsqclient/httpClient.go new file mode 100644 index 0000000..d400a58 --- /dev/null +++ b/nsqclient/httpClient.go @@ -0,0 +1,35 @@ +package nsqclient + +import ( + "bytes" + "fmt" + "io/ioutil" + "net/http" +) + +const nsqWebApi = "http://121.31.232.83:9080/api/nsq/pub" + +// http鎺ュ彛 http://121.31.232.83:9080/api/nsq/pub?topic=your_topic +func HttpPost(topic string, data []byte) bool { + uri := nsqWebApi + "?topic=" + topic + + request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(data)) + if err != nil { + return false + } + + request.Header.Set("Content-Type", "application/json;charset=UTF-8") + + response, err := http.DefaultClient.Do(request) + if err != nil { + fmt.Printf(err.Error()) + return false + } + defer response.Body.Close() + + body, _ := ioutil.ReadAll(response.Body) + + fmt.Println("response:", string(body)) + + return true +} diff --git a/nsqclient/pointer.go b/nsqclient/pointer.go new file mode 100644 index 0000000..1cba795 --- /dev/null +++ b/nsqclient/pointer.go @@ -0,0 +1,57 @@ +package nsqclient + +// #include <stdlib.h> +import "C" +import ( + "sync" + "unsafe" +) + +var ( + mutex sync.RWMutex + store = map[unsafe.Pointer]interface{}{} +) + +func Save(v interface{}) unsafe.Pointer { + if v == nil { + return nil + } + + // Generate real fake C pointer. + // This pointer will not store any data, but will bi used for indexing purposes. + // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte. + // Why we need indexing, because Go doest allow C code to store pointers to Go data. + var ptr unsafe.Pointer = C.malloc(C.size_t(1)) + if ptr == nil { + panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil") + } + + mutex.Lock() + store[ptr] = v + mutex.Unlock() + + return ptr +} + +func Restore(ptr unsafe.Pointer) (v interface{}) { + if ptr == nil { + return nil + } + + mutex.RLock() + v = store[ptr] + mutex.RUnlock() + return +} + +func Unref(ptr unsafe.Pointer) { + if ptr == nil { + return + } + + mutex.Lock() + delete(store, ptr) + mutex.Unlock() + + C.free(ptr) +} diff --git a/nsqclient/pool.go b/nsqclient/pool.go new file mode 100644 index 0000000..b773490 --- /dev/null +++ b/nsqclient/pool.go @@ -0,0 +1,25 @@ +// Package pool implements a pool of net.Conn interfaces to manage and reuse them. +package nsqclient + +import "errors" + +var ( + // ErrClosed is the error resulting if the pool is closed via pool.Close(). + ErrClosed = errors.New("pool is closed") +) + +// Pool interface describes a pool implementation. A pool should have maximum +// capacity. An ideal pool is threadsafe and easy to use. +type Pool interface { + // Get returns a new connection from the pool. Closing the connections puts + // it back to the Pool. Closing it when the pool is destroyed or full will + // be counted as an error. + Get() (*PoolConn, error) + + // Close closes the pool and all its connections. After Close() the pool is + // no longer usable. + Close() + + // Len returns the current number of connections of the pool. + Len() int +} diff --git a/nsqclient/producer.go b/nsqclient/producer.go new file mode 100644 index 0000000..717c7a1 --- /dev/null +++ b/nsqclient/producer.go @@ -0,0 +1,139 @@ +package nsqclient + +import ( + "fmt" + "time" + + nsq "github.com/nsqio/go-nsq" +) + +type Producer interface { + Publish(topic string, body []byte) error + MultiPublish(topic string, body [][]byte) error + DeferredPublish(topic string, delay time.Duration, body []byte) error +} + +var _ Producer = (*producer)(nil) + +type producer struct { + pool Pool +} + +var ( + // name pool producer + nsqList = make(map[string]Pool) +) + +type Config struct { + Addr string `toml:"addr" json:"addr"` + InitSize int `toml:"init_size" json:"init_size"` + MaxSize int `toml:"max_size" json:"max_size"` +} + +func CreateProducerPool(configs map[string]Config) { + for name, conf := range configs { + n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize) + if err == nil { + nsqList[name] = n + // 鏀寔ip:port瀵诲潃 + nsqList[conf.Addr] = n + } + } +} + +func DestroyProducerPool() { + for _, p := range nsqList { + p.Close() + } +} + +func GetProducer(key ...string) (*producer, error) { + k := "default" + if len(key) > 0 { + k = key[0] + } + if n, ok := nsqList[k]; ok { + return &producer{n}, nil + } + return nil, fmt.Errorf("GetProducer can't get producer") +} + +// CreateNSQProducer create nsq producer +func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) { + cfg := nsq.NewConfig() + for _, option := range options { + option(cfg) + } + + producer, err := nsq.NewProducer(addr, cfg) + if err != nil { + return nil, err + } + // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError) + return producer, nil +} + +// CreateNSQProducerPool create a nwq producer pool +func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) { + factory := func() (*nsq.Producer, error) { + // TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn + return newProducer(addr, options...) + } + nsqPool, err := NewChannelPool(initSize, maxSize, factory) + if err != nil { + return nil, err + } + return nsqPool, nil +} + +func NewProducer(addr string) (*producer, error) { + CreateProducerPool(map[string]Config{"default": {addr, 1, 1}}) + return GetProducer() +} + +func retry(num int, fn func() error) error { + var err error + for i := 0; i < num; i++ { + err = fn() + if err == nil { + break + } + } + return err +} + +func (p *producer) Publish(topic string, body []byte) error { + nsq, err := p.pool.Get() + if err != nil { + return err + } + defer nsq.Close() + + return retry(2, func() error { + return nsq.Publish(topic, body) + }) +} + +func (p *producer) MultiPublish(topic string, body [][]byte) error { + nsq, err := p.pool.Get() + if err != nil { + return err + } + defer nsq.Close() + + return retry(2, func() error { + return nsq.MultiPublish(topic, body) + }) +} + +func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error { + nsq, err := p.pool.Get() + if err != nil { + return err + } + defer nsq.Close() + + return retry(2, func() error { + return nsq.DeferredPublish(topic, delay, body) + }) +} diff --git a/report/report.go b/report/report.go new file mode 100644 index 0000000..9d7e2b1 --- /dev/null +++ b/report/report.go @@ -0,0 +1,42 @@ +package report + +import ( + "context" + "fmt" + "time" + + "kingdee-dbapi/config" +) + +var ctx context.Context +var cancel context.CancelFunc + +func StartReport() { + ctx, cancel = context.WithCancel(context.Background()) + go Loop(ctx) +} + +func RestartReport() { + cancel() + + StartReport() +} + +func Loop(c context.Context) { + fmt.Println("start report") + for { + select { + case <-c.Done(): + fmt.Println("loop break") + return + default: + // 涓婃姤璁㈠崟 + SendOrder() + + // 涓婃姤鍗虫椂搴撳瓨 + SendInventory() + + time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second) + } + } +} diff --git a/report/task.go b/report/task.go new file mode 100644 index 0000000..40f0ac4 --- /dev/null +++ b/report/task.go @@ -0,0 +1,70 @@ +package report + +import ( + "encoding/json" + + "kingdee-dbapi/cache" + "kingdee-dbapi/config" + "kingdee-dbapi/kingdee" + "kingdee-dbapi/models" + "kingdee-dbapi/nsqclient" +) + +func SendOrder() { + var completedOrderNo = make(map[string]struct{}) + list := kingdee.SeOrderList() + + for i := 0; i < len(list); i++ { + if cache.Exists(list[i].FBillNo) { + list = append(list[:i], list[i+1:]...) + } else { + completedOrderNo[list[i].FBillNo] = struct{}{} + } + } + + b, _ := json.Marshal(list) + + ok := nsqclient.HttpPost(config.Options.OrderTopic, b) + if ok { + // 鍐欏叆鏁版嵁搴�, 鏍囪宸茬粡涓婃姤杩囦簡,閬垮厤閲嶅涓婃姤 + for orderNo, _ := range completedOrderNo { + cursor := models.Order{ + OrderNo: orderNo, + } + + cursor.Insert() + cache.WriteCache(orderNo) + } + } + + // 閫愭潯鍙戦�� + //for idx, _ := range list { + // // 宸茬粡鎺ㄩ�佽繃鐨勮鍗� + // if cache.Exists(list[idx].FBillNo) { + // continue + // } + // + // b, _ := json.Marshal(list[idx]) + // + // ok := nsqclient.HttpPost(config.Options.OrderTopic, b) + // if ok { + // completedOrderNo[list[idx].FBillNo] = struct{}{} + // } + //} +} + +func SendInventory() { + list := kingdee.ICInventory() + + // 姣忔鍙� 300 鏉� + for i := 0; i < len(list); i += 300 { + end := i + 300 + if end > len(list) { + end = len(list) + } + + b, _ := json.Marshal(list[i:end]) + + nsqclient.HttpPost(config.Options.InventoryTopic, b) + } +} diff --git a/webserver/controller.go b/webserver/controller.go index 70641bd..503e96f 100644 --- a/webserver/controller.go +++ b/webserver/controller.go @@ -1,12 +1,13 @@ package webserver import ( + "kingdee-dbapi/kingdee" + "github.com/gin-gonic/gin" - "kingdee-dbapi/models" ) func OrderList(c *gin.Context) { - rspData := models.SeOrderList() + rspData := kingdee.SeOrderList() c.JSON(200, gin.H{ "success": true, "message": "", @@ -16,7 +17,7 @@ } func InventoryList(c *gin.Context) { - rspData := models.FindICInventory() + rspData := kingdee.ICInventory() c.JSON(200, gin.H{ "success": true, "message": "", -- Gitblit v1.8.0