1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268
| import torch import cv2 import numpy as np import time import shutil import os from pathlib import Path import logging from concurrent.futures import ThreadPoolExecutor import random import threading
MODEL_NAME = 'yolov5s' COCO_NAMES = r"C:\Users\DELL\PycharmProjects\audio_record_server\src\config\coco.names" DETECTED_HUMANS_DIR = r"E:" BATCH_SIZE = 5 THREAD_COUNT = 4 INTERVAL_DURATION_SECONDS = 60 DETECTION_COUNT_PER_INTERVAL = 10 DETECTION_THRESHOLD = 1 MIN_CONFIDENCE = 0.7
model_lock = threading.Lock()
os.makedirs(DETECTED_HUMANS_DIR, exist_ok=True)
with open(COCO_NAMES, "r") as f: classes = [line.strip() for line in f.readlines()]
logging.basicConfig( filename='log.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', filemode='w', )
def create_output_dir(output_dir): """创建输出目录""" os.makedirs(output_dir, exist_ok=True)
def load_model(model_name='yolov5s'): """加载YOLOv5模型,返回模型对象""" model = torch.hub.load('ultralytics/yolov5', model_name, pretrained=True) model.eval() return model
def get_random_detection_times(start_sec, end_sec, detection_count): """ 在[start_sec, end_sec)区间内生成随机的检测时间点(秒)。 """ if end_sec <= start_sec: return [] return [random.uniform(start_sec, end_sec) for _ in range(detection_count)]
def detect_person_many(model, frame): """ 对单帧图像进行人检测,返回是否仅检测到一个人。 """ results = model(frame) detections = results.xyxy[0] person_count = 0
for *box, confidence, cls in detections: if confidence >= MIN_CONFIDENCE and classes[int(cls)].lower() == "person": person_count += 1 if person_count > 1: return False
return person_count == 1
def detect_person(model, frame): """ 对单帧图像进行人检测,返回是否检测到人的布尔值。 """ results = model(frame) detections = results.xyxy[0] person_detected = False for *box, confidence, cls in detections: if confidence >= MIN_CONFIDENCE and classes[int(cls)].lower() == "person": person_detected = True break return person_detected
def detect_person_in_video(video_path, model): """处理单个视频,按照每3分钟段进行人检测""" try: logging.info(f"开始处理视频: {video_path}") print(f"开始处理视频: {video_path}") cap = cv2.VideoCapture(video_path) if not cap.isOpened(): logging.error(f"无法打开视频文件: {video_path}") print(f"无法打开视频文件: {video_path}") return False
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) total_seconds = total_frames / fps if fps else 0 logging.info(f"视频信息 - 宽度: {width}, 高度: {height}, 帧率: {fps}, 总时长: {total_seconds:.2f} 秒") print(f"视频信息 - 宽度: {width}, 高度: {height}, 帧率: {fps}, 总时长: {total_seconds:.2f} 秒")
if total_seconds <= 0: logging.error(f"视频时长无效: {video_path}") print(f"视频时长无效: {video_path}") cap.release() return False
num_intervals = int(total_seconds // INTERVAL_DURATION_SECONDS) if num_intervals == 0: logging.error(f"视频时长不足3分钟: {video_path}") print(f"视频时长不足3分钟: {video_path}") cap.release() return False
passed_segments = 0
for interval_idx in range(num_intervals): if passed_segments >= 3: logging.info(f"已达到足够的通过段数 ({passed_segments}),提前结束检测") print(f"已达到足够的通过段数 ({passed_segments}),提前结束检测") break
interval_start = interval_idx * INTERVAL_DURATION_SECONDS interval_end = (interval_idx + 1) * INTERVAL_DURATION_SECONDS detection_times = get_random_detection_times(interval_start, interval_end, DETECTION_COUNT_PER_INTERVAL) logging.info(f"段 {interval_idx + 1}/{num_intervals} 检测时间点(秒): {detection_times}") print(f"段 {interval_idx + 1}/{num_intervals} 检测时间点(秒): {detection_times}")
detections = 0
for detection_time in detection_times: frame_number = int(detection_time * fps) cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number) ret, frame = cap.read() if not ret: logging.warning(f"无法读取视频 {video_path} 的第 {frame_number} 帧") print(f"无法读取视频 {video_path} 的第 {frame_number} 帧") continue
if frame is None or frame.size == 0: logging.warning(f"读取到空帧,在时间 {detection_time:.2f} 秒") print(f"读取到空帧,在时间 {detection_time:.2f} 秒") continue
with model_lock: person_detected = detect_person_many(model, frame)
if person_detected: detections += 1 logging.info(f"检测到人,在时间 {detection_time:.2f} 秒") print(f"检测到人,在时间 {detection_time:.2f} 秒") else: logging.info(f"未检测到人,在时间 {detection_time:.2f} 秒") print(f"未检测到人,在时间 {detection_time:.2f} 秒")
logging.info(f"段 {interval_idx + 1}/{num_intervals} 检测到人次数: {detections}/{DETECTION_COUNT_PER_INTERVAL}") print(f"段 {interval_idx + 1}/{num_intervals} 检测到人次数: {detections}/{DETECTION_COUNT_PER_INTERVAL}")
if detections == DETECTION_THRESHOLD: passed_segments += 1 logging.info(f"段 {interval_idx + 1} 通过检测,当前通过段数: {passed_segments}") print(f"段 {interval_idx + 1} 通过检测,当前通过段数: {passed_segments}") else: logging.info(f"段 {interval_idx + 1} 未达到检测阈值") print(f"段 {interval_idx + 1} 未达到检测阈值")
cap.release()
if passed_segments >= 3: logging.info(f"视频处理完成且符合要求(通过段数: {passed_segments}): {video_path}") print(f"视频处理完成且符合要求(通过段数: {passed_segments}): {video_path}") return True else: logging.info(f"视频处理完成但不符合要求(通过段数: {passed_segments}): {video_path}") print(f"视频处理完成但不符合要求(通过段数: {passed_segments}): {video_path}") return False
except Exception as e: logging.error(f"处理视频时出错: {video_path}, 错误: {str(e)}") print(f"处理视频时出错: {video_path}, 错误: {str(e)}") return False
def process_batch(video_files, model): """处理一批视频文件""" for video_file in video_files: relative_path = os.path.relpath(video_file, r"E:\video竖屏_低分辨率") detected_human_dir = os.path.join(DETECTED_HUMANS_DIR, os.path.dirname(relative_path))
create_output_dir(detected_human_dir)
if detect_person_in_video(video_file, model): logging.info(f"检测到人且符合要求: {video_file}") print(f"检测到人且符合要求: {video_file}") shutil.move(video_file, os.path.join(detected_human_dir, os.path.basename(video_file))) else: logging.info(f"未检测到人或不符合要求: {video_file}") print(f"未检测到人或不符合要求: {video_file}")
def main(): video_files = [] root_path = r"E:\video竖屏_低分辨率" root = Path(root_path).resolve() exclude_dirs = {'$RECYCLE.BIN', 'System Volume Information', 'Windows', 'Program Files', 'Program Files (x86)'}
logging.info(f"开始查找目录 {root_path} 下的视频文件") print(f"开始查找目录 {root_path} 下的视频文件")
for extension in ('*.mp4', '*.avi', '*.mov', '*.mkv', '*.m4v', '*.WMV', '*.rmvb', '*.DAT', '*.VOB'): video_files += [ str(file.resolve()) for file in root.rglob(extension) if not any(excluded.lower() in (part.lower() for part in file.parts) for excluded in exclude_dirs) ]
if not video_files: logging.error("没有找到符合条件的文件") raise ValueError("没有找到符合条件的文件")
model = load_model(MODEL_NAME)
start_main_time = time.time()
logging.info(f"开始分批处理视频,共有 {len(video_files)} 个文件,批次大小为 {BATCH_SIZE}") print(f"开始分批处理视频,共有 {len(video_files)} 个文件,批次大小为 {BATCH_SIZE}")
with ThreadPoolExecutor(max_workers=THREAD_COUNT) as executor: for i in range(0, len(video_files), BATCH_SIZE): batch = video_files[i:i + BATCH_SIZE] executor.submit(process_batch, batch, model)
logging.info(f"所有视频文件处理完成,总耗时 {time.time() - start_main_time:.2f} 秒") print(f"所有视频文件处理完成,总耗时 {time.time() - start_main_time:.2f} 秒")
if __name__ == "__main__": main()
'''代码修改详解
### 1. **模型加载与初始化**
YOLOv5 使用 PyTorch 实现,通过 `torch.hub` 方便地加载模型。我们在主线程中加载一次模型,并在所有线程中共享该模型对象,以避免重复加载和节省内存。
''' ''' def load_model(model_name='yolov5s'): """加载YOLOv5模型,返回模型对象""" model = torch.hub.load('ultralytics/yolov5', model_name, pretrained=True) # 设置模型为评估模式 model.eval() return model '''
|