保姆级教程:用Python的m3u8库搞定B站直播回放下载(附完整代码)
·
Python实战:B站直播回放高效下载与自动化处理指南
直播内容转瞬即逝,但知识沉淀需要持久。当技术分享、学术讲座或你钟爱的主播直播结束后,如何将那些精彩的直播回放保存下来?本教程将带你深入探索Python在B站直播回放下载领域的实战应用,从原理分析到代码实现,构建一套完整的自动化解决方案。
1. 直播回放技术原理与准备工作
1.1 M3U8流媒体协议解析
M3U8作为HTTP Live Streaming(HLS)协议的核心,其工作原理值得深入理解:
- 基础结构 :M3U8文件实质是一个文本格式的播放列表,包含多个TS(Transport Stream)视频片段索引
- 关键标签 :
#EXT-X-VERSION指定协议版本#EXT-X-TARGETDURATION定义每个TS片段最大时长#EXTINF标记片段具体时长#EXT-X-ENDLIST表示视频流结束
# 典型M3U8文件示例
"""
#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:10
#EXTINF:9.009,
http://example.com/segment1.ts
#EXTINF:9.009,
http://example.com/segment2.ts
#EXT-X-ENDLIST
"""
1.2 环境配置与依赖安装
确保你的Python环境(建议3.7+)已安装以下关键库:
pip install m3u8 requests tqdm pycryptodome
注:pycryptodome仅在处理加密流时需要使用
1.3 B站直播回放特性分析
与实时直播相比,回放下载面临独特挑战:
| 特性对比 | 实时直播 | 直播回放 |
|---|---|---|
| 链接有效期 | 短时效(分钟级) | 较长(通常数天) |
| 片段组织方式 | 动态追加新片段 | 完整固定片段列表 |
| 加密情况 | 较少加密 | 可能采用AES-128加密 |
| 访问限制 | 需要维持心跳 | 可能需要特殊权限 |
2. 回放链接获取与解析实战
2.1 从网页端提取回放资源
通过浏览器开发者工具分析页面请求,定位关键接口:
- 打开目标回放页面
- 按F12进入开发者工具
- 切换到Network面板,过滤XHR请求
- 查找包含"playurl"或"m3u8"的请求
def extract_replay_id(url):
"""从回放URL中提取关键ID"""
pattern = r"live.bilibili.com/(\d+)"
match = re.search(pattern, url)
if not match:
raise ValueError("无效的B站回放URL")
return match.group(1)
2.2 通过API获取M3U8主清单
B站的后端API通常需要构造特定参数:
def get_m3u8_playlist(room_id):
api_url = "https://api.live.bilibili.com/xlive/web-room/v2/index/getRoomPlayInfo"
params = {
"room_id": room_id,
"protocol": "0,1",
"format": "0,1,2",
"codec": "0,1",
"qn": 10000,
"platform": "web",
"ptype": 8
}
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Referer": f"https://live.bilibili.com/{room_id}"
}
response = requests.get(api_url, params=params, headers=headers)
data = response.json()
# 提取最高清版本的m3u8地址
playurl_info = data['data']['playurl_info']['playurl']['stream'][0]['format'][0]['codec'][0]
return playurl_info['url_info'][0]['host'] + playurl_info['base_url']
2.3 处理多层嵌套M3U8结构
B站的流媒体常采用分级索引策略:
- 主M3U8包含不同码率的子列表
- 子M3U8包含实际TS片段信息
- 可能需要处理CDN切换逻辑
def resolve_m3u8_hierarchy(master_url):
master_playlist = m3u8.load(master_url)
if not master_playlist.playlists:
return master_url # 已经是最终层级
# 选择最高码率的版本
selected = max(
master_playlist.playlists,
key=lambda p: p.stream_info.bandwidth
)
return selected.absolute_uri
3. 高效下载与合并策略
3.1 多线程分段下载优化
传统单线程下载效率低下,采用线程池可大幅提升速度:
from concurrent.futures import ThreadPoolExecutor, as_completed
def download_segment(segment_url, save_path, headers=None):
try:
response = requests.get(segment_url, headers=headers, stream=True)
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
return True
except Exception as e:
print(f"下载失败 {segment_url}: {str(e)}")
return False
def parallel_download(segments, max_workers=8):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = []
for i, seg in enumerate(segments):
save_path = f"temp/segment_{i}.ts"
futures.append(executor.submit(
download_segment,
seg.absolute_uri,
save_path
))
for future in as_completed(futures):
if not future.result():
print("部分片段下载失败,可能影响最终视频完整性")
3.2 进度监控与断点续传
实现健壮的下载过程管理:
class DownloadProgress:
def __init__(self, total):
self.total = total
self.downloaded = 0
self.lock = threading.Lock()
def update(self, size):
with self.lock:
self.downloaded += size
percent = (self.downloaded / self.total) * 100
print(f"\r进度: {percent:.1f}% | {self.downloaded}/{self.total} bytes", end='')
def resume_download(playlist, progress_file="progress.json"):
if os.path.exists(progress_file):
with open(progress_file, 'r') as f:
progress = json.load(f)
last_segment = progress['last_segment']
else:
last_segment = -1
segments = playlist.segments
total_size = sum(get_remote_size(s.absolute_uri) for s in segments)
progress = DownloadProgress(total_size)
for i, seg in enumerate(segments):
if i <= last_segment:
continue
download_segment(seg.absolute_uri, f"temp/seg_{i}.ts")
with open(progress_file, 'w') as f:
json.dump({'last_segment': i}, f)
progress.update(get_local_size(f"temp/seg_{i}.ts"))
3.3 TS片段合并与格式转换
使用FFmpeg进行高效合并(需提前安装FFmpeg):
def merge_ts_files(output_path="output.mp4"):
# 生成文件列表
with open("file_list.txt", 'w') as f:
for file in sorted(glob.glob("temp/segment_*.ts")):
f.write(f"file '{os.path.abspath(file)}'\n")
# 调用FFmpeg合并
subprocess.run([
"ffmpeg",
"-f", "concat",
"-safe", "0",
"-i", "file_list.txt",
"-c", "copy",
output_path
], check=True)
print(f"视频已成功合并至 {output_path}")
4. 高级功能与异常处理
4.1 解密AES-128加密流
当遇到加密流时的处理方案:
from Crypto.Cipher import AES
def decrypt_ts_file(encrypted_path, key, iv=None):
with open(encrypted_path, 'rb') as f:
encrypted_data = f.read()
cipher = AES.new(key, AES.MODE_CBC, iv=iv) if iv else AES.new(key, AES.MODE_CBC)
decrypted_data = cipher.decrypt(encrypted_data)
return decrypted_data
def handle_encrypted_stream(playlist):
if not playlist.keys or not playlist.keys[0]:
return False
key_uri = playlist.keys[0].uri
iv = playlist.keys[0].iv
# 下载解密密钥
key_response = requests.get(key_uri)
key = key_response.content
# 解密所有片段
for i, seg in enumerate(playlist.segments):
encrypted_path = f"temp/seg_{i}.ts"
decrypted_path = f"decrypted/seg_{i}.ts"
decrypted_data = decrypt_ts_file(encrypted_path, key, iv)
with open(decrypted_path, 'wb') as f:
f.write(decrypted_data)
return True
4.2 常见错误处理策略
构建健壮的错误处理机制:
ERROR_MAP = {
403: "访问被拒绝,请检查Cookie设置",
404: "资源不存在,可能直播回放已删除",
412: "参数校验失败,请更新请求参数",
503: "服务器暂时不可用,建议稍后重试"
}
def robust_request(url, max_retries=3, timeout=10):
for attempt in range(max_retries):
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
return response
except requests.exceptions.RequestException as e:
if isinstance(e, requests.exceptions.HTTPError):
print(f"HTTP错误 {e.response.status_code}: {ERROR_MAP.get(e.response.status_code, '未知错误')}")
if e.response.status_code in [403, 404]:
break # 无需重试的致命错误
print(f"请求失败 (尝试 {attempt + 1}/{max_retries}): {str(e)}")
time.sleep(2 ** attempt) # 指数退避
return None
4.3 自动化元数据提取
从回放中提取有用信息:
def extract_metadata(room_id):
api_url = f"https://api.live.bilibili.com/xlive/web-room/v1/index/getInfoByRoom"
params = {"room_id": room_id}
response = requests.get(api_url, params=params)
data = response.json()
return {
"title": data["data"]["room_info"]["title"],
"host": data["data"]["anchor_info"]["base_info"]["uname"],
"start_time": datetime.fromtimestamp(data["data"]["room_info"]["live_start_time"]),
"cover_url": data["data"]["room_info"]["cover"]
}
5. 完整实现与使用示例
5.1 配置化下载管理器
创建可配置的下载器类:
class BiliReplayDownloader:
def __init__(self, config=None):
self.config = config or {
"max_workers": 8,
"temp_dir": "temp",
"output_dir": "output",
"retry_times": 3,
"timeout": 30
}
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)",
"Referer": "https://live.bilibili.com/"
})
os.makedirs(self.config["temp_dir"], exist_ok=True)
os.makedirs(self.config["output_dir"], exist_ok=True)
def clean_temp_files(self):
for file in glob.glob(f"{self.config['temp_dir']}/*"):
os.remove(file)
5.2 主流程集成
将各模块组合成完整流程:
def main(replay_url):
downloader = BiliReplayDownloader()
try:
# 步骤1:提取房间ID
room_id = extract_replay_id(replay_url)
print(f"提取到房间ID: {room_id}")
# 步骤2:获取回放元数据
metadata = extract_metadata(room_id)
print(f"开始下载: {metadata['title']} by {metadata['host']}")
# 步骤3:获取M3U8播放列表
master_url = get_m3u8_playlist(room_id)
final_url = resolve_m3u8_hierarchy(master_url)
# 步骤4:解析TS片段
playlist = m3u8.load(final_url)
# 步骤5:处理加密流
if playlist.keys and playlist.keys[0]:
print("检测到加密流,正在处理解密...")
handle_encrypted_stream(playlist)
# 步骤6:并行下载
print(f"开始下载 {len(playlist.segments)} 个片段...")
parallel_download(playlist.segments)
# 步骤7:合并文件
output_filename = f"{metadata['host']}_{metadata['title'][:20]}.mp4"
merge_ts_files(os.path.join(downloader.config["output_dir"], output_filename))
print("下载完成!")
finally:
downloader.clean_temp_files()
if __name__ == "__main__":
replay_url = input("请输入B站直播回放URL: ")
main(replay_url)
5.3 使用建议与最佳实践
- 定时执行 :对于系列直播,可设置定时任务自动检测新回放
- 质量选择 :通过修改API参数中的
qn值获取不同画质 - 存储管理 :定期清理临时文件,对大视频考虑分片存储
- 合规使用 :仅下载个人有权访问的内容,尊重版权
# 示例:设置每天自动检查新回放
import schedule
import time
def check_new_replays():
# 实现你的检查逻辑
pass
schedule.every().day.at("10:00").do(check_new_replays)
while True:
schedule.run_pending()
time.sleep(60)
更多推荐


所有评论(0)