视频场景切换检测
Published in:2025-04-03 |
Words: 5.7k | Reading time: 30min | reading:

背景

一个摄像机连续拍摄的一段视频片段,没有进行中断。一个镜头通常是视频叙事的基本单位。

2. 镜头识别的意义:

  • 视频分析和理解: 镜头识别是视频分析和理解的基础,可以帮助提取视频内容,进行结构化分析,例如场景检测、故事分割等。
  • 视频检索: 可以用于基于内容的视频检索,例如查找包含特定对象的镜头。
  • 视频摘要: 可以用于生成视频摘要,选取关键镜头来代表视频内容。
  • 视频编辑: 方便视频编辑人员对视频进行分割、重组等操作。
  • 视频监控: 异常事件检测,行为分析等。

3. 镜头识别的主要方法:

镜头识别主要基于分析连续帧之间的变化,通过检测场景边界来识别镜头。常见的方法包括:

  • 基于阈值的镜头识别:
    • 原理: 通过计算连续帧之间的差异(例如颜色直方图差异、像素差异、边缘差异等),当差异值超过预设的阈值时,则认为发生了镜头切换。
    • 优点: 简单快速,易于实现。
    • 缺点: 对阈值的选择敏感,容易受到光照变化、运动等因素的影响。
  • 基于统计模型的镜头识别:
    • 原理: 使用统计模型(例如隐马尔可夫模型 HMM)来描述视频帧的特征变化,通过检测模型状态的变化来识别镜头。
    • 优点: 对噪声和光照变化具有一定的鲁棒性。
    • 缺点: 模型训练需要大量的标注数据,计算复杂度较高。
  • 基于机器学习和深度学习的镜头识别:
    • 原理: 使用机器学习或深度学习模型(例如支持向量机 SVM、卷积神经网络 CNN、循环神经网络 RNN)来学习视频帧的特征,并对镜头边界进行分类。
    • 优点: 可以学习复杂的特征表示,具有较高的识别精度。
    • 缺点: 需要大量的标注数据进行训练,计算资源需求较高。

4. 具体步骤:

  1. 视频预处理:
    • 解码: 将视频文件解码为帧序列。
    • 降噪: 对帧序列进行降噪处理,例如使用高斯滤波或中值滤波。
    • 调整大小: 将帧序列调整到统一的大小,以便进行后续处理。
  2. 特征提取: 提取视频帧的特征,用于描述视频内容。常用的特征包括:
    • 颜色直方图: 描述图像的颜色分布。
    • 灰度共生矩阵 (GLCM): 描述图像的纹理特征。
    • 光流: 描述图像中物体的运动信息。
    • 局部二值模式 (LBP): 描述图像的局部纹理特征。
    • 深度学习特征: 使用预训练的深度学习模型(例如 VGGNet、ResNet)提取图像的特征。
  3. 差异计算: 计算连续帧之间的差异。 常用的差异计算方法包括:
    • 欧氏距离: 计算两个向量之间的距离。
    • 余弦相似度: 计算两个向量之间的角度。
    • 卡方距离: 计算两个直方图之间的距离。
  4. 镜头边界检测:
    • 阈值法: 当差异值超过预设的阈值时,则认为发生了镜头切换。
    • 机器学习方法: 使用机器学习模型对镜头边界进行分类。
  5. 后处理:
    • 合并短镜头: 将长度小于一定阈值的镜头合并到相邻的镜头中。
    • 平滑镜头边界: 对镜头边界进行平滑处理,以提高视频观看体验。

5. 技术细节:

  • 帧速率(Frame Rate): 影响分析的精度和计算成本。高帧速率提供更详细的信息,但也需要更多的计算资源。
  • 特征选择: 选择合适的特征对镜头识别的精度至关重要。 例如,对于快速运动的场景,光流特征可能更有效。
  • 阈值设定: 阈值的设定需要根据具体的视频内容进行调整。
  • 深度学习模型的选择: 选择合适的深度学习模型需要根据具体的应用场景进行考虑。

