Nuclio与CVAT使用yolov8目标检测模型实现半自动化标注
Published in:2025-04-07 |
Words: 5.8k | Reading time: 26min | reading:

Nuclio Dashboard 是 Nuclio Serverless 平台的一个基于 Web 的图形用户界面(GUI),它允许用户方便地管理(创建、查看、编辑、删除、部署)Nuclio 函数、项目、事件源(Triggers)等资源。

核心概念:

  1. 部署 (Deployment) vs 构建 (Building from Source):

    • 部署: 对于大多数用户来说,“构建” Nuclio Dashboard 意味着部署预先构建好的 Docker 镜像。这是最常见和推荐的方式。你不需要从源代码编译它。
    • 从源码构建: 这是指获取 Nuclio Dashboard 的源代码并自行编译生成可执行文件或 Docker 镜像。这通常只在开发 Nuclio 本身或需要特殊定制时才需要,对于普通用户不推荐。
  2. 运行环境: Nuclio(及其 Dashboard)可以在多种环境中运行,最常见的是:

    • Standalone Docker: 在单个 Docker 主机上运行。
    • Kubernetes: 在 Kubernetes 集群中运行,这是生产环境中最常见的方式。

部署 Nuclio Dashboard 的方法以及如何使用它。


一、部署 Nuclio Dashboard

Nuclio Dashboard 通常与 Nuclio Controller 一起部署。Controller 是 Nuclio 的核心控制平面,而 Dashboard 是其 UI。

方法一:在 Standalone Docker 环境中部署

这是最简单的方法,适合本地测试和开发。通常,当你运行 Nuclio Controller 容器时,它已经包含了 Dashboard。

  1. 拉取并运行 Nuclio Controller 镜像:

    1
    docker run -p 8070:8070 -v /var/run/docker.sock:/var/run/docker.sock --name nuclio-dashboard quay.io/nuclio/dashboard:stable-amd64
    • -d: 后台运行容器。
    • -p 8070:8070: 将主机的 8070 端口映射到容器的 8070 端口(Nuclio Dashboard 默认监听此端口)。
    • -v /var/run/docker.sock:/var/run/docker.sock: 允许 Nuclio Controller 与宿主机的 Docker守护进程通信,以便构建和运行函数容器。(注意安全风险)
    • -v /tmp:/tmp: 提供临时文件存储。
    • --name nuclio-controller: 给容器命名。
    • nuclio/controller:stable: 使用的镜像。
    • --platform-kind "local""docker": 指定平台类型为本地 Docker 环境。
  2. 访问 Dashboard:
    部署成功后,在浏览器中打开 http://<你的主机IP>:8070http://localhost:8070 即可访问 Nuclio Dashboard。

方法二:在 Kubernetes 环境中部署

这是在生产或集群环境中最常用的方法。推荐使用 Helm Chart 来部署 Nuclio。

  1. 添加 Nuclio Helm 仓库 (如果尚未添加):

    1
    2
    helm repo add nuclio https://nuclio.github.io/nuclio/charts
    helm repo update
  2. 安装 Nuclio Chart:
    这将同时部署 Nuclio Controller(包含 Dashboard)和其他必要的组件(如 CRDs)。

    1
    2
    3
    4
    5
    # 创建一个 namespace (推荐)
    kubectl create namespace nuclio

    # 使用 Helm 安装
    helm install nuclio nuclio/nuclio --namespace nuclio
    • 你可以通过创建 values.yaml 文件或使用 --set 参数来自定义安装选项(例如,指定 Service 类型、资源限制等)。
  3. 访问 Dashboard:
    部署后,需要一种方式从集群外部访问 Dashboard Service。常用方法有:

    • Port Forwarding (用于临时访问/调试):
      1
      2
      3
      4
      # 找到 Nuclio Dashboard Pod 的名称 (通常包含 'controller')
      kubectl get pods -n nuclio
      # 假设 pod 名称为 nuclio-controller-xxxxxxxxxx-yyyyy
      kubectl port-forward -n nuclio <nuclio-controller-pod-name> 8070:8070
      然后在浏览器中访问 http://localhost:8070
    • LoadBalancer Service: 如果你的 Kubernetes 集群支持 LoadBalancer 类型的 Service,并且你在 Helm 安装时配置了 Dashboard Service 为 LoadBalancer 类型,你可以获取其外部 IP 地址来访问。
      1
      kubectl get svc -n nuclio # 查找 dashboard service 的 EXTERNAL-IP
      然后在浏览器中访问 http://<EXTERNAL-IP>:8070
    • NodePort Service: 如果配置为 NodePort,你可以通过 http://<任意节点IP>:<NodePort端口> 访问。
    • Ingress: 配置 Ingress 资源,通过 Ingress Controller 提供的域名或路径来访问 Dashboard。这是生产环境中最推荐的方式。

