From 8a5f5dc40e82bf98e307082726ebe860002e019f Mon Sep 17 00:00:00 2001
From: gigibox <gigibox@163.com>
Date: 星期三, 21 六月 2023 15:45:31 +0800
Subject: [PATCH] 修改库存数据为增量上报

---
 kingdee/query.go |   71 ++++++++++++++++++++++++-----------
 1 files changed, 48 insertions(+), 23 deletions(-)

diff --git a/kingdee/query.go b/kingdee/query.go
index 65fd8d4..2da57a5 100644
--- a/kingdee/query.go
+++ b/kingdee/query.go
@@ -2,6 +2,7 @@
 
 import (
 	"encoding/json"
+	"errors"
 	"strings"
 
 	"kingdee-dbapi/config"
@@ -9,29 +10,57 @@
 	"kingdee-dbapi/nsqclient"
 )
 
-func QueryMsgHandle(data []byte) error {
-	var result []interface{}
+type QueryMsg struct {
+	Key     string // 璇锋眰
+	Command string
+	Success bool
+	Message string
+	Result  []byte
+}
 
-	var sql = string(data)
+func QueryMsgHandle(msg []byte) error {
+	var query QueryMsg
 
+	if err := json.Unmarshal(msg, &query); err != nil {
+		logger.Warn("瑙f瀽璇锋眰澶辫触, %s", err.Error())
+		return err
+	}
+
+	var sql = query.Command
 	logger.Debug("鎺ユ敹鍒版煡璇㈣姹�,%s", sql)
 
 	if !sqlCheck(sql) {
-		logger.Warn("璇嗗埆鍒板嵄闄╃殑sql璇彞, 鎷掔粷鎵ц. %s", sql)
-
-		return nil
+		query.Message = "鍗遍櫓鐨剆ql璇彞, 鎷掔粷鎵ц"
+		logger.Warn(query.Message)
+	} else {
+		result, err := execSqlCommand(sql)
+		if err != nil {
+			query.Message = err.Error()
+			logger.Warn("sql鎵ц澶辫触:%s", query.Message)
+		} else {
+			query.Result = result
+			query.Success = true
+			logger.Warn("sql鎵ц瀹屾垚.")
+		}
 	}
 
-	if db == nil {
-		logger.Debug("鏁版嵁搴撴湭杩炴帴")
+	replyData, _ := json.Marshal(query)
+	ok := nsqclient.Produce(config.Options.ReplyTopic, replyData)
+	logger.Warn("搴旂瓟鏌ヨ璇锋眰缁撴灉:%t, key:%s", ok, query.Key)
 
-		return nil
+	return nil
+}
+
+func execSqlCommand(sql string) ([]byte, error) {
+	var result []interface{}
+
+	if db == nil {
+		return nil, errors.New("鏁版嵁搴撴湭杩炴帴")
 	}
 
 	rows, err := db.Raw(sql).Rows()
 	if err != nil {
-		result = append(result, err.Error())
-		return err
+		return nil, err
 	}
 
 	var cols []string
@@ -44,13 +73,17 @@
 		//寤虹珛淇╀釜interface鏁扮粍锛宑olumnPointers涓瓨鍦╟olumns鐨勫湴鍧�
 		columns := make([]interface{}, len(cols))
 		columnPointers := make([]interface{}, len(cols))
-		for i, _ := range columns {
+		for i := 0; i < len(columns); i++ {
 			//璧嬪�煎湴鍧�
 			columnPointers[i] = &columns[i]
 		}
 
 		//鎵弿缁撴灉
-		rows.Scan(columnPointers...)
+		err = rows.Scan(columnPointers...)
+		if err != nil {
+			return nil, err
+		}
+
 		m := make(map[string]interface{})
 		for i, colName := range cols {
 			val := columnPointers[i].(*interface{})
@@ -60,17 +93,9 @@
 		result = append(result, m)
 	}
 
-	logger.Debug("鏁版嵁搴撹繑鍥炴暟鎹�%+v", result)
-	b, _ := json.Marshal(result)
+	rb, _ := json.Marshal(result)
 
-	ok := nsqclient.Produce(config.Options.ReplyTopic, b)
-	if !ok {
-		logger.Warn("搴旂瓟鏌ヨ璇锋眰澶辫触.")
-	} else {
-		logger.Debug("搴旂瓟鏌ヨ璇锋眰鎴愬姛. 鏁版嵁:%s", string(b))
-	}
-
-	return nil
+	return rb, nil
 }
 
 // 绠�鍗曡繃婊や笅sql璇彞,鎷掔粷澧炲垹鏀规搷浣�

--
Gitblit v1.8.0