PyAV用于视频剪辑处理
Published in:2025-08-14 |
Words: 4k | Reading time: 18min | reading:

PyAV用于视频剪辑处理

PyAV是FFmpeg库的Pythonic绑定。它允许你直接在Python代码中,以一种极其精细和高效的方式访问FFmpeg的内部功能。这意味着:

  • 内存操作:直接在内存中处理视频帧和音频样本,无需创建大量临时文件。
  • 精细控制:你可以完全控制容器的解复用、数据包的解码、帧的处理和编码的每一个环节。
  • 硬件加速:可以利用GPU(如macOS的VideoToolbox, Linux/Windows的CUDA/NVENC)来极大地加速解码和编码过程。
  • 高度集成:可以无缝地将视频帧与NumPy、Pillow等流行的Python库结合使用。

PyAV的核心概念

在深入代码之前,我们先了解几个核心概念,它们直接映射自FFmpeg:

  1. 容器 (Container): 指的是视频文件本身,比如一个.mp4.mkv文件。它是一个“容器”,里面装着各种数据流。
  2. 流 (Stream): 指的是容器内的数据轨道。一个视频文件通常至少包含一个视频流和一个音频流,有时还会有字幕流。
  3. 数据包 (Packet): 从流中读取的一小块压缩后的数据。
  4. 帧 (Frame): 一个数据包经过解码后得到的数据。对于视频流,它是一张图片;对于音频流,它是一段声音样本。

标准的处理流程是:打开容器 → 找到需要的流 → 从流中解复用(demux)数据包 → 解码(decode)数据包得到帧 → (处理帧) → 编码(encode)处理后的帧变回数据包 → 将数据包混合(mux)到新的输出容器 → 关闭容器

简单安装

1
2
pip install av
conda install av -c conda-forge

功能

  • libavformat: containers, audio/video/subtitle streams, packets;

  • libavdevice (by specifying a format to containers);

  • libavcodec: Codec, CodecContext, BitStreamFilterContext, audio/video frames, data planes, subtitles;

  • libavfilter: Filter, Graph;

  • libswscale: VideoReformatter;

  • libswresample: AudioResampler;

simple demo

1
2
3
4
5
6
7
import av

av.logging.set_level(av.logging.VERBOSE)
container = av.open(path_to_video)

for index, frame in enumerate(container.decode(video=0)):
frame.to_image().save(f"frame-{index:04d}.jpg")
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# --- 调度器函数 ---
def crop_video_pyav(input_video_path, output_video_path, ...):
"""
根据环境自动选择最佳方式裁剪视频。
"""
# 检查硬件加速是否受支持
if HW_ACCEL_SUPPORTED:
try:
# 优先尝试硬件加速
logger.info("检测到硬件加速支持,尝试 Metal 路径...")
success = _crop_video_pyav_metal(...)
if not success:
# 如果硬件路径明确返回失败,则降级
logger.warning("硬件加速路径执行失败,自动降级到纯软件模式。")
success = _crop_video_software(...)
return success
except Exception as e:
# 如果硬件路径意外崩溃,则降级
logger.error(f"硬件加速路径执行时发生意外异常: {e}")
logger.warning("自动降级到纯软件模式。")
return _crop_video_software(...)
else:
# 不支持硬件加速,直接走软件路径
logger.info("未检测到硬件加速支持,使用纯软件路径。")
return _crop_video_software(...)

接下来,我们将深入分析这两种核心实现。

Part 1: 健壮的基石 —— 纯软件 (CPU) 实现

这是我们后备方案的核心,也是理解PyAV基础操作的最佳入口。它不依赖任何特殊的硬件,可以在任何平台上运行。