img

二、使用 Nuclio Dashboard

当你成功访问 Nuclio Dashboard 后,你会看到一个用户友好的界面。以下是主要功能区域和常见操作:

  1. 概览 (Overview / Dashboard):

    • 通常是进入后的首页,显示项目和函数的摘要信息,例如总数、运行状态等。
  2. 项目 (Projects):

    • Nuclio 使用项目来组织函数。一个项目可以包含多个函数。
    • 创建项目: 点击 “New Project” 或类似按钮,输入项目名称和可选的描述、标签。
    • 查看项目: 点击项目名称进入项目详情页,可以看到该项目下的所有函数。
    • 编辑/删除项目: 在项目列表或详情页通常有编辑和删除的操作按钮。
  3. 函数 (Functions):

    • 这是 Nuclio 的核心。你可以在项目内部创建和管理函数。
    • 创建函数:
      • 进入一个项目,点击 “New Function” 或 “Create Function”。
      • 模板选择: 你可以选择一个预定义的模板(例如,Go 的 HTTP 处理、Python 的事件处理等),或者从头开始。
      • 配置:
        • Function Name: 函数的唯一名称。
        • Description/Labels: 可选的描述和标签。
        • Runtime: 选择函数的运行时语言(Go, Python, Node.js, Java, .NET Core, Shell, Binary)。
        • Source Code:
          • Inline Editor: 直接在 Web 编辑器中编写代码。
          • Upload: 上传代码文件或压缩包 (zip, jar)。
          • Git Repository: 从 Git 仓库拉取代码(需要配置)。
          • Image: 使用一个已经包含函数代码和依赖的 Docker 镜像。
        • Handler: 指定函数入口点(例如,Python 文件名:函数名,main:handler)。
        • Dependencies: 如果代码有外部依赖,需要在这里指定(例如,Python 的 requirements.txt 内容,Go 的 go.mod 等)。Nuclio 会在构建过程中安装它们。
        • Triggers: 配置触发函数的方式。最常见的是 http(通过 HTTP 请求触发),但也支持 Kafka, Kinesis, RabbitMQ, Cron (定时任务) 等多种事件源。需要配置触发器的详细信息(如 HTTP 路径、方法,Kafka 主题等)。
        • Environment Variables: 设置函数运行时可以访问的环境变量。
        • Volumes: (Kubernetes 环境) 挂载存储卷到函数 Pod。
        • Resources: 配置函数的资源请求和限制(CPU, Memory)。
        • Scaling: 配置自动伸缩参数(最小/最大副本数,目标 CPU 使用率等)。
        • Build Settings: 配置构建过程的参数,例如基础镜像、构建命令等。
    • 部署函数: 配置完成后,点击 “Deploy” 或 “Create”。Nuclio Controller 会开始构建函数镜像(如果需要)并将其实例化(在 Docker 中是启动容器,在 Kubernetes 中是创建 Deployment/Pod)。部署状态会显示在函数列表中(例如:Building, Ready, Error)。
    • 查看函数详情: 点击函数名称进入详情页。
      • Code & Configuration: 查看或编辑函数的代码和配置。
      • Triggers: 查看触发器信息,特别是 HTTP 触发器的调用 URL。
      • Logs: 查看函数的实时或历史日志输出。
      • Monitoring: 查看函数的调用次数、延迟、成功/失败率等指标(可能需要集成监控系统)。
      • Versions: 查看函数的部署历史版本。
    • 调用/测试函数: 对于 HTTP 触发器,可以直接在浏览器或使用 curl 等工具访问其 URL 来调用函数。Dashboard 有时也提供简单的调用测试界面。
    • 编辑/删除函数: 在函数列表或详情页进行操作。编辑后需要重新部署才能生效。
  4. API Gateways: (如果使用了 Nuclio 的 API Gateway 功能)

    • 用于创建和管理 API 网关,将外部请求路由到不同的 Nuclio 函数。
    • 可以配置路径、方法、认证、限流等。
  5. 设置/平台配置 (Settings / Platform Configuration): (可能在某些版本或模式下可见)

    • 查看 Nuclio 平台的配置信息。

