From ff3cadba4a63cd1b63cd0e36358f49ccedb88bef Mon Sep 17 00:00:00 2001
From: gigibox <gigibox@163.com>
Date: 星期四, 15 六月 2023 10:02:20 +0800
Subject: [PATCH] 完成基本功能
---
config.json | 7
report/report.go | 42 ++
.gitignore | 5
kingdee/icInventory.go | 12
go.mod | 1
kingdee/db.go | 39 ++
nsqclient/conn.go | 45 ++
nsqclient/channel.go | 134 ++++++++
nsqclient/consumer.go | 99 ++++++
config/config.go | 35 +
nsqclient/httpClient.go | 35 ++
go.sum | 5
webserver/controller.go | 7
report/task.go | 70 ++++
nsqclient/pointer.go | 57 +++
kingdee/seOrder.go | 16
gui/gui.go | 11
nsqclient/producer.go | 139 ++++++++
cache/cache.go | 28 +
models/db.go | 24 -
nsqclient/pool.go | 25 +
main.go | 17
models/order.go | 31 +
nsqclient/client.go | 35 ++
24 files changed, 862 insertions(+), 57 deletions(-)
diff --git a/.gitignore b/.gitignore
index 45256fa..aae17bd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,6 @@
.idea
.vscode
-kingdee-dbapi.exe
\ No newline at end of file
+kingdee-dbapi.exe
+invert.txt
+kingdee-api.db
+order.txt
\ No newline at end of file
diff --git a/cache/cache.go b/cache/cache.go
new file mode 100644
index 0000000..8fa4fb9
--- /dev/null
+++ b/cache/cache.go
@@ -0,0 +1,28 @@
+package cache
+
+import (
+ "kingdee-dbapi/models"
+ "sync"
+)
+
+var orderKeyCache sync.Map
+
+func InitCache() {
+ var db models.Order
+
+ cacheData := db.FindAll()
+
+ for _, order := range cacheData {
+ orderKeyCache.Store(order.OrderNo, struct{}{})
+ }
+}
+
+func Exists(key string) (ok bool) {
+ _, ok = orderKeyCache.Load(key)
+
+ return
+}
+
+func WriteCache(key string) {
+ orderKeyCache.Store(key, struct{}{})
+}
diff --git a/config.json b/config.json
index 09e83df..651c0c1 100644
--- a/config.json
+++ b/config.json
@@ -1,7 +1,10 @@
{
- "web_port": "808",
+ "web_port": "10210",
"sql_addr": "10.6.201.7",
"sql_db_name": "LZGS",
"sql_username": "webapi",
- "sql_password": "api2023"
+ "sql_password": "api2023",
+ "order_topic": "aps.wangpengfei.erp.seorder",
+ "inventory_topic": "aps.wangpengfei.erp.inventory",
+ "interval": 60
}
\ No newline at end of file
diff --git a/config/config.go b/config/config.go
index 80b5711..b50fcf5 100644
--- a/config/config.go
+++ b/config/config.go
@@ -9,24 +9,38 @@
)
type Config struct {
- WebPort string `json:"web_port"`
- SqlAddr string `json:"sql_addr"`
- SqlDBName string `json:"sql_db_name"`
- SqlUsername string `json:"sql_username"`
- SqlPassword string `json:"sql_password"`
+ WebPort string `json:"web_port"` // 鏈湴web鏈嶅姟缁戝畾鎺ュ彛
+ SqlAddr string `json:"sql_addr"` // 鏁版嵁搴搃p鍦板潃, 涓嶅姞绔彛鍙�, 榛樿浣跨敤1433绔彛
+ SqlDBName string `json:"sql_db_name"` // 鏁版嵁搴撳悕绉�
+ SqlUsername string `json:"sql_username"` // 鏁版嵁搴撶敤鎴�
+ SqlPassword string `json:"sql_password"` // 鏁版嵁搴撳瘑鐮�
+ NsqServer string `json:"nsq_server"` // nsq TCP鏈嶅姟绔湴鍧�
+ NsqWebApi string `json:"nsq_server"` // nsq HTTP鎺ュ彛鍦板潃
+ OrderTopic string `json:"order_topic"` // 璁㈠崟涓婃姤鐨則opic
+ InventoryTopic string `json:"inventory_topic"` // 搴撳瓨涓婃姤鐨則opic
+ SyncInterval int `json:"interval"` // 鍚屾鐨勬椂闂撮棿闅�, 鍗曚綅/绉�
}
const configPath = "config.json"
var Options Config
-// Init is an exported method that takes the environment starts the viper
-// (external lib) and returns the configuration struct.
-func init() {
+func DefaultConfig() {
+ Options.WebPort = "10210"
+ Options.SqlAddr = "127.0.0.1"
+ Options.SqlDBName = "dbname"
+ Options.SqlUsername = "sa"
+ Options.SqlPassword = "123456"
+ Options.NsqServer = "fai365.com:4150"
+ Options.NsqWebApi = "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic"
+ Options.OrderTopic = "/province/city/factoryNo/kingdee_seOrder"
+ Options.InventoryTopic = "/province/city/factoryNo/kingdee_inventory"
+ Options.SyncInterval = 60
}
func Load() {
if !pathExists(configPath) {
+ DefaultConfig()
SaveConfig()
} else {
file, _ := os.Open(configPath)
@@ -44,8 +58,11 @@
b, err := json.Marshal(Options)
if err == nil {
var out bytes.Buffer
+
err = json.Indent(&out, b, "", " ")
- ioutil.WriteFile(configPath, out.Bytes(), 0644)
+ if err == nil {
+ err = ioutil.WriteFile(configPath, out.Bytes(), 0644)
+ }
}
return err
diff --git a/go.mod b/go.mod
index 6b07c62..5123a17 100644
--- a/go.mod
+++ b/go.mod
@@ -8,4 +8,5 @@
github.com/flopp/go-findfont v0.1.0 // indirect
github.com/gin-gonic/gin v1.7.0 // indirect
github.com/jinzhu/gorm v1.9.16 // indirect
+ github.com/nsqio/go-nsq v1.1.0 // indirect
)
diff --git a/go.sum b/go.sum
index af446b3..afda856 100644
--- a/go.sum
+++ b/go.sum
@@ -168,6 +168,8 @@
github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@@ -268,6 +270,7 @@
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
+github.com/mattn/go-sqlite3 v1.14.0 h1:mLyGNKR8+Vv9CAU7PphKa2hkEqxxhn8i32J6FPj1/QA=
github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
github.com/mcuadros/go-version v0.0.0-20190830083331-035f6764e8d2/go.mod h1:76rfSfYPWj01Z85hUf/ituArm797mNKcvINh1OlsZKo=
github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
@@ -290,6 +293,8 @@
github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
+github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
+github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
diff --git a/gui/gui.go b/gui/gui.go
index 1046e8c..abbe701 100644
--- a/gui/gui.go
+++ b/gui/gui.go
@@ -2,12 +2,13 @@
import (
"fmt"
- "kingdee-dbapi/config"
- "kingdee-dbapi/models"
- "kingdee-dbapi/webserver"
"strings"
+ "kingdee-dbapi/config"
+ "kingdee-dbapi/kingdee"
+ "kingdee-dbapi/report"
"kingdee-dbapi/static"
+ "kingdee-dbapi/webserver"
"fyne.io/fyne/v2"
"fyne.io/fyne/v2/app"
@@ -76,7 +77,7 @@
config.SaveConfig()
// 杩炴帴鏁版嵁搴�
- err := models.Init(config.Options.SqlUsername, config.Options.SqlPassword, config.Options.SqlAddr, config.Options.SqlDBName)
+ err := kingdee.Init(config.Options.SqlUsername, config.Options.SqlPassword, config.Options.SqlAddr, config.Options.SqlDBName)
if err != nil {
fmt.Println("db init error:", err.Error())
dialog.ShowError(err, w)
@@ -87,6 +88,8 @@
submitBtn.Text = "宸插惎鍔�"
submitBtn.Disable()
+ report.StartReport()
+
go webserver.Serve(serverPort.Text)
})
diff --git a/kingdee/db.go b/kingdee/db.go
new file mode 100644
index 0000000..e504eca
--- /dev/null
+++ b/kingdee/db.go
@@ -0,0 +1,39 @@
+package kingdee
+
+import (
+ "fmt"
+ "github.com/jinzhu/gorm"
+
+ _ "github.com/jinzhu/gorm/dialects/mssql"
+)
+
+var db *gorm.DB
+var err error
+
+// Init .
+func Init(username, password, addr, dbName string) error {
+ var err error
+
+ sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s",
+ username, password, addr, dbName)
+ fmt.Println(sqlServer)
+ // 鎵撳紑鏁版嵁搴撹繛鎺�
+ //db, err = gorm.Open("mssql", "sqlserver://sa:LZdba@)@)@10.6.201.7:1433?database=LZGS")
+ db, err = gorm.Open("mssql", sqlServer)
+ if err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func GetDB() *gorm.DB {
+ return db
+}
+
+// CloseDB .
+func CloseDB() {
+ if db != nil {
+ db.Close()
+ }
+}
diff --git a/models/icInventory.go b/kingdee/icInventory.go
similarity index 90%
rename from models/icInventory.go
rename to kingdee/icInventory.go
index 68e06eb..bd824a5 100644
--- a/models/icInventory.go
+++ b/kingdee/icInventory.go
@@ -1,4 +1,4 @@
-package models
+package kingdee
type Inventory struct {
FNumber string `gorm:"column:FNumber" json:"FNumber"` // 鐗╂枡浠g爜
@@ -8,12 +8,12 @@
FStockNo string `gorm:"column:FStockNo" json:"FStockNo"` // 浠撳簱浠g爜
FStockName string `gorm:"column:FStockName" json:"FStockName"` // 浠撳簱鍚嶇О
FUnit string `gorm:"column:FUnit" json:"FUnit"` // 鍩烘湰璁¢噺鍗曚綅
- FUnitQty float64 `gorm:"column:FUnitQty" json:"FUnitQty"` //鍩烘湰璁¢噺鍗曚綅鏁伴噺
+ FUnitQty float64 `gorm:"column:FUnitQty" json:"FUnitQty"` // 鍩烘湰璁¢噺鍗曚綅鏁伴噺
FSec string `gorm:"column:FSec" json:"FSec"` // 甯哥敤璁¢噺鍗曚綅
FQty float64 `gorm:"column:FQty" json:"FQty"` // 甯哥敤璁¢噺鍗曚綅鏁伴噺
}
-func FindICInventory() []Inventory {
+func ICInventory() []Inventory {
sql := `
SELECT
TOP (100) PERCENT item.FNumber AS FNumber,
@@ -40,9 +40,11 @@
item.FNumber
`
var result []Inventory
- //var result []map[string]interface{}
- db.Raw(sql).Debug().Scan(&result)
+ db.Raw(sql).Scan(&result)
+ //db.Raw(sql).Debug().Scan(&result)
+
+ // 鎵撳嵃鍒楀悕绉版祴璇�
//rows, _ := db.Raw(sql).Debug().Rows()
//fmt.Println(rows.Columns())
diff --git a/models/seOrder.go b/kingdee/seOrder.go
similarity index 86%
rename from models/seOrder.go
rename to kingdee/seOrder.go
index a1c2f27..297555d 100644
--- a/models/seOrder.go
+++ b/kingdee/seOrder.go
@@ -1,4 +1,4 @@
-package models
+package kingdee
// 閿�鍞鍗曡〃
type SEOrder struct {
@@ -16,6 +16,7 @@
FCess float64 `gorm:"column:FCess" json:"FCess"` // 绋庣巼
FNote string `gorm:"column:FNote" json:"FNote"` // 澶囨敞
FAdviceConsignDate string `gorm:"column:FAdviceConsignDate" json:"FAdviceConsignDate"` // 浜よ揣鏃ユ湡
+ Fdate string `gorm:"column:Fdate" json:"Fdate"` // 璁㈠崟鏃ユ湡
FCustID string `gorm:"column:FCustID" json:"FCustID"` // 璐揣鍗曚綅
FInventory float64 `gorm:"column:FInventory" json:"FInventory"` // 鍗虫椂搴撳瓨
// 鍚◣鍗曚环
@@ -40,11 +41,15 @@
i.FCess,
i.FNote,
i.FAdviceConsignDate,
+ i.Fdate,
i.FCustID,
SUM(ICInventory.FQty) AS FInventory
- FROM vwICBill_32 AS i
+ FROM
+ vwICBill_32 AS i
JOIN t_ICitem ON i.FShortNumber = t_ICitem.FShortNumber
JOIN ICInventory ON ICInventory.FItemID = t_ICitem.FItemID
+ WHERE
+ DateDiff(dd,fDate,getdate())=0
GROUP BY
i.FBillNo,
i.FNumber,
@@ -60,11 +65,16 @@
i.FCess,
i.FNote,
i.FAdviceConsignDate,
+ i.Fdate,
i.FCustID;
`
+
+ //鏌ヨ褰撳ぉ鏃ユ湡
+ //select * from vwICBill_32 where DateDiff(dd,fDate,getdate())=0
var result []SEOrder
- db.Raw(sql).Debug().Scan(&result)
+ db.Raw(sql).Scan(&result)
+ //db.Raw(sql).Debug().Scan(&result)
return result
}
diff --git a/main.go b/main.go
index a027b57..fa399aa 100644
--- a/main.go
+++ b/main.go
@@ -1,12 +1,13 @@
package main
import (
+ "kingdee-dbapi/kingdee"
+ "kingdee-dbapi/models"
"os"
"strings"
"kingdee-dbapi/config"
"kingdee-dbapi/gui"
- "kingdee-dbapi/models"
"github.com/flopp/go-findfont"
)
@@ -14,21 +15,13 @@
func main() {
config.Load()
- // 杩炴帴鏁版嵁搴�
- //models.Init()
- //
- //var order models.SEOrder
- //err := order.FindByFBillNo("SEORD006060")
- //if err != nil {
- // fmt.Println(err.Error())
- //} else {
- // fmt.Printf("%+v\n", order)
- //}
+ // sqlite3鏁版嵁搴�
+ models.Init()
// 璁剧疆涓枃瀛椾綋
setFont()
defer os.Unsetenv("FYNE_FONT")
- defer models.CloseDB()
+ defer kingdee.CloseDB()
// 鍒涘缓绐楀彛骞惰繍琛�
window := gui.NewDisplay()
diff --git a/models/db.go b/models/db.go
index d40e5b5..6e18fa4 100644
--- a/models/db.go
+++ b/models/db.go
@@ -1,39 +1,27 @@
package models
import (
- "fmt"
"github.com/jinzhu/gorm"
-
- _ "github.com/jinzhu/gorm/dialects/mssql"
+ _ "github.com/jinzhu/gorm/dialects/sqlite"
)
var db *gorm.DB
var err error
// Init .
-func Init(username, password, addr, dbName string) error {
- var err error
-
- sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s",
- username, password, addr, dbName)
- fmt.Println(sqlServer)
+func Init() error {
// 鎵撳紑鏁版嵁搴撹繛鎺�
- //db, err = gorm.Open("mssql", "sqlserver://sa:LZdba@)@)@10.6.201.7:1433?database=LZGS")
- db, err = gorm.Open("mssql", sqlServer)
+ db, err = gorm.Open("sqlite3", "kingdee-api.db")
if err != nil {
return err
}
- return nil
-}
+ db.AutoMigrate(&Order{})
-func GetDB() *gorm.DB {
- return db
+ return nil
}
// CloseDB .
func CloseDB() {
- if db != nil {
- db.Close()
- }
+ db.Close()
}
diff --git a/models/order.go b/models/order.go
new file mode 100644
index 0000000..e34efde
--- /dev/null
+++ b/models/order.go
@@ -0,0 +1,31 @@
+package models
+
+type Order struct {
+ ID uint `gorm:"primary_key"`
+ CreateTime string
+ OrderNo string
+}
+
+//鎸囧畾琛ㄥ悕.
+func (o *Order) TableName() string {
+ return "t_order"
+}
+
+//Insert 鎻掑叆璁板綍.
+func (o *Order) Insert() bool {
+ result := db.Table(o.TableName()).Create(&o)
+ if result.Error != nil {
+ return false
+ }
+
+ return result.RowsAffected > 0
+}
+
+//FindAll 鏌ユ壘鎵�鏈夎褰�.
+func (o *Order) FindAll() (list []Order) {
+ if err := db.Table(o.TableName()).Where("1 = 1").Scan(&list).Error; err != nil {
+ return nil
+ }
+
+ return list
+}
diff --git a/nsqclient/channel.go b/nsqclient/channel.go
new file mode 100644
index 0000000..1594dcc
--- /dev/null
+++ b/nsqclient/channel.go
@@ -0,0 +1,134 @@
+package nsqclient
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+// channelPool implements the Pool interface based on buffered channels.
+type channelPool struct {
+ // storage for our net.Conn connections
+ mu sync.Mutex
+ conns chan *nsq.Producer
+
+ // net.Conn generator
+ factory Factory
+}
+
+// Factory is a function to create new connections.
+type Factory func() (*nsq.Producer, error)
+
+// NewChannelPool returns a new pool based on buffered channels with an initial
+// capacity and maximum capacity. Factory is used when initial capacity is
+// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
+// until a new Get() is called. During a Get(), If there is no new connection
+// available in the pool, a new connection will be created via the Factory()
+// method.
+func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
+ if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
+ return nil, errors.New("invalid capacity settings")
+ }
+
+ c := &channelPool{
+ conns: make(chan *nsq.Producer, maxCap),
+ factory: factory,
+ }
+
+ // create initial connections, if something goes wrong,
+ // just close the pool error out.
+ for i := 0; i < initialCap; i++ {
+ conn, err := factory()
+ if err != nil {
+ c.Close()
+ return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
+ }
+ c.conns <- conn
+ }
+
+ return c, nil
+}
+
+func (c *channelPool) getConns() chan *nsq.Producer {
+ c.mu.Lock()
+ conns := c.conns
+ c.mu.Unlock()
+ return conns
+}
+
+// Get implements the Pool interfaces Get() method. If there is no new
+// connection available in the pool, a new connection will be created via the
+// Factory() method.
+func (c *channelPool) Get() (*PoolConn, error) {
+ conns := c.getConns()
+ if conns == nil {
+ return nil, ErrClosed
+ }
+
+ // wrap our connections with out custom net.Conn implementation (wrapConn
+ // method) that puts the connection back to the pool if it's closed.
+ select {
+ case conn := <-conns:
+ if conn == nil {
+ return nil, ErrClosed
+ }
+
+ return c.wrapConn(conn), nil
+ default:
+ conn, err := c.factory()
+ if err != nil {
+ return nil, err
+ }
+
+ return c.wrapConn(conn), nil
+ }
+}
+
+// put puts the connection back to the pool. If the pool is full or closed,
+// conn is simply closed. A nil conn will be rejected.
+func (c *channelPool) put(conn *nsq.Producer) error {
+ if conn == nil {
+ return errors.New("connection is nil. rejecting")
+ }
+
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ if c.conns == nil {
+ // pool is closed, close passed connection
+ conn.Stop()
+ return nil
+ }
+
+ // put the resource back into the pool. If the pool is full, this will
+ // block and the default case will be executed.
+ select {
+ case c.conns <- conn:
+ return nil
+ default:
+ // pool is full, close passed connection
+ conn.Stop()
+ return nil
+ }
+}
+
+func (c *channelPool) Close() {
+ c.mu.Lock()
+ conns := c.conns
+ c.conns = nil
+ c.factory = nil
+ c.mu.Unlock()
+
+ if conns == nil {
+ return
+ }
+
+ close(conns)
+ for conn := range conns {
+ conn.Stop()
+ }
+}
+
+func (c *channelPool) Len() int { return len(c.getConns()) }
diff --git a/nsqclient/client.go b/nsqclient/client.go
new file mode 100644
index 0000000..1b27d18
--- /dev/null
+++ b/nsqclient/client.go
@@ -0,0 +1,35 @@
+package nsqclient
+
+import (
+ "fmt"
+ "kingdee-dbapi/config"
+)
+
+var nsqClient Producer
+
+const plcTopic = "plcTopic"
+
+func InitNsqClient() error {
+ var err error
+ nsqClient, err = NewProducer(config.Options.NsqServer)
+ if err != nil {
+ fmt.Println(err.Error())
+ }
+
+ return err
+}
+
+func Produce(msg []byte) (err error) {
+ if nsqClient == nil {
+ err = InitNsqClient()
+ if err != nil {
+ return err
+ }
+ }
+
+ if err = nsqClient.Publish(plcTopic, msg); err != nil {
+ fmt.Println("Publish error:" + err.Error())
+ }
+
+ return
+}
diff --git a/nsqclient/conn.go b/nsqclient/conn.go
new file mode 100644
index 0000000..5c680b5
--- /dev/null
+++ b/nsqclient/conn.go
@@ -0,0 +1,45 @@
+package nsqclient
+
+import (
+ "sync"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+// PoolConn is a wrapper around net.Conn to modify the the behavior of
+// net.Conn's Close() method.
+type PoolConn struct {
+ *nsq.Producer
+ mu sync.RWMutex
+ c *channelPool
+ unusable bool
+}
+
+// Close puts the given connects back to the pool instead of closing it.
+func (p *PoolConn) Close() error {
+ p.mu.RLock()
+ defer p.mu.RUnlock()
+
+ if p.unusable {
+ if p.Producer != nil {
+ p.Producer.Stop()
+ return nil
+ }
+ return nil
+ }
+ return p.c.put(p.Producer)
+}
+
+// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
+func (p *PoolConn) MarkUnusable() {
+ p.mu.Lock()
+ p.unusable = true
+ p.mu.Unlock()
+}
+
+// newConn wraps a standard net.Conn to a poolConn net.Conn.
+func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn {
+ p := &PoolConn{c: c}
+ p.Producer = conn
+ return p
+}
diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go
new file mode 100644
index 0000000..a0df0b0
--- /dev/null
+++ b/nsqclient/consumer.go
@@ -0,0 +1,99 @@
+package nsqclient
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+type NsqConsumer struct {
+ consumer *nsq.Consumer
+ // handler nsq.Handler
+ handler func([]byte) error
+ ctx context.Context
+ ctxCancel context.CancelFunc
+ topic string
+ channel string
+}
+
+func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
+ conf := nsq.NewConfig()
+ conf.MaxAttempts = 0
+ conf.MsgTimeout = 10 * time.Minute // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪
+ conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉�
+ for _, option := range options {
+ option(conf)
+ }
+
+ consumer, err := nsq.NewConsumer(topic, channel, conf)
+ if err != nil {
+ return nil, err
+ }
+ return &NsqConsumer{
+ consumer: consumer,
+ ctx: ctx,
+ topic: topic,
+ channel: channel,
+ }, nil
+}
+
+func DestroyNsqConsumer(c *NsqConsumer) {
+ if c != nil {
+ if c.ctxCancel != nil {
+ c.ctxCancel()
+ }
+ }
+}
+
+// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
+// n.handler = handler
+// }
+
+func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
+ n.handler = handler
+}
+
+func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
+ return n.RunDistributed([]string{qaddr}, nil, concurrency)
+}
+
+func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
+ return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
+}
+
+func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
+ n.consumer.ChangeMaxInFlight(concurrency)
+ // n.consumer.AddConcurrentHandlers(n.handler, concurrency)
+ n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
+ return n.handler(msg.Body)
+ // return nil
+ }), concurrency)
+
+ var err error
+ if len(qAddr) > 0 {
+ err = n.consumer.ConnectToNSQDs(qAddr)
+ } else if len(lAddr) > 0 {
+ err = n.consumer.ConnectToNSQLookupds(lAddr)
+ } else {
+ err = fmt.Errorf("Addr Must NOT Empty")
+ }
+ if err != nil {
+ return err
+ }
+
+ if n.ctx == nil {
+ n.ctx, n.ctxCancel = context.WithCancel(context.Background())
+ }
+
+ for {
+ select {
+ case <-n.ctx.Done():
+ fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
+ n.consumer.Stop()
+ fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
+ return nil
+ }
+ }
+}
diff --git a/nsqclient/httpClient.go b/nsqclient/httpClient.go
new file mode 100644
index 0000000..d400a58
--- /dev/null
+++ b/nsqclient/httpClient.go
@@ -0,0 +1,35 @@
+package nsqclient
+
+import (
+ "bytes"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+)
+
+const nsqWebApi = "http://121.31.232.83:9080/api/nsq/pub"
+
+// http鎺ュ彛 http://121.31.232.83:9080/api/nsq/pub?topic=your_topic
+func HttpPost(topic string, data []byte) bool {
+ uri := nsqWebApi + "?topic=" + topic
+
+ request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(data))
+ if err != nil {
+ return false
+ }
+
+ request.Header.Set("Content-Type", "application/json;charset=UTF-8")
+
+ response, err := http.DefaultClient.Do(request)
+ if err != nil {
+ fmt.Printf(err.Error())
+ return false
+ }
+ defer response.Body.Close()
+
+ body, _ := ioutil.ReadAll(response.Body)
+
+ fmt.Println("response:", string(body))
+
+ return true
+}
diff --git a/nsqclient/pointer.go b/nsqclient/pointer.go
new file mode 100644
index 0000000..1cba795
--- /dev/null
+++ b/nsqclient/pointer.go
@@ -0,0 +1,57 @@
+package nsqclient
+
+// #include <stdlib.h>
+import "C"
+import (
+ "sync"
+ "unsafe"
+)
+
+var (
+ mutex sync.RWMutex
+ store = map[unsafe.Pointer]interface{}{}
+)
+
+func Save(v interface{}) unsafe.Pointer {
+ if v == nil {
+ return nil
+ }
+
+ // Generate real fake C pointer.
+ // This pointer will not store any data, but will bi used for indexing purposes.
+ // Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
+ // Why we need indexing, because Go doest allow C code to store pointers to Go data.
+ var ptr unsafe.Pointer = C.malloc(C.size_t(1))
+ if ptr == nil {
+ panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
+ }
+
+ mutex.Lock()
+ store[ptr] = v
+ mutex.Unlock()
+
+ return ptr
+}
+
+func Restore(ptr unsafe.Pointer) (v interface{}) {
+ if ptr == nil {
+ return nil
+ }
+
+ mutex.RLock()
+ v = store[ptr]
+ mutex.RUnlock()
+ return
+}
+
+func Unref(ptr unsafe.Pointer) {
+ if ptr == nil {
+ return
+ }
+
+ mutex.Lock()
+ delete(store, ptr)
+ mutex.Unlock()
+
+ C.free(ptr)
+}
diff --git a/nsqclient/pool.go b/nsqclient/pool.go
new file mode 100644
index 0000000..b773490
--- /dev/null
+++ b/nsqclient/pool.go
@@ -0,0 +1,25 @@
+// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
+package nsqclient
+
+import "errors"
+
+var (
+ // ErrClosed is the error resulting if the pool is closed via pool.Close().
+ ErrClosed = errors.New("pool is closed")
+)
+
+// Pool interface describes a pool implementation. A pool should have maximum
+// capacity. An ideal pool is threadsafe and easy to use.
+type Pool interface {
+ // Get returns a new connection from the pool. Closing the connections puts
+ // it back to the Pool. Closing it when the pool is destroyed or full will
+ // be counted as an error.
+ Get() (*PoolConn, error)
+
+ // Close closes the pool and all its connections. After Close() the pool is
+ // no longer usable.
+ Close()
+
+ // Len returns the current number of connections of the pool.
+ Len() int
+}
diff --git a/nsqclient/producer.go b/nsqclient/producer.go
new file mode 100644
index 0000000..717c7a1
--- /dev/null
+++ b/nsqclient/producer.go
@@ -0,0 +1,139 @@
+package nsqclient
+
+import (
+ "fmt"
+ "time"
+
+ nsq "github.com/nsqio/go-nsq"
+)
+
+type Producer interface {
+ Publish(topic string, body []byte) error
+ MultiPublish(topic string, body [][]byte) error
+ DeferredPublish(topic string, delay time.Duration, body []byte) error
+}
+
+var _ Producer = (*producer)(nil)
+
+type producer struct {
+ pool Pool
+}
+
+var (
+ // name pool producer
+ nsqList = make(map[string]Pool)
+)
+
+type Config struct {
+ Addr string `toml:"addr" json:"addr"`
+ InitSize int `toml:"init_size" json:"init_size"`
+ MaxSize int `toml:"max_size" json:"max_size"`
+}
+
+func CreateProducerPool(configs map[string]Config) {
+ for name, conf := range configs {
+ n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
+ if err == nil {
+ nsqList[name] = n
+ // 鏀寔ip:port瀵诲潃
+ nsqList[conf.Addr] = n
+ }
+ }
+}
+
+func DestroyProducerPool() {
+ for _, p := range nsqList {
+ p.Close()
+ }
+}
+
+func GetProducer(key ...string) (*producer, error) {
+ k := "default"
+ if len(key) > 0 {
+ k = key[0]
+ }
+ if n, ok := nsqList[k]; ok {
+ return &producer{n}, nil
+ }
+ return nil, fmt.Errorf("GetProducer can't get producer")
+}
+
+// CreateNSQProducer create nsq producer
+func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
+ cfg := nsq.NewConfig()
+ for _, option := range options {
+ option(cfg)
+ }
+
+ producer, err := nsq.NewProducer(addr, cfg)
+ if err != nil {
+ return nil, err
+ }
+ // producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
+ return producer, nil
+}
+
+// CreateNSQProducerPool create a nwq producer pool
+func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
+ factory := func() (*nsq.Producer, error) {
+ // TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn
+ return newProducer(addr, options...)
+ }
+ nsqPool, err := NewChannelPool(initSize, maxSize, factory)
+ if err != nil {
+ return nil, err
+ }
+ return nsqPool, nil
+}
+
+func NewProducer(addr string) (*producer, error) {
+ CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
+ return GetProducer()
+}
+
+func retry(num int, fn func() error) error {
+ var err error
+ for i := 0; i < num; i++ {
+ err = fn()
+ if err == nil {
+ break
+ }
+ }
+ return err
+}
+
+func (p *producer) Publish(topic string, body []byte) error {
+ nsq, err := p.pool.Get()
+ if err != nil {
+ return err
+ }
+ defer nsq.Close()
+
+ return retry(2, func() error {
+ return nsq.Publish(topic, body)
+ })
+}
+
+func (p *producer) MultiPublish(topic string, body [][]byte) error {
+ nsq, err := p.pool.Get()
+ if err != nil {
+ return err
+ }
+ defer nsq.Close()
+
+ return retry(2, func() error {
+ return nsq.MultiPublish(topic, body)
+ })
+}
+
+func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
+ nsq, err := p.pool.Get()
+ if err != nil {
+ return err
+ }
+ defer nsq.Close()
+
+ return retry(2, func() error {
+ return nsq.DeferredPublish(topic, delay, body)
+ })
+}
diff --git a/report/report.go b/report/report.go
new file mode 100644
index 0000000..9d7e2b1
--- /dev/null
+++ b/report/report.go
@@ -0,0 +1,42 @@
+package report
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "kingdee-dbapi/config"
+)
+
+var ctx context.Context
+var cancel context.CancelFunc
+
+func StartReport() {
+ ctx, cancel = context.WithCancel(context.Background())
+ go Loop(ctx)
+}
+
+func RestartReport() {
+ cancel()
+
+ StartReport()
+}
+
+func Loop(c context.Context) {
+ fmt.Println("start report")
+ for {
+ select {
+ case <-c.Done():
+ fmt.Println("loop break")
+ return
+ default:
+ // 涓婃姤璁㈠崟
+ SendOrder()
+
+ // 涓婃姤鍗虫椂搴撳瓨
+ SendInventory()
+
+ time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second)
+ }
+ }
+}
diff --git a/report/task.go b/report/task.go
new file mode 100644
index 0000000..40f0ac4
--- /dev/null
+++ b/report/task.go
@@ -0,0 +1,70 @@
+package report
+
+import (
+ "encoding/json"
+
+ "kingdee-dbapi/cache"
+ "kingdee-dbapi/config"
+ "kingdee-dbapi/kingdee"
+ "kingdee-dbapi/models"
+ "kingdee-dbapi/nsqclient"
+)
+
+func SendOrder() {
+ var completedOrderNo = make(map[string]struct{})
+ list := kingdee.SeOrderList()
+
+ for i := 0; i < len(list); i++ {
+ if cache.Exists(list[i].FBillNo) {
+ list = append(list[:i], list[i+1:]...)
+ } else {
+ completedOrderNo[list[i].FBillNo] = struct{}{}
+ }
+ }
+
+ b, _ := json.Marshal(list)
+
+ ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
+ if ok {
+ // 鍐欏叆鏁版嵁搴�, 鏍囪宸茬粡涓婃姤杩囦簡,閬垮厤閲嶅涓婃姤
+ for orderNo, _ := range completedOrderNo {
+ cursor := models.Order{
+ OrderNo: orderNo,
+ }
+
+ cursor.Insert()
+ cache.WriteCache(orderNo)
+ }
+ }
+
+ // 閫愭潯鍙戦��
+ //for idx, _ := range list {
+ // // 宸茬粡鎺ㄩ�佽繃鐨勮鍗�
+ // if cache.Exists(list[idx].FBillNo) {
+ // continue
+ // }
+ //
+ // b, _ := json.Marshal(list[idx])
+ //
+ // ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
+ // if ok {
+ // completedOrderNo[list[idx].FBillNo] = struct{}{}
+ // }
+ //}
+}
+
+func SendInventory() {
+ list := kingdee.ICInventory()
+
+ // 姣忔鍙� 300 鏉�
+ for i := 0; i < len(list); i += 300 {
+ end := i + 300
+ if end > len(list) {
+ end = len(list)
+ }
+
+ b, _ := json.Marshal(list[i:end])
+
+ nsqclient.HttpPost(config.Options.InventoryTopic, b)
+ }
+}
diff --git a/webserver/controller.go b/webserver/controller.go
index 70641bd..503e96f 100644
--- a/webserver/controller.go
+++ b/webserver/controller.go
@@ -1,12 +1,13 @@
package webserver
import (
+ "kingdee-dbapi/kingdee"
+
"github.com/gin-gonic/gin"
- "kingdee-dbapi/models"
)
func OrderList(c *gin.Context) {
- rspData := models.SeOrderList()
+ rspData := kingdee.SeOrderList()
c.JSON(200, gin.H{
"success": true,
"message": "",
@@ -16,7 +17,7 @@
}
func InventoryList(c *gin.Context) {
- rspData := models.FindICInventory()
+ rspData := kingdee.ICInventory()
c.JSON(200, gin.H{
"success": true,
"message": "",
--
Gitblit v1.8.0