实现技术

  • OpenCV: 一个强大的计算机视觉库,提供了图像处理、特征提取、视频分析等功能。
  • FFmpeg: 一个开源的多媒体框架,可以用于视频解码、编码、格式转换等操作。
  • Scikit-learn: 一个流行的机器学习库,提供了各种机器学习算法。
  • TensorFlow/PyTorch: 深度学习框架,可以用于构建和训练深度学习模型。
  • PySceneDetect: 一个专门用于场景分割(即镜头识别)的Python库,它基于OpenCV和NumPy。

技术选择

PySceneDetect:

  • 描述: PySceneDetect 是一个专门用于场景分割的 Python 库,它构建在 OpenCV 和 NumPy 之上。它提供了多种场景检测算法,包括基于内容的比较(ContentDetector)、阈值检测(ThresholdDetector)等。
  • 优点:
    • 易于使用:提供了简单的 API,方便快速上手。
    • 多种算法:内置了多种场景检测算法,可以根据视频内容选择合适的算法。
    • 可定制性:允许自定义场景检测算法。
    • 文档完善:拥有比较完善的文档。
  • 缺点:
    • 性能:对于非常大的视频,可能需要优化性能。
    • 精度:默认参数可能不适用于所有类型的视频,需要调整参数。

第三方库安装使用

1
pip install scenedetect opencv-python tqdm

简易demo

  • 检测视频中的场景切换帧,输出图片与csv文件
1
scenedetect --input goldeneye.mp4 detect-adaptive list-scenes save-images

关于场景切换检测阈值的确认

例如,对于detect-content,如果默认阈值27没有产生正确的结果,我们可以通过首先生成统计文件来确定适当的阈值:

scenedetect --input goldeneye.mp4 --stats goldeneye.stats.csv detect-adaptive
然后,我们可以绘制content_val列的值

  • 代码如下:

关键代码展示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
def generate_stats_csv(video_path, output_dir, threshold):
"""
Detects scenes and saves frame statistics (Frame Num, content_val) to a CSV.
Returns True on success, False on failure.
"""
thread_name = threading.current_thread().name
video_filename_base = os.path.splitext(os.path.basename(video_path))[0]
output_csv_path = os.path.join(output_dir, f"{video_filename_base}_stats.csv")
temp_stats_path = os.path.join(output_dir, f"{video_filename_base}_temp_full_stats.csv")

logger.info(f"[{thread_name}] Processing: {os.path.basename(video_path)}")
video_manager = None
stats_manager = None

try:
# 1. Initialize
video_manager = VideoStreamCv2(video_path)
stats_manager = StatsManager()
scene_manager = SceneManager(stats_manager)
scene_manager.add_detector(ContentDetector(threshold=threshold))

# 2. Get duration
try:
total_frames = video_manager.duration.get_frames()
if total_frames <= 0:
logger.warning(f"[{thread_name}] Video {os.path.basename(video_path)} reported 0 frames. Skipping.")
return False
except Exception as info_err:
logger.error(f"[{thread_name}] Error getting video duration for {os.path.basename(video_path)}: {info_err}", exc_info=True)
return False

# 3. Run detection
logger.debug(f"[{thread_name}] Running detect_scenes for {os.path.basename(video_path)}")
scene_manager.detect_scenes(frame_source=video_manager)
logger.debug(f"[{thread_name}] detect_scenes finished for {os.path.basename(video_path)}")

# 4. Save *full* statistics temporarily by passing a FILE HANDLE
logger.debug(f"[{thread_name}] Saving full stats to temporary file: {temp_stats_path}")
try:
# --- MODIFIED PART ---
with open(temp_stats_path, 'w', newline='', encoding='utf-8') as f:
stats_manager.save_to_csv(f) # Pass the open file handle 'f' directly
# ---------------------
if not os.path.exists(temp_stats_path):
logger.error(f"[{thread_name}] StatsManager failed to create the temporary stats file: {temp_stats_path}")
return False
except TypeError as te:
# Catch if even passing a handle is wrong
logger.error(f"[{thread_name}] TypeError calling save_to_csv for {os.path.basename(video_path)}. API mismatch. Error: {te}", exc_info=True)
return False
except Exception as save_err:
logger.error(f"[{thread_name}] Error saving temporary stats file {temp_stats_path}: {save_err}", exc_info=True)
return False


