Qwen3-ASR-0.6B实战教程:Python API调用方法,批量处理音频文件

1. 从Web界面到代码:为什么需要Python API?

如果你用过Qwen3-ASR-0.6B的Web界面,可能会觉得挺方便——上传文件、点击按钮、看到结果。但当你需要处理几十个、几百个音频文件时,一个个手动上传就太麻烦了。这时候,Python API的价值就体现出来了。

想象一下这些场景:

  • 你有一整天的会议录音需要转成文字稿
  • 你需要为一批视频文件自动生成字幕
  • 你要定期处理用户上传的语音消息
  • 你想把语音识别集成到自己的应用里

这些批量、自动化的任务,用Web界面操作效率太低。Python API让你能用代码控制整个流程,一次处理成百上千个文件,还能把结果直接保存到数据库或文件里。

Qwen3-ASR-0.6B的API设计得很简单,就是标准的HTTP接口,用Python的requests库几行代码就能调用。下面我就带你一步步掌握这个技能。

2. 环境准备:安装必要的Python库

在开始写代码之前,确保你的Python环境已经准备好了。如果你已经部署了Qwen3-ASR-0.6B镜像,那么服务端的环境已经配置好了。我们这里说的是客户端的环境——就是你写Python代码的电脑或服务器。

2.1 检查Python版本

打开终端或命令行,输入:

python3 --version

建议使用Python 3.8或更高版本。如果版本太低,可以去Python官网下载新版本。

2.2 安装必需的库

只需要两个核心库:

pip install requests

是的,就这么简单。requests库用来发送HTTP请求调用API,它是Python里最常用的HTTP客户端库。

如果你打算处理音频文件,可能还需要pydubsoundfile这样的音频处理库,但Qwen3-ASR-0.6B的API本身支持多种音频格式,所以不是必须的。

2.3 确认服务地址

确保你知道Qwen3-ASR-0.6B服务的访问地址。如果你在本地部署,通常是:

http://localhost:7860

如果在远程服务器上,需要换成服务器的IP地址和端口:

http://你的服务器IP:7860

你可以在浏览器里访问这个地址,看看Web界面能不能打开,确认服务正常运行。

3. 基础API调用:单个文件识别

我们先从最简单的开始:识别一个音频文件。这是所有复杂操作的基础。

3.1 理解API接口

Qwen3-ASR-0.6B提供了一个简单的HTTP接口:

  • URL: http://你的服务地址/api/transcribe
  • 方法: POST
  • 参数格式: multipart/form-data
  • 必需参数: audio (音频文件)
  • 可选参数:
    • language: 语言代码,如zh(中文)、en(英文),或auto(自动检测)
    • output_timestamps: truefalse,是否输出时间戳

3.2 完整代码示例

创建一个Python文件,比如transcribe_single.py,写入以下代码:

import requests
import json

def transcribe_single_file(audio_file_path, server_url="http://localhost:7860", language="auto"):
    """
    识别单个音频文件
    
    Args:
        audio_file_path: 音频文件的路径
        server_url: Qwen3-ASR服务地址
        language: 语言代码,默认自动检测
    
    Returns:
        识别结果,包含文本和时间戳信息
    """
    # 准备请求数据
    with open(audio_file_path, 'rb') as audio_file:
        files = {'audio': audio_file}
        data = {
            'language': language,
            'output_timestamps': 'true'  # 获取时间戳信息
        }
        
        # 发送请求
        try:
            response = requests.post(
                f"{server_url}/api/transcribe",
                files=files,
                data=data,
                timeout=60  # 设置超时时间
            )
            
            # 检查响应状态
            if response.status_code == 200:
                result = response.json()
                return {
                    'success': True,
                    'text': result.get('text', ''),
                    'timestamps': result.get('timestamps', []),
                    'language': result.get('language', 'unknown')
                }
            else:
                return {
                    'success': False,
                    'error': f"请求失败,状态码: {response.status_code}",
                    'details': response.text
                }
                
        except requests.exceptions.RequestException as e:
            return {
                'success': False,
                'error': f"网络请求错误: {str(e)}"
            }
        except json.JSONDecodeError as e:
            return {
                'success': False,
                'error': f"响应解析错误: {str(e)}",
                'raw_response': response.text[:200]  # 只取前200字符
            }

