1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385
| import cv2 import base64 import requests import json import os import logging import time from datetime import timedelta import xml.etree.ElementTree as ET from xml.dom import minidom
VIDEO_PATH = "/Users/zg/PycharmProjects/CVAT_model_nuclio/src/run/data/1.mp4" NUCLIO_FUNCTION_URL = "http://192.168.10.158:32774" OUTPUT_DIR = "./data/" LOG_FILE = "./logs/video_processing_cvat_images_cn.log" FRAME_SKIP = 5 REQUEST_TIMEOUT = 30
os.makedirs('./logs/', exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True)
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE, encoding='utf-8'), logging.StreamHandler() ] )
def encode_frame_to_base64(frame): """将OpenCV读取的帧(NumPy数组)编码为Base64字符串。""" try: is_success, buffer = cv2.imencode(".jpg", frame) if not is_success: logging.error("无法将帧编码为JPEG。") return None b64_string = base64.b64encode(buffer).decode('utf-8') return b64_string except Exception as e: logging.error(f"帧编码过程中出错: {e}") return None
def call_nuclio_detector(base64_image_string, frame_number): """将Base64编码的图像发送给Nuclio函数并返回检测结果。""" payload = json.dumps({"image": base64_image_string}) headers = {'Content-Type': 'application/json'}
try: start_time = time.time() response = requests.post(NUCLIO_FUNCTION_URL, headers=headers, data=payload, timeout=REQUEST_TIMEOUT) end_time = time.time() logging.info(f"帧 {frame_number}: Nuclio请求耗时 {end_time - start_time:.2f} 秒。")
if response.status_code == 200: try: detections = response.json() if not isinstance(detections, list): logging.error(f"帧 {frame_number}: Nuclio响应不是列表。收到类型: {type(detections)}") return None valid_detections = [] for det in detections: if isinstance(det, dict) and 'label' in det and 'points' in det and 'confidence' in det: if isinstance(det['points'], list) and len(det['points']) == 4: valid_detections.append(det) else: logging.warning(f"帧 {frame_number}: 跳过无效'points'的检测结果: {det.get('points')}") else: logging.warning(f"帧 {frame_number}: 跳过缺少键或类型错误的检测结果: {det}") logging.info(f"帧 {frame_number}: 收到 {len(valid_detections)} 个有效检测结果。") return valid_detections except json.JSONDecodeError: logging.error(f"帧 {frame_number}: 解析Nuclio的JSON响应失败。状态码: {response.status_code}, 响应体(前200字符): {response.text[:200]}...") return None except Exception as e_val: logging.error(f"帧 {frame_number}: 验证Nuclio响应结构时出错: {e_val}") return None else: logging.error(f"帧 {frame_number}: Nuclio函数返回错误。状态码: {response.status_code}, 响应体(前200字符): {response.text[:200]}...") return None except requests.exceptions.Timeout: logging.error(f"帧 {frame_number}: 请求Nuclio函数超时 ({REQUEST_TIMEOUT}秒)。") return None except requests.exceptions.RequestException as e: logging.error(f"帧 {frame_number}: 请求Nuclio函数失败: {e}") return None except Exception as e: logging.error(f"帧 {frame_number}: 调用Nuclio时发生意外错误: {e}") return None
def pretty_print_xml(elem): """返回包含声明且格式化(美化)的XML字符串。""" try: rough_string = ET.tostring(elem, 'utf-8') reparsed = minidom.parseString(rough_string) xml_str = reparsed.toprettyxml(indent=" ", encoding="utf-8").decode('utf-8') return xml_str except Exception as e: logging.error(f"XML美化打印过程中出错: {e}") return None
def save_results_to_cvat_image_xml(all_results, xml_output_path, video_filename, frame_width, frame_height, total_processed_frames, original_total_frames): """将检测结果保存为 CVAT XML 1.1 for Images 格式。""" logging.info(f"正在为 {len(all_results)} 个处理过的帧构建CVAT图像XML输出...")
root = ET.Element("annotations") ET.SubElement(root, "version").text = "1.1"
meta = ET.SubElement(root, "meta") task = ET.SubElement(meta, "task") ET.SubElement(task, "id").text = "N/A" ET.SubElement(task, "name").text = video_filename ET.SubElement(task, "size").text = str(total_processed_frames) ET.SubElement(task, "mode").text = "annotation" ET.SubElement(task, "overlap").text = "0" ET.SubElement(task, "bugtracker").text = "" current_time_utc = time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + "+00:00" ET.SubElement(task, "created").text = current_time_utc ET.SubElement(task, "updated").text = current_time_utc ET.SubElement(task, "start_frame").text = "0" last_processed_frame_num = max(all_results.keys()) if all_results else 0 ET.SubElement(task, "stop_frame").text = str(last_processed_frame_num - 1 if last_processed_frame_num > 0 else 0)
ET.SubElement(task, "frame_filter").text = ""
labels = set() if all_results: for frame_dets in all_results.values(): if frame_dets: for det in frame_dets: labels.add(str(det.get('label', 'unknown')))
labels_elem = ET.SubElement(task, "labels") if not labels: logging.warning("在检测结果中未找到标签,添加默认'unknown'标签。") labels.add('unknown')
for label_name in sorted(list(labels)): label_elem = ET.SubElement(labels_elem, "label") ET.SubElement(label_elem, "name").text = label_name ET.SubElement(label_elem, "color").text = "" ET.SubElement(label_elem, "type").text = "rectangle" ET.SubElement(label_elem, "attributes")
segments = ET.SubElement(task, "segments") segment = ET.SubElement(segments, "segment") ET.SubElement(segment, "id").text = "0" ET.SubElement(segment, "start").text = "0" ET.SubElement(segment, "stop").text = str(original_total_frames - 1) ET.SubElement(segment, "url").text = "N/A"
ET.SubElement(task, "owner") ET.SubElement(task, "assignee") ET.SubElement(task, "subset").text = "Default"
original_size = ET.SubElement(meta, "original_size") ET.SubElement(original_size, "width").text = str(frame_width) ET.SubElement(original_size, "height").text = str(frame_height)
ET.SubElement(meta, "dumped").text = current_time_utc
if all_results: for frame_number in sorted(all_results.keys()): detections = all_results[frame_number] if not detections: continue
image_elem = ET.SubElement(root, "image") image_elem.set("id", str(frame_number - 1)) image_elem.set("name", f"frame_{frame_number:06d}") image_elem.set("width", str(frame_width)) image_elem.set("height", str(frame_height))
for det in detections: box_elem = ET.SubElement(image_elem, "box") box_elem.set("label", str(det.get('label', 'unknown')))
points = det.get('points', [0, 0, 0, 0]) box_elem.set("xtl", f"{float(points[0]):.2f}") box_elem.set("ytl", f"{float(points[1]):.2f}") box_elem.set("xbr", f"{float(points[2]):.2f}") box_elem.set("ybr", f"{float(points[3]):.2f}")
box_elem.set("occluded", "0")
attr_conf = ET.SubElement(box_elem, "attribute", name="confidence") try: confidence_val = float(det.get('confidence', 0.0)) except (ValueError, TypeError): confidence_val = 0.0 attr_conf.text = f"{confidence_val:.4f}"
try: full_xml_string = pretty_print_xml(root)
if full_xml_string: if not full_xml_string.strip().startswith("<?xml"): logging.error("美化打印未能生成有效的XML开头。") raise ValueError("美化打印失败。")
with open(xml_output_path, "w", encoding="utf-8") as f: f.write(full_xml_string) logging.info(f"检测结果已保存至 CVAT 图像 XML: {xml_output_path}") else: logging.warning("美化打印失败,回退到基础XML写入器。") tree = ET.ElementTree(root) if hasattr(ET, 'indent'): ET.indent(tree, space=" ", level=0) tree.write(xml_output_path, encoding='utf-8', xml_declaration=True) logging.info(f"检测结果已保存至基础 CVAT XML (后备方案): {xml_output_path}")
except Exception as e: logging.error(f"保存结果到 CVAT XML 文件失败: {e}")
def main(): logging.info(f"开始处理视频: {VIDEO_PATH}") logging.info(f"Nuclio 函数 URL: {NUCLIO_FUNCTION_URL}") logging.info(f"处理帧间隔: 每 {FRAME_SKIP} 帧")
if not os.path.exists(VIDEO_PATH): logging.error(f"视频文件未找到: {VIDEO_PATH}") return
cap = cv2.VideoCapture(VIDEO_PATH) if not cap.isOpened(): logging.error(f"打开视频文件错误: {VIDEO_PATH}") return
original_total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) if original_total_frames <= 0: logging.error(f"视频文件似乎为空或元数据错误 (总帧数: {original_total_frames})。") cap.release() return logging.info(f"视频信息: 总帧数: {original_total_frames}, FPS: {fps:.2f}, 尺寸: {frame_width}x{frame_height}")
frame_count = 0 processed_frame_count = 0 all_results = {} start_process_time = time.time()
while True: ret, frame = cap.read()
if not ret: if frame_count < original_total_frames: logging.warning(f"无法读取帧 {frame_count + 1} (总帧数 {original_total_frames}),假定视频结束。") else: logging.info("视频处理到达结尾。") break
frame_count += 1
if FRAME_SKIP > 1 and frame_count % FRAME_SKIP != 0 : continue
processed_frame_count += 1 logging.info(f"正在处理帧 {frame_count}/{original_total_frames}...")
b64_string = encode_frame_to_base64(frame) if not b64_string: logging.warning(f"因编码错误跳过帧 {frame_count}。") continue
detections = call_nuclio_detector(b64_string, frame_count)
if detections: all_results[frame_count] = detections
if processed_frame_count > 0 and processed_frame_count % 10 == 0: try: elapsed_time = time.time() - start_process_time estimated_total_to_process = (original_total_frames // FRAME_SKIP) if FRAME_SKIP > 1 else original_total_frames if estimated_total_to_process > 0: fraction_done = processed_frame_count / estimated_total_to_process if fraction_done > 0 and fraction_done <= 1: total_estimated_time = elapsed_time / fraction_done estimated_remaining_time = max(0, total_estimated_time - elapsed_time) logging.info( f"进度: 帧 {frame_count}/{original_total_frames}。已处理 {processed_frame_count} 帧。预计剩余时间: {timedelta(seconds=int(estimated_remaining_time))}") except ZeroDivisionError: logging.warning("无法估计剩余时间 (除零错误)。") except Exception as e_est: logging.warning(f"无法估计剩余时间: {e_est}")
cap.release() logging.info("视频捕获已释放。")
video_basename = os.path.basename(VIDEO_PATH) video_name_no_ext, _ = os.path.splitext(video_basename) xml_output_filename = f"{video_name_no_ext}_cvat_images.xml" xml_output_path = os.path.join(OUTPUT_DIR, xml_output_filename)
if all_results: save_results_to_cvat_image_xml( all_results, xml_output_path, video_basename, frame_width, frame_height, processed_frame_count, original_total_frames ) else: logging.warning("没有记录到任何检测结果,将不会创建CVAT XML文件。")
end_process_time = time.time() total_time = max(0, end_process_time - start_process_time) logging.info(f"视频处理完成。总耗时: {timedelta(seconds=int(total_time))}") logging.info(f"总共读取帧数: {frame_count}。实际处理帧数: {processed_frame_count}")
if __name__ == "__main__": main()
|