核心步骤

  1. 参数准备:
    最关键的一步是确保输出的分辨率永远是偶数。像 libx264 这样的H.264编码器无法处理宽度或高度为奇数的视频。

    1
    2
    final_w = w - (w % 2)
    final_h = h - (h % 2)
  2. 打开输入/输出容器:
    使用 av.open(),就像Python内置的 open() 一样简单,但它可以同时用于读和写。

    1
    2
    3
    with av.open(input_video_path, mode='r') as in_container:
    with av.open(output_video_path, mode='w') as out_container:
    # ...
  3. 创建输出流:
    最简单的方式是从输入流创建模板。这会自动复制编解码器、码率等大部分参数。然后我们再手动覆盖需要修改的参数,比如新的宽度和高度。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # 查找所有输入流
    in_video_stream = in_container.streams.video[0]
    in_audio_stream = in_container.streams.audio[0]

    # 从模板创建输出流
    out_video_stream = out_container.add_stream_from_template(in_video_stream)
    out_audio_stream = out_container.add_stream_from_template(in_audio_stream)

    # 覆盖视频流的尺寸
    out_video_stream.width = final_w
    out_video_stream.height = final_h
  4. 核心处理循环:
    这是最精彩的部分,完美体现了PyAV与Pillow的结合。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    for packet in in_container.demux(streams_to_demux):
    if packet.stream.type == 'video':
    for frame in packet.decode():
    # a. 解码帧并转换为Pillow Image对象
    img = frame.to_image()

    # b. 使用Pillow的强大功能进行图像处理
    cropped_img = img.crop((x, y, x + w, y + h))
    if needs_scaling:
    cropped_img = cropped_img.resize((final_w, final_h))

    # c. 将处理后的Pillow Image转换回PyAV的VideoFrame
    new_frame = av.VideoFrame.from_image(cropped_img)

    # d. 编码新帧并混合到输出文件
    for out_packet in out_video_stream.encode(new_frame):
    out_container.mux(out_packet)

    elif packet.stream.type == 'audio':
    # 音频直通:不解码,直接复制数据包,效率最高
    packet.stream = out_audio_stream
    out_container.mux(packet)
  5. 冲洗 (Flush) 编码器:
    处理完所有帧后,编码器的内部可能还缓存着一些数据。我们通过发送一个 None 来告诉它结束编码,并清空所有缓冲区。

    1
    2
    for out_packet in out_video_stream.encode(None):
    out_container.mux(out_packet)

Part 2: 极致性能 —— Metal硬件加速 (GPU) 实现

这是脚本的“高性能模式”,专门为macOS设计。它尽可能地将所有操作都留在GPU上,避免了昂贵的CPU<->GPU数据拷贝。

核心步骤

  1. 创建硬件设备上下文:
    这是开启硬件加速的第一步,我们告诉PyAV我们要使用苹果的 videotoolbox 框架。

    1
    hw_device = av.hwdevice.Device("videotoolbox")
  2. 配置硬件解码:
    这是现代PyAV中配置硬件解码的关键。我们不是手动设置上下文,而是为输入流的 codec_context 提供一个 get_format 回调函数。当解码器准备好时,它会调用这个函数并提供一个它支持的像素格式列表。我们的函数只需从中选择 'videotoolbox' 格式即可。这告诉解码器:“请直接将帧解码到GPU显存中,不要下载到CPU内存。”

    1
    2
    3
    4
    5
    6
    7
    8
    hw_pix_fmt = 'videotoolbox'
    def get_hw_format(formats):
    for fmt in formats:
    if fmt.name == hw_pix_fmt:
    return fmt
    raise av.EncoderNotFoundError("未找到 'videotoolbox' 硬件格式。")

    in_stream.codec_context.get_format = get_hw_format
  3. 构建硬件滤镜图 (Filter Graph):
    裁剪和缩放是通过FFmpeg的滤镜系统完成的。PyAV允许我们用代码构建这个处理链。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    graph = av.filter.Graph()
    buffer_src = graph.add_buffer(template=in_stream) # 输入源
    buffer_sink = graph.add("buffersink") # 输出汇

    filter_chain = f"crop={...},scale={...}"

    # 链接:源 -> 滤镜链 -> 汇
    filters = graph.add_filter(filter_chain, "filters")
    buffer_src.link_to(filters)
    filters.link_to(buffer_sink)

    graph.configure()

    这里最巧妙的是,即使 cropscale 滤镜是纯CPU操作,FFmpeg也会在后台自动处理硬件帧的下载(GPU->CPU)、应用滤镜、再上传(CPU->GPU)的过程。

  4. 配置硬件编码:
    我们创建一个使用 h264_videotoolbox 编码器的输出流。它会自动接收来自滤镜图的、已经处理好的帧(这些帧可能在CPU上,也可能在GPU上,编码器会自动处理)。

    1
    2
    3
    4
    out_stream = out_container.add_stream("h264_videotoolbox", rate=rate)
    out_stream.width = final_w
    out_stream.height = final_h
    out_stream.pix_fmt = "nv12" # VideoToolbox 编码器偏好的像素格式
  5. 硬件处理循环:
    循环的主体结构类似,但操作对象变成了滤镜图。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    for frame in packet.decode(): # 解码出的 frame 是一个指向GPU显存的硬件帧
    graph.push(frame) # 将硬件帧推入滤镜图处理
    while True:
    try:
    filtered_frame = buffer_sink.pull() # 从滤镜图拉取处理好的帧
    for out_packet in out_stream.encode(filtered_frame):
    output_container.mux(out_packet)
    except (av.error.EOFError, av.error.BlockingIOError):
    break