# 使用示例
if __name__ == "__main__":
    # 替换成你的音频文件路径
    audio_path = "example.wav"
    
    # 调用识别函数
    result = transcribe_single_file(audio_path)
    
    if result['success']:
        print("识别成功!")
        print(f"检测到的语言: {result['language']}")
        print(f"识别文本: {result['text']}")
        print(f"文本长度: {len(result['text'])} 字符")
        
        # 如果有时间戳,显示前几个
        if result['timestamps']:
            print("\n前5个词的时间戳:")
            for i, ts in enumerate(result['timestamps'][:5]):
                print(f"  '{ts['word']}': {ts['start']:.2f}s - {ts['end']:.2f}s")
    else:
        print(f"识别失败: {result['error']}")

3.3 代码详解

让我解释一下关键部分:

  1. 文件读取:用open(audio_file_path, 'rb')以二进制模式打开音频文件。'rb'表示只读二进制模式,适合传输文件。

  2. 请求构造

    • files={'audio': audio_file}:告诉requests这是一个文件上传
    • data字典包含其他参数,比如语言选择
  3. 错误处理

    • 网络错误(服务器没响应)
    • HTTP错误(服务器返回错误状态码)
    • JSON解析错误(响应格式不对)
  4. 结果解析:成功时返回一个字典,包含文本、时间戳和检测到的语言。

3.4 测试你的代码

找一段音频文件(WAV、MP3、FLAC格式都行),替换代码中的audio_path,然后运行:

python transcribe_single.py

如果一切正常,你会看到识别出的文字。如果出错了,根据错误信息排查问题,比如文件路径对不对、服务地址对不对、服务有没有启动。

4. 批量处理实战:处理整个文件夹的音频

单个文件识别只是开始,真正的威力在于批量处理。我们来写一个实用的批量处理脚本。

4.1 基础批量处理

创建一个新文件batch_process.py

import os
import glob
import json
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed

def batch_transcribe_folder(input_folder, output_folder, server_url="http://localhost:7860", 
                           language="auto", max_workers=2):
    """
    批量处理文件夹中的所有音频文件
    
    Args:
        input_folder: 输入文件夹路径,包含音频文件
        output_folder: 输出文件夹路径,保存结果
        server_url: 服务地址
        language: 语言设置
        max_workers: 最大并发数,根据GPU性能调整
    
    Returns:
        处理统计信息
    """
    # 创建输出目录
    os.makedirs(output_folder, exist_ok=True)
    
    # 查找所有支持的音频文件
    audio_extensions = ['*.wav', '*.mp3', '*.flac', '*.ogg', '*.m4a']
    audio_files = []
    for ext in audio_extensions:
        audio_files.extend(glob.glob(os.path.join(input_folder, ext)))
    
    if not audio_files:
        print(f"在 {input_folder} 中没有找到音频文件")
        return {'total': 0, 'success': 0, 'failed': 0}
    
    print(f"找到 {len(audio_files)} 个音频文件,开始处理...")
    
    # 准备统计信息
    stats = {
        'total': len(audio_files),
        'success': 0,
        'failed': 0,
        'failed_files': [],
        'start_time': datetime.now()
    }
    
    # 处理单个文件的函数
    def process_single_file(audio_file):
        try:
            # 调用上一节的识别函数
            result = transcribe_single_file(audio_file, server_url, language)
            
            if result['success']:
                # 生成输出文件名(保持原文件名)
                base_name = os.path.splitext(os.path.basename(audio_file))[0]
                
                # 保存文本结果
                txt_path = os.path.join(output_folder, f"{base_name}.txt")
                with open(txt_path, 'w', encoding='utf-8') as f:
                    f.write(result['text'])
                
                # 保存完整结果(JSON格式,包含时间戳)
                json_path = os.path.join(output_folder, f"{base_name}.json")
                with open(json_path, 'w', encoding='utf-8') as f:
                    json.dump(result, f, ensure_ascii=False, indent=2)
                
                # 保存纯时间戳信息(方便后续处理)
                if result['timestamps']:
                    csv_path = os.path.join(output_folder, f"{base_name}_timestamps.csv")
                    with open(csv_path, 'w', encoding='utf-8') as f:
                        f.write("word,start,end\n")
                        for ts in result['timestamps']:
                            f.write(f"{ts['word']},{ts['start']},{ts['end']}\n")
                
                return {'file': audio_file, 'success': True, 'text_length': len(result['text'])}
            else:
                return {'file': audio_file, 'success': False, 'error': result['error']}
                
        except Exception as e:
            return {'file': audio_file, 'success': False, 'error': str(e)}
    
    # 使用线程池并发处理
    success_count = 0
    failed_count = 0
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # 提交所有任务
        future_to_file = {executor.submit(process_single_file, file): file for file in audio_files}
        
        # 处理完成的任务
        for i, future in enumerate(as_completed(future_to_file), 1):
            file_result = future.result()
            
            if file_result['success']:
                success_count += 1
                print(f"[{i}/{len(audio_files)}] ✓ {os.path.basename(file_result['file'])} "
                      f"({file_result['text_length']} 字符)")
            else:
                failed_count += 1
                stats['failed_files'].append({
                    'file': file_result['file'],
                    'error': file_result['error']
                })
                print(f"[{i}/{len(audio_files)}] ✗ {os.path.basename(file_result['file'])} "
                      f"- 错误: {file_result['error']}")
    
    # 更新统计信息
    stats['success'] = success_count
    stats['failed'] = failed_count
    stats['end_time'] = datetime.now()
    stats['duration'] = (stats['end_time'] - stats['start_time']).total_seconds()
    
    return stats