基于yolov8模型部署函数计算到nuclio

yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
apiVersion: nuclio.io/v1beta1
kind: Function
metadata:
name: yolov8-detector
namespace: nuclio # Or your preferred namespace
annotations:
nuclio.io/project-name: # Add your project name if using Nuclio projects
spec:
description: "YOLOv8 Object Detection Function using model copied during build"
runtime: python:3.9 # Ensure this matches the Python version you need
handler: main:handler # Points to main.py -> handler function
env:
- name: HTTP_PROXY
value: http://192.168.10.123:7890
- name: HTTPS_PROXY
value: http://192.168.10.123:7890 # 或者 https://... 如果代理本身用https
- name: NO_PROXY
value: localhost,127.0.0.1,kubernetes.default.svc # 添加其他内部地址

build:
path: . # Build context is still copied somewhere by Nuclio, just maybe not where expected.
commands:
- 'apt-get update && apt-get install -y --no-install-recommends libgl1-mesa-glx libglib2.0-0'
- 'pip install --default-timeout=600 --no-cache-dir torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu'
- 'pip install -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple --default-timeout=600 --no-cache-dir ultralytics Pillow nuclio-sdk numpy'

# ---- Runtime Configuration ----
# (Optional but Recommended) Set resource limits, especially memory for large models
resources:
limits:
memory: "4Gi" # Adjust memory limit based on yolov8x needs (e.g., 4Gi, 6Gi, 8Gi)

python code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82

import json
import base64
from PIL import Image
import io
import nuclio_sdk
from ultralytics import YOLO
import numpy as np

def init_context(context: nuclio_sdk.Context):
"""初始化函数上下文,加载模型"""
context.logger.info("Initializing YOLOv8 model...")
# 从函数工作目录加载模型文件 (假设 .pt 文件和 main.py 在同一目录)
# 或者提供模型的绝对路径(如果在 function.yaml 中挂载了)
model_path = "/detect/yolov8/yolov8x.pt"
model = YOLO(model_path)
context.user_data.model = model
context.logger.info("Model loaded successfully.")

def handler(context: nuclio_sdk.Context, event: nuclio_sdk.Event):
"""处理输入的 HTTP 请求"""
context.logger.info("Received event.")

# 检查请求体类型
if isinstance(event.body, bytes):
try:
data = json.loads(event.body.decode('utf-8'))
except Exception as e:
context.logger.error(f"Failed to decode JSON body: {e}")
return context.Response(body=f"Bad request: {e}", status_code=400)
elif isinstance(event.body, dict):
data = event.body
else:
return context.Response(body="Unsupported event body type", status_code=400)

# 从请求中获取 base64 编码的图像数据
if 'image' not in data:
return context.Response(body="Missing 'image' field in request body", status_code=400)

try:
image_b64 = data['image']
image_bytes = base64.b64decode(image_b64)
image = Image.open(io.BytesIO(image_bytes)).convert('RGB') # 确保是 RGB
except Exception as e:
context.logger.error(f"Failed to decode base64 image: {e}")
return context.Response(body=f"Invalid image data: {e}", status_code=400)

# 使用加载的模型进行预测
context.logger.info("Performing inference...")
model = context.user_data.model
results = model(image) # results 是一个列表,通常包含一个 Results 对象
context.logger.info("Inference complete.")

# 处理预测结果
detections = []
if results and results[0]:
result = results[0] # 获取第一个图像的结果
boxes = result.boxes # Boxes object
names = result.names # Class names dict {id: name}

for i in range(len(boxes)):
box = boxes[i]
xyxy = box.xyxy.cpu().numpy().flatten().tolist() # [x1, y1, x2, y2]
conf = float(box.conf.cpu().numpy()) # 置信度
cls_id = int(box.cls.cpu().numpy()) # 类别 ID
label = names[cls_id] # 类别名称