实际处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339

# 检查当前 PyAV 环境是否支持硬件加速
HW_ACCEL_SUPPORTED = hasattr(av, 'hwdevice')

if HW_ACCEL_SUPPORTED:
# 进一步检查平台是否为 macOS
if platform.system() == 'Darwin':
logger.info("硬件加速 (VideoToolbox) 可用。将优先使用 Metal 路径。")
else:
HW_ACCEL_SUPPORTED = False
logger.warning("PyAV 支持硬件加速,但当前系统不是 macOS。VideoToolbox 不可用。")
logger.warning("所有视频处理将回退到纯软件模式。")
else:
logger.warning("当前 PyAV 环境中未找到硬件加速模块 ('av.hwdevice')。")
logger.warning("所有视频处理将回退到纯软件模式,速度会较慢。")
logger.warning("要解决此问题,请确保您的项目解释器配置正确,并使用了完整编译的 PyAV 库。")


# --- 2. 主调度器函数 ---
# 您的外部代码应该只调用这个函数
@logger.catch
def crop_video_pyav(
input_video_path: str, output_video_path: str,
crop_x: float, crop_y: float, crop_w: float, crop_h: float,
log_queue=None, # log_queue 和其他参数保留以兼容您的接口
min_short_side_output_px: Optional[int] = None,
assigned_gpu_id: Optional[int] = None,
**kwargs # 捕获任何其他未使用的参数
) -> bool:
"""
根据环境自动选择最佳方式裁剪视频。

如果支持硬件加速,则尝试使用Metal路径;否则,或Metal路径失败时,
自动回退到纯软件路径。
"""
crop_params = {
'x': crop_x, 'y': crop_y, 'w': crop_w, 'h': crop_h
}

if HW_ACCEL_SUPPORTED:
try:
# 优先尝试硬件加速路径
logger.info(f"检测到硬件加速支持,尝试 Metal 路径...")
success = _crop_video_pyav_metal(
input_video_path, output_video_path, crop_params,
min_short_side_output_px
)
if not success:
logger.warning("硬件加速路径执行失败,自动降级到纯软件模式。")
success = _crop_video_software(
input_video_path, output_video_path, crop_params,
min_short_side_output_px
)
return success
except Exception as e:
logger.opt(exception=True).error(f"硬件加速路径执行时发生意外异常: {e}")
logger.warning("自动降级到纯软件模式。")
return _crop_video_software(
input_video_path, output_video_path, crop_params,
min_short_side_output_px
)
else:
# 如果一开始就不支持硬件加速,直接走软件路径
logger.info(f"未检测到硬件加速支持,使用纯软件路径。")
return _crop_video_software(
input_video_path, output_video_path, crop_params,
min_short_side_output_px
)


# --- 3. 硬件加速实现 (内部函数) ---
@logger.catch
def _crop_video_pyav_metal(
input_video_path: str, output_video_path: str,
crop_rect: Dict,
min_short_side_output_px: Optional[int] = None,
) -> bool:
"""
使用 PyAV 和 VideoToolbox (Metal) 进行硬件加速的视频裁剪。
这是一个内部函数,假设硬件支持已确认。
"""
logger.info(f"Metal路径: 开始处理 {os.path.basename(input_video_path)}")

# 参数准备
crop_w_f = int(crop_rect['w']) - (int(crop_rect['w']) % 2)
crop_h_f = int(crop_rect['h']) - (int(crop_rect['h']) % 2)
crop_x_f = int(crop_rect['x']) - (int(crop_rect['x']) % 2)
crop_y_f = int(crop_rect['y']) - (int(crop_rect['y']) % 2)
if crop_w_f <= 0 or crop_h_f <= 0: return False

final_w, final_h = crop_w_f, crop_h_f
needs_scaling = False
if min_short_side_output_px and min(crop_w_f, crop_h_f) < min_short_side_output_px:
needs_scaling = True
scale_factor = min_short_side_output_px / min(crop_w_f, crop_h_f)
final_w = math.ceil((crop_w_f * scale_factor) / 2) * 2
final_h = math.ceil((crop_h_f * scale_factor) / 2) * 2

input_container = None
output_container = None

try:
# --- 硬件设备与容器 ---
logger.debug("Metal路径: 创建 VideoToolbox 上下文。")
hw_device = av.hwdevice.Device("videotoolbox")

input_container = av.open(input_video_path, mode='r')
output_container = av.open(output_video_path, mode='w')
in_stream = input_container.streams.video[0]
in_stream.thread_type = "AUTO"