# 使用示例
if __name__ == "__main__":
    # 配置参数
    input_dir = "/path/to/your/audio/files"  # 替换为你的音频文件夹
    output_dir = "/path/to/output/results"    # 替换为输出文件夹
    server_url = "http://localhost:7860"      # 服务地址
    
    # 运行批量处理
    print("开始批量处理音频文件...")
    statistics = batch_transcribe_folder(
        input_folder=input_dir,
        output_folder=output_dir,
        server_url=server_url,
        language="auto",      # 自动检测语言
        max_workers=2         # 并发数,根据GPU性能调整
    )
    
    # 打印统计信息
    print("\n" + "="*50)
    print("批量处理完成!")
    print(f"总文件数: {statistics['total']}")
    print(f"成功: {statistics['success']}")
    print(f"失败: {statistics['failed']}")
    print(f"总耗时: {statistics['duration']:.2f} 秒")
    
    if statistics['failed'] > 0:
        print(f"\n失败的文件:")
        for fail in statistics['failed_files']:
            print(f"  {os.path.basename(fail['file'])}: {fail['error']}")

4.2 批量处理的高级功能

上面的脚本已经很好用了,但我们可以让它更强大。添加一些实用功能:

def enhanced_batch_processing(input_folder, output_folder, server_url, 
                             language="auto", max_workers=2, retry_count=2):
    """
    增强版批量处理,包含重试机制和进度保存
    """
    # 创建必要的目录
    os.makedirs(output_folder, exist_ok=True)
    log_dir = os.path.join(output_folder, "logs")
    os.makedirs(log_dir, exist_ok=True)
    
    # 进度文件路径(支持中断后继续)
    progress_file = os.path.join(log_dir, "progress.json")
    
    # 加载之前的进度(如果存在)
    processed_files = set()
    if os.path.exists(progress_file):
        try:
            with open(progress_file, 'r', encoding='utf-8') as f:
                progress_data = json.load(f)
                processed_files = set(progress_data.get('processed', []))
            print(f"从进度文件恢复,已处理 {len(processed_files)} 个文件")
        except:
            print("进度文件损坏,重新开始")
    
    # 查找所有音频文件
    audio_files = []
    for ext in ['*.wav', '*.mp3', '*.flac', '*.ogg', '*.m4a']:
        audio_files.extend(glob.glob(os.path.join(input_folder, ext)))
    
    # 过滤掉已处理的文件
    todo_files = [f for f in audio_files if f not in processed_files]
    
    if not todo_files:
        print("没有需要处理的新文件")
        return
    
    print(f"找到 {len(todo_files)} 个待处理文件")
    
    # 带重试的处理函数
    def process_with_retry(audio_file, max_retries=retry_count):
        for attempt in range(max_retries + 1):
            try:
                result = transcribe_single_file(audio_file, server_url, language)
                if result['success']:
                    return {'success': True, 'result': result, 'attempts': attempt + 1}
                elif attempt < max_retries:
                    print(f"重试 {audio_file} (第{attempt + 1}次)")
                    time.sleep(1)  # 等待1秒后重试
            except Exception as e:
                if attempt < max_retries:
                    print(f"错误重试 {audio_file}: {str(e)}")
                    time.sleep(1)
        
        return {'success': False, 'error': '超过最大重试次数'}
    
    # 处理文件并保存进度
    success_count = 0
    failed_count = 0
    
    for i, audio_file in enumerate(todo_files, 1):
        print(f"[{i}/{len(todo_files)}] 处理: {os.path.basename(audio_file)}")
        
        result = process_with_retry(audio_file)
        
        if result['success']:
            # 保存结果文件(同上)
            save_results(audio_file, output_folder, result['result'])
            success_count += 1
        else:
            failed_count += 1
            # 记录失败信息
            log_error(audio_file, result['error'], log_dir)
        
        # 更新进度
        processed_files.add(audio_file)
        save_progress(progress_file, list(processed_files))
        
        # 每处理10个文件打印一次进度
        if i % 10 == 0:
            print(f"进度: {i}/{len(todo_files)} (成功: {success_count}, 失败: {failed_count})")
    
    return success_count, failed_count