# 5. Read the temp stats CSV and extract desired columns
# (Keep this part the same as the previous version)
logger.debug(f"[{thread_name}] Reading temporary stats file: {temp_stats_path}")
try:
stats_df = pd.read_csv(temp_stats_path, skipinitialspace=True)
stats_df.columns = stats_df.columns.str.strip()

if FRAME_NUM_COL not in stats_df.columns or CONTENT_VAL_COL not in stats_df.columns:
logger.error(f"[{thread_name}] Temporary stats file ({temp_stats_path}) is missing required columns. Found: {stats_df.columns.tolist()}. Expected: '{FRAME_NUM_COL}', '{CONTENT_VAL_COL}'")
# Keep the temp file for inspection if columns are missing
return False # Return failure

output_df = stats_df[[FRAME_NUM_COL, CONTENT_VAL_COL]]
logger.debug(f"[{thread_name}] Saving extracted stats to final file: {output_csv_path}")
output_df.to_csv(output_csv_path, index=False)

logger.info(f"[{thread_name}] Successfully generated stats CSV: {os.path.basename(output_csv_path)}")
return True # Return success

except pd.errors.EmptyDataError:
logger.warning(f"[{thread_name}] Temporary stats file was empty: {temp_stats_path}. No scenes likely detected or error during stats generation.")
# Create an empty output file with correct headers
pd.DataFrame(columns=[FRAME_NUM_COL, CONTENT_VAL_COL]).to_csv(output_csv_path, index=False)
logger.info(f"[{thread_name}] Created empty stats CSV (as temp was empty): {os.path.basename(output_csv_path)}")
return True # Count as processed successfully (empty result)
except KeyError as e:
logger.error(f"[{thread_name}] Column not found in temporary stats file ({temp_stats_path}): {e}. Available columns: {stats_df.columns.tolist()}")
# Keep the temp file for inspection
return False
except Exception as e:
logger.error(f"[{thread_name}] Error processing temporary stats file {temp_stats_path}: {e}", exc_info=True)
return False

except VideoOpenFailure as vf_err:
logger.error(f"[{thread_name}] Failed to open video {os.path.basename(video_path)}: {vf_err}")
return False
except Exception as e:
logger.error(f"[{thread_name}] Error during scene detection/stats generation for {os.path.basename(video_path)}: {e}", exc_info=True)
return False
finally:
if video_manager is not None:
try:
if hasattr(video_manager, '_cap') and video_manager._cap is not None and hasattr(video_manager._cap, 'release'):
video_manager._cap.release()
del video_manager
except Exception as del_e:
logger.warning(f"[{thread_name}] Exception while cleaning up video_manager for {os.path.basename(video_path)}: {del_e}")

# Clean up temporary stats file ONLY IF the final CSV was successfully created or was intentionally empty
if 'output_df' in locals() or ( 'stats_df' in locals() and stats_df.empty ): # Check if processing reached CSV saving/empty handling stage
if 'temp_stats_path' in locals() and os.path.exists(temp_stats_path):
try:
os.remove(temp_stats_path)
logger.debug(f"[{thread_name}] Removed temporary stats file: {temp_stats_path}")
except OSError as e:
logger.error(f"[{thread_name}] Failed to remove temporary stats file {temp_stats_path}: {e}")
else:
# If processing failed before reading/writing the final CSV, keep the temp file for debugging
if 'temp_stats_path' in locals() and os.path.exists(temp_stats_path):
logger.warning(f"[{thread_name}] Keeping temporary stats file due to processing error: {temp_stats_path}")


# ... (Rest of the code remains the same: process_video, find_video_files, move_video, main, __main__) ...
def main(root_dir, dest_dir, num_threads, threshold):
"""Main function to orchestrate the CSV generation process."""
root_dir = os.path.normpath(root_dir)
dest_dir = os.path.normpath(dest_dir)