detections.append({
"label": label,
"confidence": conf,
# CVAT 通常需要 [xtl, ytl, xbr, ybr] 格式的整数坐标
"points": [int(xyxy[0]), int(xyxy[1]), int(xyxy[2]), int(xyxy[3])]
# 或者根据需要返回归一化坐标或其他格式
# "xtl": xyxy[0], "ytl": xyxy[1], "xbr": xyxy[2], "ybr": xyxy[3]
})

context.logger.info(f"Detected {len(detections)} objects.")

# 返回 JSON 格式的检测结果
return context.Response(body=json.dumps(detections),
content_type='application/json',
status_code=200)

web http 调用 yolov8模型

可输出 cvat v1.1格式的标注后xml文件,根据输入视频输出目标检测标注结果生成代码 获得标注结果后 压缩标注结果文件 导入cvat对应项目对应task

  • 代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
# -*- coding: utf-8 -
import cv2
import base64
import requests
import json
import os
import logging
import time
from datetime import timedelta
import xml.etree.ElementTree as ET
from xml.dom import minidom # 用于美化打印XML

# --- 配置区 ---
VIDEO_PATH = "/Users/zg/PycharmProjects/CVAT_model_nuclio/src/run/data/1.mp4" # <<<--- 修改这里: 你的视频文件的完整路径
NUCLIO_FUNCTION_URL = "http://192.168.10.158:32774" # YOLOv8检测器的端口
OUTPUT_DIR = "./data/" # 输出XML文件的目录
LOG_FILE = "./logs/video_processing_cvat_images_cn.log" # 日志文件名
FRAME_SKIP = 5 # 处理每 N 帧 (1 = 处理所有帧, 5 = 每5帧处理1帧)
REQUEST_TIMEOUT = 30 # 等待Nuclio响应的超时时间(秒)

# 创建日志和数据目录(如果不存在)
os.makedirs('./logs/', exist_ok=True)
os.makedirs(OUTPUT_DIR, exist_ok=True)


# --- 设置日志记录 ---
logging.basicConfig(
level=logging.INFO, # 日志级别
format='%(asctime)s - %(levelname)s - %(message)s', # 日志格式
handlers=[
logging.FileHandler(LOG_FILE, encoding='utf-8'), # 输出日志到文件,使用utf-8编码
logging.StreamHandler() # 同时输出日志到控制台
]
)


# --- 辅助函数 ---
def encode_frame_to_base64(frame):
"""将OpenCV读取的帧(NumPy数组)编码为Base64字符串。"""
try:
# 将帧编码为内存中的JPEG格式
is_success, buffer = cv2.imencode(".jpg", frame)
if not is_success:
logging.error("无法将帧编码为JPEG。")
return None
# 将字节缓冲区编码为Base64字符串
b64_string = base64.b64encode(buffer).decode('utf-8')
return b64_string
except Exception as e:
logging.error(f"帧编码过程中出错: {e}")
return None

def call_nuclio_detector(base64_image_string, frame_number):
"""将Base64编码的图像发送给Nuclio函数并返回检测结果。"""
payload = json.dumps({"image": base64_image_string})
headers = {'Content-Type': 'application/json'}

try:
start_time = time.time()
# 发送POST请求
response = requests.post(NUCLIO_FUNCTION_URL, headers=headers, data=payload, timeout=REQUEST_TIMEOUT)
end_time = time.time()
logging.info(f"帧 {frame_number}: Nuclio请求耗时 {end_time - start_time:.2f} 秒。")

# 检查响应状态码
if response.status_code == 200:
try:
detections = response.json()
# 验证检测结果的基本结构
if not isinstance(detections, list):
logging.error(f"帧 {frame_number}: Nuclio响应不是列表。收到类型: {type(detections)}")
return None
valid_detections = []
for det in detections:
if isinstance(det, dict) and 'label' in det and 'points' in det and 'confidence' in det:
if isinstance(det['points'], list) and len(det['points']) == 4:
valid_detections.append(det)
else:
logging.warning(f"帧 {frame_number}: 跳过无效'points'的检测结果: {det.get('points')}")
else:
logging.warning(f"帧 {frame_number}: 跳过缺少键或类型错误的检测结果: {det}")
logging.info(f"帧 {frame_number}: 收到 {len(valid_detections)} 个有效检测结果。")
return valid_detections
except json.JSONDecodeError:
logging.error(f"帧 {frame_number}: 解析Nuclio的JSON响应失败。状态码: {response.status_code}, 响应体(前200字符): {response.text[:200]}...")
return None
except Exception as e_val:
logging.error(f"帧 {frame_number}: 验证Nuclio响应结构时出错: {e_val}")
return None
else:
logging.error(f"帧 {frame_number}: Nuclio函数返回错误。状态码: {response.status_code}, 响应体(前200字符): {response.text[:200]}...")
return None
except requests.exceptions.Timeout:
logging.error(f"帧 {frame_number}: 请求Nuclio函数超时 ({REQUEST_TIMEOUT}秒)。")
return None
except requests.exceptions.RequestException as e:
logging.error(f"帧 {frame_number}: 请求Nuclio函数失败: {e}")
return None
except Exception as e:
logging.error(f"帧 {frame_number}: 调用Nuclio时发生意外错误: {e}")
return None


