From ff3cadba4a63cd1b63cd0e36358f49ccedb88bef Mon Sep 17 00:00:00 2001
From: gigibox <gigibox@163.com>
Date: 星期四, 15 六月 2023 10:02:20 +0800
Subject: [PATCH] 完成基本功能

---
 config.json             |    7 
 report/report.go        |   42 ++
 .gitignore              |    5 
 kingdee/icInventory.go  |   12 
 go.mod                  |    1 
 kingdee/db.go           |   39 ++
 nsqclient/conn.go       |   45 ++
 nsqclient/channel.go    |  134 ++++++++
 nsqclient/consumer.go   |   99 ++++++
 config/config.go        |   35 +
 nsqclient/httpClient.go |   35 ++
 go.sum                  |    5 
 webserver/controller.go |    7 
 report/task.go          |   70 ++++
 nsqclient/pointer.go    |   57 +++
 kingdee/seOrder.go      |   16 
 gui/gui.go              |   11 
 nsqclient/producer.go   |  139 ++++++++
 cache/cache.go          |   28 +
 models/db.go            |   24 -
 nsqclient/pool.go       |   25 +
 main.go                 |   17 
 models/order.go         |   31 +
 nsqclient/client.go     |   35 ++
 24 files changed, 862 insertions(+), 57 deletions(-)

diff --git a/.gitignore b/.gitignore
index 45256fa..aae17bd 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,6 @@
 .idea
 .vscode
-kingdee-dbapi.exe
\ No newline at end of file
+kingdee-dbapi.exe
+invert.txt
+kingdee-api.db
+order.txt
\ No newline at end of file
diff --git a/cache/cache.go b/cache/cache.go
new file mode 100644
index 0000000..8fa4fb9
--- /dev/null
+++ b/cache/cache.go
@@ -0,0 +1,28 @@
+package cache
+
+import (
+	"kingdee-dbapi/models"
+	"sync"
+)
+
+var orderKeyCache sync.Map
+
+func InitCache() {
+	var db models.Order
+
+	cacheData := db.FindAll()
+
+	for _, order := range cacheData {
+		orderKeyCache.Store(order.OrderNo, struct{}{})
+	}
+}
+
+func Exists(key string) (ok bool) {
+	_, ok = orderKeyCache.Load(key)
+
+	return
+}
+
+func WriteCache(key string) {
+	orderKeyCache.Store(key, struct{}{})
+}
diff --git a/config.json b/config.json
index 09e83df..651c0c1 100644
--- a/config.json
+++ b/config.json
@@ -1,7 +1,10 @@
 {
-    "web_port": "808",
+    "web_port": "10210",
     "sql_addr": "10.6.201.7",
     "sql_db_name": "LZGS",
     "sql_username": "webapi",
-    "sql_password": "api2023"
+    "sql_password": "api2023",
+    "order_topic": "aps.wangpengfei.erp.seorder",
+    "inventory_topic": "aps.wangpengfei.erp.inventory",
+    "interval": 60
 }
\ No newline at end of file
diff --git a/config/config.go b/config/config.go
index 80b5711..b50fcf5 100644
--- a/config/config.go
+++ b/config/config.go
@@ -9,24 +9,38 @@
 )
 
 type Config struct {
-	WebPort     string `json:"web_port"`
-	SqlAddr     string `json:"sql_addr"`
-	SqlDBName   string `json:"sql_db_name"`
-	SqlUsername string `json:"sql_username"`
-	SqlPassword string `json:"sql_password"`
+	WebPort        string `json:"web_port"`        // 鏈湴web鏈嶅姟缁戝畾鎺ュ彛
+	SqlAddr        string `json:"sql_addr"`        // 鏁版嵁搴搃p鍦板潃, 涓嶅姞绔彛鍙�, 榛樿浣跨敤1433绔彛
+	SqlDBName      string `json:"sql_db_name"`     // 鏁版嵁搴撳悕绉�
+	SqlUsername    string `json:"sql_username"`    // 鏁版嵁搴撶敤鎴�
+	SqlPassword    string `json:"sql_password"`    // 鏁版嵁搴撳瘑鐮�
+	NsqServer      string `json:"nsq_server"`      // nsq TCP鏈嶅姟绔湴鍧�
+	NsqWebApi      string `json:"nsq_server"`      // nsq HTTP鎺ュ彛鍦板潃
+	OrderTopic     string `json:"order_topic"`     // 璁㈠崟涓婃姤鐨則opic
+	InventoryTopic string `json:"inventory_topic"` // 搴撳瓨涓婃姤鐨則opic
+	SyncInterval   int    `json:"interval"`        // 鍚屾鐨勬椂闂撮棿闅�, 鍗曚綅/绉�
 }
 
 const configPath = "config.json"
 
 var Options Config
 