# --- 配置硬件解码 (现代方式) ---
logger.debug("Metal路径: 配置硬件解码。")
hw_pix_fmt = 'videotoolbox'

def get_hw_format(formats):
for fmt in formats:
if fmt.name == hw_pix_fmt:
return fmt
raise av.EncoderNotFoundError(f"未找到 '{hw_pix_fmt}' 硬件格式。")

in_stream.codec_context.get_format = get_hw_format

# --- 配置滤镜图 ---
# FFmpeg 会在后台自动插入 hwupload/hwdownload 滤镜
logger.debug("Metal路径: 配置滤镜图。")
graph = av.filter.Graph()
buffer_src = graph.add_buffer(template=in_stream)

filter_chain = f"crop={crop_w_f}:{crop_h_f}:{crop_x_f}:{crop_y_f}"
if needs_scaling:
filter_chain += f",scale={final_w}:{final_h}"

# 滤镜图的终点是 buffer_sink
buffer_sink = graph.add("buffersink")

# 链接滤镜
buffer_src.link_to(graph.add_filter(filter_chain, "filters"))
graph.get_filter("filters").link_to(buffer_sink)

graph.configure()
logger.info(f"Metal路径: 滤镜图配置成功: '{filter_chain}'")

# --- 配置硬件编码 ---
logger.debug("Metal路径: 配置硬件编码。")
rate = in_stream.base_rate or in_stream.guessed_rate or in_stream.average_rate
out_stream = output_container.add_stream("h264_videotoolbox", rate=rate)
out_stream.width = final_w
out_stream.height = final_h
out_stream.pix_fmt = "nv12" # VideoToolbox 编码器通常使用 nv12
out_stream.time_base = in_stream.time_base

# --- 音频流处理 (直通) ---
in_audio_stream = next((s for s in input_container.streams if s.type == 'audio'), None)
if in_audio_stream:
out_audio_stream = output_container.add_stream('aac', template=in_audio_stream)
streams_to_demux = (in_stream, in_audio_stream)
else:
out_audio_stream = None
streams_to_demux = in_stream

# --- 核心处理循环 ---
for packet in input_container.demux(streams_to_demux):
if packet.dts is None: continue

if packet.stream.type == 'video':
for frame in packet.decode():
graph.push(frame)
while True:
try:
filtered_frame = buffer_sink.pull()
for out_packet in out_stream.encode(filtered_frame):
output_container.mux(out_packet)
except (av.error.EOFError, av.error.BlockingIOError):
break
elif out_audio_stream and packet.stream.type == 'audio':
packet.stream = out_audio_stream
output_container.mux(packet)

# --- 冲洗(Flush) ---
logger.debug("Metal路径: 冲洗滤镜和编码器。")
graph.push(None)
while True:
try:
filtered_frame = buffer_sink.pull()
for out_packet in out_stream.encode(filtered_frame):
output_container.mux(out_packet)
except (av.error.EOFError, av.error.BlockingIOError):
break

for out_packet in out_stream.encode(None):
output_container.mux(out_packet)

# 正常关闭
output_container.close()
input_container.close()

logger.info(f"Metal路径: 视频处理成功 -> {os.path.basename(output_video_path)}")
return True

except Exception as e:
logger.opt(exception=True).error(f"Metal路径处理时发生致命错误: {e}")
# 清理
if output_container:
output_container.close()
if input_container:
input_container.close()
if os.path.exists(output_video_path):
try:
os.remove(output_video_path)
except OSError:
pass
return False


# --- 4. 纯软件备用方案 (内部函数) ---
@logger.catch
def _crop_video_software(
input_video_path: str, output_video_path: str,
crop_rect: Dict,
min_short_side_output_px: Optional[int] = None,
) -> bool:
"""
使用纯软件 (CPU) 进行视频裁剪的备用方法。
这个方法总是可用的,但速度比硬件加速慢。
它确保输出尺寸为偶数,以兼容H.264等编码器。
"""
logger.info(f"软件路径: 开始处理 {os.path.basename(input_video_path)}")

input_container = None
output_container = None
success = False

try:
# --- 1. 参数准备 (关键修正部分) ---
x, y, w, h = int(crop_rect['x']), int(crop_rect['y']), int(crop_rect['w']), int(crop_rect['h'])
if w <= 0 or h <= 0:
logger.error(f"裁剪尺寸无效 (w={w}, h={h})。")
return False

# 初始的最终尺寸就是裁剪后的尺寸
final_w, final_h = w, h
needs_scaling = False

# 检查是否需要缩放
if min_short_side_output_px and min(w, h) < min_short_side_output_px:
needs_scaling = True
scale_factor = min_short_side_output_px / min(w, h)
final_w = int(w * scale_factor)
final_h = int(h * scale_factor)