# --- XML 生成 ---
def pretty_print_xml(elem):
"""返回包含声明且格式化(美化)的XML字符串。"""
try:
# 将ElementTree元素转换为utf-8字节字符串
rough_string = ET.tostring(elem, 'utf-8')
# 使用minidom解析字节字符串
reparsed = minidom.parseString(rough_string)
# 使用toprettyxml进行格式化,包含缩进和XML声明
# 确保它返回utf-8编码的字节字符串,然后解码为python字符串
xml_str = reparsed.toprettyxml(indent=" ", encoding="utf-8").decode('utf-8')
return xml_str
except Exception as e:
logging.error(f"XML美化打印过程中出错: {e}")
# 返回None,表示需要调用函数中的后备方案
return None

def save_results_to_cvat_image_xml(all_results, xml_output_path, video_filename, frame_width, frame_height, total_processed_frames, original_total_frames):
"""将检测结果保存为 CVAT XML 1.1 for Images 格式。"""
logging.info(f"正在为 {len(all_results)} 个处理过的帧构建CVAT图像XML输出...")

# --- 构建XML结构 ---
root = ET.Element("annotations")
ET.SubElement(root, "version").text = "1.1"

# Meta元信息
meta = ET.SubElement(root, "meta")
task = ET.SubElement(meta, "task")
ET.SubElement(task, "id").text = "N/A" # 占位符
ET.SubElement(task, "name").text = video_filename # 使用视频名作为任务名
# 注意: size在这里表示标注的图像数量 (即处理过的帧数)
ET.SubElement(task, "size").text = str(total_processed_frames)
ET.SubElement(task, "mode").text = "annotation" # 模式改为 annotation (图像模式)
ET.SubElement(task, "overlap").text = "0"
ET.SubElement(task, "bugtracker").text = ""
current_time_utc = time.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + "+00:00" # 当前UTC时间
ET.SubElement(task, "created").text = current_time_utc
ET.SubElement(task, "updated").text = current_time_utc
ET.SubElement(task, "start_frame").text = "0"
# stop_frame对应最后一帧的索引(0-based)
# 我们基于处理过的帧来计算,找到最大的帧号
last_processed_frame_num = max(all_results.keys()) if all_results else 0
ET.SubElement(task, "stop_frame").text = str(last_processed_frame_num - 1 if last_processed_frame_num > 0 else 0)

ET.SubElement(task, "frame_filter").text = "" # 如果有跳帧,可以写在这里,但通常导入时不需要

# 提取并添加标签信息
labels = set()
if all_results:
for frame_dets in all_results.values():
if frame_dets:
for det in frame_dets:
labels.add(str(det.get('label', 'unknown')))

labels_elem = ET.SubElement(task, "labels")
if not labels: # 如果没有检测到任何标签,添加一个默认标签
logging.warning("在检测结果中未找到标签,添加默认'unknown'标签。")
labels.add('unknown')

for label_name in sorted(list(labels)):
label_elem = ET.SubElement(labels_elem, "label")
ET.SubElement(label_elem, "name").text = label_name
ET.SubElement(label_elem, "color").text = "" # CVAT会自动分配颜色
ET.SubElement(label_elem, "type").text = "rectangle" # 假设都是矩形框
ET.SubElement(label_elem, "attributes") # 这里不定义属性

# 添加最小化的 segments 信息 (CVAT格式要求)
segments = ET.SubElement(task, "segments")
segment = ET.SubElement(segments, "segment")
ET.SubElement(segment, "id").text = "0"
ET.SubElement(segment, "start").text = "0"
ET.SubElement(segment, "stop").text = str(original_total_frames - 1) # 使用原始视频的总帧数
ET.SubElement(segment, "url").text = "N/A" # 占位符