# Ensure destination directory exists
try:
os.makedirs(dest_dir, exist_ok=True)
logger.info(f"Ensured destination directory exists: {dest_dir}")
except OSError as e:
logger.critical(f"Failed to create destination directory '{dest_dir}': {e}")
return # Cannot proceed without output directory
except Exception as e:
logger.critical(f"Unexpected error creating destination directory '{dest_dir}': {e}")
return

# Find video files, excluding any already in the destination
video_files_all = find_video_files(root_dir)
abs_dest_dir = os.path.abspath(dest_dir)
video_files = [f for f in video_files_all if not os.path.abspath(f).startswith(abs_dest_dir)]

if len(video_files) < len(video_files_all):
logger.warning(f"Filtered out {len(video_files_all) - len(video_files)} files potentially inside the destination directory '{dest_dir}'.")

if not video_files:
logger.error(f"No valid video files found in '{root_dir}' (excluding destination directory).")
return

logger.info(f"Starting stats CSV generation for {len(video_files)} videos from '{root_dir}' using {num_threads} threads.")
logger.info(f"Output CSV files will be saved in: {dest_dir}")

start_overall_time = time.time()
processed_count = 0
successful_runs = 0
failed_runs = 0

with ThreadPoolExecutor(max_workers=num_threads, thread_name_prefix='StatsGenWorker') as executor:
# Submit tasks: Pass video path, output dir, and threshold
futures = {executor.submit(generate_stats_csv, video_file, dest_dir, threshold): video_file for video_file in video_files}

logger.info("All tasks submitted. Waiting for completion...")
from concurrent.futures import as_completed
# Use tqdm with as_completed for better progress feedback
for future in tqdm(as_completed(futures), total=len(futures), desc="Overall Progress"):
video_file = futures[future] # Get associated video file
processed_count += 1
try:
success = future.result() # Get boolean result from generate_stats_csv
if success:
successful_runs += 1
else:
failed_runs += 1
# Error is already logged within the function
logger.debug(f"Task for {os.path.basename(video_file)} completed with failure (logged previously).")
except Exception as exc:
# Catch exceptions that might occur if future.result() itself fails unexpectedly
failed_runs += 1
logger.error(f"Unexpected error getting result for {os.path.basename(video_file)}: {exc}", exc_info=True)

end_overall_time = time.time()
logger.info(f"Stats CSV generation complete.")
logger.info(f"Summary: {len(video_files)} videos submitted. {successful_runs} processed successfully, {failed_runs} encountered errors.")
logger.info(f"Total execution time: {end_overall_time - start_overall_time:.2f} seconds.")


if __name__ == "__main__":
# --- Initial Checks ---
if not check_ffmpeg():
# Optional: Decide whether to proceed without FFmpeg warning or exit
# sys.exit(1)
pass
check_acceleration_support()

# --- Setup Directories ---
root_directory = r'/Volumes/shared/标注/影视'
destination_directory = r'./data'


# Use defaults if not using argparse
detection_threshold = DEFAULT_THRESHOLD
thread_count = DEFAULT_NUM_THREADS

# --- Directory Validation ---
if not os.path.isdir(root_directory):
logger.critical(f"Input root directory '{root_directory}' not found or is not a directory.")
sys.exit(1)
# Destination directory creation is handled in main()

# --- Run Main Process ---
main(root_directory, destination_directory, thread_count, detection_threshold)

测试demo

使用 PySceneDetect 快速分析视频 并输出各种参数到csv

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
# 检查硬件加速支持
def detect_scenes(video_path, threshold=30.0, downscale_width=640):
"""Detects scene changes in a video using PySceneDetect with VideoStreamCv2."""
video_manager = None
try:
# 1. 初始化
logging.debug(f"Initializing VideoStreamCv2 for: {os.path.basename(video_path)}")
video_manager = VideoStreamCv2(video_path)
logging.info(f"Initialized VideoStreamCv2 for {os.path.basename(video_path)}")

