package radio import ( "bytes" "crypto/md5" "encoding/json" "fmt" "io" "net/http" "strconv" "time" "sundynix-go/global" "sundynix-go/model/radio" "sundynix-go/model/system" "sundynix-go/pkg/httpclient" "sundynix-go/utils/upload" "github.com/google/uuid" ) // TTSRequest TTS请求参数 type TTSRequest struct { Text string // 要转换的文本 Speaker string // 声音类型 ProgramId string // 节目ID } // SubmitTTSTask 提交长文本TTS任务 (异步) func (t *TTSService) SubmitTTSTask(req TTSRequest) (string, error) { if req.Text == "" { return "", fmt.Errorf("文本内容不能为空") } taskId, err := t.doSubmitTTSTask(req) if err != nil { return "", fmt.Errorf("提交TTS任务失败: %v", err) } go t.asyncProcessResult(req.ProgramId, taskId) return taskId, nil } // doSubmitTTSTask 提交任务 func (t *TTSService) doSubmitTTSTask(req TTSRequest) (string, error) { url := "https://openspeech.bytedance.com/api/v3/tts/submit" appID := global.Config.TTS.AppId accessKey := global.Config.TTS.AccessKey resourceID := global.Config.TTS.ResourceId if resourceID == "" { resourceID = "seed-tts-2.0" } speaker := req.Speaker if speaker == "" { speaker = "zh_male_dayi_uranus_bigtts" } bodyData := map[string]interface{}{ "user": map[string]interface{}{"uid": "123123"}, "unique_id": uuid.New().String(), "req_params": map[string]interface{}{ "text": req.Text, "speaker": speaker, "audio_params": map[string]interface{}{ "format": "mp3", "sample_rate": 24000, "enable_timestamp": true, }, "additions": "{}", }, } jsonBody, _ := json.Marshal(bodyData) httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBody)) if err != nil { return "", err } httpReq.Header.Set("X-Api-App-Id", appID) httpReq.Header.Set("X-Api-Access-Key", accessKey) httpReq.Header.Set("X-Api-Resource-Id", resourceID) httpReq.Header.Set("X-Api-Request-Id", uuid.New().String()) httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Connection", "keep-alive") resp, err := httpclient.GetClient().Do(httpReq) if err != nil { return "", err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { respBytes, _ := io.ReadAll(resp.Body) return "", fmt.Errorf("请求失败, 状态码: %d, 返回: %s", resp.StatusCode, string(respBytes)) } var result struct { Data struct { TaskId string `json:"task_id"` } `json:"data"` Message string `json:"message"` } if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return "", err } if result.Data.TaskId == "" { return "", fmt.Errorf("未能获取任务ID: %s", result.Message) } return result.Data.TaskId, nil } // asyncProcessResult 异步处理TTS结果 func (t *TTSService) asyncProcessResult(programId, taskId string) { resultUrl, err := t.waitForResult(taskId) if err != nil { global.Logger.Error(fmt.Sprintf("TTS任务失败, TaskId: %s, Error: %v", taskId, err)) global.DB.Model(&radio.RadioProgram{}).Where("id = ?", programId).Update("audio_status", 0) return } audioData, err := t.downloadAudio(resultUrl) if err != nil { global.Logger.Error(fmt.Sprintf("下载音频失败, TaskId: %s, Error: %v", taskId, err)) global.DB.Model(&radio.RadioProgram{}).Where("id = ?", programId).Update("audio_status", 0) return } audioId, err := t.uploadToOSS(audioData, programId) if err != nil { global.Logger.Error(fmt.Sprintf("上传OSS失败, TaskId: %s, Error: %v", taskId, err)) global.DB.Model(&radio.RadioProgram{}).Where("id = ?", programId).Update("audio_status", 0) return } if err := global.DB.Model(&radio.RadioProgram{}).Where("id = ?", programId). Updates(map[string]interface{}{ "audio_id": audioId, "audio_status": 2, // 音频就绪 }).Error; err != nil { global.Logger.Error(fmt.Sprintf("更新节目音频ID失败, TaskId: %s, Error: %v", taskId, err)) global.DB.Model(&radio.RadioProgram{}).Where("id = ?", programId).Update("audio_status", 0) return } global.Logger.Info(fmt.Sprintf("TTS任务完成, TaskId: %s, ProgramId: %s, AudioId: %s", taskId, programId, audioId)) } // waitForResult 轮询查询任务结果,返回音频下载URL func (t *TTSService) waitForResult(taskId string) (string, error) { maxRetries := 10 interval := 30 * time.Second for i := 0; i < maxRetries; i++ { time.Sleep(interval) resultUrl, status, statusMsg, err := t.queryTaskResult(taskId) if err != nil { return "", err } switch status { case 1: return resultUrl, nil case 2: return "", fmt.Errorf("TTS合成失败: %s", statusMsg) default: global.Logger.Debug(fmt.Sprintf("TTS任务处理中, TaskId: %s, 重试次数: %d", taskId, i+1)) } } return "", fmt.Errorf("TTS任务超时, TaskId: %s", taskId) } // queryTaskResult 查询任务状态 func (t *TTSService) queryTaskResult(taskId string) (string, int, string, error) { url := "https://openspeech.bytedance.com/api/v3/tts/query" appID := global.Config.TTS.AppId accessKey := global.Config.TTS.AccessKey resourceID := global.Config.TTS.ResourceId if resourceID == "" { resourceID = "seed-tts-2.0" } bodyData := map[string]interface{}{"task_id": taskId} jsonBody, _ := json.Marshal(bodyData) httpReq, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonBody)) if err != nil { return "", 0, "", err } httpReq.Header.Set("X-Api-App-Id", appID) httpReq.Header.Set("X-Api-Access-Key", accessKey) httpReq.Header.Set("X-Api-Resource-Id", resourceID) httpReq.Header.Set("X-Api-Request-Id", uuid.New().String()) httpReq.Header.Set("Content-Type", "application/json") httpReq.Header.Set("Connection", "keep-alive") resp, err := httpclient.GetClient().Do(httpReq) if err != nil { return "", 0, "", err } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { respBytes, _ := io.ReadAll(resp.Body) return "", 0, "", fmt.Errorf("查询任务失败, 状态码: %d, 返回: %s", resp.StatusCode, string(respBytes)) } respBytes, _ := io.ReadAll(resp.Body) global.Logger.Info(fmt.Sprintf("火山查询原始结果: %s", string(respBytes))) var result struct { Data map[string]interface{} `json:"data"` Message string `json:"message"` } if err := json.Unmarshal(respBytes, &result); err != nil { return "", 0, "", err } status := 0 audioUrl := "" if result.Data != nil { // 官方文档定义: // 1: Running (正在处理) // 2: Success (处理成功) // 3: Failure (处理失败) volcStatus := 0 if statusVal, ok := result.Data["task_status"].(float64); ok { volcStatus = int(statusVal) } else if statusStr, ok := result.Data["task_status"].(string); ok { if statusStr == "1" { volcStatus = 1 } else if statusStr == "2" || statusStr == "success" || statusStr == "done" { volcStatus = 2 } else if statusStr == "3" || statusStr == "failed" || statusStr == "error" { volcStatus = 3 } } // 映射到内部状态: 0: 处理中, 1: 成功, 2: 失败 if volcStatus == 1 { status = 0 // Running } else if volcStatus == 2 { status = 1 // Success } else if volcStatus == 3 { status = 2 // Failure } if val, ok := result.Data["audio_url"].(string); ok { audioUrl = val } else if val, ok := result.Data["audio"].(string); ok { audioUrl = val } } if audioUrl != "" { status = 1 } if status == 1 && audioUrl == "" { // 任务状态为1时如果没有url,继续等待(轮询)避免直接返回空URL使下载失败 status = 0 } return audioUrl, status, result.Message, nil } // TTSService TTS服务 type TTSService struct{} var TTSServiceApp = new(TTSService) // downloadAudio 从URL下载音频数据 func (t *TTSService) downloadAudio(audioUrl string) ([]byte, error) { resp, err := httpclient.GetClient().Get(audioUrl) if err != nil { return nil, fmt.Errorf("下载音频失败: %v", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("下载音频HTTP错误: %d", resp.StatusCode) } audioData, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("读取音频数据失败: %v", err) } return audioData, nil } // uploadToOSS 上传音频到OSS并保存到数据库 func (t *TTSService) uploadToOSS(audioData []byte, programId string) (string, error) { instance := upload.OssInstance() minioClient, ok := instance.(*upload.Minio) if !ok { return "", fmt.Errorf("获取MinIO客户端失败") } timestamp := time.Now().UnixMicro() timestr := strconv.FormatInt(timestamp, 10) key := fmt.Sprintf("audio/%s/%s.mp3", time.Now().Format("2006-01-02"), programId+"-"+timestr) filename := fmt.Sprintf("program-%s.mp3", programId) fileURL, err := minioClient.UploadBytes(audioData, key, "audio/mpeg") if err != nil { return "", fmt.Errorf("上传文件到OSS失败: %v", err) } hashStr := fmt.Sprintf("%x", md5.Sum(audioData)) oss := system.Oss{ Name: filename, Url: fileURL, Key: key, Suffix: "mp3", Tag: "mp3", MD5: hashStr, } if err := global.DB.Create(&oss).Error; err != nil { return "", fmt.Errorf("保存文件记录失败: %v", err) } return oss.Id, nil }