def save_results(audio_file, output_folder, result):
    """保存识别结果"""
    base_name = os.path.splitext(os.path.basename(audio_file))[0]
    
    # 保存文本
    with open(os.path.join(output_folder, f"{base_name}.txt"), 'w', encoding='utf-8') as f:
        f.write(result['text'])
    
    # 保存JSON
    with open(os.path.join(output_folder, f"{base_name}.json"), 'w', encoding='utf-8') as f:
        json.dump(result, f, ensure_ascii=False, indent=2)

def log_error(audio_file, error, log_dir):
    """记录错误日志"""
    error_log = os.path.join(log_dir, "errors.log")
    with open(error_log, 'a', encoding='utf-8') as f:
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        f.write(f"[{timestamp}] {audio_file}: {error}\n")

def save_progress(progress_file, processed_files):
    """保存处理进度"""
    with open(progress_file, 'w', encoding='utf-8') as f:
        json.dump({'processed': processed_files}, f, ensure_ascii=False, indent=2)

5. 实际应用案例:会议录音自动整理

让我们看一个真实的例子:把会议录音自动整理成文字稿。

5.1 会议录音处理脚本

import os
import re
from datetime import timedelta

def process_meeting_recording(audio_file, output_dir, server_url="http://localhost:7860"):
    """
    处理会议录音,生成格式化的会议纪要
    
    Args:
        audio_file: 会议录音文件路径
        output_dir: 输出目录
        server_url: 语音识别服务地址
    """
    print(f"开始处理会议录音: {os.path.basename(audio_file)}")
    
    # 1. 语音识别
    result = transcribe_single_file(audio_file, server_url, language="zh")
    
    if not result['success']:
        print(f"语音识别失败: {result['error']}")
        return
    
    # 2. 基础文本清理
    raw_text = result['text']
    cleaned_text = clean_transcription_text(raw_text)
    
    # 3. 分割成段落(基于时间戳)
    paragraphs = split_into_paragraphs(result['timestamps'])
    
    # 4. 生成会议纪要
    meeting_minutes = generate_meeting_minutes(
        audio_file=audio_file,
        raw_text=raw_text,
        paragraphs=paragraphs,
        timestamps=result['timestamps']
    )
    
    # 5. 保存各种格式
    base_name = os.path.splitext(os.path.basename(audio_file))[0]
    
    # 原始识别结果
    with open(os.path.join(output_dir, f"{base_name}_raw.txt"), 'w', encoding='utf-8') as f:
        f.write(raw_text)
    
    # 清理后的文本
    with open(os.path.join(output_dir, f"{base_name}_cleaned.txt"), 'w', encoding='utf-8') as f:
        f.write(cleaned_text)
    
    # 格式化的会议纪要
    with open(os.path.join(output_dir, f"{base_name}_minutes.md"), 'w', encoding='utf-8') as f:
        f.write(meeting_minutes)
    
    # 段落分割结果
    with open(os.path.join(output_dir, f"{base_name}_paragraphs.json"), 'w', encoding='utf-8') as f:
        json.dump(paragraphs, f, ensure_ascii=False, indent=2)
    
    print(f"会议纪要生成完成,保存到: {output_dir}")