stats_manager = StatsManager()
scene_manager = SceneManager(stats_manager)

# 2. 添加场景检测器
scene_manager.add_detector(ContentDetector(threshold=threshold))

# 3. 获取视频信息 for Progress Bar
try:
total_frames = video_manager.duration.get_frames()
logging.debug(f"Video Info: Total Frames={total_frames}")
except Exception as info_err:
logging.error(f"Error getting video info (duration) for {os.path.basename(video_path)}: {info_err}", exc_info=True)
if video_manager is not None: del video_manager
return None, video_path

if total_frames <= 0:
logging.warning(f"Video {os.path.basename(video_path)} reported 0 frames. Skipping processing.")
if video_manager is not None: del video_manager
return 0, video_path

# 4. 设置(简化的)进度条并执行检测
# The frame-by-frame callback doesn't work with this version.
# The tqdm bar will show progress only after detect_scenes finishes for the file.
with tqdm(total=total_frames, desc=f"Processing {os.path.basename(video_path)}", unit="frame", leave=False) as pbar:

# 5. 执行场景检测 - Pass the video_manager as frame_source
scene_manager.detect_scenes(frame_source=video_manager)

# Manually update pbar to 100% after completion if needed,
# though tqdm might do this automatically on exit from 'with'.
pbar.n = total_frames # Set progress to max
pbar.refresh() # Refresh display


# 6. 获取结果
scene_list = scene_manager.get_scene_list()

logging.info(f"Detected {len(scene_list)} scenes in {os.path.basename(video_path)}")

# 7. 资源管理
# del video_manager

return len(scene_list), video_path

except cv2.error as cv_err:
logging.error(f"OpenCV error processing {os.path.basename(video_path)}: {cv_err}", exc_info=True)
return None, video_path
except VideoOpenFailure as vf_err:
logging.error(f"Failed to open video {os.path.basename(video_path)}: {vf_err}", exc_info=True)
return None, video_path
except Exception as e:
logging.error(f"Generic error processing {os.path.basename(video_path)}: {e}", exc_info=True)
return None, video_path
finally:
if 'video_manager' in locals() and video_manager is not None:
try: del video_manager
except Exception: pass


# ... (Keep process_video, find_video_files, move_video, main, __main__ block the same) ...
# ... [Rest of the code] ...

def process_video(video_path, threshold, dest_dir, downscale_width):
"""处理单个视频,捕获异常, 并移动视频"""
thread_name = threading.current_thread().name
logging.info(f"[{thread_name}] Starting processing for: {os.path.basename(video_path)}")
start_time = time.time()
num_scenes = None # Initialize
processed_video_path = video_path # Assume original path initially
try:
num_scenes, processed_video_path = detect_scenes(video_path, threshold, downscale_width)

# Check if detect_scenes returned None for path (shouldn't happen with current logic, but defensive)
if processed_video_path is None:
logging.error(f"[{thread_name}] Video path became None during processing for {video_path}. Skipping move.")
return # Explicitly return

if num_scenes is not None and num_scenes > 1:
# Check if the source file still exists before moving (it might fail during processing)
if os.path.exists(processed_video_path):
move_video(processed_video_path, dest_dir)
else:
logging.warning(
f"[{thread_name}] Source file {processed_video_path} not found after processing. Cannot move.")
elif num_scenes is not None: # Includes case where num_scenes is 0 because no frames were processed
logging.info(
f"[{thread_name}] Video {os.path.basename(processed_video_path)} is single scene or failed processing. No move needed.")
# else case (num_scenes is None) is implicitly handled by the warning below

if num_scenes is None:
# This case means detect_scenes failed and returned None for num_scenes
logging.warning(
f"[{thread_name}] Video {os.path.basename(processed_video_path)} failed processing entirely. No move needed.")

except Exception as e:
# Catch any unexpected error during the move decision or logging
logging.error(f"[{thread_name}] Top-level error in process_video for {os.path.basename(video_path)}: {e}",
exc_info=True)
finally:
end_time = time.time()
status = "failed" if num_scenes is None else f"{num_scenes} scenes"
logging.info(
f"[{thread_name}] Finished processing {os.path.basename(video_path)} ({status}) in {end_time - start_time:.2f} seconds.")