-// Init is an exported method that takes the environment starts the viper
-// (external lib) and returns the configuration struct.
-func init() {
+func DefaultConfig() {
+	Options.WebPort = "10210"
+	Options.SqlAddr = "127.0.0.1"
+	Options.SqlDBName = "dbname"
+	Options.SqlUsername = "sa"
+	Options.SqlPassword = "123456"
+	Options.NsqServer = "fai365.com:4150"
+	Options.NsqWebApi = "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic"
+	Options.OrderTopic = "/province/city/factoryNo/kingdee_seOrder"
+	Options.InventoryTopic = "/province/city/factoryNo/kingdee_inventory"
+	Options.SyncInterval = 60
 }
 
 func Load() {
 	if !pathExists(configPath) {
+		DefaultConfig()
 		SaveConfig()
 	} else {
 		file, _ := os.Open(configPath)
@@ -44,8 +58,11 @@
 	b, err := json.Marshal(Options)
 	if err == nil {
 		var out bytes.Buffer
+
 		err = json.Indent(&out, b, "", "    ")
-		ioutil.WriteFile(configPath, out.Bytes(), 0644)
+		if err == nil {
+			err = ioutil.WriteFile(configPath, out.Bytes(), 0644)
+		}
 	}
 
 	return err
diff --git a/go.mod b/go.mod
index 6b07c62..5123a17 100644
--- a/go.mod
+++ b/go.mod
@@ -8,4 +8,5 @@
 	github.com/flopp/go-findfont v0.1.0 // indirect
 	github.com/gin-gonic/gin v1.7.0 // indirect
 	github.com/jinzhu/gorm v1.9.16 // indirect
+	github.com/nsqio/go-nsq v1.1.0 // indirect
 )
diff --git a/go.sum b/go.sum
index af446b3..afda856 100644
--- a/go.sum
+++ b/go.sum
@@ -168,6 +168,8 @@
 github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
 github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
 github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
+github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
+github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
 github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
 github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@@ -268,6 +270,7 @@
 github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
 github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
 github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
+github.com/mattn/go-sqlite3 v1.14.0 h1:mLyGNKR8+Vv9CAU7PphKa2hkEqxxhn8i32J6FPj1/QA=
 github.com/mattn/go-sqlite3 v1.14.0/go.mod h1:JIl7NbARA7phWnGvh0LKTyg7S9BA+6gx71ShQilpsus=
 github.com/mcuadros/go-version v0.0.0-20190830083331-035f6764e8d2/go.mod h1:76rfSfYPWj01Z85hUf/ituArm797mNKcvINh1OlsZKo=
 github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg=
@@ -290,6 +293,8 @@
 github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646/go.mod h1:jpp1/29i3P1S/RLdc7JQKbRpFeM1dOBd8T9ki5s+AY8=
 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e h1:fD57ERR4JtEqsWbfPhv4DMiApHyliiK5xCTNVSPiaAs=
 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
+github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
+github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
 github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
 github.com/pelletier/go-toml v1.9.3/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
diff --git a/gui/gui.go b/gui/gui.go
index 1046e8c..abbe701 100644
--- a/gui/gui.go
+++ b/gui/gui.go
@@ -2,12 +2,13 @@
 
 import (
 	"fmt"
-	"kingdee-dbapi/config"
-	"kingdee-dbapi/models"
-	"kingdee-dbapi/webserver"
 	"strings"
 
+	"kingdee-dbapi/config"
+	"kingdee-dbapi/kingdee"
+	"kingdee-dbapi/report"
 	"kingdee-dbapi/static"
+	"kingdee-dbapi/webserver"
 
 	"fyne.io/fyne/v2"
 	"fyne.io/fyne/v2/app"
@@ -76,7 +77,7 @@
 		config.SaveConfig()
 
 		// 杩炴帴鏁版嵁搴�
-		err := models.Init(config.Options.SqlUsername, config.Options.SqlPassword, config.Options.SqlAddr, config.Options.SqlDBName)
+		err := kingdee.Init(config.Options.SqlUsername, config.Options.SqlPassword, config.Options.SqlAddr, config.Options.SqlDBName)
 		if err != nil {
 			fmt.Println("db init error:", err.Error())
 			dialog.ShowError(err, w)
@@ -87,6 +88,8 @@
 		submitBtn.Text = "宸插惎鍔�"
 		submitBtn.Disable()
 
+		report.StartReport()
+
 		go webserver.Serve(serverPort.Text)
 	})
 
diff --git a/kingdee/db.go b/kingdee/db.go
new file mode 100644
index 0000000..e504eca
--- /dev/null
+++ b/kingdee/db.go
@@ -0,0 +1,39 @@
+package kingdee
+
+import (
+	"fmt"
+	"github.com/jinzhu/gorm"
+
+	_ "github.com/jinzhu/gorm/dialects/mssql"
+)
+
+var db *gorm.DB
+var err error
+
+// Init .
+func Init(username, password, addr, dbName string) error {
+	var err error
+
+	sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s",
+		username, password, addr, dbName)
+	fmt.Println(sqlServer)
+	// 鎵撳紑鏁版嵁搴撹繛鎺�
+	//db, err = gorm.Open("mssql", "sqlserver://sa:LZdba@)@)@10.6.201.7:1433?database=LZGS")
+	db, err = gorm.Open("mssql", sqlServer)
+	if err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func GetDB() *gorm.DB {
+	return db
+}
+
+// CloseDB .
+func CloseDB() {
+	if db != nil {
+		db.Close()
+	}
+}
diff --git a/models/icInventory.go b/kingdee/icInventory.go
similarity index 90%
rename from models/icInventory.go
rename to kingdee/icInventory.go
index 68e06eb..bd824a5 100644
--- a/models/icInventory.go
+++ b/kingdee/icInventory.go
@@ -1,4 +1,4 @@
-package models
+package kingdee
 
 type Inventory struct {
 	FNumber    string  `gorm:"column:FNumber" json:"FNumber"`       // 鐗╂枡浠g爜
@@ -8,12 +8,12 @@
 	FStockNo   string  `gorm:"column:FStockNo" json:"FStockNo"`     // 浠撳簱浠g爜
 	FStockName string  `gorm:"column:FStockName" json:"FStockName"` // 浠撳簱鍚嶇О
 	FUnit      string  `gorm:"column:FUnit" json:"FUnit"`           // 鍩烘湰璁¢噺鍗曚綅
-	FUnitQty   float64 `gorm:"column:FUnitQty" json:"FUnitQty"`     //鍩烘湰璁¢噺鍗曚綅鏁伴噺
+	FUnitQty   float64 `gorm:"column:FUnitQty" json:"FUnitQty"`     // 鍩烘湰璁¢噺鍗曚綅鏁伴噺
 	FSec       string  `gorm:"column:FSec" json:"FSec"`             // 甯哥敤璁¢噺鍗曚綅
 	FQty       float64 `gorm:"column:FQty" json:"FQty"`             // 甯哥敤璁¢噺鍗曚綅鏁伴噺
 }
 
-func FindICInventory() []Inventory {
+func ICInventory() []Inventory {
 	sql := `
 	SELECT
 		TOP (100) PERCENT item.FNumber AS FNumber,
@@ -40,9 +40,11 @@
 		item.FNumber	
 	`
 	var result []Inventory
-	//var result []map[string]interface{}
 
-	db.Raw(sql).Debug().Scan(&result)
+	db.Raw(sql).Scan(&result)
+	//db.Raw(sql).Debug().Scan(&result)
+
+	// 鎵撳嵃鍒楀悕绉版祴璇�
 	//rows, _ := db.Raw(sql).Debug().Rows()
 	//fmt.Println(rows.Columns())
 
diff --git a/models/seOrder.go b/kingdee/seOrder.go
similarity index 86%
rename from models/seOrder.go
rename to kingdee/seOrder.go
index a1c2f27..297555d 100644
--- a/models/seOrder.go
+++ b/kingdee/seOrder.go
@@ -1,4 +1,4 @@
-package models
+package kingdee
 
 // 閿�鍞鍗曡〃
 type SEOrder struct {
@@ -16,6 +16,7 @@
 	FCess              float64 `gorm:"column:FCess" json:"FCess"`                           // 绋庣巼
 	FNote              string  `gorm:"column:FNote" json:"FNote"`                           // 澶囨敞
 	FAdviceConsignDate string  `gorm:"column:FAdviceConsignDate" json:"FAdviceConsignDate"` // 浜よ揣鏃ユ湡
+	Fdate              string  `gorm:"column:Fdate" json:"Fdate"`                           // 璁㈠崟鏃ユ湡
 	FCustID            string  `gorm:"column:FCustID" json:"FCustID"`                       // 璐揣鍗曚綅
 	FInventory         float64 `gorm:"column:FInventory" json:"FInventory"`                 // 鍗虫椂搴撳瓨
 	// 鍚◣鍗曚环
@@ -40,11 +41,15 @@
 		i.FCess,
 		i.FNote,
 		i.FAdviceConsignDate,
+		i.Fdate,
 		i.FCustID,
 		SUM(ICInventory.FQty) AS FInventory
-	FROM vwICBill_32 AS i
+	FROM
+		vwICBill_32 AS i
 	JOIN t_ICitem ON i.FShortNumber = t_ICitem.FShortNumber
 	JOIN ICInventory ON ICInventory.FItemID = t_ICitem.FItemID
+	WHERE
+		DateDiff(dd,fDate,getdate())=0
 	GROUP BY
 		i.FBillNo,
 		i.FNumber,
@@ -60,11 +65,16 @@
 		i.FCess,
 		i.FNote,
 		i.FAdviceConsignDate,
+		i.Fdate,
 		i.FCustID;
 	`
+
+	//鏌ヨ褰撳ぉ鏃ユ湡
+	//select * from vwICBill_32 where DateDiff(dd,fDate,getdate())=0
 	var result []SEOrder
 
-	db.Raw(sql).Debug().Scan(&result)
+	db.Raw(sql).Scan(&result)
+	//db.Raw(sql).Debug().Scan(&result)
 
 	return result
 }
diff --git a/main.go b/main.go
index a027b57..fa399aa 100644
--- a/main.go
+++ b/main.go
@@ -1,12 +1,13 @@
 package main
 
 import (
+	"kingdee-dbapi/kingdee"
+	"kingdee-dbapi/models"
 	"os"
 	"strings"
 
 	"kingdee-dbapi/config"
 	"kingdee-dbapi/gui"
-	"kingdee-dbapi/models"
 
 	"github.com/flopp/go-findfont"
 )
@@ -14,21 +15,13 @@
 func main() {
 	config.Load()
 
-	// 杩炴帴鏁版嵁搴�
-	//models.Init()
-	//
-	//var order models.SEOrder
-	//err := order.FindByFBillNo("SEORD006060")
-	//if err != nil {
-	//	fmt.Println(err.Error())
-	//} else {
-	//	fmt.Printf("%+v\n", order)
-	//}
+	// sqlite3鏁版嵁搴�
+	models.Init()
 
 	// 璁剧疆涓枃瀛椾綋
 	setFont()
 	defer os.Unsetenv("FYNE_FONT")
-	defer models.CloseDB()
+	defer kingdee.CloseDB()
 
 	// 鍒涘缓绐楀彛骞惰繍琛�
 	window := gui.NewDisplay()
diff --git a/models/db.go b/models/db.go
index d40e5b5..6e18fa4 100644
--- a/models/db.go
+++ b/models/db.go
@@ -1,39 +1,27 @@
 package models
 
 import (
-	"fmt"
 	"github.com/jinzhu/gorm"
-
-	_ "github.com/jinzhu/gorm/dialects/mssql"
+	_ "github.com/jinzhu/gorm/dialects/sqlite"
 )
 
 var db *gorm.DB
 var err error
 
 // Init .
-func Init(username, password, addr, dbName string) error {
-	var err error
-
-	sqlServer := fmt.Sprintf("sqlserver://%s:%s@%s:1433?database=%s",
-		username, password, addr, dbName)
-	fmt.Println(sqlServer)
+func Init() error {
 	// 鎵撳紑鏁版嵁搴撹繛鎺�
-	//db, err = gorm.Open("mssql", "sqlserver://sa:LZdba@)@)@10.6.201.7:1433?database=LZGS")
-	db, err = gorm.Open("mssql", sqlServer)
+	db, err = gorm.Open("sqlite3", "kingdee-api.db")
 	if err != nil {
 		return err
 	}
 
-	return nil
-}
+	db.AutoMigrate(&Order{})
 
-func GetDB() *gorm.DB {
-	return db
+	return nil
 }
 
 // CloseDB .
 func CloseDB() {
-	if db != nil {
-		db.Close()
-	}
+	db.Close()
 }
diff --git a/models/order.go b/models/order.go
new file mode 100644
index 0000000..e34efde
--- /dev/null
+++ b/models/order.go
@@ -0,0 +1,31 @@
+package models
+
+type Order struct {
+	ID         uint `gorm:"primary_key"`
+	CreateTime string
+	OrderNo    string
+}
+
+//鎸囧畾琛ㄥ悕.
+func (o *Order) TableName() string {
+	return "t_order"
+}
+
+//Insert 鎻掑叆璁板綍.
+func (o *Order) Insert() bool {
+	result := db.Table(o.TableName()).Create(&o)
+	if result.Error != nil {
+		return false
+	}
+
+	return result.RowsAffected > 0
+}
+
+//FindAll 鏌ユ壘鎵�鏈夎褰�.
+func (o *Order) FindAll() (list []Order) {
+	if err := db.Table(o.TableName()).Where("1 = 1").Scan(&list).Error; err != nil {
+		return nil
+	}
+
+	return list
+}
diff --git a/nsqclient/channel.go b/nsqclient/channel.go
new file mode 100644
index 0000000..1594dcc
--- /dev/null
+++ b/nsqclient/channel.go
@@ -0,0 +1,134 @@
+package nsqclient
+
+import (
+	"errors"
+	"fmt"
+	"sync"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+// channelPool implements the Pool interface based on buffered channels.
+type channelPool struct {
+	// storage for our net.Conn connections
+	mu    sync.Mutex
+	conns chan *nsq.Producer
+
+	// net.Conn generator
+	factory Factory
+}
+
+// Factory is a function to create new connections.
+type Factory func() (*nsq.Producer, error)
+
+// NewChannelPool returns a new pool based on buffered channels with an initial
+// capacity and maximum capacity. Factory is used when initial capacity is
+// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
+// until a new Get() is called. During a Get(), If there is no new connection
+// available in the pool, a new connection will be created via the Factory()
+// method.
+func NewChannelPool(initialCap, maxCap int, factory Factory) (Pool, error) {
+	if initialCap < 0 || maxCap <= 0 || initialCap > maxCap {
+		return nil, errors.New("invalid capacity settings")
+	}
+
+	c := &channelPool{
+		conns:   make(chan *nsq.Producer, maxCap),
+		factory: factory,
+	}
+
+	// create initial connections, if something goes wrong,
+	// just close the pool error out.
+	for i := 0; i < initialCap; i++ {
+		conn, err := factory()
+		if err != nil {
+			c.Close()
+			return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
+		}
+		c.conns <- conn
+	}
+
+	return c, nil
+}
+
+func (c *channelPool) getConns() chan *nsq.Producer {
+	c.mu.Lock()
+	conns := c.conns
+	c.mu.Unlock()
+	return conns
+}
+
+// Get implements the Pool interfaces Get() method. If there is no new
+// connection available in the pool, a new connection will be created via the
+// Factory() method.
+func (c *channelPool) Get() (*PoolConn, error) {
+	conns := c.getConns()
+	if conns == nil {
+		return nil, ErrClosed
+	}
+
+	// wrap our connections with out custom net.Conn implementation (wrapConn
+	// method) that puts the connection back to the pool if it's closed.
+	select {
+	case conn := <-conns:
+		if conn == nil {
+			return nil, ErrClosed
+		}
+
+		return c.wrapConn(conn), nil
+	default:
+		conn, err := c.factory()
+		if err != nil {
+			return nil, err
+		}
+
+		return c.wrapConn(conn), nil
+	}
+}
+
+// put puts the connection back to the pool. If the pool is full or closed,
+// conn is simply closed. A nil conn will be rejected.
+func (c *channelPool) put(conn *nsq.Producer) error {
+	if conn == nil {
+		return errors.New("connection is nil. rejecting")
+	}
+
+	c.mu.Lock()
+	defer c.mu.Unlock()
+
+	if c.conns == nil {
+		// pool is closed, close passed connection
+		conn.Stop()
+		return nil
+	}
+
+	// put the resource back into the pool. If the pool is full, this will
+	// block and the default case will be executed.
+	select {
+	case c.conns <- conn:
+		return nil
+	default:
+		// pool is full, close passed connection
+		conn.Stop()
+		return nil
+	}
+}
+
+func (c *channelPool) Close() {
+	c.mu.Lock()
+	conns := c.conns
+	c.conns = nil
+	c.factory = nil
+	c.mu.Unlock()
+
+	if conns == nil {
+		return
+	}
+
+	close(conns)
+	for conn := range conns {
+		conn.Stop()
+	}
+}
+
+func (c *channelPool) Len() int { return len(c.getConns()) }
diff --git a/nsqclient/client.go b/nsqclient/client.go
new file mode 100644
index 0000000..1b27d18
--- /dev/null
+++ b/nsqclient/client.go
@@ -0,0 +1,35 @@
+package nsqclient
+
+import (
+	"fmt"
+	"kingdee-dbapi/config"
+)
+
+var nsqClient Producer
+
+const plcTopic = "plcTopic"
+
+func InitNsqClient() error {
+	var err error
+	nsqClient, err = NewProducer(config.Options.NsqServer)
+	if err != nil {
+		fmt.Println(err.Error())
+	}
+
+	return err
+}
+
+func Produce(msg []byte) (err error) {
+	if nsqClient == nil {
+		err = InitNsqClient()
+		if err != nil {
+			return err
+		}
+	}
+
+	if err = nsqClient.Publish(plcTopic, msg); err != nil {
+		fmt.Println("Publish error:" + err.Error())
+	}
+
+	return
+}
diff --git a/nsqclient/conn.go b/nsqclient/conn.go
new file mode 100644
index 0000000..5c680b5
--- /dev/null
+++ b/nsqclient/conn.go
@@ -0,0 +1,45 @@
+package nsqclient
+
+import (
+	"sync"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+// PoolConn is a wrapper around net.Conn to modify the the behavior of
+// net.Conn's Close() method.
+type PoolConn struct {
+	*nsq.Producer
+	mu       sync.RWMutex
+	c        *channelPool
+	unusable bool
+}
+
+// Close puts the given connects back to the pool instead of closing it.
+func (p *PoolConn) Close() error {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+
+	if p.unusable {
+		if p.Producer != nil {
+			p.Producer.Stop()
+			return nil
+		}
+		return nil
+	}
+	return p.c.put(p.Producer)
+}
+
+// MarkUnusable marks the connection not usable any more, to let the pool close it instead of returning it to pool.
+func (p *PoolConn) MarkUnusable() {
+	p.mu.Lock()
+	p.unusable = true
+	p.mu.Unlock()
+}
+
+// newConn wraps a standard net.Conn to a poolConn net.Conn.
+func (c *channelPool) wrapConn(conn *nsq.Producer) *PoolConn {
+	p := &PoolConn{c: c}
+	p.Producer = conn
+	return p
+}
diff --git a/nsqclient/consumer.go b/nsqclient/consumer.go
new file mode 100644
index 0000000..a0df0b0
--- /dev/null
+++ b/nsqclient/consumer.go
@@ -0,0 +1,99 @@
+package nsqclient
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+type NsqConsumer struct {
+	consumer *nsq.Consumer
+	// handler   nsq.Handler
+	handler   func([]byte) error
+	ctx       context.Context
+	ctxCancel context.CancelFunc
+	topic     string
+	channel   string
+}
+
+func NewNsqConsumer(ctx context.Context, topic, channel string, options ...func(*nsq.Config)) (*NsqConsumer, error) {
+	conf := nsq.NewConfig()
+	conf.MaxAttempts = 0
+	conf.MsgTimeout = 10 * time.Minute         // 榛樿涓�涓秷鎭渶澶氳兘澶勭悊鍗佸垎閽燂紝鍚﹀垯灏变細閲嶆柊涓㈠叆闃熷垪
+	conf.LookupdPollInterval = 3 * time.Second // 璋冩暣consumer鐨勯噸杩為棿闅旀椂闂翠负3绉�
+	for _, option := range options {
+		option(conf)
+	}
+
+	consumer, err := nsq.NewConsumer(topic, channel, conf)
+	if err != nil {
+		return nil, err
+	}
+	return &NsqConsumer{
+		consumer: consumer,
+		ctx:      ctx,
+		topic:    topic,
+		channel:  channel,
+	}, nil
+}
+
+func DestroyNsqConsumer(c *NsqConsumer) {
+	if c != nil {
+		if c.ctxCancel != nil {
+			c.ctxCancel()
+		}
+	}
+}
+
+// func (n *NsqConsumer) AddHandler(handler nsq.Handler) {
+// 	n.handler = handler
+// }
+
+func (n *NsqConsumer) AddHandler(handler func([]byte) error) {
+	n.handler = handler
+}
+
+func (n *NsqConsumer) Run(qaddr string, concurrency int) error {
+	return n.RunDistributed([]string{qaddr}, nil, concurrency)
+}
+
+func (n *NsqConsumer) RunLookupd(lookupAddr string, concurrency int) error {
+	return n.RunDistributed(nil, []string{lookupAddr}, concurrency)
+}
+
+func (n *NsqConsumer) RunDistributed(qAddr, lAddr []string, concurrency int) error {
+	n.consumer.ChangeMaxInFlight(concurrency)
+	// n.consumer.AddConcurrentHandlers(n.handler, concurrency)
+	n.consumer.AddConcurrentHandlers(nsq.HandlerFunc(func(msg *nsq.Message) error {
+		return n.handler(msg.Body)
+		// return nil
+	}), concurrency)
+
+	var err error
+	if len(qAddr) > 0 {
+		err = n.consumer.ConnectToNSQDs(qAddr)
+	} else if len(lAddr) > 0 {
+		err = n.consumer.ConnectToNSQLookupds(lAddr)
+	} else {
+		err = fmt.Errorf("Addr Must NOT Empty")
+	}
+	if err != nil {
+		return err
+	}
+
+	if n.ctx == nil {
+		n.ctx, n.ctxCancel = context.WithCancel(context.Background())
+	}
+
+	for {
+		select {
+		case <-n.ctx.Done():
+			fmt.Println("[%s] %s,%s", "stop consumer", n.topic, n.channel)
+			n.consumer.Stop()
+			fmt.Println("[%s] %s,%s", "stop consumer success", n.topic, n.channel)
+			return nil
+		}
+	}
+}
diff --git a/nsqclient/httpClient.go b/nsqclient/httpClient.go
new file mode 100644
index 0000000..d400a58
--- /dev/null
+++ b/nsqclient/httpClient.go
@@ -0,0 +1,35 @@
+package nsqclient
+
+import (
+	"bytes"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+)
+
+const nsqWebApi = "http://121.31.232.83:9080/api/nsq/pub"
+
+// http鎺ュ彛 http://121.31.232.83:9080/api/nsq/pub?topic=your_topic
+func HttpPost(topic string, data []byte) bool {
+	uri := nsqWebApi + "?topic=" + topic
+
+	request, err := http.NewRequest(http.MethodPost, uri, bytes.NewReader(data))
+	if err != nil {
+		return false
+	}
+
+	request.Header.Set("Content-Type", "application/json;charset=UTF-8")
+
+	response, err := http.DefaultClient.Do(request)
+	if err != nil {
+		fmt.Printf(err.Error())
+		return false
+	}
+	defer response.Body.Close()
+
+	body, _ := ioutil.ReadAll(response.Body)
+
+	fmt.Println("response:", string(body))
+
+	return true
+}
diff --git a/nsqclient/pointer.go b/nsqclient/pointer.go
new file mode 100644
index 0000000..1cba795
--- /dev/null
+++ b/nsqclient/pointer.go
@@ -0,0 +1,57 @@
+package nsqclient
+
+// #include <stdlib.h>
+import "C"
+import (
+	"sync"
+	"unsafe"
+)
+
+var (
+	mutex sync.RWMutex
+	store = map[unsafe.Pointer]interface{}{}
+)
+
+func Save(v interface{}) unsafe.Pointer {
+	if v == nil {
+		return nil
+	}
+
+	// Generate real fake C pointer.
+	// This pointer will not store any data, but will bi used for indexing purposes.
+	// Since Go doest allow to cast dangling pointer to unsafe.Pointer, we do rally allocate one byte.
+	// Why we need indexing, because Go doest allow C code to store pointers to Go data.
+	var ptr unsafe.Pointer = C.malloc(C.size_t(1))
+	if ptr == nil {
+		panic("can't allocate 'cgo-pointer hack index pointer': ptr == nil")
+	}
+
+	mutex.Lock()
+	store[ptr] = v
+	mutex.Unlock()
+
+	return ptr
+}
+
+func Restore(ptr unsafe.Pointer) (v interface{}) {
+	if ptr == nil {
+		return nil
+	}
+
+	mutex.RLock()
+	v = store[ptr]
+	mutex.RUnlock()
+	return
+}
+
+func Unref(ptr unsafe.Pointer) {
+	if ptr == nil {
+		return
+	}
+
+	mutex.Lock()
+	delete(store, ptr)
+	mutex.Unlock()
+
+	C.free(ptr)
+}
diff --git a/nsqclient/pool.go b/nsqclient/pool.go
new file mode 100644
index 0000000..b773490
--- /dev/null
+++ b/nsqclient/pool.go
@@ -0,0 +1,25 @@
+// Package pool implements a pool of net.Conn interfaces to manage and reuse them.
+package nsqclient
+
+import "errors"
+
+var (
+	// ErrClosed is the error resulting if the pool is closed via pool.Close().
+	ErrClosed = errors.New("pool is closed")
+)
+
+// Pool interface describes a pool implementation. A pool should have maximum
+// capacity. An ideal pool is threadsafe and easy to use.
+type Pool interface {
+	// Get returns a new connection from the pool. Closing the connections puts
+	// it back to the Pool. Closing it when the pool is destroyed or full will
+	// be counted as an error.
+	Get() (*PoolConn, error)
+
+	// Close closes the pool and all its connections. After Close() the pool is
+	// no longer usable.
+	Close()
+
+	// Len returns the current number of connections of the pool.
+	Len() int
+}
diff --git a/nsqclient/producer.go b/nsqclient/producer.go
new file mode 100644
index 0000000..717c7a1
--- /dev/null
+++ b/nsqclient/producer.go
@@ -0,0 +1,139 @@
+package nsqclient
+
+import (
+	"fmt"
+	"time"
+
+	nsq "github.com/nsqio/go-nsq"
+)
+
+type Producer interface {
+	Publish(topic string, body []byte) error
+	MultiPublish(topic string, body [][]byte) error
+	DeferredPublish(topic string, delay time.Duration, body []byte) error
+}
+
+var _ Producer = (*producer)(nil)
+
+type producer struct {
+	pool Pool
+}
+
+var (
+	// 				   name   pool producer
+	nsqList = make(map[string]Pool)
+)
+
+type Config struct {
+	Addr     string `toml:"addr" json:"addr"`
+	InitSize int    `toml:"init_size" json:"init_size"`
+	MaxSize  int    `toml:"max_size" json:"max_size"`
+}
+
+func CreateProducerPool(configs map[string]Config) {
+	for name, conf := range configs {
+		n, err := newProducerPool(conf.Addr, conf.InitSize, conf.MaxSize)
+		if err == nil {
+			nsqList[name] = n
+			// 鏀寔ip:port瀵诲潃
+			nsqList[conf.Addr] = n
+		}
+	}
+}
+
+func DestroyProducerPool() {
+	for _, p := range nsqList {
+		p.Close()
+	}
+}
+
+func GetProducer(key ...string) (*producer, error) {
+	k := "default"
+	if len(key) > 0 {
+		k = key[0]
+	}
+	if n, ok := nsqList[k]; ok {
+		return &producer{n}, nil
+	}
+	return nil, fmt.Errorf("GetProducer can't get producer")
+}
+
+// CreateNSQProducer create nsq producer
+func newProducer(addr string, options ...func(*nsq.Config)) (*nsq.Producer, error) {
+	cfg := nsq.NewConfig()
+	for _, option := range options {
+		option(cfg)
+	}
+
+	producer, err := nsq.NewProducer(addr, cfg)
+	if err != nil {
+		return nil, err
+	}
+	// producer.SetLogger(log.New(os.Stderr, "", log.Flags()), nsq.LogLevelError)
+	return producer, nil
+}
+
+// CreateNSQProducerPool create a nwq producer pool
+func newProducerPool(addr string, initSize, maxSize int, options ...func(*nsq.Config)) (Pool, error) {
+	factory := func() (*nsq.Producer, error) {
+		// TODO 杩欓噷搴旇鎵цping鏂规硶鏉ョ‘瀹氳繛鎺ユ槸姝e父鐨勫惁鍒欎笉搴旇鍒涘缓conn
+		return newProducer(addr, options...)
+	}
+	nsqPool, err := NewChannelPool(initSize, maxSize, factory)
+	if err != nil {
+		return nil, err
+	}
+	return nsqPool, nil
+}
+
+func NewProducer(addr string) (*producer, error) {
+	CreateProducerPool(map[string]Config{"default": {addr, 1, 1}})
+	return GetProducer()
+}
+
+func retry(num int, fn func() error) error {
+	var err error
+	for i := 0; i < num; i++ {
+		err = fn()
+		if err == nil {
+			break
+		}
+	}
+	return err
+}
+
+func (p *producer) Publish(topic string, body []byte) error {
+	nsq, err := p.pool.Get()
+	if err != nil {
+		return err
+	}
+	defer nsq.Close()
+
+	return retry(2, func() error {
+		return nsq.Publish(topic, body)
+	})
+}
+
+func (p *producer) MultiPublish(topic string, body [][]byte) error {
+	nsq, err := p.pool.Get()
+	if err != nil {
+		return err
+	}
+	defer nsq.Close()
+
+	return retry(2, func() error {
+		return nsq.MultiPublish(topic, body)
+	})
+}
+
+func (p *producer) DeferredPublish(topic string, delay time.Duration, body []byte) error {
+	nsq, err := p.pool.Get()
+	if err != nil {
+		return err
+	}
+	defer nsq.Close()
+
+	return retry(2, func() error {
+		return nsq.DeferredPublish(topic, delay, body)
+	})
+}
diff --git a/report/report.go b/report/report.go
new file mode 100644
index 0000000..9d7e2b1
--- /dev/null
+++ b/report/report.go
@@ -0,0 +1,42 @@
+package report
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"kingdee-dbapi/config"
+)
+
+var ctx context.Context
+var cancel context.CancelFunc
+
+func StartReport() {
+	ctx, cancel = context.WithCancel(context.Background())
+	go Loop(ctx)
+}
+
+func RestartReport() {
+	cancel()
+
+	StartReport()
+}
+
+func Loop(c context.Context) {
+	fmt.Println("start report")
+	for {
+		select {
+		case <-c.Done():
+			fmt.Println("loop break")
+			return
+		default:
+			// 涓婃姤璁㈠崟
+			SendOrder()
+
+			// 涓婃姤鍗虫椂搴撳瓨
+			SendInventory()
+
+			time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second)
+		}
+	}
+}
diff --git a/report/task.go b/report/task.go
new file mode 100644
index 0000000..40f0ac4
--- /dev/null
+++ b/report/task.go
@@ -0,0 +1,70 @@
+package report
+
+import (
+	"encoding/json"
+
+	"kingdee-dbapi/cache"
+	"kingdee-dbapi/config"
+	"kingdee-dbapi/kingdee"
+	"kingdee-dbapi/models"
+	"kingdee-dbapi/nsqclient"
+)
+
+func SendOrder() {
+	var completedOrderNo = make(map[string]struct{})
+	list := kingdee.SeOrderList()
+
+	for i := 0; i < len(list); i++ {
+		if cache.Exists(list[i].FBillNo) {
+			list = append(list[:i], list[i+1:]...)
+		} else {
+			completedOrderNo[list[i].FBillNo] = struct{}{}
+		}
+	}
+
+	b, _ := json.Marshal(list)
+
+	ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
+	if ok {
+		// 鍐欏叆鏁版嵁搴�, 鏍囪宸茬粡涓婃姤杩囦簡,閬垮厤閲嶅涓婃姤
+		for orderNo, _ := range completedOrderNo {
+			cursor := models.Order{
+				OrderNo: orderNo,
+			}
+
+			cursor.Insert()
+			cache.WriteCache(orderNo)
+		}
+	}
+
+	// 閫愭潯鍙戦��
+	//for idx, _ := range list {
+	//	// 宸茬粡鎺ㄩ�佽繃鐨勮鍗�
+	//	if cache.Exists(list[idx].FBillNo) {
+	//		continue
+	//	}
+	//
+	//	b, _ := json.Marshal(list[idx])
+	//
+	//	ok := nsqclient.HttpPost(config.Options.OrderTopic, b)
+	//	if ok {
+	//		completedOrderNo[list[idx].FBillNo] = struct{}{}
+	//	}
+	//}
+}
+
+func SendInventory() {
+	list := kingdee.ICInventory()
+
+	// 姣忔鍙� 300 鏉�
+	for i := 0; i < len(list); i += 300 {
+		end := i + 300
+		if end > len(list) {
+			end = len(list)
+		}
+
+		b, _ := json.Marshal(list[i:end])
+
+		nsqclient.HttpPost(config.Options.InventoryTopic, b)
+	}
+}
diff --git a/webserver/controller.go b/webserver/controller.go
index 70641bd..503e96f 100644
--- a/webserver/controller.go
+++ b/webserver/controller.go
@@ -1,12 +1,13 @@
 package webserver
 
 import (
+	"kingdee-dbapi/kingdee"
+
 	"github.com/gin-gonic/gin"
-	"kingdee-dbapi/models"
 )
 
 func OrderList(c *gin.Context) {
-	rspData := models.SeOrderList()
+	rspData := kingdee.SeOrderList()
 	c.JSON(200, gin.H{
 		"success": true,
 		"message": "",
@@ -16,7 +17,7 @@
 }
 
 func InventoryList(c *gin.Context) {
-	rspData := models.FindICInventory()
+	rspData := kingdee.ICInventory()
 	c.JSON(200, gin.H{
 		"success": true,
 		"message": "",

--
Gitblit v1.8.0