def clean_transcription_text(text):
    """清理识别文本中的常见问题"""
    # 移除多余的空白字符
    text = re.sub(r'\s+', ' ', text).strip()
    
    # 处理常见的识别错误(可以根据实际情况调整)
    replacements = {
        '呃': '', '嗯': '', '啊': '',  # 移除语气词
        '这个': '', '那个': '',        # 移除口头禅
        '就是': '', '然后': '',        # 移除连接词
    }
    
    for old, new in replacements.items():
        text = text.replace(old, new)
    
    # 分割过长的句子
    sentences = re.split(r'[。!?;]', text)
    sentences = [s.strip() for s in sentences if s.strip()]
    
    # 重新组合,每句一行
    return '\n'.join(sentences)

def split_into_paragraphs(timestamps, silence_threshold=2.0):
    """
    根据时间戳和静音段分割段落
    
    Args:
        timestamps: 单词时间戳列表
        silence_threshold: 静音阈值(秒),超过这个时间就分段落
    """
    if not timestamps:
        return []
    
    paragraphs = []
    current_para = []
    current_start = timestamps[0]['start']
    
    for i in range(len(timestamps)):
        current_word = timestamps[i]
        current_para.append(current_word['word'])
        
        # 检查是否需要分段
        if i < len(timestamps) - 1:
            next_word = timestamps[i + 1]
            silence_gap = next_word['start'] - current_word['end']
            
            if silence_gap > silence_threshold:
                # 当前段落结束
                paragraph_text = ''.join(current_para)
                paragraphs.append({
                    'text': paragraph_text,
                    'start': current_start,
                    'end': current_word['end'],
                    'duration': current_word['end'] - current_start
                })
                
                # 开始新段落
                current_para = []
                current_start = next_word['start']
    
    # 添加最后一个段落
    if current_para:
        paragraph_text = ''.join(current_para)
        paragraphs.append({
            'text': paragraph_text,
            'start': current_start,
            'end': timestamps[-1]['end'],
            'duration': timestamps[-1]['end'] - current_start
        })
    
    return paragraphs

def generate_meeting_minutes(audio_file, raw_text, paragraphs, timestamps):
    """生成格式化的会议纪要"""
    from datetime import datetime
    
    # 基本信息
    file_name = os.path.basename(audio_file)
    file_size = os.path.getsize(audio_file) / (1024 * 1024)  # MB
    total_duration = timestamps[-1]['end'] if timestamps else 0
    word_count = len(raw_text)
    
    # 构建Markdown格式的会议纪要
    minutes = f"""# 会议纪要

## 基本信息
- **文件名称**: {file_name}
- **文件大小**: {file_size:.2f} MB
- **音频时长**: {format_timestamp(total_duration)}
- **识别字数**: {word_count} 字
- **生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- **处理工具**: Qwen3-ASR-0.6B 语音识别

## 内容摘要
{generate_summary(raw_text)}

## 完整转录
{format_transcription(paragraphs)}

## 关键信息提取
{extract_key_information(raw_text)}

## 行动项
{extract_action_items(raw_text)}
"""
    return minutes

def format_timestamp(seconds):
    """格式化时间戳为 HH:MM:SS"""
    return str(timedelta(seconds=int(seconds)))

def generate_summary(text, max_sentences=3):
    """生成简要摘要(这里简化处理,实际可以使用文本摘要模型)"""
    sentences = [s for s in re.split(r'[。!?]', text) if s.strip()]
    if len(sentences) <= max_sentences:
        return '\n'.join([f"- {s}" for s in sentences])
    else:
        return '\n'.join([f"- {s}" for s in sentences[:max_sentences]]) + "\n- ..."

def format_transcription(paragraphs):
    """格式化转录文本"""
    formatted = []
    for i, para in enumerate(paragraphs, 1):
        time_str = f"{format_timestamp(para['start'])} - {format_timestamp(para['end'])}"
        formatted.append(f"### 段落 {i} [{time_str}]\n{para['text']}\n")
    return '\n'.join(formatted)

def extract_key_information(text):
    """提取关键信息(这里简化处理)"""
    # 实际应用中可以使用关键词提取或NER模型
    keywords = ['项目', ' deadline', '预算', '需求', '方案', '问题', '解决']
    found = []
    for kw in keywords:
        if kw in text:
            found.append(kw)
    
    if found:
        return "涉及关键词: " + ", ".join(found)
    return "未检测到明显的关键词"

