
| import cv2 import base64 import requests import json import os import logging import time from datetime import timedelta import xml.etree.ElementTree as ET from xml.dom import minidom
VIDEO_PATH = "/Users/zg/PycharmProjects/CVAT_model_nuclio/src/run/data/1.mp4" NUCLIO_FUNCTION_URL = "http://192.168.10.158:32774" OUTPUT_DIR = "./data/" LOG_FILE = "./logs/video_processing_cvat_images_cn.log" FRAME_SKIP = 5 REQUEST_TIMEOUT = 30
os.makedirs('./logs/', exist_ok=True) os.makedirs(OUTPUT_DIR, exist_ok=True)
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(LOG_FILE, encoding='utf-8'), logging.StreamHandler() ] )
def encode_frame_to_base64(frame): """将OpenCV读取的帧(NumPy数组)编码为Base64字符串。""" try: is_success, buffer = cv2.imencode(".jpg", frame) if not is_success: logging.error("无法将帧编码为JPEG。") return None b64_string = base64.b64encode(buffer).decode('utf-8') return b64_string except Exception as e: logging.error(f"帧编码过程中出错: {e}") return None
def call_nuclio_detector(base64_image_string, frame_number): """将Base64编码的图像发送给Nuclio函数并返回检测结果。""" payload = json.dumps({"image": base64_image_string}) headers = {'Content-Type': 'application/json'}
try: start_time = time.time() response = requests.post(NUCLIO_FUNCTION_URL, headers=headers, data=payload, timeout=REQUEST_TIMEOUT) end_time = time.time() logging.info(f"帧 {frame_number}: Nuclio请求耗时 {end_time - start_time:.2f} 秒。")
if response.status_code == 200: try: detections = response.json() if not isinstance(detections, list): logging.error(f"帧 {frame_number}: Nuclio响应不是列表。收到类型: {type(detections)}") return None valid_detections = [] for det in detections: if isinstance(det, dict) and 'label' in det and 'points' in det and 'confidence' in det: if isinstance(det['points'], list) and len(det['points']) == 4: valid_detections.append(det) else: logging.warning(f"帧 {frame_number}: 跳过无效'points'的检测结果: {det.get('points')}") else: logging.warning(f"帧 {frame_number}: 跳过缺少键或类型错误的检测结果: {det}") logging.info(f"帧 {frame_number}: 收到 {len(valid_detections)} 个有效检测结果。") return valid_detections except json.JSONDecodeError: logging.error(f"帧 {frame_number}: 解析Nuclio的JSON响应失败。状态码: {response.status_code}, 响应体(前200字符): {response.text[:200]}...") return None except Exception as e_val: logging.error(f"帧 {frame_number}: 验证Nuclio响应结构时出错: {e_val}") return None else: logging.error(f"帧 {frame_number}: Nuclio函数返回错误。状态码: {response.status_code}, 响应体(前200字符): {response.text[:200]}...") return None except requests.exceptions.Timeout: logging.error(f"帧 {frame_number}: 请求Nuclio函数超时 ({REQUEST_TIMEOUT}秒)。") return None except requests.exceptions.RequestException as e: logging.error(f"帧 {frame_number}: 请求Nuclio函数失败: {e}") return None except Exception as e: logging.error(f"帧 {frame_number}: 调用Nuclio时发生意外错误: {e}") return None
def pretty_print_xml(elem): """返回包含声明且格式化(美化)的XML字符串。""" try: rough_string = ET.tostring(elem, 'utf-8') reparsed = minidom.parseString(rough_string) xml_str = reparsed.toprettyxml(indent=" ", encoding="utf-8").decode('utf-8') return xml_str except Exception as e: logging.error(f"XML美化打印过程中出错: {e}") return None
def save_results_to_cvat_image_xml(all_results, xml_output_path, video_filename, frame_width, frame_height, total_processed_frames, original_total_frames): """将检测结果保存为 CVAT XML 1.1 for Images 格式。""" logging.info(f"正在为 {len(all_results)} 个处理过的帧构建CVAT图像XML输出...")
root = ET.Element("annotations") ET.SubElement(root, "version").text = "1.1"
meta = ET.SubElement(root, "meta") task = ET.SubElement(meta, "task") ET.SubElement(task, "id").text = "N/A" ET.SubElement(task, "name").text = video_filename ET.SubElement(task, "size").text = str(total_processed_frames) ET.SubElement(task, "mode").text = "annotation" ET.SubElement(task, "overlap").text = "0" ET.SubElement(task, "bugtracker").text = "" current_time_utc = time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + "+00:00" ET.SubElement(task, "created").text = current_time_utc ET.SubElement(task, "updated").text = current_time_utc ET.SubElement(task, "start_frame").text = "0" last_processed_frame_num = max(all_results.keys()) if all_results else 0 ET.SubElement(task, "stop_frame").text = str(last_processed_frame_num - 1 if last_processed_frame_num > 0 else 0)
ET.SubElement(task, "frame_filter").text = ""
labels = set() if all_results: for frame_dets in all_results.values(): if frame_dets: for det in frame_dets: labels.add(str(det.get('label', 'unknown')))
labels_elem = ET.SubElement(task, "labels") if not labels: logging.warning("在检测结果中未找到标签,添加默认'unknown'标签。") labels.add('unknown')
for label_name in sorted(list(labels)): label_elem = ET.SubElement(labels_elem, "label") ET.SubElement(label_elem, "name").text = label_name ET.SubElement(label_elem, "color").text = "" ET.SubElement(label_elem, "type").text = "rectangle" ET.SubElement(label_elem, "attributes")
segments = ET.SubElement(task, "segments") segment = ET.SubElement(segments, "segment") ET.SubElement(segment, "id").text = "0" ET.SubElement(segment, "start").text = "0" ET.SubElement(segment, "stop").text = str(original_total_frames - 1) ET.SubElement(segment, "url").text = "N/A"
ET.SubElement(task, "owner") ET.SubElement(task, "assignee") ET.SubElement(task, "subset").text = "Default"
original_size = ET.SubElement(meta, "original_size") ET.SubElement(original_size, "width").text = str(frame_width) ET.SubElement(original_size, "height").text = str(frame_height)
ET.SubElement(meta, "dumped").text = current_time_utc
if all_results: for frame_number in sorted(all_results.keys()): detections = all_results[frame_number] if not detections: continue
image_elem = ET.SubElement(root, "image") image_elem.set("id", str(frame_number - 1)) image_elem.set("name", f"frame_{frame_number:06d}") image_elem.set("width", str(frame_width)) image_elem.set("height", str(frame_height))
for det in detections: box_elem = ET.SubElement(image_elem, "box") box_elem.set("label", str(det.get('label', 'unknown')))
points = det.get('points', [0, 0, 0, 0]) box_elem.set("xtl", f"{float(points[0]):.2f}") box_elem.set("ytl", f"{float(points[1]):.2f}") box_elem.set("xbr", f"{float(points[2]):.2f}") box_elem.set("ybr", f"{float(points[3]):.2f}")
box_elem.set("occluded", "0")
attr_conf = ET.SubElement(box_elem, "attribute", name="confidence") try: confidence_val = float(det.get('confidence', 0.0)) except (ValueError, TypeError): confidence_val = 0.0 attr_conf.text = f"{confidence_val:.4f}"
try: full_xml_string = pretty_print_xml(root)
if full_xml_string: if not full_xml_string.strip().startswith("<?xml"): logging.error("美化打印未能生成有效的XML开头。") raise ValueError("美化打印失败。")
with open(xml_output_path, "w", encoding="utf-8") as f: f.write(full_xml_string) logging.info(f"检测结果已保存至 CVAT 图像 XML: {xml_output_path}") else: logging.warning("美化打印失败,回退到基础XML写入器。") tree = ET.ElementTree(root) if hasattr(ET, 'indent'): ET.indent(tree, space=" ", level=0) tree.write(xml_output_path, encoding='utf-8', xml_declaration=True) logging.info(f"检测结果已保存至基础 CVAT XML (后备方案): {xml_output_path}")
except Exception as e: logging.error(f"保存结果到 CVAT XML 文件失败: {e}")
def main(): logging.info(f"开始处理视频: {VIDEO_PATH}") logging.info(f"Nuclio 函数 URL: {NUCLIO_FUNCTION_URL}") logging.info(f"处理帧间隔: 每 {FRAME_SKIP} 帧")
if not os.path.exists(VIDEO_PATH): logging.error(f"视频文件未找到: {VIDEO_PATH}") return
cap = cv2.VideoCapture(VIDEO_PATH) if not cap.isOpened(): logging.error(f"打开视频文件错误: {VIDEO_PATH}") return
original_total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) fps = cap.get(cv2.CAP_PROP_FPS) frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) if original_total_frames <= 0: logging.error(f"视频文件似乎为空或元数据错误 (总帧数: {original_total_frames})。") cap.release() return logging.info(f"视频信息: 总帧数: {original_total_frames}, FPS: {fps:.2f}, 尺寸: {frame_width}x{frame_height}")
frame_count = 0 processed_frame_count = 0 all_results = {} start_process_time = time.time()
while True: ret, frame = cap.read()
if not ret: if frame_count < original_total_frames: logging.warning(f"无法读取帧 {frame_count + 1} (总帧数 {original_total_frames}),假定视频结束。") else: logging.info("视频处理到达结尾。") break
frame_count += 1
if FRAME_SKIP > 1 and frame_count % FRAME_SKIP != 0 : continue
processed_frame_count += 1 logging.info(f"正在处理帧 {frame_count}/{original_total_frames}...")
b64_string = encode_frame_to_base64(frame) if not b64_string: logging.warning(f"因编码错误跳过帧 {frame_count}。") continue
detections = call_nuclio_detector(b64_string, frame_count)
if detections: all_results[frame_count] = detections
if processed_frame_count > 0 and processed_frame_count % 10 == 0: try: elapsed_time = time.time() - start_process_time estimated_total_to_process = (original_total_frames // FRAME_SKIP) if FRAME_SKIP > 1 else original_total_frames if estimated_total_to_process > 0: fraction_done = processed_frame_count / estimated_total_to_process if fraction_done > 0 and fraction_done <= 1: total_estimated_time = elapsed_time / fraction_done estimated_remaining_time = max(0, total_estimated_time - elapsed_time) logging.info( f"进度: 帧 {frame_count}/{original_total_frames}。已处理 {processed_frame_count} 帧。预计剩余时间: {timedelta(seconds=int(estimated_remaining_time))}") except ZeroDivisionError: logging.warning("无法估计剩余时间 (除零错误)。") except Exception as e_est: logging.warning(f"无法估计剩余时间: {e_est}")
cap.release() logging.info("视频捕获已释放。")
video_basename = os.path.basename(VIDEO_PATH) video_name_no_ext, _ = os.path.splitext(video_basename) xml_output_filename = f"{video_name_no_ext}_cvat_images.xml" xml_output_path = os.path.join(OUTPUT_DIR, xml_output_filename)
if all_results: save_results_to_cvat_image_xml( all_results, xml_output_path, video_basename, frame_width, frame_height, processed_frame_count, original_total_frames ) else: logging.warning("没有记录到任何检测结果,将不会创建CVAT XML文件。")
end_process_time = time.time() total_time = max(0, end_process_time - start_process_time) logging.info(f"视频处理完成。总耗时: {timedelta(seconds=int(total_time))}") logging.info(f"总共读取帧数: {frame_count}。实际处理帧数: {processed_frame_count}")
if __name__ == "__main__": main()
|