yolov5s 模型识别人脸
Published in:2024-11-21 |
Words: 2.3k | Reading time: 10min | reading:

yolov5s 模型识别人脸

简介

YOLO(You Only Look Once)是一个广泛使用的实时目标检测算法,它能够在图片或视频中快速检测到目标并进行分类。YOLO 的优点在于其速度和准确度,能够在单个前向传播中同时进行目标定位和分类,从而实现实时目标检测。YOLO 在许多应用场景中都有很好的表现,包括自动驾驶、视频监控、无人机巡检等。

特点

1
2
3
4
5
#YOLO 通过以下步骤进行目标检测:

#输入图像: 将输入图像分割成网格,并为每个网格预测一个边界框和对应的类概率。
#目标检测: 每个网格预测一个边界框,包含位置(x, y, w, h)和类别概率。网络还预测是否存在目标。
#非极大值抑制(NMS): 通过 NMS 去除冗余的检测结果,仅保留最有可能的边界框。

安装使用

1
2
3
4
5
6
7
8
9
10

pip install torch
pip install cv2

#or

pip install torch torchvision torchaudio
pip install yolov5


demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
import torch
import cv2
import numpy as np
import time
import shutil
import os
from pathlib import Path
import logging
from concurrent.futures import ThreadPoolExecutor
import random
import threading

# 配置参数
MODEL_NAME = 'yolov5s' # 使用 YOLOv5s 模型,可根据需求选择 yolov5m, yolov5l, yolov5x
COCO_NAMES = r"C:\Users\DELL\PycharmProjects\audio_record_server\src\config\coco.names"
DETECTED_HUMANS_DIR = r"E:" # 存放检测到人的视频
BATCH_SIZE = 5 # 每批处理视频数
THREAD_COUNT = 4 # 线程数量
INTERVAL_DURATION_SECONDS = 60 # 每个检测段的时长(秒)
DETECTION_COUNT_PER_INTERVAL = 10 # 每个3分钟段的检测次数
DETECTION_THRESHOLD = 1 # 每个3分钟段的检测阈值
MIN_CONFIDENCE = 0.7 # 检测置信度阈值

# 线程锁
model_lock = threading.Lock()

# 如果目标目录不存在,则创建它
os.makedirs(DETECTED_HUMANS_DIR, exist_ok=True)

# 加载类别名称
with open(COCO_NAMES, "r") as f:
classes = [line.strip() for line in f.readlines()]

# 初始化日志
logging.basicConfig(
filename='log.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filemode='w',
)

def create_output_dir(output_dir):
"""创建输出目录"""
os.makedirs(output_dir, exist_ok=True)

def load_model(model_name='yolov5s'):
"""加载YOLOv5模型,返回模型对象"""
model = torch.hub.load('ultralytics/yolov5', model_name, pretrained=True)
# 设置模型为评估模式
model.eval()
return model

def get_random_detection_times(start_sec, end_sec, detection_count):
"""
在[start_sec, end_sec)区间内生成随机的检测时间点(秒)。
"""
if end_sec <= start_sec:
return []
return [random.uniform(start_sec, end_sec) for _ in range(detection_count)]

def detect_person_many(model, frame):
"""
对单帧图像进行人检测,返回是否仅检测到一个人。
"""
results = model(frame)
# 解析检测结果
detections = results.xyxy[0] # [xmin, ymin, xmax, ymax, confidence, class]
person_count = 0 # 记录检测到的人数

for *box, confidence, cls in detections:
if confidence >= MIN_CONFIDENCE and classes[int(cls)].lower() == "person":
person_count += 1
# 如果检测到超过一个人,直接返回 False
if person_count > 1:
return False

# 如果检测到正好一个人,返回 True
return person_count == 1