def extract_action_items(text):
    """提取行动项(这里简化处理)"""
    # 实际应用中可以使用更复杂的规则或模型
    action_patterns = [
        r'需要(.{5,20}?)完成',
        r'请(.{5,20}?)处理',
        r'下一步(.{5,20}?)',
        r'安排(.{5,20}?)'
    ]
    
    actions = []
    for pattern in action_patterns:
        matches = re.findall(pattern, text)
        actions.extend(matches)
    
    if actions:
        return '\n'.join([f"- {action}" for action in set(actions)])
    return "未检测到明确的行动项"

# 使用示例
if __name__ == "__main__":
    # 处理单个会议录音
    process_meeting_recording(
        audio_file="meeting_20240515.wav",
        output_dir="./meeting_minutes",
        server_url="http://localhost:7860"
    )
    
    # 或者批量处理会议录音
    meeting_files = glob.glob("/recordings/meetings/*.wav")
    for meeting_file in meeting_files:
        process_meeting_recording(meeting_file, "./meeting_minutes")

5.2 自动化工作流

你可以把这个脚本放到定时任务里,自动处理每天的会议录音:

import schedule
import time

def daily_meeting_processing():
    """每天自动处理会议录音"""
    # 1. 扫描指定目录的新录音文件
    recording_dir = "/daily_recordings"
    today = datetime.now().strftime("%Y%m%d")
    output_dir = f"./meeting_minutes/{today}"
    
    # 2. 处理所有新文件
    audio_files = glob.glob(os.path.join(recording_dir, "*.wav")) + \
                  glob.glob(os.path.join(recording_dir, "*.mp3"))
    
    for audio_file in audio_files:
        process_meeting_recording(audio_file, output_dir)
    
    # 3. 发送通知(可选)
    send_notification(f"已处理 {len(audio_files)} 个会议录音")
    
    print(f"{datetime.now()}: 完成每日会议录音处理")

# 设置每天下午6点执行
schedule.every().day.at("18:00").do(daily_meeting_processing)

print("会议录音自动处理服务已启动...")
while True:
    schedule.run_pending()
    time.sleep(60)

6. 性能优化与错误处理

6.1 调整并发数

批量处理时,并发数不是越大越好。要根据你的GPU性能来调整:

def find_optimal_workers(server_url, test_file, max_test=8):
    """
    测试找到最佳并发数
    """
    results = []
    
    for workers in range(1, max_test + 1):
        print(f"测试 {workers} 个并发...")
        
        start_time = time.time()
        
        # 同时处理多个测试文件
        with ThreadPoolExecutor(max_workers=workers) as executor:
            futures = [executor.submit(transcribe_single_file, test_file, server_url) 
                      for _ in range(10)]  # 测试10个任务
            
            # 等待所有完成
            for future in as_completed(futures):
                future.result()
        
        duration = time.time() - start_time
        avg_time = duration / 10
        
        results.append({
            'workers': workers,
            'total_time': duration,
            'avg_time': avg_time,
            'throughput': 10 / duration
        })
        
        print(f"  平均每个文件: {avg_time:.2f}秒, 吞吐量: {10/duration:.2f} 文件/秒")
    
    # 找到最佳并发数
    best = max(results, key=lambda x: x['throughput'])
    print(f"\n最佳并发数: {best['workers']}")
    print(f"最佳吞吐量: {best['throughput']:.2f} 文件/秒")
    
    return best['workers']

6.2 错误处理与重试

网络请求可能失败,需要完善的错误处理:

def robust_transcribe(audio_file, server_url, max_retries=3, timeout=30):
    """
    带重试机制的语音识别
    """
    for attempt in range(max_retries):
        try:
            result = transcribe_single_file(audio_file, server_url)
            
            if result['success']:
                return result
            elif '网络' in result.get('error', '') and attempt < max_retries - 1:
                # 网络错误,等待后重试
                wait_time = 2 ** attempt  # 指数退避
                print(f"网络错误,{wait_time}秒后重试...")
                time.sleep(wait_time)
                continue
            else:
                # 其他错误,直接返回
                return result
                
        except Exception as e:
            if attempt < max_retries - 1:
                wait_time = 2 ** attempt
                print(f"异常 {str(e)},{wait_time}秒后重试...")
                time.sleep(wait_time)
            else:
                return {
                    'success': False,
                    'error': f"重试{max_retries}次后失败: {str(e)}"
                }
    
    return {'success': False, 'error': '超过最大重试次数'}