场景切割

  • 根据视频内场景切换 切割视频
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255

# -*- coding: utf-8 -*-
def detect_scenes(video_path, threshold=30.0):
"""Detects scene changes in a video using PySceneDetect with VideoStreamCv2."""
video_manager = None
try:
logger.debug(f"Initializing VideoStreamCv2 for: {os.path.basename(video_path)}")
video_manager = VideoStreamCv2(video_path)
logger.info(f"Initialized VideoStreamCv2 for {os.path.basename(video_path)}")

stats_manager = StatsManager()
scene_manager = SceneManager(stats_manager)
scene_manager.add_detector(ContentDetector(threshold=threshold))

try:
total_frames = video_manager.duration.get_frames()
# base_timecode = video_manager.base_timecode # No longer needed for get_scene_list
logging.debug(f"Video Info: Total Frames={total_frames}")
except Exception as info_err:
logger.error(f"Error getting video info (duration) for {os.path.basename(video_path)}: {info_err}", exc_info=True)
return None, video_path, None # Return None for scene list

if total_frames <= 0:
logger.warning(f"Video {os.path.basename(video_path)} reported 0 frames. Skipping processing.")
return 0, video_path, [] # Return 0 scenes, empty list

# Execute scene detection (no progress bar here as callback wasn't reliable)
logger.debug(f"Starting scene detection for {os.path.basename(video_path)}")
scene_manager.detect_scenes(frame_source=video_manager)
logger.debug(f"Finished scene detection for {os.path.basename(video_path)}")

# Get scene list (without base_timecode)
scene_list = scene_manager.get_scene_list()

logger.info(f"Detected {len(scene_list)} scenes in {os.path.basename(video_path)}")

# Return scene count, path, and the actual scene list for splitting
return len(scene_list), video_path, scene_list

except cv2.error as cv_err:
logger.error(f"OpenCV error processing {os.path.basename(video_path)}: {cv_err}", exc_info=True)
return None, video_path, None
except VideoOpenFailure as vf_err:
logger.error(f"Failed to open video {os.path.basename(video_path)}: {vf_err}", exc_info=True)
return None, video_path, None
except Exception as e:
logger.error(f"Generic error during scene detection for {os.path.basename(video_path)}: {e}", exc_info=True)
return None, video_path, None
finally:
# Ensure the video file is closed/released by deleting the reference
if 'video_manager' in locals() and video_manager is not None:
try:
# Explicitly call internal capture release if possible
if hasattr(video_manager, '_cap') and video_manager._cap is not None and hasattr(video_manager._cap, 'release'):
video_manager._cap.release()
del video_manager
except Exception as del_e:
logger.warning(f"Exception while cleaning up video_manager for {os.path.basename(video_path)}: {del_e}")

def split_video_scene(input_path, output_path, start_timecode_str, end_timecode_str):
"""Splits a video segment using FFmpeg without re-encoding."""
thread_name = threading.current_thread().name
logger.info(f"[{thread_name}] Splitting scene: {os.path.basename(output_path)} ({start_timecode_str} -> {end_timecode_str})")

# Ensure output directory exists
os.makedirs(os.path.dirname(output_path), exist_ok=True)

# Construct FFmpeg command
# Using -ss after -i and -copyts is generally more frame-accurate for splitting with -c copy
command = [
'ffmpeg',
'-i', str(input_path), # Input file
'-ss', start_timecode_str, # Start time
'-to', end_timecode_str, # End time
'-copyts', # Copy timestamps
'-avoid_negative_ts', 'make_zero', # Handle timestamp issues
'-c', 'copy', # Copy codecs (no re-encoding)
'-y', # Overwrite output file if it exists
str(output_path) # Output file
]