ET.SubElement(task, "owner") # 空的 owner
ET.SubElement(task, "assignee") # 空的 assignee
ET.SubElement(task, "subset").text = "Default" # 默认子集

# 原始尺寸信息 (需要)
original_size = ET.SubElement(meta, "original_size")
ET.SubElement(original_size, "width").text = str(frame_width)
ET.SubElement(original_size, "height").text = str(frame_height)

ET.SubElement(meta, "dumped").text = current_time_utc # 导出时间

# 为每个处理过的帧添加 <image> 标注
if all_results:
for frame_number in sorted(all_results.keys()): # 按帧号排序
detections = all_results[frame_number]
if not detections: # 如果该帧没有检测结果,跳过(理论上不应发生,因为我们只存储有结果的帧)
continue

# 创建 <image> 元素
image_elem = ET.SubElement(root, "image")
# CVAT 使用 0-based 索引作为 id
image_elem.set("id", str(frame_number - 1))
# 生成一个规范的帧名 (可选,但推荐)
image_elem.set("name", f"frame_{frame_number:06d}") # 格式化帧号,如 frame_000123
image_elem.set("width", str(frame_width))
image_elem.set("height", str(frame_height))

# 在 <image> 元素内部添加 <box> 元素
for det in detections:
box_elem = ET.SubElement(image_elem, "box")
box_elem.set("label", str(det.get('label', 'unknown')))

points = det.get('points', [0, 0, 0, 0])
# 坐标使用两位小数的浮点数格式
box_elem.set("xtl", f"{float(points[0]):.2f}")
box_elem.set("ytl", f"{float(points[1]):.2f}")
box_elem.set("xbr", f"{float(points[2]):.2f}")
box_elem.set("ybr", f"{float(points[3]):.2f}")

# 对于图像格式,通常不需要 outside,但 occluded 可以保留
box_elem.set("occluded", "0") # 0 表示未遮挡 (假设)
# 在 CVAT for Images 格式中,不需要 keyframe 和 outside
# box_elem.set("outside", "0") # 通常不需要为图像设置outside

# 将置信度添加为属性
attr_conf = ET.SubElement(box_elem, "attribute", name="confidence")
try:
confidence_val = float(det.get('confidence', 0.0))
except (ValueError, TypeError):
confidence_val = 0.0
attr_conf.text = f"{confidence_val:.4f}"

# --- 结束构建XML结构 ---


# --- 写入 XML 文件 ---
try:
# 获取包含声明且格式化的完整XML字符串
full_xml_string = pretty_print_xml(root)

if full_xml_string:
# 在写入前确保字符串以 <?xml 开头
if not full_xml_string.strip().startswith("<?xml"):
logging.error("美化打印未能生成有效的XML开头。")
raise ValueError("美化打印失败。")

with open(xml_output_path, "w", encoding="utf-8") as f:
# 直接写入完整的字符串
f.write(full_xml_string)
logging.info(f"检测结果已保存至 CVAT 图像 XML: {xml_output_path}")
else:
# 如果 pretty_print_xml 返回 None,使用后备方案
logging.warning("美化打印失败,回退到基础XML写入器。")
tree = ET.ElementTree(root)
# 如果 Python >= 3.9, 可以使用 ET.indent 添加基本缩进
if hasattr(ET, 'indent'):
ET.indent(tree, space=" ", level=0)
# 使用 ElementTree 写入,并请求 XML 声明
tree.write(xml_output_path, encoding='utf-8', xml_declaration=True)
logging.info(f"检测结果已保存至基础 CVAT XML (后备方案): {xml_output_path}")

except Exception as e:
logging.error(f"保存结果到 CVAT XML 文件失败: {e}")


# --- 主处理逻辑 ---
def main():
logging.info(f"开始处理视频: {VIDEO_PATH}")
logging.info(f"Nuclio 函数 URL: {NUCLIO_FUNCTION_URL}")
logging.info(f"处理帧间隔: 每 {FRAME_SKIP} 帧")

if not os.path.exists(VIDEO_PATH):
logging.error(f"视频文件未找到: {VIDEO_PATH}")
return

cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
logging.error(f"打开视频文件错误: {VIDEO_PATH}")
return

