| | |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "errors" |
| | | "strings" |
| | | |
| | | "kingdee-dbapi/config" |
| | |
| | | "kingdee-dbapi/nsqclient" |
| | | ) |
| | | |
| | | func QueryMsgHandle(data []byte) error { |
| | | var result []interface{} |
| | | type QueryMsg struct { |
| | | Key string // 请求 |
| | | Command string |
| | | Success bool |
| | | Message string |
| | | Result []byte |
| | | } |
| | | |
| | | var sql = string(data) |
| | | func QueryMsgHandle(msg []byte) error { |
| | | var query QueryMsg |
| | | |
| | | if err := json.Unmarshal(msg, &query); err != nil { |
| | | logger.Warn("解析请求失败, %s", err.Error()) |
| | | return err |
| | | } |
| | | |
| | | var sql = query.Command |
| | | logger.Debug("接收到查询请求,%s", sql) |
| | | |
| | | if !sqlCheck(sql) { |
| | | logger.Warn("识别到危险的sql语句, 拒绝执行. %s", sql) |
| | | |
| | | return nil |
| | | query.Message = "危险的sql语句, 拒绝执行" |
| | | logger.Warn(query.Message) |
| | | } else { |
| | | result, err := execSqlCommand(sql) |
| | | if err != nil { |
| | | query.Message = err.Error() |
| | | logger.Warn("sql执行失败:%s", query.Message) |
| | | } else { |
| | | query.Result = result |
| | | query.Success = true |
| | | logger.Warn("sql执行完成.") |
| | | } |
| | | } |
| | | |
| | | if db == nil { |
| | | logger.Debug("数据库未连接") |
| | | replyData, _ := json.Marshal(query) |
| | | ok := nsqclient.Produce(config.Options.ReplyTopic, replyData) |
| | | logger.Warn("应答查询请求结果:%t, key:%s", ok, query.Key) |
| | | |
| | | return nil |
| | | return nil |
| | | } |
| | | |
| | | func execSqlCommand(sql string) ([]byte, error) { |
| | | var result []interface{} |
| | | |
| | | if db == nil { |
| | | return nil, errors.New("数据库未连接") |
| | | } |
| | | |
| | | rows, err := db.Raw(sql).Rows() |
| | | if err != nil { |
| | | result = append(result, err.Error()) |
| | | return err |
| | | return nil, err |
| | | } |
| | | |
| | | var cols []string |
| | |
| | | //建立俩个interface数组,columnPointers中存在columns的地址 |
| | | columns := make([]interface{}, len(cols)) |
| | | columnPointers := make([]interface{}, len(cols)) |
| | | for i, _ := range columns { |
| | | for i := 0; i < len(columns); i++ { |
| | | //赋值地址 |
| | | columnPointers[i] = &columns[i] |
| | | } |
| | | |
| | | //扫描结果 |
| | | rows.Scan(columnPointers...) |
| | | err = rows.Scan(columnPointers...) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | m := make(map[string]interface{}) |
| | | for i, colName := range cols { |
| | | val := columnPointers[i].(*interface{}) |
| | |
| | | result = append(result, m) |
| | | } |
| | | |
| | | logger.Debug("数据库返回数据%+v", result) |
| | | b, _ := json.Marshal(result) |
| | | rb, _ := json.Marshal(result) |
| | | |
| | | ok := nsqclient.Produce(config.Options.ReplyTopic, b) |
| | | if !ok { |
| | | logger.Warn("应答查询请求失败.") |
| | | } else { |
| | | logger.Debug("应答查询请求成功. 数据:%s", string(b)) |
| | | } |
| | | |
| | | return nil |
| | | return rb, nil |
| | | } |
| | | |
| | | // 简单过滤下sql语句,拒绝增删改操作 |