Qwen3-ASR-0.6B实战教程:Python API调用方法,批量处理音频文件
Qwen3-ASR-0.6B实战教程:Python API调用方法,批量处理音频文件
1. 从Web界面到代码:为什么需要Python API?
如果你用过Qwen3-ASR-0.6B的Web界面,可能会觉得挺方便——上传文件、点击按钮、看到结果。但当你需要处理几十个、几百个音频文件时,一个个手动上传就太麻烦了。这时候,Python API的价值就体现出来了。
想象一下这些场景:
- 你有一整天的会议录音需要转成文字稿
- 你需要为一批视频文件自动生成字幕
- 你要定期处理用户上传的语音消息
- 你想把语音识别集成到自己的应用里
这些批量、自动化的任务,用Web界面操作效率太低。Python API让你能用代码控制整个流程,一次处理成百上千个文件,还能把结果直接保存到数据库或文件里。
Qwen3-ASR-0.6B的API设计得很简单,就是标准的HTTP接口,用Python的requests库几行代码就能调用。下面我就带你一步步掌握这个技能。
2. 环境准备:安装必要的Python库
在开始写代码之前,确保你的Python环境已经准备好了。如果你已经部署了Qwen3-ASR-0.6B镜像,那么服务端的环境已经配置好了。我们这里说的是客户端的环境——就是你写Python代码的电脑或服务器。
2.1 检查Python版本
打开终端或命令行,输入:
python3 --version
建议使用Python 3.8或更高版本。如果版本太低,可以去Python官网下载新版本。
2.2 安装必需的库
只需要两个核心库:
pip install requests
是的,就这么简单。requests库用来发送HTTP请求调用API,它是Python里最常用的HTTP客户端库。
如果你打算处理音频文件,可能还需要pydub或soundfile这样的音频处理库,但Qwen3-ASR-0.6B的API本身支持多种音频格式,所以不是必须的。
2.3 确认服务地址
确保你知道Qwen3-ASR-0.6B服务的访问地址。如果你在本地部署,通常是:
http://localhost:7860
如果在远程服务器上,需要换成服务器的IP地址和端口:
http://你的服务器IP:7860
你可以在浏览器里访问这个地址,看看Web界面能不能打开,确认服务正常运行。
3. 基础API调用:单个文件识别
我们先从最简单的开始:识别一个音频文件。这是所有复杂操作的基础。
3.1 理解API接口
Qwen3-ASR-0.6B提供了一个简单的HTTP接口:
- URL:
http://你的服务地址/api/transcribe - 方法: POST
- 参数格式: multipart/form-data
- 必需参数:
audio(音频文件) - 可选参数:
language: 语言代码,如zh(中文)、en(英文),或auto(自动检测)output_timestamps:true或false,是否输出时间戳
3.2 完整代码示例
创建一个Python文件,比如transcribe_single.py,写入以下代码:
import requests
import json
def transcribe_single_file(audio_file_path, server_url="http://localhost:7860", language="auto"):
"""
识别单个音频文件
Args:
audio_file_path: 音频文件的路径
server_url: Qwen3-ASR服务地址
language: 语言代码,默认自动检测
Returns:
识别结果,包含文本和时间戳信息
"""
# 准备请求数据
with open(audio_file_path, 'rb') as audio_file:
files = {'audio': audio_file}
data = {
'language': language,
'output_timestamps': 'true' # 获取时间戳信息
}
# 发送请求
try:
response = requests.post(
f"{server_url}/api/transcribe",
files=files,
data=data,
timeout=60 # 设置超时时间
)
# 检查响应状态
if response.status_code == 200:
result = response.json()
return {
'success': True,
'text': result.get('text', ''),
'timestamps': result.get('timestamps', []),
'language': result.get('language', 'unknown')
}
else:
return {
'success': False,
'error': f"请求失败,状态码: {response.status_code}",
'details': response.text
}
except requests.exceptions.RequestException as e:
return {
'success': False,
'error': f"网络请求错误: {str(e)}"
}
except json.JSONDecodeError as e:
return {
'success': False,
'error': f"响应解析错误: {str(e)}",
'raw_response': response.text[:200] # 只取前200字符
}
# 使用示例
if __name__ == "__main__":
# 替换成你的音频文件路径
audio_path = "example.wav"
# 调用识别函数
result = transcribe_single_file(audio_path)
if result['success']:
print("识别成功!")
print(f"检测到的语言: {result['language']}")
print(f"识别文本: {result['text']}")
print(f"文本长度: {len(result['text'])} 字符")
# 如果有时间戳,显示前几个
if result['timestamps']:
print("\n前5个词的时间戳:")
for i, ts in enumerate(result['timestamps'][:5]):
print(f" '{ts['word']}': {ts['start']:.2f}s - {ts['end']:.2f}s")
else:
print(f"识别失败: {result['error']}")
3.3 代码详解
让我解释一下关键部分:
-
文件读取:用
open(audio_file_path, 'rb')以二进制模式打开音频文件。'rb'表示只读二进制模式,适合传输文件。 -
请求构造:
files={'audio': audio_file}:告诉requests这是一个文件上传data字典包含其他参数,比如语言选择
-
错误处理:
- 网络错误(服务器没响应)
- HTTP错误(服务器返回错误状态码)
- JSON解析错误(响应格式不对)
-
结果解析:成功时返回一个字典,包含文本、时间戳和检测到的语言。
3.4 测试你的代码
找一段音频文件(WAV、MP3、FLAC格式都行),替换代码中的audio_path,然后运行:
python transcribe_single.py
如果一切正常,你会看到识别出的文字。如果出错了,根据错误信息排查问题,比如文件路径对不对、服务地址对不对、服务有没有启动。
4. 批量处理实战:处理整个文件夹的音频
单个文件识别只是开始,真正的威力在于批量处理。我们来写一个实用的批量处理脚本。
4.1 基础批量处理
创建一个新文件batch_process.py:
import os
import glob
import json
import time
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
def batch_transcribe_folder(input_folder, output_folder, server_url="http://localhost:7860",
language="auto", max_workers=2):
"""
批量处理文件夹中的所有音频文件
Args:
input_folder: 输入文件夹路径,包含音频文件
output_folder: 输出文件夹路径,保存结果
server_url: 服务地址
language: 语言设置
max_workers: 最大并发数,根据GPU性能调整
Returns:
处理统计信息
"""
# 创建输出目录
os.makedirs(output_folder, exist_ok=True)
# 查找所有支持的音频文件
audio_extensions = ['*.wav', '*.mp3', '*.flac', '*.ogg', '*.m4a']
audio_files = []
for ext in audio_extensions:
audio_files.extend(glob.glob(os.path.join(input_folder, ext)))
if not audio_files:
print(f"在 {input_folder} 中没有找到音频文件")
return {'total': 0, 'success': 0, 'failed': 0}
print(f"找到 {len(audio_files)} 个音频文件,开始处理...")
# 准备统计信息
stats = {
'total': len(audio_files),
'success': 0,
'failed': 0,
'failed_files': [],
'start_time': datetime.now()
}
# 处理单个文件的函数
def process_single_file(audio_file):
try:
# 调用上一节的识别函数
result = transcribe_single_file(audio_file, server_url, language)
if result['success']:
# 生成输出文件名(保持原文件名)
base_name = os.path.splitext(os.path.basename(audio_file))[0]
# 保存文本结果
txt_path = os.path.join(output_folder, f"{base_name}.txt")
with open(txt_path, 'w', encoding='utf-8') as f:
f.write(result['text'])
# 保存完整结果(JSON格式,包含时间戳)
json_path = os.path.join(output_folder, f"{base_name}.json")
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
# 保存纯时间戳信息(方便后续处理)
if result['timestamps']:
csv_path = os.path.join(output_folder, f"{base_name}_timestamps.csv")
with open(csv_path, 'w', encoding='utf-8') as f:
f.write("word,start,end\n")
for ts in result['timestamps']:
f.write(f"{ts['word']},{ts['start']},{ts['end']}\n")
return {'file': audio_file, 'success': True, 'text_length': len(result['text'])}
else:
return {'file': audio_file, 'success': False, 'error': result['error']}
except Exception as e:
return {'file': audio_file, 'success': False, 'error': str(e)}
# 使用线程池并发处理
success_count = 0
failed_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_file = {executor.submit(process_single_file, file): file for file in audio_files}
# 处理完成的任务
for i, future in enumerate(as_completed(future_to_file), 1):
file_result = future.result()
if file_result['success']:
success_count += 1
print(f"[{i}/{len(audio_files)}] ✓ {os.path.basename(file_result['file'])} "
f"({file_result['text_length']} 字符)")
else:
failed_count += 1
stats['failed_files'].append({
'file': file_result['file'],
'error': file_result['error']
})
print(f"[{i}/{len(audio_files)}] ✗ {os.path.basename(file_result['file'])} "
f"- 错误: {file_result['error']}")
# 更新统计信息
stats['success'] = success_count
stats['failed'] = failed_count
stats['end_time'] = datetime.now()
stats['duration'] = (stats['end_time'] - stats['start_time']).total_seconds()
return stats
# 使用示例
if __name__ == "__main__":
# 配置参数
input_dir = "/path/to/your/audio/files" # 替换为你的音频文件夹
output_dir = "/path/to/output/results" # 替换为输出文件夹
server_url = "http://localhost:7860" # 服务地址
# 运行批量处理
print("开始批量处理音频文件...")
statistics = batch_transcribe_folder(
input_folder=input_dir,
output_folder=output_dir,
server_url=server_url,
language="auto", # 自动检测语言
max_workers=2 # 并发数,根据GPU性能调整
)
# 打印统计信息
print("\n" + "="*50)
print("批量处理完成!")
print(f"总文件数: {statistics['total']}")
print(f"成功: {statistics['success']}")
print(f"失败: {statistics['failed']}")
print(f"总耗时: {statistics['duration']:.2f} 秒")
if statistics['failed'] > 0:
print(f"\n失败的文件:")
for fail in statistics['failed_files']:
print(f" {os.path.basename(fail['file'])}: {fail['error']}")
4.2 批量处理的高级功能
上面的脚本已经很好用了,但我们可以让它更强大。添加一些实用功能:
def enhanced_batch_processing(input_folder, output_folder, server_url,
language="auto", max_workers=2, retry_count=2):
"""
增强版批量处理,包含重试机制和进度保存
"""
# 创建必要的目录
os.makedirs(output_folder, exist_ok=True)
log_dir = os.path.join(output_folder, "logs")
os.makedirs(log_dir, exist_ok=True)
# 进度文件路径(支持中断后继续)
progress_file = os.path.join(log_dir, "progress.json")
# 加载之前的进度(如果存在)
processed_files = set()
if os.path.exists(progress_file):
try:
with open(progress_file, 'r', encoding='utf-8') as f:
progress_data = json.load(f)
processed_files = set(progress_data.get('processed', []))
print(f"从进度文件恢复,已处理 {len(processed_files)} 个文件")
except:
print("进度文件损坏,重新开始")
# 查找所有音频文件
audio_files = []
for ext in ['*.wav', '*.mp3', '*.flac', '*.ogg', '*.m4a']:
audio_files.extend(glob.glob(os.path.join(input_folder, ext)))
# 过滤掉已处理的文件
todo_files = [f for f in audio_files if f not in processed_files]
if not todo_files:
print("没有需要处理的新文件")
return
print(f"找到 {len(todo_files)} 个待处理文件")
# 带重试的处理函数
def process_with_retry(audio_file, max_retries=retry_count):
for attempt in range(max_retries + 1):
try:
result = transcribe_single_file(audio_file, server_url, language)
if result['success']:
return {'success': True, 'result': result, 'attempts': attempt + 1}
elif attempt < max_retries:
print(f"重试 {audio_file} (第{attempt + 1}次)")
time.sleep(1) # 等待1秒后重试
except Exception as e:
if attempt < max_retries:
print(f"错误重试 {audio_file}: {str(e)}")
time.sleep(1)
return {'success': False, 'error': '超过最大重试次数'}
# 处理文件并保存进度
success_count = 0
failed_count = 0
for i, audio_file in enumerate(todo_files, 1):
print(f"[{i}/{len(todo_files)}] 处理: {os.path.basename(audio_file)}")
result = process_with_retry(audio_file)
if result['success']:
# 保存结果文件(同上)
save_results(audio_file, output_folder, result['result'])
success_count += 1
else:
failed_count += 1
# 记录失败信息
log_error(audio_file, result['error'], log_dir)
# 更新进度
processed_files.add(audio_file)
save_progress(progress_file, list(processed_files))
# 每处理10个文件打印一次进度
if i % 10 == 0:
print(f"进度: {i}/{len(todo_files)} (成功: {success_count}, 失败: {failed_count})")
return success_count, failed_count
def save_results(audio_file, output_folder, result):
"""保存识别结果"""
base_name = os.path.splitext(os.path.basename(audio_file))[0]
# 保存文本
with open(os.path.join(output_folder, f"{base_name}.txt"), 'w', encoding='utf-8') as f:
f.write(result['text'])
# 保存JSON
with open(os.path.join(output_folder, f"{base_name}.json"), 'w', encoding='utf-8') as f:
json.dump(result, f, ensure_ascii=False, indent=2)
def log_error(audio_file, error, log_dir):
"""记录错误日志"""
error_log = os.path.join(log_dir, "errors.log")
with open(error_log, 'a', encoding='utf-8') as f:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
f.write(f"[{timestamp}] {audio_file}: {error}\n")
def save_progress(progress_file, processed_files):
"""保存处理进度"""
with open(progress_file, 'w', encoding='utf-8') as f:
json.dump({'processed': processed_files}, f, ensure_ascii=False, indent=2)
5. 实际应用案例:会议录音自动整理
让我们看一个真实的例子:把会议录音自动整理成文字稿。
5.1 会议录音处理脚本
import os
import re
from datetime import timedelta
def process_meeting_recording(audio_file, output_dir, server_url="http://localhost:7860"):
"""
处理会议录音,生成格式化的会议纪要
Args:
audio_file: 会议录音文件路径
output_dir: 输出目录
server_url: 语音识别服务地址
"""
print(f"开始处理会议录音: {os.path.basename(audio_file)}")
# 1. 语音识别
result = transcribe_single_file(audio_file, server_url, language="zh")
if not result['success']:
print(f"语音识别失败: {result['error']}")
return
# 2. 基础文本清理
raw_text = result['text']
cleaned_text = clean_transcription_text(raw_text)
# 3. 分割成段落(基于时间戳)
paragraphs = split_into_paragraphs(result['timestamps'])
# 4. 生成会议纪要
meeting_minutes = generate_meeting_minutes(
audio_file=audio_file,
raw_text=raw_text,
paragraphs=paragraphs,
timestamps=result['timestamps']
)
# 5. 保存各种格式
base_name = os.path.splitext(os.path.basename(audio_file))[0]
# 原始识别结果
with open(os.path.join(output_dir, f"{base_name}_raw.txt"), 'w', encoding='utf-8') as f:
f.write(raw_text)
# 清理后的文本
with open(os.path.join(output_dir, f"{base_name}_cleaned.txt"), 'w', encoding='utf-8') as f:
f.write(cleaned_text)
# 格式化的会议纪要
with open(os.path.join(output_dir, f"{base_name}_minutes.md"), 'w', encoding='utf-8') as f:
f.write(meeting_minutes)
# 段落分割结果
with open(os.path.join(output_dir, f"{base_name}_paragraphs.json"), 'w', encoding='utf-8') as f:
json.dump(paragraphs, f, ensure_ascii=False, indent=2)
print(f"会议纪要生成完成,保存到: {output_dir}")
def clean_transcription_text(text):
"""清理识别文本中的常见问题"""
# 移除多余的空白字符
text = re.sub(r'\s+', ' ', text).strip()
# 处理常见的识别错误(可以根据实际情况调整)
replacements = {
'呃': '', '嗯': '', '啊': '', # 移除语气词
'这个': '', '那个': '', # 移除口头禅
'就是': '', '然后': '', # 移除连接词
}
for old, new in replacements.items():
text = text.replace(old, new)
# 分割过长的句子
sentences = re.split(r'[。!?;]', text)
sentences = [s.strip() for s in sentences if s.strip()]
# 重新组合,每句一行
return '\n'.join(sentences)
def split_into_paragraphs(timestamps, silence_threshold=2.0):
"""
根据时间戳和静音段分割段落
Args:
timestamps: 单词时间戳列表
silence_threshold: 静音阈值(秒),超过这个时间就分段落
"""
if not timestamps:
return []
paragraphs = []
current_para = []
current_start = timestamps[0]['start']
for i in range(len(timestamps)):
current_word = timestamps[i]
current_para.append(current_word['word'])
# 检查是否需要分段
if i < len(timestamps) - 1:
next_word = timestamps[i + 1]
silence_gap = next_word['start'] - current_word['end']
if silence_gap > silence_threshold:
# 当前段落结束
paragraph_text = ''.join(current_para)
paragraphs.append({
'text': paragraph_text,
'start': current_start,
'end': current_word['end'],
'duration': current_word['end'] - current_start
})
# 开始新段落
current_para = []
current_start = next_word['start']
# 添加最后一个段落
if current_para:
paragraph_text = ''.join(current_para)
paragraphs.append({
'text': paragraph_text,
'start': current_start,
'end': timestamps[-1]['end'],
'duration': timestamps[-1]['end'] - current_start
})
return paragraphs
def generate_meeting_minutes(audio_file, raw_text, paragraphs, timestamps):
"""生成格式化的会议纪要"""
from datetime import datetime
# 基本信息
file_name = os.path.basename(audio_file)
file_size = os.path.getsize(audio_file) / (1024 * 1024) # MB
total_duration = timestamps[-1]['end'] if timestamps else 0
word_count = len(raw_text)
# 构建Markdown格式的会议纪要
minutes = f"""# 会议纪要
## 基本信息
- **文件名称**: {file_name}
- **文件大小**: {file_size:.2f} MB
- **音频时长**: {format_timestamp(total_duration)}
- **识别字数**: {word_count} 字
- **生成时间**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
- **处理工具**: Qwen3-ASR-0.6B 语音识别
## 内容摘要
{generate_summary(raw_text)}
## 完整转录
{format_transcription(paragraphs)}
## 关键信息提取
{extract_key_information(raw_text)}
## 行动项
{extract_action_items(raw_text)}
"""
return minutes
def format_timestamp(seconds):
"""格式化时间戳为 HH:MM:SS"""
return str(timedelta(seconds=int(seconds)))
def generate_summary(text, max_sentences=3):
"""生成简要摘要(这里简化处理,实际可以使用文本摘要模型)"""
sentences = [s for s in re.split(r'[。!?]', text) if s.strip()]
if len(sentences) <= max_sentences:
return '\n'.join([f"- {s}" for s in sentences])
else:
return '\n'.join([f"- {s}" for s in sentences[:max_sentences]]) + "\n- ..."
def format_transcription(paragraphs):
"""格式化转录文本"""
formatted = []
for i, para in enumerate(paragraphs, 1):
time_str = f"{format_timestamp(para['start'])} - {format_timestamp(para['end'])}"
formatted.append(f"### 段落 {i} [{time_str}]\n{para['text']}\n")
return '\n'.join(formatted)
def extract_key_information(text):
"""提取关键信息(这里简化处理)"""
# 实际应用中可以使用关键词提取或NER模型
keywords = ['项目', ' deadline', '预算', '需求', '方案', '问题', '解决']
found = []
for kw in keywords:
if kw in text:
found.append(kw)
if found:
return "涉及关键词: " + ", ".join(found)
return "未检测到明显的关键词"
def extract_action_items(text):
"""提取行动项(这里简化处理)"""
# 实际应用中可以使用更复杂的规则或模型
action_patterns = [
r'需要(.{5,20}?)完成',
r'请(.{5,20}?)处理',
r'下一步(.{5,20}?)',
r'安排(.{5,20}?)'
]
actions = []
for pattern in action_patterns:
matches = re.findall(pattern, text)
actions.extend(matches)
if actions:
return '\n'.join([f"- {action}" for action in set(actions)])
return "未检测到明确的行动项"
# 使用示例
if __name__ == "__main__":
# 处理单个会议录音
process_meeting_recording(
audio_file="meeting_20240515.wav",
output_dir="./meeting_minutes",
server_url="http://localhost:7860"
)
# 或者批量处理会议录音
meeting_files = glob.glob("/recordings/meetings/*.wav")
for meeting_file in meeting_files:
process_meeting_recording(meeting_file, "./meeting_minutes")
5.2 自动化工作流
你可以把这个脚本放到定时任务里,自动处理每天的会议录音:
import schedule
import time
def daily_meeting_processing():
"""每天自动处理会议录音"""
# 1. 扫描指定目录的新录音文件
recording_dir = "/daily_recordings"
today = datetime.now().strftime("%Y%m%d")
output_dir = f"./meeting_minutes/{today}"
# 2. 处理所有新文件
audio_files = glob.glob(os.path.join(recording_dir, "*.wav")) + \
glob.glob(os.path.join(recording_dir, "*.mp3"))
for audio_file in audio_files:
process_meeting_recording(audio_file, output_dir)
# 3. 发送通知(可选)
send_notification(f"已处理 {len(audio_files)} 个会议录音")
print(f"{datetime.now()}: 完成每日会议录音处理")
# 设置每天下午6点执行
schedule.every().day.at("18:00").do(daily_meeting_processing)
print("会议录音自动处理服务已启动...")
while True:
schedule.run_pending()
time.sleep(60)
6. 性能优化与错误处理
6.1 调整并发数
批量处理时,并发数不是越大越好。要根据你的GPU性能来调整:
def find_optimal_workers(server_url, test_file, max_test=8):
"""
测试找到最佳并发数
"""
results = []
for workers in range(1, max_test + 1):
print(f"测试 {workers} 个并发...")
start_time = time.time()
# 同时处理多个测试文件
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(transcribe_single_file, test_file, server_url)
for _ in range(10)] # 测试10个任务
# 等待所有完成
for future in as_completed(futures):
future.result()
duration = time.time() - start_time
avg_time = duration / 10
results.append({
'workers': workers,
'total_time': duration,
'avg_time': avg_time,
'throughput': 10 / duration
})
print(f" 平均每个文件: {avg_time:.2f}秒, 吞吐量: {10/duration:.2f} 文件/秒")
# 找到最佳并发数
best = max(results, key=lambda x: x['throughput'])
print(f"\n最佳并发数: {best['workers']}")
print(f"最佳吞吐量: {best['throughput']:.2f} 文件/秒")
return best['workers']
6.2 错误处理与重试
网络请求可能失败,需要完善的错误处理:
def robust_transcribe(audio_file, server_url, max_retries=3, timeout=30):
"""
带重试机制的语音识别
"""
for attempt in range(max_retries):
try:
result = transcribe_single_file(audio_file, server_url)
if result['success']:
return result
elif '网络' in result.get('error', '') and attempt < max_retries - 1:
# 网络错误,等待后重试
wait_time = 2 ** attempt # 指数退避
print(f"网络错误,{wait_time}秒后重试...")
time.sleep(wait_time)
continue
else:
# 其他错误,直接返回
return result
except Exception as e:
if attempt < max_retries - 1:
wait_time = 2 ** attempt
print(f"异常 {str(e)},{wait_time}秒后重试...")
time.sleep(wait_time)
else:
return {
'success': False,
'error': f"重试{max_retries}次后失败: {str(e)}"
}
return {'success': False, 'error': '超过最大重试次数'}
6.3 内存和性能监控
处理大量文件时,监控资源使用:
import psutil
import threading
class ResourceMonitor:
"""资源使用监控器"""
def __init__(self, interval=5):
self.interval = interval
self.monitoring = False
self.stats = []
def start(self):
"""开始监控"""
self.monitoring = True
self.thread = threading.Thread(target=self._monitor_loop)
self.thread.daemon = True
self.thread.start()
def stop(self):
"""停止监控"""
self.monitoring = False
if self.thread:
self.thread.join(timeout=2)
def _monitor_loop(self):
"""监控循环"""
while self.monitoring:
# 获取CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 获取内存使用
memory = psutil.virtual_memory()
# 获取GPU使用(如果可用)
gpu_info = self._get_gpu_info()
self.stats.append({
'timestamp': datetime.now().isoformat(),
'cpu_percent': cpu_percent,
'memory_percent': memory.percent,
'memory_used_gb': memory.used / (1024**3),
'gpu_info': gpu_info
})
time.sleep(self.interval)
def _get_gpu_info(self):
"""获取GPU信息(需要nvidia-smi)"""
try:
import subprocess
result = subprocess.run(
['nvidia-smi', '--query-gpu=utilization.gpu,memory.used,memory.total',
'--format=csv,noheader,nounits'],
capture_output=True,
text=True
)
if result.returncode == 0:
gpu_data = result.stdout.strip().split(', ')
return {
'gpu_utilization': float(gpu_data[0]),
'memory_used': float(gpu_data[1]),
'memory_total': float(gpu_data[2])
}
except:
pass
return None
def print_summary(self):
"""打印监控摘要"""
if not self.stats:
print("没有监控数据")
return
print("\n资源使用统计:")
print(f"监控时长: {len(self.stats) * self.interval} 秒")
print(f"平均CPU使用率: {sum(s['cpu_percent'] for s in self.stats)/len(self.stats):.1f}%")
print(f"平均内存使用率: {sum(s['memory_percent'] for s in self.stats)/len(self.stats):.1f}%")
if self.stats[0]['gpu_info']:
avg_gpu = sum(s['gpu_info']['gpu_utilization'] for s in self.stats if s['gpu_info'])/len(self.stats)
print(f"平均GPU使用率: {avg_gpu:.1f}%")
# 使用监控器
monitor = ResourceMonitor(interval=10)
monitor.start()
# 执行批量处理
batch_transcribe_folder(...)
monitor.stop()
monitor.print_summary()
7. 总结:从单文件到批量处理的完整方案
通过这篇教程,你应该已经掌握了Qwen3-ASR-0.6B Python API的完整使用方法。让我们回顾一下关键点:
7.1 核心要点总结
- API调用很简单:本质上就是一个HTTP POST请求,上传音频文件,获取识别结果
- 批量处理是核心价值:用Python脚本可以自动化处理大量文件,节省大量时间
- 错误处理很重要:网络问题、服务问题都可能发生,要有重试机制
- 并发可以提高效率:但要根据GPU性能调整,不是越多越好
- 结果可以多种格式保存:纯文本、带时间戳的JSON、CSV等,根据后续用途选择
7.2 实用建议
- 开始之前:先测试单个文件,确保服务正常,API调用成功
- 处理大量文件时:使用进度保存,支持中断后继续
- 性能调优:先用小批量测试找到最佳并发数
- 结果验证:定期抽查识别结果,确保质量
- 日志记录:记录处理过程中的所有错误,方便排查问题
7.3 扩展思路
掌握了基础用法后,你还可以考虑:
- 集成到Web应用:用Flask或FastAPI包装成Web服务
- 实时语音识别:结合WebSocket实现实时转录
- 多语言混合识别:处理包含多种语言的音频
- 结合其他AI服务:识别后自动翻译、摘要、分类等
Qwen3-ASR-0.6B的API虽然简单,但功能强大。通过Python脚本,你可以把它集成到各种工作流中,真正实现语音处理的自动化。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐


所有评论(0)