try:
# Run FFmpeg command
# creationflags prevents console window popup on Windows
process = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True, # Raise CalledProcessError if ffmpeg returns non-zero exit code
creationflags=subprocess.CREATE_NO_WINDOW if platform.system() == "Windows" else 0,
encoding='utf-8', # Capture output as text
errors='replace' # Handle potential decoding errors in ffmpeg output
)
logger.info(f"[{thread_name}] Successfully split: {os.path.basename(output_path)}")
# logger.debug(f"FFmpeg stdout:\n{process.stdout}") # Usually empty for -c copy
# logger.debug(f"FFmpeg stderr:\n{process.stderr}") # Contains progress/info
return True
except subprocess.CalledProcessError as e:
logger.error(f"[{thread_name}] FFmpeg failed for {os.path.basename(output_path)}.")
logger.error(f" Command: {' '.join(command)}") # Log the command for easier debugging
logger.error(f" Return Code: {e.returncode}")
logger.error(f" Stderr:\n{e.stderr}")
# Optionally log stdout too: logger.error(f" Stdout:\n{e.stdout}")
return False
except FileNotFoundError:
logger.critical(f"[{thread_name}] FFmpeg command not found during split. Ensure FFmpeg is installed and in PATH.")
# No point continuing if ffmpeg isn't found for any split
raise # Re-raise to potentially stop the whole process
except Exception as e:
logger.error(f"[{thread_name}] Unexpected error splitting {os.path.basename(output_path)}: {e}", exc_info=True)
return False


def process_video(video_path, threshold, base_dest_dir, originals_dir, downscale_width):
"""Detects scenes, splits video if multiple scenes are found, and moves original."""
thread_name = threading.current_thread().name
base_name = os.path.splitext(os.path.basename(video_path))[0]
file_ext = os.path.splitext(video_path)[1]
logger.info(f"[{thread_name}] Starting processing for: {base_name}{file_ext}")
start_time = time.time()
split_success_count = 0
split_fail_count = 0

try:
num_scenes, _, scene_list = detect_scenes(video_path, threshold) # Get scene_list now

if num_scenes is None:
logger.warning(f"[{thread_name}] Video {base_name}{file_ext} failed scene detection. Skipping.")
# Optionally move failed files to a specific error directory
return # Stop processing this file

if num_scenes <= 1:
logger.info(f"[{thread_name}] Video {base_name}{file_ext} has {num_scenes} scene(s). No splitting needed.")
# Decide if you want to move single-scene videos somewhere else or leave them
# Example: move_video(video_path, os.path.join(base_dest_dir, "single_scene"))
else:
logger.info(f"[{thread_name}] Video {base_name}{file_ext} has {num_scenes} scenes. Starting split...")
# Create a subdirectory for this video's scenes
video_scene_dir = os.path.join(base_dest_dir, base_name)
os.makedirs(video_scene_dir, exist_ok=True)

for i, (start_tc, end_tc) in enumerate(scene_list):
scene_num = i + 1
output_filename = f"{base_name}_scene_{scene_num:03d}{file_ext}"
output_filepath = os.path.join(video_scene_dir, output_filename)

# Use get_timecode() for FFmpeg compatible strings
start_str = start_tc.get_timecode()
end_str = end_tc.get_timecode()

success = split_video_scene(video_path, output_filepath, start_str, end_str)
if success:
split_success_count += 1
else:
split_fail_count += 1

# After attempting all splits, move the original file
if split_fail_count == 0 and split_success_count > 0: # Only move original if all splits succeeded
logger.info(f"[{thread_name}] All {split_success_count} scenes split successfully for {base_name}{file_ext}. Moving original.")
move_video(video_path, originals_dir)
elif split_success_count > 0: # Some splits succeeded, some failed
logger.warning(f"[{thread_name}] Completed splitting for {base_name}{file_ext} with {split_fail_count} failures out of {num_scenes}. Original NOT moved.")
else: # All splits failed
logger.error(f"[{thread_name}] All {num_scenes} split attempts failed for {base_name}{file_ext}. Original NOT moved.")