def detect_person(model, frame):
"""
对单帧图像进行人检测,返回是否检测到人的布尔值。
"""
results = model(frame)
# 解析检测结果
detections = results.xyxy[0] # [xmin, ymin, xmax, ymax, confidence, class]
person_detected = False
for *box, confidence, cls in detections:
if confidence >= MIN_CONFIDENCE and classes[int(cls)].lower() == "person":
person_detected = True
break
return person_detected

def detect_person_in_video(video_path, model):
"""处理单个视频,按照每3分钟段进行人检测"""
try:
logging.info(f"开始处理视频: {video_path}")
print(f"开始处理视频: {video_path}")
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
logging.error(f"无法打开视频文件: {video_path}")
print(f"无法打开视频文件: {video_path}")
return False

width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
total_seconds = total_frames / fps if fps else 0
logging.info(f"视频信息 - 宽度: {width}, 高度: {height}, 帧率: {fps}, 总时长: {total_seconds:.2f} 秒")
print(f"视频信息 - 宽度: {width}, 高度: {height}, 帧率: {fps}, 总时长: {total_seconds:.2f} 秒")

if total_seconds <= 0:
logging.error(f"视频时长无效: {video_path}")
print(f"视频时长无效: {video_path}")
cap.release()
return False

# 计算3分钟段数
num_intervals = int(total_seconds // INTERVAL_DURATION_SECONDS)
if num_intervals == 0:
logging.error(f"视频时长不足3分钟: {video_path}")
print(f"视频时长不足3分钟: {video_path}")
cap.release()
return False

passed_segments = 0 # 通过检测的段数

# 逐段检测
for interval_idx in range(num_intervals):
if passed_segments >= 3:
# 已经有足够的段通过,提前结束
logging.info(f"已达到足够的通过段数 ({passed_segments}),提前结束检测")
print(f"已达到足够的通过段数 ({passed_segments}),提前结束检测")
break

interval_start = interval_idx * INTERVAL_DURATION_SECONDS
interval_end = (interval_idx + 1) * INTERVAL_DURATION_SECONDS
detection_times = get_random_detection_times(interval_start, interval_end, DETECTION_COUNT_PER_INTERVAL)
logging.info(f"段 {interval_idx + 1}/{num_intervals} 检测时间点(秒): {detection_times}")
print(f"段 {interval_idx + 1}/{num_intervals} 检测时间点(秒): {detection_times}")

detections = 0 # 检测到人的次数

for detection_time in detection_times:
frame_number = int(detection_time * fps)
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_number)
ret, frame = cap.read()
if not ret:
logging.warning(f"无法读取视频 {video_path} 的第 {frame_number} 帧")
print(f"无法读取视频 {video_path} 的第 {frame_number} 帧")
continue

# 确保帧内容有效
if frame is None or frame.size == 0:
logging.warning(f"读取到空帧,在时间 {detection_time:.2f} 秒")
print(f"读取到空帧,在时间 {detection_time:.2f} 秒")
continue

# 使用锁确保线程安全
with model_lock:
person_detected = detect_person_many(model, frame)

if person_detected:
detections += 1
logging.info(f"检测到人,在时间 {detection_time:.2f} 秒")
print(f"检测到人,在时间 {detection_time:.2f} 秒")
else:
logging.info(f"未检测到人,在时间 {detection_time:.2f} 秒")
print(f"未检测到人,在时间 {detection_time:.2f} 秒")

logging.info(f"段 {interval_idx + 1}/{num_intervals} 检测到人次数: {detections}/{DETECTION_COUNT_PER_INTERVAL}")
print(f"段 {interval_idx + 1}/{num_intervals} 检测到人次数: {detections}/{DETECTION_COUNT_PER_INTERVAL}")

if detections == DETECTION_THRESHOLD:
passed_segments += 1
logging.info(f"段 {interval_idx + 1} 通过检测,当前通过段数: {passed_segments}")
print(f"段 {interval_idx + 1} 通过检测,当前通过段数: {passed_segments}")
else:
logging.info(f"段 {interval_idx + 1} 未达到检测阈值")
print(f"段 {interval_idx + 1} 未达到检测阈值")