# *** 核心修正:确保最终的输出尺寸永远是偶数 ***
# 这对于 H.264 (libx264) 和许多其他编码器至关重要
final_w = final_w - (final_w % 2)
final_h = final_h - (final_h % 2)

if final_w <= 0 or final_h <= 0:
logger.error(f"计算后的输出尺寸无效 ({final_w}x{final_h})。")
return False

# --- 2. 打开容器并设置流 ---
with av.open(input_video_path, mode='r') as in_container:
with av.open(output_video_path, mode='w') as out_container:

# a. 查找所有输入流
in_video_stream = next((s for s in in_container.streams if s.type == 'video'), None)
if not in_video_stream:
logger.error("输入文件中未找到视频流。")
return False
in_video_stream.thread_type = "AUTO"

in_audio_stream = next((s for s in in_container.streams if s.type == 'audio'), None)
in_subtitle_stream = next((s for s in in_container.streams if s.type == 'subtitle'), None)

# b. 为每个输入流创建对应的输出流
out_video_stream = out_container.add_stream_from_template(in_video_stream)
out_video_stream.width = final_w
out_video_stream.height = final_h

out_audio_stream = out_container.add_stream_from_template(in_audio_stream) if in_audio_stream else None
out_subtitle_stream = out_container.add_stream_from_template(
in_subtitle_stream) if in_subtitle_stream else None

streams_to_demux = [s for s in [in_video_stream, in_audio_stream, in_subtitle_stream] if s]

# --- 3. 核心处理循环 ---
for packet in in_container.demux(streams_to_demux):
if packet.dts is None:
continue

if packet.stream.type == 'video':
for frame in packet.decode():
img = frame.to_image()

# 裁剪
cropped_img = img.crop((x, y, x + w, y + h))

# 如果需要,进行缩放
if needs_scaling:
# Pillow 的 resize 需要一个 (width, height) 元组
cropped_img = cropped_img.resize((final_w, final_h))

new_frame = av.VideoFrame.from_image(cropped_img)
new_frame.pts = frame.pts

for out_packet in out_video_stream.encode(new_frame):
out_container.mux(out_packet)

elif packet.stream.type == 'audio' and out_audio_stream:
packet.stream = out_audio_stream
out_container.mux(packet)

elif packet.stream.type == 'subtitle' and out_subtitle_stream:
packet.stream = out_subtitle_stream
out_container.mux(packet)

# --- 4. 冲洗(Flush)视频编码器 ---
logger.debug("软件路径: 冲洗视频编码器。")
for out_packet in out_video_stream.encode(None):
out_container.mux(out_packet)

success = True
logger.info(f"软件路径: 视频处理成功 -> {os.path.basename(output_video_path)}")
return True

except Exception as e:
logger.opt(exception=True).error(f"软件路径处理时发生错误: {e}")
return False
finally:
if output_container and not output_container.closed:
output_container.close()
if input_container and not input_container.closed:
input_container.close()
if not success and os.path.exists(output_video_path):
try:
os.remove(output_video_path)
logger.info(f"已清理失败的输出文件: {output_video_path}")
except OSError as err:
logger.warning(f"清理失败的输出文件时出错: {err}")

one more thing

  • gpu acc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# GPU Acceleration Defaults
GPU_ACCELERATION_DEFAULT = {
"gpu_opencv_decode": False,
"opencv_cuda_device": 0, # Default/primary GPU for OpenCV CUDA operations
"ffmpeg_hwaccel": "videotoolbox", # e.g., "cuda", "qsv", "vaapi", "videotoolbox"
"ffmpeg_gpu_encoder": "h264_videotoolbox", # e.g., "h264_nvenc", "hevc_qsv", h264_videotoolbox
"gpu_ids_to_use": "0" # New: Comma-separated list of GPU IDs (e.g., "0,1") for general processing
}

GPU_ACCELERATION_DEFAULT_win = {
"gpu_opencv_decode": False,
"opencv_cuda_device": 0, # Default/primary GPU for OpenCV CUDA operations
"ffmpeg_hwaccel": "cuda", # e.g., "cuda", "qsv", "vaapi", "videotoolbox"
"ffmpeg_gpu_encoder": "h264_nvenc", # e.g., "h264_nvenc", "hevc_qsv", h264_videotoolbox
"gpu_ids_to_use": "0" # New: Comma-separated list of GPU IDs (e.g., "0,1") for general processing
}

See

Prev:
使用 Docker Compose 快速部署 Shadowsocks 代理服务教程
Next:
数字水印系统实现