package service import ( "bytes" "context" "encoding/json" "errors" "fmt" "github.com/fsnotify/fsnotify" "gorm.io/gorm" "io" "log" "mime/multipart" "net/http" "os" "os/exec" "path/filepath" "speechAnalysis/conf" "speechAnalysis/constvar" "speechAnalysis/models" "speechAnalysis/pkg/logx" "strings" "time" ) // Response 结构体用于存储响应体的内容 type Response struct { Code int `json:"code"` Msg string `json:"msg"` Result string `json:"result"` Score float64 `json:"score"` } func AnalysisAudio(filename string, targetURL string) (resp Response, err error) { file, err := os.Open(filename) if err != nil { return } defer file.Close() // 创建一个缓冲区来存储表单数据 var requestBody bytes.Buffer writer := multipart.NewWriter(&requestBody) // 创建一个表单字段,用于存储文件 fileWriter, err := writer.CreateFormFile("audio", filename) if err != nil { return } // 将文件内容复制到表单字段中 _, err = io.Copy(fileWriter, file) if err != nil { return } // 关闭表单写入器,以便写入末尾的边界 writer.Close() // 创建POST请求,指定URL和请求体 request, err := http.NewRequest("POST", targetURL, &requestBody) if err != nil { return } // 设置请求头,指定Content-Type为multipart/form-data request.Header.Set("Content-Type", writer.FormDataContentType()) // 发送请求 client := &http.Client{} response, err := client.Do(request) if err != nil { return } defer response.Body.Close() // 读取响应 body := &bytes.Buffer{} _, err = io.Copy(body, response.Body) if err != nil { return } err = json.NewDecoder(body).Decode(&resp) if err != nil { return } return } func Process(audioId uint) (err error) { audio, err := models.NewAudioSearch().SetID(audioId).First() if err != nil { return errors.New("查找音频失败") } if audio.AudioStatus != constvar.AudioStatusUploadOk && audio.AudioStatus != constvar.AudioStatusFailed { return errors.New("状态不正确") } err = models.NewAudioSearch().SetID(audioId).UpdateByMap(map[string]interface{}{"audio_status": constvar.AudioStatusProcessing}) if err != nil { return errors.New("DB错误") } go func() { var resp Response sz := audio.Size / 1024 / 1024 if sz > 2 { resp, err = AnalysisAudio(audio.FilePath, conf.AanlysisConf.LongUrl) } else { resp, err = AnalysisAudio(audio.FilePath, conf.AanlysisConf.Url) } if err != nil { logx.Errorf("err when AnalysisAudio:%v", err) _ = models.NewAudioSearch().SetID(audioId).UpdateByMap(map[string]interface{}{"audio_status": constvar.AudioStatusFailed}) return } if resp.Code != 0 { logx.Errorf("AnalysisAudio error return:%v", resp) _ = models.NewAudioSearch().SetID(audioId).UpdateByMap(map[string]interface{}{"audio_status": constvar.AudioStatusFailed}) return } logx.Infof("AnalysisAudio result: %v", resp) words := GetWordFromText(resp.Result, audio) err = models.WithTransaction(func(db *gorm.DB) error { err = models.NewAudioSearch().SetOrm(db).SetID(audioId).UpdateByMap(map[string]interface{}{ "audio_status": constvar.AudioStatusFinish, "score": resp.Score, "tags": strings.Join(words, ","), }) if err != nil { return err } err = models.NewAudioTextSearch().SetOrm(db).Save(&models.AudioText{ AudioID: audio.ID, AudioText: resp.Result, }) return err }) if err != nil { logx.Infof("AnalysisAudio success but update record failed: %v", err) _ = models.NewAudioSearch().SetID(audioId).UpdateByMap(map[string]interface{}{"audio_status": constvar.AudioStatusFailed}) return } }() return nil } func GetWordFromText(text string, audio *models.Audio) (words []string) { if audio == nil { return nil } wordRecords, err := models.NewWordSearch().SetLocomotiveNumber(audio.LocomotiveNumber).FindNotTotal() if err != nil || len(wordRecords) == 0 { return nil } for _, v := range wordRecords { if strings.Contains(text, v.Content) { words = append(words, v.Content) } } return words } func PreLoad(cxt context.Context) { mkdirErr := os.MkdirAll(conf.LocalConf.PreLoadPath, os.ModePerm) if mkdirErr != nil { logx.Errorf("function os.MkdirAll() err:%v", mkdirErr) } mkdirErr1 := os.MkdirAll(conf.LocalConf.StorePath, os.ModePerm) if mkdirErr1 != nil { logx.Errorf("function os.MkdirAll() err:%v", mkdirErr1) } //文件夹下新增音频文件时触发 watcher, err := fsnotify.NewWatcher() if err != nil { log.Fatal(err) } defer watcher.Close() err = watcher.Add(conf.LocalConf.PreLoadPath) if err != nil { log.Fatal(err) } audoF := func(eventName, fileName string, audio *models.Audio) bool { time.Sleep(time.Second * 1) //设置文件访问权限 err = os.Chmod(eventName, 0777) if err != nil { logx.Errorf(fmt.Sprintf("%s:%s", eventName, "设置文件权限失败")) } //校验文件命名 arr := strings.Split(fileName, "_") if len(arr) != 6 { logx.Errorf(fmt.Sprintf("%s:%s", fileName, "文件名称错误")) return false } timeStr := arr[4] + strings.Split(arr[5], ".")[0] t, err := time.ParseInLocation("20060102150405", timeStr, time.Local) if err != nil { logx.Errorf(fmt.Sprintf("%s:%s", fileName, "时间格式不对")) } //查重 _, err = models.NewAudioSearch().SetName(fileName).First() if err != gorm.ErrRecordNotFound { logx.Errorf(fmt.Sprintf("%s:%s", fileName, "重复上传")) return false } //将文件移动到uploads文件夹下 //判断storePath中末尾是否带 var src string if strings.HasSuffix(conf.LocalConf.StorePath, "/") { src = conf.LocalConf.StorePath + fileName } else { src = conf.LocalConf.StorePath + "/" + fileName } //err = os.Rename(eventName, src) //利用exec命令移动文件 cmd := exec.Command("mv", eventName, src) err = cmd.Run() if err != nil { logx.Errorf(fmt.Sprintf("%s:%s-%s", fileName, "移动文件失败", err.Error())) return false } // 读取文件大小 fileInfo, err := os.Stat(src) if err != nil { logx.Errorf(fmt.Sprintf("%s:%s", fileName, "获取文件大小失败")) return false } size := fileInfo.Size() fmt.Println("fileName:", fileName, "size:", size, "src1", src) audio.Name = fileName audio.Size = size audio.FilePath = src audio.AudioStatus = constvar.AudioStatusUploadOk audio.LocomotiveNumber = arr[0] audio.TrainNumber = arr[1] audio.DriverNumber = arr[2] audio.Station = arr[3] audio.OccurrenceAt = t audio.IsFollowed = 0 return true } txtF := func(filePath string, audio *models.Audio) bool { fileName := filepath.Base(filePath) //读取filepath文件内容到bts bts, err := os.ReadFile(filePath) if err != nil { logx.Errorf(fmt.Sprintf("%s:%s", filePath, "读取txt文件失败")) return false } //解析 交路号:123_公里标:321 fileds := string(bts) arr := strings.Split(fileds, "\n") if len(arr) != 2 { logx.Errorf(fmt.Sprintf("%s:%s", filePath, "读取txt文件内容格式不对")) return false } else { RouteNumber := strings.Split(arr[0], ":") KilometerMarker := strings.Split(arr[1], ":") if len(RouteNumber) > 1 && len(KilometerMarker) > 1 { audio.RouteNumber = RouteNumber[1] audio.KilometerMarker = KilometerMarker[1] } else { logx.Errorf(fmt.Sprintf("%s:%s", filePath, "文件内容格式不对")) return false } } var src string if strings.HasSuffix(conf.LocalConf.StorePath, "/") { src = conf.LocalConf.StorePath + fileName } else { src = conf.LocalConf.StorePath + "/" + fileName } //err = os.Rename(filePath, src) //利用exec命令移动文件 cmd := exec.Command("mv", filePath, src) err = cmd.Run() if err != nil { logx.Errorf(fmt.Sprintf("%s:%s", fileName, "移动文件失败")) return false } audio.TxtFilePath = src return true } //成对变量 pair := make(map[string]string) FOR: for { select { case <-cxt.Done(): fmt.Println("preload stop") break FOR // 退出循环 case event, ok := <-watcher.Events: if !ok { continue } if event.Op&fsnotify.Create == fsnotify.Create { // 文件名 fileName := filepath.Base(event.Name) //获取不带扩展名的文件名 name := strings.TrimSuffix(fileName, filepath.Ext(fileName)) //判断文件在pair中 if _, ok := pair[name]; !ok { pair[name] = event.Name } else { audio := &models.Audio{} isOk := true // 判断文件类型是否为.mp3或.wav if strings.ToLower(filepath.Ext(event.Name)) == ".mp3" || strings.ToLower(filepath.Ext(event.Name)) == ".wav" { isOk = audoF(event.Name, fileName, audio) && txtF(pair[name], audio) } if strings.ToLower(filepath.Ext(event.Name)) == ".txt" { isOk = audoF(pair[name], filepath.Base(pair[name]), audio) && txtF(event.Name, audio) } if !isOk { delete(pair, name) continue } if len(audio.Name) > 0 { if err = models.NewAudioSearch().Create(audio); err != nil { logx.Errorf(fmt.Sprintf("%s:%s", fileName, "数据库create失败")) continue } go func() { var trainInfoNames = []string{audio.LocomotiveNumber, audio.TrainNumber, audio.Station} // var ( info *models.TrainInfo err error parent models.TrainInfo ) for i := 0; i < 3; i++ { name := trainInfoNames[i] class := constvar.Class(i + 1) info, err = models.NewTrainInfoSearch().SetName(name).SetClass(class).First() if err == gorm.ErrRecordNotFound { info = &models.TrainInfo{ Name: name, Class: class, ParentID: parent.ID, } _ = models.NewTrainInfoSearch().Create(info) } parent = *info } }() } } } case err, ok := <-watcher.Errors: if !ok { logx.Errorf(err.Error()) } } } }