| | |
| | | { |
| | | "web_port": "10210", |
| | | "sql_addr": "os.smartai.com", |
| | | "sql_addr": "10.6.201.7", |
| | | "sql_db_name": "LZGS", |
| | | "sql_username": "sa", |
| | | "sql_password": "basic@2023", |
| | | "sql_username": "webapi", |
| | | "sql_password": "api2023", |
| | | "nsq_server": "fai365.com:4150", |
| | | "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub?topic=your_topic", |
| | | "nsq_webapi": "http://121.31.232.83:9080/api/nsq/pub", |
| | | "order_topic": "aps.wangpengfei.erp.seorder", |
| | | "inventory_topic": "aps.wangpengfei.erp.inventory", |
| | | "query_topic": "aps.wangpengfei.erp.k3resource", |
| | |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "errors" |
| | | "strings" |
| | | |
| | | "kingdee-dbapi/config" |
| | |
| | | "kingdee-dbapi/nsqclient" |
| | | ) |
| | | |
| | | func QueryMsgHandle(data []byte) error { |
| | | var result []interface{} |
| | | type QueryMsg struct { |
| | | Key string // 请求 |
| | | Command string |
| | | Success bool |
| | | Message string |
| | | Result []byte |
| | | } |
| | | |
| | | var sql = string(data) |
| | | func QueryMsgHandle(msg []byte) error { |
| | | var query QueryMsg |
| | | |
| | | if err := json.Unmarshal(msg, &query); err != nil { |
| | | logger.Warn("解析请求失败, %s", err.Error()) |
| | | return err |
| | | } |
| | | |
| | | var sql = query.Command |
| | | logger.Debug("接收到查询请求,%s", sql) |
| | | |
| | | if !sqlCheck(sql) { |
| | | logger.Warn("识别到危险的sql语句, 拒绝执行. %s", sql) |
| | | |
| | | return nil |
| | | query.Message = "危险的sql语句, 拒绝执行" |
| | | logger.Warn(query.Message) |
| | | } else { |
| | | result, err := execSqlCommand(sql) |
| | | if err != nil { |
| | | query.Message = err.Error() |
| | | logger.Warn("sql执行失败:%s", query.Message) |
| | | } else { |
| | | query.Result = result |
| | | query.Success = true |
| | | logger.Warn("sql执行完成.") |
| | | } |
| | | } |
| | | |
| | | if db == nil { |
| | | logger.Debug("数据库未连接") |
| | | replyData, _ := json.Marshal(query) |
| | | ok := nsqclient.Produce(config.Options.ReplyTopic, replyData) |
| | | logger.Warn("应答查询请求结果:%t, key:%s", ok, query.Key) |
| | | |
| | | return nil |
| | | return nil |
| | | } |
| | | |
| | | func execSqlCommand(sql string) ([]byte, error) { |
| | | var result []interface{} |
| | | |
| | | if db == nil { |
| | | return nil, errors.New("数据库未连接") |
| | | } |
| | | |
| | | rows, err := db.Raw(sql).Rows() |
| | | if err != nil { |
| | | result = append(result, err.Error()) |
| | | return err |
| | | return nil, err |
| | | } |
| | | |
| | | var cols []string |
| | |
| | | //建立俩个interface数组,columnPointers中存在columns的地址 |
| | | columns := make([]interface{}, len(cols)) |
| | | columnPointers := make([]interface{}, len(cols)) |
| | | for i, _ := range columns { |
| | | for i := 0; i < len(columns); i++ { |
| | | //赋值地址 |
| | | columnPointers[i] = &columns[i] |
| | | } |
| | | |
| | | //扫描结果 |
| | | rows.Scan(columnPointers...) |
| | | err = rows.Scan(columnPointers...) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | m := make(map[string]interface{}) |
| | | for i, colName := range cols { |
| | | val := columnPointers[i].(*interface{}) |
| | |
| | | result = append(result, m) |
| | | } |
| | | |
| | | logger.Debug("数据库返回数据%+v", result) |
| | | b, _ := json.Marshal(result) |
| | | rb, _ := json.Marshal(result) |
| | | |
| | | ok := nsqclient.Produce(config.Options.ReplyTopic, b) |
| | | if !ok { |
| | | logger.Warn("应答查询请求失败.") |
| | | } else { |
| | | logger.Debug("应答查询请求成功. 数据:%s", string(b)) |
| | | } |
| | | |
| | | return nil |
| | | return rb, nil |
| | | } |
| | | |
| | | // 简单过滤下sql语句,拒绝增删改操作 |
| | |
| | | |
| | | import ( |
| | | "context" |
| | | "kingdee-dbapi/config" |
| | | "kingdee-dbapi/logger" |
| | | "time" |
| | | ) |
| | | |
| | | var ctx context.Context |
| | |
| | | //sql := []byte("select * from t_icitem where FItemID=3316") |
| | | //ok := nsqclient.Produce(config.Options.QueryTopic, sql) |
| | | //logger.Debug("测试请求接口, %v", ok) |
| | | // |
| | | //time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second) |
| | | |
| | | time.Sleep(time.Duration(config.Options.SyncInterval) * time.Second) |
| | | } |
| | | } |
| | | } |
| | |
| | | "encoding/json" |
| | | "io/ioutil" |
| | | "kingdee-dbapi/logger" |
| | | "time" |
| | | |
| | | "kingdee-dbapi/cache" |
| | | "kingdee-dbapi/config" |
| | |
| | | } |
| | | } |
| | | |
| | | var invReportedCache = make(map[string]float64, 0) |
| | | var fullLoad bool |
| | | |
| | | func SendInventory() { |
| | | var list []kingdee.Inventory |
| | | |
| | | // 设置每天凌晨1点上报一次全量数据 |
| | | hour := time.Now().Hour() |
| | | if hour == 1 { |
| | | if fullLoad == false { |
| | | invReportedCache = make(map[string]float64, 0) |
| | | fullLoad = true |
| | | } |
| | | } else { |
| | | fullLoad = false |
| | | } |
| | | |
| | | if config.Options.Debug { |
| | | data, err := ioutil.ReadFile(inventoryLocalStore) |
| | |
| | | } |
| | | } else { |
| | | list = kingdee.ICInventory() |
| | | |
| | | logger.Debug("查询到%d条库存数据", len(list)) |
| | | |
| | | for i := 0; i < len(list); { |
| | | cacheKey := list[i].FNumber + list[i].FBatchNo + list[i].FStockNo |
| | | if qty, ok := invReportedCache[cacheKey]; ok && qty == list[i].FUnitQty { |
| | | list = append(list[:i], list[i+1:]...) |
| | | } else { |
| | | invReportedCache[cacheKey] = list[i].FUnitQty |
| | | i++ |
| | | } |
| | | } |
| | | } |
| | | |
| | | // 每次发 100 条 |
| | |
| | | ok := nsqclient.Produce(config.Options.InventoryTopic, b) |
| | | if !ok { |
| | | logger.Warn("库存数据上报失败") |
| | | |
| | | //上报失败, 缓存清空 |
| | | invReportedCache = make(map[string]float64, 0) |
| | | } else { |
| | | successCnt = end |
| | | } |
| | | } |
| | | |
| | | logger.Debug("已上报%d条库存数据", successCnt) |
| | | } |