6.3 内存和性能监控

处理大量文件时,监控资源使用:

import psutil
import threading

class ResourceMonitor:
    """资源使用监控器"""
    
    def __init__(self, interval=5):
        self.interval = interval
        self.monitoring = False
        self.stats = []
        
    def start(self):
        """开始监控"""
        self.monitoring = True
        self.thread = threading.Thread(target=self._monitor_loop)
        self.thread.daemon = True
        self.thread.start()
        
    def stop(self):
        """停止监控"""
        self.monitoring = False
        if self.thread:
            self.thread.join(timeout=2)
            
    def _monitor_loop(self):
        """监控循环"""
        while self.monitoring:
            # 获取CPU使用率
            cpu_percent = psutil.cpu_percent(interval=1)
            
            # 获取内存使用
            memory = psutil.virtual_memory()
            
            # 获取GPU使用(如果可用)
            gpu_info = self._get_gpu_info()
            
            self.stats.append({
                'timestamp': datetime.now().isoformat(),
                'cpu_percent': cpu_percent,
                'memory_percent': memory.percent,
                'memory_used_gb': memory.used / (1024**3),
                'gpu_info': gpu_info
            })
            
            time.sleep(self.interval)
    
    def _get_gpu_info(self):
        """获取GPU信息(需要nvidia-smi)"""
        try:
            import subprocess
            result = subprocess.run(
                ['nvidia-smi', '--query-gpu=utilization.gpu,memory.used,memory.total', 
                 '--format=csv,noheader,nounits'],
                capture_output=True,
                text=True
            )
            if result.returncode == 0:
                gpu_data = result.stdout.strip().split(', ')
                return {
                    'gpu_utilization': float(gpu_data[0]),
                    'memory_used': float(gpu_data[1]),
                    'memory_total': float(gpu_data[2])
                }
        except:
            pass
        return None
    
    def print_summary(self):
        """打印监控摘要"""
        if not self.stats:
            print("没有监控数据")
            return
            
        print("\n资源使用统计:")
        print(f"监控时长: {len(self.stats) * self.interval} 秒")
        print(f"平均CPU使用率: {sum(s['cpu_percent'] for s in self.stats)/len(self.stats):.1f}%")
        print(f"平均内存使用率: {sum(s['memory_percent'] for s in self.stats)/len(self.stats):.1f}%")
        
        if self.stats[0]['gpu_info']:
            avg_gpu = sum(s['gpu_info']['gpu_utilization'] for s in self.stats if s['gpu_info'])/len(self.stats)
            print(f"平均GPU使用率: {avg_gpu:.1f}%")

# 使用监控器
monitor = ResourceMonitor(interval=10)
monitor.start()

# 执行批量处理
batch_transcribe_folder(...)

monitor.stop()
monitor.print_summary()

7. 总结:从单文件到批量处理的完整方案

通过这篇教程,你应该已经掌握了Qwen3-ASR-0.6B Python API的完整使用方法。让我们回顾一下关键点:

7.1 核心要点总结

  1. API调用很简单:本质上就是一个HTTP POST请求,上传音频文件,获取识别结果
  2. 批量处理是核心价值:用Python脚本可以自动化处理大量文件,节省大量时间
  3. 错误处理很重要:网络问题、服务问题都可能发生,要有重试机制
  4. 并发可以提高效率:但要根据GPU性能调整,不是越多越好
  5. 结果可以多种格式保存:纯文本、带时间戳的JSON、CSV等,根据后续用途选择

7.2 实用建议

  1. 开始之前:先测试单个文件,确保服务正常,API调用成功
  2. 处理大量文件时:使用进度保存,支持中断后继续
  3. 性能调优:先用小批量测试找到最佳并发数
  4. 结果验证:定期抽查识别结果,确保质量
  5. 日志记录:记录处理过程中的所有错误,方便排查问题

7.3 扩展思路

掌握了基础用法后,你还可以考虑:

  • 集成到Web应用:用Flask或FastAPI包装成Web服务
  • 实时语音识别:结合WebSocket实现实时转录
  • 多语言混合识别:处理包含多种语言的音频
  • 结合其他AI服务:识别后自动翻译、摘要、分类等

Qwen3-ASR-0.6B的API虽然简单,但功能强大。通过Python脚本,你可以把它集成到各种工作流中,真正实现语音处理的自动化。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