cap.release()

if passed_segments >= 3:
logging.info(f"视频处理完成且符合要求(通过段数: {passed_segments}): {video_path}")
print(f"视频处理完成且符合要求(通过段数: {passed_segments}): {video_path}")
return True # 符合要求
else:
logging.info(f"视频处理完成但不符合要求(通过段数: {passed_segments}): {video_path}")
print(f"视频处理完成但不符合要求(通过段数: {passed_segments}): {video_path}")
return False # 不符合要求

except Exception as e:
logging.error(f"处理视频时出错: {video_path}, 错误: {str(e)}")
print(f"处理视频时出错: {video_path}, 错误: {str(e)}")
return False

def process_batch(video_files, model):
"""处理一批视频文件"""
for video_file in video_files:
relative_path = os.path.relpath(video_file, r"E:\video竖屏_低分辨率")
detected_human_dir = os.path.join(DETECTED_HUMANS_DIR, os.path.dirname(relative_path))

create_output_dir(detected_human_dir)

if detect_person_in_video(video_file, model):
logging.info(f"检测到人且符合要求: {video_file}")
print(f"检测到人且符合要求: {video_file}")
shutil.move(video_file, os.path.join(detected_human_dir, os.path.basename(video_file)))
else:
logging.info(f"未检测到人或不符合要求: {video_file}")
print(f"未检测到人或不符合要求: {video_file}")

def main():
video_files = []
root_path = r"E:\video竖屏_低分辨率"
root = Path(root_path).resolve()
exclude_dirs = {'$RECYCLE.BIN', 'System Volume Information', 'Windows', 'Program Files', 'Program Files (x86)'}

logging.info(f"开始查找目录 {root_path} 下的视频文件")
print(f"开始查找目录 {root_path} 下的视频文件")

for extension in ('*.mp4', '*.avi', '*.mov', '*.mkv', '*.m4v', '*.WMV', '*.rmvb', '*.DAT', '*.VOB'):
video_files += [
str(file.resolve()) for file in root.rglob(extension)
if not any(excluded.lower() in (part.lower() for part in file.parts) for excluded in exclude_dirs)
]

if not video_files:
logging.error("没有找到符合条件的文件")
raise ValueError("没有找到符合条件的文件")

# 加载模型(在主线程中加载一次模型,供所有线程使用)
model = load_model(MODEL_NAME)

start_main_time = time.time()

logging.info(f"开始分批处理视频,共有 {len(video_files)} 个文件,批次大小为 {BATCH_SIZE}")
print(f"开始分批处理视频,共有 {len(video_files)} 个文件,批次大小为 {BATCH_SIZE}")

# 多线程处理
with ThreadPoolExecutor(max_workers=THREAD_COUNT) as executor:
for i in range(0, len(video_files), BATCH_SIZE):
batch = video_files[i:i + BATCH_SIZE]
executor.submit(process_batch, batch, model)

logging.info(f"所有视频文件处理完成,总耗时 {time.time() - start_main_time:.2f} 秒")
print(f"所有视频文件处理完成,总耗时 {time.time() - start_main_time:.2f} 秒")

if __name__ == "__main__":
main()

'''代码修改详解

### 1. **模型加载与初始化**

YOLOv5 使用 PyTorch 实现,通过 `torch.hub` 方便地加载模型。我们在主线程中加载一次模型,并在所有线程中共享该模型对象,以避免重复加载和节省内存。

'''
'''
def load_model(model_name='yolov5s'):
"""加载YOLOv5模型,返回模型对象"""
model = torch.hub.load('ultralytics/yolov5', model_name, pretrained=True)
# 设置模型为评估模式
model.eval()
return model
'''
Prev:
nlp scapy use
Next:
使用js log库和python FastApi完成 微信小程序日志保存与上传