# 获取视频元数据
original_total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
fps = cap.get(cv2.CAP_PROP_FPS)
frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
if original_total_frames <= 0:
logging.error(f"视频文件似乎为空或元数据错误 (总帧数: {original_total_frames})。")
cap.release()
return
logging.info(f"视频信息: 总帧数: {original_total_frames}, FPS: {fps:.2f}, 尺寸: {frame_width}x{frame_height}")

frame_count = 0 # 已读取的帧计数 (1-based for logging)
processed_frame_count = 0 # 实际处理的帧计数
all_results = {} # 存储结果: {帧号 (1-based): [检测列表]}
start_process_time = time.time() # 开始计时

while True:
# 读取一帧
ret, frame = cap.read()

# 检查是否成功读取
if not ret:
if frame_count < original_total_frames:
logging.warning(f"无法读取帧 {frame_count + 1} (总帧数 {original_total_frames}),假定视频结束。")
else:
logging.info("视频处理到达结尾。")
break # 退出循环

frame_count += 1 # 更新已读取帧计数

# --- 跳帧逻辑 ---
if FRAME_SKIP > 1 and frame_count % FRAME_SKIP != 0 :
continue # 跳过当前帧

# --- 处理当前帧 ---
processed_frame_count += 1 # 更新已处理帧计数
logging.info(f"正在处理帧 {frame_count}/{original_total_frames}...")

# 编码帧
b64_string = encode_frame_to_base64(frame)
if not b64_string:
logging.warning(f"因编码错误跳过帧 {frame_count}。")
continue

# 调用 Nuclio 检测器
detections = call_nuclio_detector(b64_string, frame_count)

# 存储检测结果 (仅当检测结果非空时存储)
if detections:
all_results[frame_count] = detections # 使用 1-based 帧号作为键

# --- 进度估计 (可选) ---
if processed_frame_count > 0 and processed_frame_count % 10 == 0: # 每处理10帧更新一次进度
try:
elapsed_time = time.time() - start_process_time
# 估计总共需要处理的帧数
estimated_total_to_process = (original_total_frames // FRAME_SKIP) if FRAME_SKIP > 1 else original_total_frames
if estimated_total_to_process > 0:
fraction_done = processed_frame_count / estimated_total_to_process
# 避免除零错误和进度超过100%的情况
if fraction_done > 0 and fraction_done <= 1:
total_estimated_time = elapsed_time / fraction_done
estimated_remaining_time = max(0, total_estimated_time - elapsed_time) # 确保不为负
logging.info(
f"进度: 帧 {frame_count}/{original_total_frames}。已处理 {processed_frame_count} 帧。预计剩余时间: {timedelta(seconds=int(estimated_remaining_time))}")
except ZeroDivisionError:
logging.warning("无法估计剩余时间 (除零错误)。")
except Exception as e_est:
logging.warning(f"无法估计剩余时间: {e_est}")


# --- 清理与保存 ---
cap.release() # 释放视频捕获对象
logging.info("视频捕获已释放。")

# 确定输出XML文件名
video_basename = os.path.basename(VIDEO_PATH)
video_name_no_ext, _ = os.path.splitext(video_basename)
# 文件名表明是CVAT图像格式
xml_output_filename = f"{video_name_no_ext}_cvat_images.xml"
xml_output_path = os.path.join(OUTPUT_DIR, xml_output_filename)

# 保存为 CVAT 图像 XML
if all_results:
# 传递处理过的帧数给size字段
save_results_to_cvat_image_xml(
all_results,
xml_output_path,
video_basename,
frame_width,
frame_height,
processed_frame_count, # 传递处理过的帧数
original_total_frames # 传递原始总帧数
)
else:
logging.warning("没有记录到任何检测结果,将不会创建CVAT XML文件。")

# --- 结束处理 ---
end_process_time = time.time()
total_time = max(0, end_process_time - start_process_time) # 确保总时间非负
logging.info(f"视频处理完成。总耗时: {timedelta(seconds=int(total_time))}")
logging.info(f"总共读取帧数: {frame_count}。实际处理帧数: {processed_frame_count}")


# --- 程序入口 ---
if __name__ == "__main__":
main()

see

Prev:
基于yolov8根据CVAT标注结果训练模型
Next:
ffmpeg分割视频与音频中的处理问题分析