except Exception as e:
logger.error(f"[{thread_name}] Top-level error processing {base_name}{file_ext}: {e}", exc_info=True)
finally:
end_time = time.time()
result_summary = f"{num_scenes} scenes detected" if num_scenes is not None else "failed detection"
if num_scenes is not None and num_scenes > 1:
result_summary += f" ({split_success_count} split OK, {split_fail_count} split failed)"
logger.info(f"[{thread_name}] Finished processing {base_name}{file_ext} ({result_summary}) in {end_time - start_time:.2f} seconds.")

def main(root_dir, dest_dir, num_threads=4, threshold=30.0, downscale_width=640):
"""Main function to process multiple videos in parallel."""
root_dir = os.path.normpath(root_dir)
dest_dir = os.path.normpath(dest_dir)

# Define directory for processed original files
originals_processed_dir = os.path.join(dest_dir, "originals_processed")
os.makedirs(originals_processed_dir, exist_ok=True)

# Filter out files already in the destination or originals directory
video_files_all = find_video_files(root_dir)
abs_dest_dir = os.path.abspath(dest_dir) # Includes originals_processed_dir
video_files = [f for f in video_files_all if not os.path.abspath(f).startswith(abs_dest_dir)]

if len(video_files) < len(video_files_all):
logger.warning(f"Filtered out {len(video_files_all) - len(video_files)} files potentially inside destination subdirectories.")

if not video_files:
logger.error(f"No valid video files found in {root_dir} (excluding destination).")
return

logging.info(f"Starting scene splitting on {len(video_files)} videos from {root_dir} using {num_threads} threads.")
logging.info(f"Output segments will be in subfolders under: {dest_dir}")
logging.info(f"Originals (if successfully split) will be moved to: {originals_processed_dir}")

start_overall_time = time.time()
processed_count = 0
successful_runs = 0
failed_runs = 0

with ThreadPoolExecutor(max_workers=num_threads, thread_name_prefix='SceneSplitWorker') as executor:
futures = {executor.submit(process_video, video_file, threshold, dest_dir, originals_processed_dir, downscale_width): video_file for video_file in video_files}

logging.info("All tasks submitted. Waiting for completion...")
from concurrent.futures import as_completed
for future in tqdm(as_completed(futures), total=len(futures), desc="Overall Progress"):
video_file = futures[future]
processed_count += 1
try:
future.result() # Check for exceptions from the thread
successful_runs += 1
except Exception as exc:
# Exception already logged in thread or called functions
failed_runs += 1
logging.debug(f"Worker thread for {os.path.basename(video_file)} failed top-level (already logged).")

end_overall_time = time.time()
logging.info(f"Scene splitting complete.")
logging.info(f"Summary: {len(video_files)} videos submitted. {successful_runs} processed without top-level errors, {failed_runs} encountered errors.")
logging.info(f"Total execution time: {end_overall_time - start_overall_time:.2f} seconds.")

if __name__ == "__main__":
# --- CHECK FFMPEG FIRST ---
if not check_ffmpeg():
sys.exit(1) # Exit if FFmpeg is not found

root_dir = r"/Volumes/shared/标注/影视"
dest_dir = r"/Volumes/shared/0328-1-detect-scene-ret"

# Standard directory checks
if not os.path.exists(root_dir) or not os.path.isdir(root_dir):
logging.critical(f"Root directory '{root_dir}' not found or is not a directory.")
sys.exit(1)
try:
os.makedirs(dest_dir, exist_ok=True)
logging.info(f"Ensured base destination directory exists: {dest_dir}")
except OSError as e:
logging.critical(f"Failed to create base destination directory '{dest_dir}': {e}")
sys.exit(1)
except Exception as e:
logging.critical(f"Unexpected error creating destination directory '{dest_dir}': {e}")
sys.exit(1)

check_acceleration_support()

# Run main process
# Adjust num_threads, threshold, downscale_width as needed
main(root_dir, dest_dir, num_threads=4, threshold=27.0, downscale_width=640)

测试结果展示

值中的峰值对应于输入视频中的场景中断。在某些情况下,可能需要相应地提高或降低阈值。

img

see

Next:
在ubuntu使用Postfix搭建邮件服务器