From 8a5f5dc40e82bf98e307082726ebe860002e019f Mon Sep 17 00:00:00 2001 From: gigibox <gigibox@163.com> Date: 星期三, 21 六月 2023 15:45:31 +0800 Subject: [PATCH] 修改库存数据为增量上报 --- kingdee/query.go | 71 ++++++++++++++++++++++++----------- 1 files changed, 48 insertions(+), 23 deletions(-) diff --git a/kingdee/query.go b/kingdee/query.go index 65fd8d4..2da57a5 100644 --- a/kingdee/query.go +++ b/kingdee/query.go @@ -2,6 +2,7 @@ import ( "encoding/json" + "errors" "strings" "kingdee-dbapi/config" @@ -9,29 +10,57 @@ "kingdee-dbapi/nsqclient" ) -func QueryMsgHandle(data []byte) error { - var result []interface{} +type QueryMsg struct { + Key string // 璇锋眰 + Command string + Success bool + Message string + Result []byte +} - var sql = string(data) +func QueryMsgHandle(msg []byte) error { + var query QueryMsg + if err := json.Unmarshal(msg, &query); err != nil { + logger.Warn("瑙f瀽璇锋眰澶辫触, %s", err.Error()) + return err + } + + var sql = query.Command logger.Debug("鎺ユ敹鍒版煡璇㈣姹�,%s", sql) if !sqlCheck(sql) { - logger.Warn("璇嗗埆鍒板嵄闄╃殑sql璇彞, 鎷掔粷鎵ц. %s", sql) - - return nil + query.Message = "鍗遍櫓鐨剆ql璇彞, 鎷掔粷鎵ц" + logger.Warn(query.Message) + } else { + result, err := execSqlCommand(sql) + if err != nil { + query.Message = err.Error() + logger.Warn("sql鎵ц澶辫触:%s", query.Message) + } else { + query.Result = result + query.Success = true + logger.Warn("sql鎵ц瀹屾垚.") + } } - if db == nil { - logger.Debug("鏁版嵁搴撴湭杩炴帴") + replyData, _ := json.Marshal(query) + ok := nsqclient.Produce(config.Options.ReplyTopic, replyData) + logger.Warn("搴旂瓟鏌ヨ璇锋眰缁撴灉:%t, key:%s", ok, query.Key) - return nil + return nil +} + +func execSqlCommand(sql string) ([]byte, error) { + var result []interface{} + + if db == nil { + return nil, errors.New("鏁版嵁搴撴湭杩炴帴") } rows, err := db.Raw(sql).Rows() if err != nil { - result = append(result, err.Error()) - return err + return nil, err } var cols []string @@ -44,13 +73,17 @@ //寤虹珛淇╀釜interface鏁扮粍锛宑olumnPointers涓瓨鍦╟olumns鐨勫湴鍧� columns := make([]interface{}, len(cols)) columnPointers := make([]interface{}, len(cols)) - for i, _ := range columns { + for i := 0; i < len(columns); i++ { //璧嬪�煎湴鍧� columnPointers[i] = &columns[i] } //鎵弿缁撴灉 - rows.Scan(columnPointers...) + err = rows.Scan(columnPointers...) + if err != nil { + return nil, err + } + m := make(map[string]interface{}) for i, colName := range cols { val := columnPointers[i].(*interface{}) @@ -60,17 +93,9 @@ result = append(result, m) } - logger.Debug("鏁版嵁搴撹繑鍥炴暟鎹�%+v", result) - b, _ := json.Marshal(result) + rb, _ := json.Marshal(result) - ok := nsqclient.Produce(config.Options.ReplyTopic, b) - if !ok { - logger.Warn("搴旂瓟鏌ヨ璇锋眰澶辫触.") - } else { - logger.Debug("搴旂瓟鏌ヨ璇锋眰鎴愬姛. 鏁版嵁:%s", string(b)) - } - - return nil + return rb, nil } // 绠�鍗曡繃婊や笅sql璇彞,鎷掔粷澧炲垹鏀规搷浣� -- Gitblit v1.8.0