diff --git a/专业领域/ByteTrack/YOLO+ByteTrack水流速度检测-可行性详细分析.md b/专业领域/ByteTrack/YOLO+ByteTrack水流速度检测-可行性详细分析.md new file mode 100755 index 0000000..fdcc6f6 --- /dev/null +++ b/专业领域/ByteTrack/YOLO+ByteTrack水流速度检测-可行性详细分析.md @@ -0,0 +1,1808 @@ +# YOLO + ByteTrack 水流速度检测 — 可行性详细分析与完整实现指南 + +> **结论先行:技术上完全可行,但精度和适用场景有明确边界。** +> +> 归档:2026-04-23 + +--- + +## 一、可行性总评 + +### 1.1 核心结论 + +| 维度 | 评级 | 说明 | +|------|------|------| +| **技术可行性** | ✅ 高 | YOLO 检测 + ByteTrack 追踪在原理上完全成立,已有类似应用 | +| **精度可行性** | ⚠️ 中 | 取决于标定质量、水面特征丰富度、相机角度;典型误差 5-15% | +| **工程可行性** | ✅ 高 | 成熟开源生态,GPU 可实时,CPU 勉强可跑 | +| **成本可行性** | ✅ 高 | 相机成本低(几百到几千),远低于 ADCP/雷达流速仪 | +| **部署可行性** | ⚠️ 中 | 边缘设备需要优化(TensorRT/INT8),但完全可做到 | + +### 1.2 与传统方法对比 + +| 方法 | 精度 | 成本 | 实时性 | 部署难度 | 适用场景 | +|------|------|------|--------|---------|---------| +| **ADCP** | ±1-2% | 5-20万 | 是 | 高(需入水) | 深水、精确测量 | +| **雷达流速仪** | ±2-5% | 2-10万 | 是 | 中 | 定点测量 | +| **LSPIV(传统)** | ±5-10% | 1-3万 | 中 | 中高 | 开阔水面 | +| **YOLO+ByteTrack** | ±5-15% | 0.5-2万 | 是 | 中 | 开阔水面、有示踪物 | +| **浮标测速** | ±5-10% | 0.1-1万 | 否 | 低 | 单次测量 | + +**关键洞察**:YOLO+ByteTrack 本质上是 **LSPIV(大尺度粒子图像测速)** 的现代深度学习变体。传统 LSPIV 用互相关算法追踪水面纹理,我们用 YOLO 检测离散示踪物 + ByteTrack 追踪轨迹。 + +--- + +## 二、适用场景与限制 + +### 2.1 适用的场景 + +✅ **适合使用此方案:** +- 开阔河道、渠道、排水口的水面流速测量 +- 有天然示踪物(树叶、泡沫、漂浮物)的水面 +- 有固定摄像头安装位置(桥梁、河岸建筑) +- 洪水监测、排污监测、生态流量监测 +- 无人机航拍水面流速调查 + +❌ **不适合使用此方案:** +- 非常清澈、无特征的水面(无法检测/追踪) +- 夜间无照明条件 +- 暴雨导致镜头模糊的情况 +- 需要精确垂向流速剖面(此方法仅测表面流速) +- 水面波动剧烈导致示踪物翻转的情况 + +### 2.2 关键精度影响因素 + +| 因素 | 影响程度 | 应对策略 | +|------|---------|---------| +| 相机标定误差 | 高 | 多点标定、定期验证 | +| 透视变形 | 高 | 单应性变换校正 | +| 示踪物跟随性 | 中 | 选择合适大小的示踪物 | +| 水面湍流 | 中 | 增加追踪帧数、平滑滤波 | +| 镜头畸变 | 低-中 | 相机标定校正 | +| 光照变化 | 中 | 数据增强、HDR | + +--- + +## 三、完整实现步骤 + +### Step 1:硬件准备与部署 + +#### 1.1 设备清单 + +``` +必备: +├── 工业/监控相机(1080p @ 30fps 以上) +│ ├── 推荐:Hikvision DS-2CD 系列 / Dahua IPC 系列 +│ ├── 预算方案:普通 USB 相机 + 防水外壳 +│ └── 高端方案:FLIR 全局快门相机(减少运动模糊) +├── 边缘计算设备 +│ ├── NVIDIA Jetson Orin Nano / NX(推荐) +│ ├── Jetson Xavier NX(中等预算) +│ └── 树莓派 5 + Coral TPU(仅推理,性能有限) +├── 固定支架/杆(确保相机稳定) +└── 参考标尺(用于像素-实际距离标定) + +可选: +├── 补光灯(夜间使用) +├── 偏振滤镜(减少水面反光) +├── 气象防护罩 +└── 4G/5G 模块(远程数据传输) +``` + +#### 1.2 相机部署位置选择 + +``` +最佳位置特征: +1. 俯视或斜视角度(30°-60° 俯角) +2. 覆盖足够长的测量断面(至少 5-10 米水面) +3. 避免逆光拍摄 +4. 相机到水面的距离和角度要固定不变 +5. 确保视野内有可用于标定的已知尺寸参考物 + +不推荐: +1. 正对水面(90° 俯视)— 透视信息不足 +2. 几乎水平拍摄 — 透视畸变过大 +3. 水面反光直射方向 +``` + +### Step 2:相机标定(关键步骤) + +#### 2.1 标定方法概述 + +有三种主要方法将像素坐标转换为实际坐标: + +**方法 A:简单比例因子法(最简单,适合相机正对河道)** + +```python +# 只需要一个已知长度的参考物 +# 例如:河面上已知 2 米宽的浮标 + +# 在图像上测量浮标的像素宽度 +reference_pixels = measure_width_in_image(image, reference_object) +real_width_meters = 2.0 + +pixels_per_meter = reference_pixels / real_width_meters +# 假设所有方向比例相同(仅在正对小范围区域时有效) +``` + +**方法 B:单应性变换法(推荐,适合斜视相机)** + +```python +import cv2 +import numpy as np + +class PerspectiveCalibrator: + """ + 通过地面控制点 (GCP) 建立像素坐标到实际坐标的映射 + 使用单应性矩阵 (Homography) 进行透视变换 + """ + def __init__(self): + self.homography_matrix = None + + def calibrate(self, pixel_points, real_points): + """ + pixel_points: Nx2 数组,图像上的控制点像素坐标 + real_points: Nx2 数组,对应的实际坐标 (米) + + 至少需要 4 个控制点 + """ + assert len(pixel_points) >= 4, "至少需要4个控制点" + + pixel_points = np.array(pixel_points, dtype=np.float32) + real_points = np.array(real_points, dtype=np.float32) + + # 计算单应性矩阵 + H, status = cv2.findHomography(pixel_points, real_points) + self.homography_matrix = H + + # 评估标定精度 + errors = [] + for p, r in zip(pixel_points, real_points): + # 将像素点变换到实际坐标 + p_hom = np.array([p[0], p[1], 1.0]) + r_pred = H @ p_hom + r_pred = r_pred[:2] / r_pred[2] # 归一化 + error = np.linalg.norm(r_pred - r) + errors.append(error) + + mean_error = np.mean(errors) + max_error = np.max(errors) + + print(f"标定精度 — 平均误差: {mean_error:.3f}m, 最大误差: {max_error:.3f}m") + return mean_error, max_error + + def pixel_to_real(self, pixel_coords): + """将像素坐标转换为实际坐标(米)""" + assert self.homography_matrix is not None, "请先标定" + + pts = np.array([[pixel_coords[0], pixel_coords[1]]], dtype=np.float32) + pts = cv2.perspectiveTransform(pts.reshape(1, -1, 2), self.homography_matrix) + return pts[0][0] + + def batch_pixel_to_real(self, pixel_coords_array): + """批量转换""" + pts = np.array(pixel_coords_array, dtype=np.float32).reshape(-1, 1, 2) + transformed = cv2.perspectiveTransform(pts, self.homography_matrix) + return transformed.reshape(-1, 2) + +# ---- 使用示例 ---- +calibrator = PerspectiveCalibrator() + +# 在图像上标记至少4个已知实际坐标的点 +# 例如:河道两岸的固定标志物、桥梁栏杆等 +pixel_control_points = [ + [100, 200], # 点1 的像素坐标 + [500, 180], # 点2 + [150, 600], # 点3 + [600, 580], # 点4 + [350, 400], # 点5(可选,提高精度) +] + +# 对应的实际坐标(需要实地测量,单位:米) +# 以某个基准点为原点建立平面坐标系 +real_control_points = [ + [0.0, 0.0], # 点1 的实际坐标 + [5.2, 0.3], # 点2 + [0.5, 3.1], # 点3 + [5.8, 3.4], # 点4 + [3.0, 1.8], # 点5 +] + +mean_err, max_err = calibrator.calibrate(pixel_control_points, real_control_points) +``` + +**方法 C:相机内参+外参标定(最精确,适合高精度需求)** + +```python +class FullCameraCalibrator: + """ + 使用棋盘格标定内参,再用PnP求解外参 + 适用于需要高精度的场景 + """ + def __init__(self): + self.camera_matrix = None + self.dist_coeffs = None + self.rvec = None + self.tvec = None + + def intrinsic_calibration(self, chessboard_images, chessboard_size=(9, 6), square_size=0.025): + """ + 使用棋盘格图像标定相机内参 + chessboard_images: 从不同角度拍摄的棋盘格图像列表 + chessboard_size: 棋盘格内角点数量 + square_size: 每个格子的实际物理尺寸(米) + """ + criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001) + objp = np.zeros((chessboard_size[0] * chessboard_size[1], 3), np.float32) + objp[:, :2] = np.mgrid[0:chessboard_size[0], 0:chessboard_size[1]].T.reshape(-1, 2) * square_size + + objpoints = [] # 实际3D点 + imgpoints = [] # 2D图像点 + + for img in chessboard_images: + gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) + ret, corners = cv2.findChessboardCorners(gray, chessboard_size, None) + if ret: + objpoints.append(objp) + corners2 = cv2.cornerSubPix(gray, corners, (11, 11), (-1, -1), criteria) + imgpoints.append(corners2) + + ret, self.camera_matrix, self.dist_coeffs, rvecs, tvecs = \ + cv2.calibrateCamera(objpoints, imgpoints, gray.shape[::-1], None, None) + + return ret + + def extrinsic_calibration(self, world_points, image_points): + """ + 使用已知世界坐标和对应图像点求解相机外参 + """ + world_points = np.array(world_points, dtype=np.float32) + image_points = np.array(image_points, dtype=np.float32) + + ret, self.rvec, self.tvec = cv2.solvePnP( + world_points, image_points, self.camera_matrix, self.dist_coeffs + ) + return ret + + def project_to_ground(self, pixel_point, ground_z=0.0): + """ + 将图像上的点投影到地平面(假设水面为 z=0 平面) + 需要考虑水面高度与参考平面的偏差 + """ + # 使用射线-平面相交方法 + # 详细实现略(涉及较多线性代数) + pass +``` + +#### 2.2 标定实操指南 + +``` +现场标定流程: + +1. 选择标定日,准备卷尺、GPS(可选)、标记物 + +2. 在相机视野内设置至少 4 个地面控制点 (GCP): + - 使用鲜艳的标记物(橙色锥桶、喷漆标记) + - 分布在测量区域内,不要太集中 + - 确保标记物在水面上方可见 + +3. 用卷尺测量各 GCP 之间的实际距离 + - 建立平面坐标系(以某个 GCP 为原点) + - 记录每个 GCP 的 (x, y) 实际坐标 + +4. 拍摄标定图像 + - 确保所有 GCP 在画面中清晰可见 + - 记录此时相机参数(焦距、角度)— 之后不能改变 + +5. 在图像上标记每个 GCP 的像素坐标 + - 使用标注工具或手动记录 + +6. 运行标定算法,检查误差 + - 平均误差 < 0.1m 为优秀 + - 平均误差 < 0.3m 为可用 + - 误差过大需重新标定 + +7. 保存标定参数(H 矩阵)供后续使用 +``` + +### Step 3:数据采集与标注 + +#### 3.1 数据采集策略 + +```python +import cv2 +import os +from pathlib import Path + +class DataCollector: + """ + 从相机视频流中采集训练数据 + """ + def __init__(self, source=0, output_dir="./raw_data"): + self.source = source + self.output_dir = Path(output_dir) + self.output_dir.mkdir(parents=True, exist_ok=True) + + def collect_frames(self, interval_seconds=0.5, max_frames=1000): + """ + 从视频流中按间隔抽取帧 + + 注意: + - 需要覆盖不同时间段(早/中/晚) + - 需要覆盖不同天气条件 + - 需要覆盖不同水流状态(枯水/洪水) + """ + cap = cv2.VideoCapture(self.source) + fps = cap.get(cv2.CAP_PROP_FPS) + frame_interval = int(fps * interval_seconds) + + frame_count = 0 + saved_count = 0 + + while saved_count < max_frames: + ret, frame = cap.read() + if not ret: + break + + if frame_count % frame_interval == 0: + filename = self.output_dir / f"frame_{saved_count:06d}.jpg" + cv2.imwrite(str(filename), frame) + saved_count += 1 + print(f"Saved {saved_count}/{max_frames}") + + frame_count += 1 + + cap.release() + print(f"Collected {saved_count} frames from {frame_count} total frames") +``` + +#### 3.2 数据标注 + +**标注策略**: + +``` +目标类别设计: + +基础方案(1类): +└── floating_object(所有漂浮物统一标注) + 优点:简单,数据需求少 + 缺点:可能混入无关漂浮物 + +精细方案(3-5类): +├── natural_debris(树叶、树枝等天然漂浮物) +├── foam_bubble(泡沫) +├── artificial_tracer(人工投放的示踪物) +├── ice_chunk(冰块,冬季) +└── wave_crest(浪尖,可选) + 优点:可以过滤不可靠的示踪物 + 缺点:标注成本高 + +推荐方案: +初期用 1 类方案快速验证 +后期根据需求扩展到多类 +``` + +**标注工具选择**: + +| 工具 | 适合场景 | 价格 | 特点 | +|------|---------|------|------| +| **Roboflow** | 快速标注+自动预处理 | 免费层可用 | 自动标注辅助,直接导出 COCO | +| **CVAT** | 视频标注 | 免费开源 | 支持视频插值,适合大量帧 | +| **Label Studio** | 灵活自定义 | 免费开源 | 可扩展,支持多人协作 | +| **LabelImg** | 简单图像标注 | 免费开源 | 轻量,仅支持 YOLO/PascalVOC | + +**数据量建议**: + +``` +最低要求: +├── 300-500 张标注图像(快速验证) +├── 覆盖至少 3 种不同光照条件 +└── 包含各种大小的示踪物 + +生产级别: +├── 2000-5000 张标注图像 +├── 覆盖全天候光照变化 +├── 包含不同天气条件 +├── 包含不同流速状态 +└── 预留 20% 作为验证集 +``` + +#### 3.3 标注质量控制 + +```python +# 标注检查脚本 +import json +import cv2 +import numpy as np + +def validate_annotations(coco_json_path, image_dir, min_area=100, max_area=500000): + """检查标注质量""" + with open(coco_json_path) as f: + data = json.load(f) + + issues = [] + + for ann in data['annotations']: + bbox = ann['bbox'] # [x, y, w, h] + area = bbox[2] * bbox[3] + + if area < min_area: + issues.append(f"标注 {ann['id']} 面积过小: {area}px²") + if area > max_area: + issues.append(f"标注 {ann['id']} 面积过大: {area}px²") + + # 检查边界框是否在图像范围内 + img_info = next(img for img in data['images'] if img['id'] == ann['image_id']) + if (bbox[0] + bbox[2] > img_info['width'] or + bbox[1] + bbox[3] > img_info['height']): + issues.append(f"标注 {ann['id']} 超出图像边界") + + if issues: + print(f"发现 {len(issues)} 个标注问题:") + for issue in issues[:20]: + print(f" - {issue}") + else: + print("标注质量检查通过 ✓") + + return issues +``` + +### Step 4:模型训练 + +#### 4.1 推荐方案:YOLOv8 + Ultralytics 生态 + +> 原始笔记推荐 YOLOX,但截至 2025-2026,**YOLOv8 已成为更优选择**: +> - Ultralytics 提供内置的 ByteTrack 支持 +> - API 更简洁,部署更简单 +> - 模型性能更优 +> - 活跃的社区和文档 + +```bash +# ===== 环境搭建 ===== + +# 1. 创建 conda 环境 +conda create -n waterflow python=3.10 -y +conda activate waterflow + +# 2. 安装 PyTorch(根据你的 CUDA 版本) +pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121 + +# 3. 安装 Ultralytics +pip install ultralytics + +# 4. 安装其他依赖 +pip install opencv-python numpy scipy pandas matplotlib seaborn +pip install supervision # 可视化工具 +pip install roboflow # 数据管理(可选) +``` + +#### 4.2 数据准备(YOLOv8 格式) + +```yaml +# dataset.yaml +path: ./water_flow_dataset +train: images/train +val: images/val +test: images/test + +nc: 1 # 类别数 +names: ['floating_object'] # 类别名 +``` + +``` +数据集目录结构: +water_flow_dataset/ +├── images/ +│ ├── train/ +│ │ ├── frame_0001.jpg +│ │ ├── frame_0002.jpg +│ │ └── ... +│ ├── val/ +│ │ └── ... +│ └── test/ +│ └── ... +└── labels/ + ├── train/ + │ ├── frame_0001.txt # YOLO 格式标注 + │ ├── frame_0002.txt + │ └── ... + ├── val/ + │ └── ... + └── test/ + └── ... +``` + +**YOLO 标注格式**(每行一个目标): +``` +class_id x_center y_center width height +``` +所有值都是归一化的(0-1 范围),相对于图像宽高。 + +#### 4.3 模型训练 + +```python +from ultralytics import YOLO + +# ===== 方案 A:使用预训练模型微调 ===== + +# 加载预训练模型(推荐从 YOLOv8n 开始) +model = YOLO('yolov8n.pt') + +# 训练 +results = model.train( + data='dataset.yaml', # 数据集配置文件 + epochs=100, # 训练轮数 + imgsz=640, # 输入尺寸 + batch=16, # 批次大小 + device=0, # GPU 设备 + patience=20, # 早停轮数 + save_period=10, # 保存间隔 + cache=True, # 缓存数据到内存加速 + # 数据增强参数 + hsv_h=0.015, # 色调增强(水面光照变化) + hsv_s=0.7, # 饱和度增强 + hsv_v=0.4, # 亮度增强 + degrees=0.0, # 不旋转(相机固定) + translate=0.1, # 平移 + scale=0.5, # 缩放 + fliplr=0.5, # 水平翻转 + mosaic=1.0, # Mosaic 增强 + mixup=0.1, # Mixup 增强 + # 优化器 + optimizer='AdamW', + lr0=0.001, + lrf=0.01, + weight_decay=0.0005, +) + +# ===== 方案 B:从 nano 到 small 的选择 ===== + +# YOLOv8n (3.2M 参数) — 边缘部署首选 +model_nano = YOLO('yolov8n.pt') + +# YOLOv8s (11.2M 参数) — 平衡选择 +model_small = YOLO('yolov8s.pt') + +# YOLOv8m (25.9M 参数) — 更高精度 +model_medium = YOLO('yolov8m.pt') +``` + +#### 4.4 训练监控与评估 + +```python +from ultralytics import YOLO + +# 加载训练好的模型 +model = YOLO('runs/detect/train/weights/best.pt') + +# 在验证集上评估 +metrics = model.val() + +print(f"mAP@50: {metrics.box.map50:.4f}") +print(f"mAP@50-95: {metrics.box.map:.4f}") +print(f"Precision: {metrics.box.mp:.4f}") +print(f"Recall: {metrics.box.mr:.4f}") + +# 目标:mAP@50 > 0.85 为良好,> 0.90 为优秀 + +# 可视化混淆矩阵 +model.val(plot=True) + +# 测试集推理可视化 +results = model.predict( + source='images/test/', + save=True, + save_txt=True, + conf=0.25, + iou=0.45 +) +``` + +#### 4.5 训练问题排查 + +``` +常见问题及解决方案: + +问题 1:mAP 低于 0.5 +├── 原因:数据量不足或标注质量差 +├── 解决:增加标注数据,检查标注一致性 +└── 解决:增加训练轮数,检查学习率 + +问题 2:小目标检测效果差 +├── 原因:YOLOv8 对小目标检测有局限 +├── 解决:使用 YOLOv8s 或更大的模型 +├── 解决:使用更高的输入分辨率(imgsz=1280) +└── 解决:使用 SAHI(切片辅助推理) + +问题 3:过拟合 +├── 原因:训练数据不足或分布单一 +├── 解决:增加数据增强(mixup, mosaic, copy-paste) +└── 解决:收集更多样化的训练数据 + +问题 4:水面反光导致误检 +├── 原因:反光区域被误认为示踪物 +├── 解决:在训练数据中增加反光场景 +├── 解决:使用偏振滤镜 +└── 解决:提高置信度阈值 +``` + +### Step 5:ByteTrack 追踪集成 + +#### 5.1 方案对比:内置追踪器 vs 独立 ByteTrack + +``` +Ultralytics YOLOv8 内置了多种追踪器: + +1. BoT-SORT(默认)— 推荐 + - ByteTrack 的改进版 + - 加入相机运动补偿 + - 加入 ReID 特征 + - API 最简单 + +2. ByteTrack + - 原版 ByteTrack + - 轻量快速 + - 无相机运动补偿 + +3. DeepOCSORT + - 加入深度特征的 OCSORT + - 对遮挡处理更好 + +推荐:默认使用 BoT-SORT,如果性能瓶颈再切换到 ByteTrack +``` + +#### 5.2 追踪推理实现 + +```python +from ultralytics import YOLO +from collections import defaultdict +import cv2 +import numpy as np + +class WaterFlowTracker: + """ + 水流速度检测追踪器 + 集成 YOLO 检测 + 追踪 + 速度计算 + """ + + def __init__( + self, + model_path: str, + pixels_per_meter: float = None, + homography_matrix: np.ndarray = None, + conf_threshold: float = 0.3, + iou_threshold: float = 0.45, + tracker_type: str = "botsort", # "botsort" or "bytetrack" + ): + # 加载模型 + self.model = YOLO(model_path) + + # 标定参数 + self.pixels_per_meter = pixels_per_meter + self.homography_matrix = homography_matrix + + # 追踪参数 + self.conf_threshold = conf_threshold + self.iou_threshold = iou_threshold + self.tracker_type = tracker_type + + # 轨迹存储 {track_id: [(frame_idx, x, y), ...]} + self.trajectories = defaultdict(list) + + # 速度记录 {track_id: velocity_mps} + self.velocities = {} + + # 追踪配置 + if tracker_type == "botsort": + self.track_kwargs = { + "tracker": "botsort.yaml", + "track_high_thresh": 0.5, + "track_low_thresh": 0.1, + "new_track_thresh": 0.6, + "track_buffer": 30, # 最多容忍丢失30帧 + "match_thresh": 0.8, + "proximity_thresh": 0.5, + "appearance_thresh": 0.25, + "with_reid": True, # 使用 ReID 特征 + "gmc_method": "sparseOptFlow", # 相机运动补偿 + "frame_rate": 30, + } + else: # bytetrack + self.track_kwargs = { + "tracker": "bytetrack.yaml", + "track_thresh": 0.5, + "track_buffer": 30, + "match_thresh": 0.8, + } + + def process_video(self, video_path: str, output_path: str = None, + skip_frames: int = 0, max_frames: int = None): + """ + 处理视频文件 + + Args: + video_path: 输入视频路径 + output_path: 输出视频路径(可选) + skip_frames: 跳过的初始帧数 + max_frames: 最多处理帧数(None 表示处理全部) + """ + cap = cv2.VideoCapture(video_path) + fps = cap.get(cv2.CAP_PROP_FPS) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + + # 视频写入器 + writer = None + if output_path: + fourcc = cv2.VideoWriter_fourcc(*'mp4v') + writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height)) + + frame_idx = 0 + + while True: + ret, frame = cap.read() + if not ret: + break + + if frame_idx < skip_frames: + frame_idx += 1 + continue + + if max_frames and frame_idx - skip_frames >= max_frames: + break + + # 运行检测+追踪 + results = self.model.track( + frame, + persist=True, + conf=self.conf_threshold, + iou=self.iou_threshold, + verbose=False, + **self.track_kwargs + ) + + # 处理结果 + if results[0].boxes is not None and results[0].boxes.id is not None: + boxes = results[0].boxes.xyxy.cpu().numpy() + track_ids = results[0].boxes.id.cpu().numpy().astype(int) + confs = results[0].boxes.conf.cpu().numpy() + + for box, track_id, conf in zip(boxes, track_ids, confs): + # 计算中心点 + center_x = (box[0] + box[2]) / 2 + center_y = (box[1] + box[3]) / 2 + + # 更新轨迹 + self.trajectories[track_id].append((frame_idx, center_x, center_y)) + + # 计算速度(需要足够的历史帧) + if len(self.trajectories[track_id]) >= 5: + velocity = self._calculate_velocity(track_id, fps) + self.velocities[track_id] = velocity + + # 可视化(可选) + annotated_frame = results[0].plot() + + # 在画面上叠加速度信息 + if annotated_frame is not None: + self._draw_velocities(annotated_frame, boxes, track_ids) + + if writer: + writer.write(annotated_frame if annotated_frame is not None else frame) + + if frame_idx % 30 == 0: + print(f"Processed frame {frame_idx}") + + frame_idx += 1 + + cap.release() + if writer: + writer.release() + + print(f"Total frames processed: {frame_idx}") + print(f"Total tracks: {len(self.trajectories)}") + print(f"Tracks with velocity: {len(self.velocities)}") + + def _calculate_velocity(self, track_id: int, fps: float, + min_frames: int = 5, use_rolling: bool = True): + """ + 计算单个追踪对象的速度 + + Args: + track_id: 追踪 ID + fps: 视频帧率 + min_frames: 最少需要的帧数 + use_rolling: 是否使用滚动窗口(更稳定) + """ + traj = self.trajectories[track_id] + if len(traj) < min_frames: + return None + + if use_rolling: + # 使用最近 N 帧计算速度(对非匀速水流更准确) + window_size = min(15, len(traj)) + recent = traj[-window_size:] + else: + # 使用全部轨迹 + recent = traj + + start_frame, start_x, start_y = recent[0] + end_frame, end_x, end_y = recent[-1] + + # 像素位移 + dx_pixel = end_x - start_x + dy_pixel = end_y - start_y + + if self.homography_matrix is not None: + # 使用单应性矩阵转换 + start_real = self._pixel_to_real(start_x, start_y) + end_real = self._pixel_to_real(end_x, end_y) + dx_real = end_real[0] - start_real[0] + dy_real = end_real[1] - start_real[1] + distance_meters = np.sqrt(dx_real**2 + dy_real**2) + elif self.pixels_per_meter: + # 简单比例转换 + pixel_dist = np.sqrt(dx_pixel**2 + dy_pixel**2) + distance_meters = pixel_dist / self.pixels_per_meter + else: + # 返回像素速度 + time_sec = (end_frame - start_frame) / fps + return pixel_dist / time_sec # 像素/秒 + + # 时间 + time_sec = (end_frame - start_frame) / fps + if time_sec == 0: + return None + + # 速度(m/s) + velocity = distance_meters / time_sec + + return velocity + + def _pixel_to_real(self, px, py): + """像素坐标转实际坐标""" + pt = np.array([[[px, py]]], dtype=np.float32) + transformed = cv2.perspectiveTransform(pt, self.homography_matrix) + return transformed[0][0] + + def _draw_velocities(self, frame, boxes, track_ids): + """在画面上绘制速度信息""" + for box, track_id in zip(boxes, track_ids): + velocity = self.velocities.get(track_id) + if velocity is None: + continue + + # 在目标上方显示速度 + x = int((box[0] + box[2]) / 2) + y = int(box[1]) - 10 + + # 颜色根据速度(蓝-绿-红) + color = self._velocity_color(velocity) + + text = f"ID:{track_id} {velocity:.2f} m/s" + cv2.putText(frame, text, (x - 50, y), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) + + def _velocity_color(self, velocity): + """根据速度返回颜色""" + if velocity < 0.5: + return (255, 0, 0) # 蓝色 - 慢 + elif velocity < 1.5: + return (0, 255, 0) # 绿色 - 中 + else: + return (0, 0, 255) # 红色 - 快 + + def get_summary(self): + """获取流速汇总统计""" + if not self.velocities: + return {"error": "No velocity data"} + + values = [v for v in self.velocities.values() if v is not None] + if not values: + return {"error": "No valid velocity data"} + + return { + "count": len(values), + "mean_velocity": np.mean(values), + "median_velocity": np.median(values), + "std_velocity": np.std(values), + "min_velocity": np.min(values), + "max_velocity": np.max(values), + "q25": np.percentile(values, 25), + "q75": np.percentile(values, 75), + } +``` + +#### 5.3 独立 ByteTrack 集成(备选方案) + +如果需要使用原版 ByteTrack(而非 Ultralytics 内置版): + +```bash +# 安装 ByteTrack +git clone https://github.com/ifzhang/ByteTrack.git +cd ByteTrack +pip install -r requirements.txt +python setup.py develop + +# 安装 Cython 版的 NMS(加速) +pip install cython +cd yolox/layers && python setup.py build_ext --inplace +``` + +```python +import cv2 +import numpy as np +from yolox.tracker.byte_tracker import BYTETracker + +class CustomByteTrackProcessor: + """ + 使用原版 ByteTrack 的处理器 + 适用于需要精细控制追踪参数的场景 + """ + + def __init__(self, track_thresh=0.5, track_buffer=30, match_thresh=0.8): + # ByteTrack 参数 + track_args = { + 'track_thresh': track_thresh, + 'track_buffer': track_buffer, + 'match_thresh': match_thresh, + 'frame_rate': 30, + } + self.tracker = BYTETracker(track_args) + + def process_frame(self, frame, detections): + """ + 处理单帧 + detections: list of [x1, y1, x2, y2, confidence, class_id] + """ + # 分离高分框和低分框 + det_boxes = np.array(detections) + + # 运行追踪 + online_targets = self.tracker.update(det_boxes, frame.shape[:2], (1, 1)) + + results = [] + for t in online_targets: + results.append({ + 'track_id': t.track_id, + 'bbox': t.tlbr, + 'score': t.score, + }) + + return results +``` + +### Step 6:速度计算与后处理 + +#### 6.1 速度计算进阶方法 + +```python +import numpy as np +from scipy.signal import savgol_filter +from collections import defaultdict + +class AdvancedVelocityCalculator: + """ + 进阶流速计算器 + 包含多种速度估计方法和滤波技术 + """ + + def __init__(self, homography_matrix=None, fps=30): + self.H = homography_matrix + self.fps = fps + + def point_velocity(self, trajectory, method='total'): + """ + 从轨迹计算速度 + + trajectory: [(frame_idx, x, y), ...] + + 方法: + - 'total': 总位移/总时间(简单但粗糙) + - 'segment': 分段速度后取平均(更稳健) + - 'derivative': 位置微分(捕捉瞬时速度) + - 'kalman': 卡尔曼滤波估计 + """ + if len(trajectory) < 3: + return None + + if method == 'total': + return self._total_displacement(trajectory) + elif method == 'segment': + return self._segment_average(trajectory) + elif method == 'derivative': + return self._derivative_method(trajectory) + elif method == 'kalman': + return self._kalman_estimate(trajectory) + + def _total_displacement(self, trajectory): + """首尾位移法""" + start = trajectory[0] + end = trajectory[-1] + + dx_m, dy_m = self._to_real_distance( + start[1], start[2], end[1], end[2] + ) + distance = np.sqrt(dx_m**2 + dy_m**2) + time = (end[0] - start[0]) / self.fps + + return distance / time if time > 0 else None + + def _segment_average(self, trajectory, segment_size=10): + """分段计算,取中位数(抗异常值)""" + velocities = [] + for i in range(0, len(trajectory) - segment_size, segment_size // 2): + segment = trajectory[i:i + segment_size] + v = self._total_displacement(segment) + if v is not None: + velocities.append(v) + + if not velocities: + return None + + # 剔除异常值 + velocities = np.array(velocities) + q1, q3 = np.percentile(velocities, [25, 75]) + iqr = q3 - q1 + filtered = velocities[(velocities >= q1 - 1.5*iqr) & + (velocities <= q3 + 1.5*iqr)] + + return np.median(filtered) if len(filtered) > 0 else None + + def _derivative_method(self, trajectory, smoothing=True): + """数值微分法""" + frames = np.array([t[0] for t in trajectory]) + xs = np.array([t[1] for t in trajectory]) + ys = np.array([t[2] for t in trajectory]) + + if smoothing: + # Savitzky-Golay 滤波 + window = min(len(xs), 11) + if window % 2 == 0: + window -= 1 + if window >= 5: + xs = savgol_filter(xs, window, 2) + ys = savgol_filter(ys, window, 2) + + # 数值微分 + dt = 1.0 / self.fps + vx = np.gradient(xs) / dt + vy = np.gradient(ys) / dt + + # 转换到实际距离 + speed_pixel = np.sqrt(vx**2 + vy**2) + + # 平均像素速度转换为 m/s + avg_pixel_speed = np.mean(speed_pixel) + + if self.H is not None: + # 在轨迹中间位置采样转换因子 + mid_idx = len(trajectory) // 2 + mid_x, mid_y = trajectory[mid_idx][1], trajectory[mid_idx][2] + ppm = self._get_pixels_per_meter_at(mid_x, mid_y) + return avg_pixel_speed / ppm + else: + return avg_pixel_speed # 像素/秒 + + def _kalman_estimate(self, trajectory): + """卡尔曼滤波估计""" + from filterpy.kalman import KalmanFilter + + # 初始化卡尔曼滤波 + kf = KalmanFilter(dim_x=4, dim_z=2) + + # 状态: [x, y, vx, vy] + kf.F = np.array([[1, 0, 1/self.fps, 0], + [0, 1, 0, 1/self.fps], + [0, 0, 1, 0], + [0, 0, 0, 1]]) + + kf.H = np.array([[1, 0, 0, 0], + [0, 1, 0, 0]]) + + kf.R *= 5 # 测量噪声 + kf.Q[0:2, 0:2] *= 0.01 # 过程噪声 + kf.P *= 100 + + # 初始化 + first = trajectory[0] + kf.x = np.array([first[1], first[2], 0, 0]) + + # 更新 + for i, (frame, x, y) in enumerate(trajectory[1:]): + kf.predict() + kf.update(np.array([x, y])) + + # 速度估计 + vx, vy = kf.x[2], kf.x[3] + + if self.H is not None: + mid_x = trajectory[len(trajectory)//2][1] + mid_y = trajectory[len(trajectory)//2][2] + ppm = self._get_pixels_per_meter_at(mid_x, mid_y) + return np.sqrt(vx**2 + vy**2) / ppm + + return np.sqrt(vx**2 + vy**2) + + def _to_real_distance(self, x1, y1, x2, y2): + """计算两像素点间的实际距离""" + if self.H is not None: + p1 = self._pixel_to_real(x1, y1) + p2 = self._pixel_to_real(x2, y2) + return p2[0] - p1[0], p2[1] - p1[1] + else: + return x2 - x1, y2 - y1 + + def _pixel_to_real(self, x, y): + pt = np.array([[[x, y]]], dtype=np.float32) + transformed = cv2.perspectiveTransform(pt, self.H) + return transformed[0][0] + + def _get_pixels_per_meter_at(self, x, y): + """获取指定位置的像素/米比例(考虑透视)""" + if self.H is None: + return 1.0 + + # 在点附近取两个相距 1 米的点 + real_near = np.array([[[x, y]]], dtype=np.float32) + real_far = np.array([[[x + 1.0, y]]], dtype=np.float32) + + # 反向变换到像素空间 + H_inv = np.linalg.inv(self.H) + px_near = cv2.perspectiveTransform(real_near, H_inv)[0][0] + px_far = cv2.perspectiveTransform(real_far, H_inv)[0][0] + + pixel_dist = np.linalg.norm(px_far - px_near) + return pixel_dist # 像素/米 +``` + +#### 6.2 多目标速度融合 + +```python +def compute_flow_velocity_summary(velocities, method='trimmed_mean', + trim_fraction=0.1): + """ + 从多个追踪对象的速度估计中计算综合流速 + + 方法: + - 'trimmed_mean': 截尾均值(去除极端值) + - 'median': 中位数(稳健) + - 'weighted': 按轨迹长度加权 + - 'mode': 众数(最常见速度) + """ + if not velocities: + return None + + vals = np.array([v for v in velocities if v is not None and v > 0]) + if len(vals) == 0: + return None + + if method == 'trimmed_mean': + sorted_vals = np.sort(vals) + n = len(sorted_vals) + trim_count = max(1, int(n * trim_fraction)) + trimmed = sorted_vals[trim_count:-trim_count] + return np.mean(trimmed) if len(trimmed) > 0 else np.median(vals) + + elif method == 'median': + return np.median(vals) + + elif method == 'weighted': + # weights = trajectory_lengths + pass # 需要传入权重 + + elif method == 'mode': + from scipy.stats import gaussian_kde + kde = gaussian_kde(vals) + x_eval = np.linspace(vals.min(), vals.max(), 1000) + densities = kde(x_eval) + return x_eval[np.argmax(densities)] + + return np.mean(vals) +``` + +### Step 7:完整流水线 + +```python +#!/usr/bin/env python3 +""" +水流速度检测完整流水线 +""" +import cv2 +import numpy as np +import yaml +import json +from datetime import datetime +from pathlib import Path +from ultralytics import YOLO +from collections import defaultdict + +class WaterFlowVelocityPipeline: + """ + 端到端水流速度检测流水线 + + 使用流程: + 1. 初始化:传入模型路径和标定参数 + 2. 处理视频:自动检测、追踪、计算速度 + 3. 获取结果:输出流速统计和可视化 + """ + + def __init__(self, config_path: str): + """从配置文件初始化""" + with open(config_path) as f: + self.config = yaml.safe_load(f) + + # 加载模型 + self.model = YOLO(self.config['model_path']) + + # 加载标定参数 + if 'homography_matrix' in self.config: + self.H = np.array(self.config['homography_matrix']) + else: + self.H = None + self.ppm = self.config.get('pixels_per_meter') + + # 初始化轨迹存储 + self.trajectories = defaultdict(list) + self.frame_count = 0 + + def run(self, video_path: str, output_dir: str): + """运行完整流水线""" + output_dir = Path(output_dir) + output_dir.mkdir(parents=True, exist_ok=True) + + # 1. 处理视频 + results = self._process_video(video_path) + + # 2. 计算速度 + velocities = self._calculate_all_velocities() + + # 3. 统计分析 + summary = self._compute_summary(velocities) + + # 4. 保存结果 + self._save_results(output_dir, summary, velocities) + + # 5. 生成报告 + self._generate_report(output_dir, summary) + + return summary + + def _process_video(self, video_path: str): + """处理视频,执行检测+追踪""" + cap = cv2.VideoCapture(video_path) + video_fps = cap.get(cv2.CAP_PROP_FPS) + + results = [] + frame_idx = 0 + + while True: + ret, frame = cap.read() + if not ret: + break + + # 检测 + 追踪 + det_results = self.model.track( + frame, + persist=True, + conf=self.config.get('conf_threshold', 0.3), + iou=self.config.get('iou_threshold', 0.45), + tracker="botsort.yaml", + verbose=False, + ) + + if det_results[0].boxes is not None and det_results[0].boxes.id is not None: + boxes = det_results[0].boxes.xyxy.cpu().numpy() + ids = det_results[0].boxes.id.cpu().numpy().astype(int) + + for box, track_id in zip(boxes, ids): + cx = (box[0] + box[2]) / 2 + cy = (box[1] + box[3]) / 2 + self.trajectories[track_id].append((frame_idx, cx, cy)) + + self.frame_count = frame_idx + frame_idx += 1 + + if frame_idx % 100 == 0: + print(f"Processed {frame_idx} frames, {len(self.trajectories)} tracks") + + cap.release() + return results + + def _calculate_all_velocities(self): + """计算所有追踪对象的速度""" + velocities = {} + + for track_id, traj in self.trajectories.items(): + if len(traj) < self.config.get('min_track_length', 10): + continue # 轨迹太短,不可靠 + + velocity = self._calculate_velocity(traj) + if velocity is not None and 0 < velocity < 10: + # 过滤不合理值(>10 m/s 的水流不太可能) + velocities[track_id] = velocity + + return velocities + + def _calculate_velocity(self, trajectory): + """计算单个轨迹的速度""" + if len(trajectory) < 5: + return None + + # 使用分段平均法 + segment_size = 15 + velocities = [] + + for i in range(0, len(trajectory) - segment_size, segment_size // 2): + seg = trajectory[i:i + segment_size] + start_f, sx, sy = seg[0] + end_f, ex, ey = seg[-1] + + if self.H is not None: + p1 = cv2.perspectiveTransform( + np.array([[[sx, sy]]], dtype=np.float32), self.H)[0][0] + p2 = cv2.perspectiveTransform( + np.array([[[ex, ey]]], dtype=np.float32), self.H)[0][0] + dist = np.linalg.norm(p2 - p1) + elif self.ppm: + dist = np.sqrt((ex-sx)**2 + (ey-sy)**2) / self.ppm + else: + continue + + time = (end_f - start_f) / 30.0 # 假设 30fps + if time > 0: + velocities.append(dist / time) + + if not velocities: + return None + + # 中位数滤波 + return float(np.median(velocities)) + + def _compute_summary(self, velocities): + """计算流速统计摘要""" + if not velocities: + return {"error": "No valid velocity data"} + + vals = np.array(list(velocities.values())) + + # 截尾均值 + sorted_v = np.sort(vals) + n = len(sorted_v) + trim = max(1, int(n * 0.1)) + trimmed = sorted_v[trim:-trim] + + return { + "timestamp": datetime.now().isoformat(), + "total_tracks": len(self.trajectories), + "valid_tracks": len(velocities), + "mean_velocity": float(np.mean(vals)), + "median_velocity": float(np.median(vals)), + "trimmed_mean": float(np.mean(trimmed)) if len(trimmed) > 0 else None, + "std": float(np.std(vals)), + "min": float(np.min(vals)), + "max": float(np.max(vals)), + "q25": float(np.percentile(vals, 25)), + "q75": float(np.percentile(vals, 75)), + "frame_count": self.frame_count, + } + + def _save_results(self, output_dir, summary, velocities): + """保存结果""" + # JSON 结果 + with open(output_dir / "results.json", 'w') as f: + json.dump({ + "summary": summary, + "per_track_velocities": {str(k): v for k, v in velocities.items()} + }, f, indent=2) + + # 轨迹 CSV + import csv + with open(output_dir / "trajectories.csv", 'w', newline='') as f: + writer = csv.writer(f) + writer.writerow(['track_id', 'frame', 'pixel_x', 'pixel_y', + 'real_x', 'real_y', 'velocity_mps']) + for track_id, traj in self.trajectories.items(): + v = velocities.get(track_id, 0) + for frame, px, py in traj: + if self.H is not None: + real = cv2.perspectiveTransform( + np.array([[[px, py]]], dtype=np.float32), self.H)[0][0] + writer.writerow([track_id, frame, px, py, + real[0], real[1], v]) + else: + writer.writerow([track_id, frame, px, py, '', '', v]) + + def _generate_report(self, output_dir, summary): + """生成文本报告""" + report = f""" +======================================== + 水流速度检测报告 +======================================== + 时间: {summary.get('timestamp', 'N/A')} + 处理帧数: {summary.get('frame_count', 'N/A')} + 总追踪数: {summary.get('total_tracks', 'N/A')} + 有效追踪数: {summary.get('valid_tracks', 'N/A')} +---------------------------------------- + 平均流速: {summary.get('mean_velocity', 'N/A'):.3f} m/s + 中位流速: {summary.get('median_velocity', 'N/A'):.3f} m/s + 截尾均值: {summary.get('trimmed_mean', 'N/A'):.3f} m/s + 标准差: {summary.get('std', 'N/A'):.3f} m/s + 最小流速: {summary.get('min', 'N/A'):.3f} m/s + 最大流速: {summary.get('max', 'N/A'):.3f} m/s + 25分位: {summary.get('q25', 'N/A'):.3f} m/s + 75分位: {summary.get('q75', 'N/A'):.3f} m/s +======================================== + """ + with open(output_dir / "report.txt", 'w') as f: + f.write(report) + + print(report) + + +# ===== 配置文件示例 ===== +""" +# config.yaml +model_path: "runs/detect/train/weights/best.pt" +conf_threshold: 0.3 +iou_threshold: 0.45 +min_track_length: 10 + +# 标定参数(二选一) +homography_matrix: + - [1.2e-3, 3.4e-5, -0.5] + - [2.1e-5, 1.1e-3, -0.3] + - [8.7e-7, 2.3e-7, 1.0] + +# 或者简单比例 +# pixels_per_meter: 50.0 +""" +``` + +--- + +## 四、精度验证与误差分析 + +### 4.1 精度验证方法 + +``` +现场验证流程: + +1. 同步测量法 + - 同时使用传统方法(流速仪/浮标法)测量 + - 对比 YOLO+ByteTrack 的测量结果 + - 计算相对误差 + +2. 已知流量法 + - 在人工渠道中,使用已知流量 + - 对比计算出的流速 + - Q = A × v(流量 = 截面积 × 流速) + +3. 多点验证法 + - 在河道不同位置设置多个测量区域 + - 每个区域独立计算流速 + - 对比空间一致性 + +精度目标: +├── 优秀:< 5% 相对误差 +├── 良好:5-10% 相对误差 +├── 可用:10-15% 相对误差 +└── 不可用:> 15% 相对误差 +``` + +### 4.2 误差来源分解 + +| 误差来源 | 典型贡献 | 减少方法 | +|---------|---------|---------| +| 相机标定误差 | 2-8% | 增加控制点、定期复标 | +| 透视校正误差 | 1-5% | 单应性矩阵、分区标定 | +| 示踪物跟随性 | 1-5% | 使用轻量示踪物 | +| 检测中心点抖动 | 1-3% | 轨迹平滑、卡尔曼滤波 | +| 追踪 ID 切换 | 2-5% | 调整追踪参数 | +| 时间同步误差 | < 1% | 使用视频帧率 | +| 水面波动 | 2-5% | 长时间平均 | +| **总误差** | **5-15%** | | + +--- + +## 五、部署方案 + +### 5.1 边缘部署(Jetson) + +```bash +# 在 Jetson Orin 上部署 + +# 1. 安装 JetPack SDK +# (从 NVIDIA 官网下载并烧录) + +# 2. 安装 PyTorch for Jetson +pip install torch torchvision --index-url https://download.pytorch.org/whl/cu118 + +# 3. 安装 Ultralytics +pip install ultralytics + +# 4. 模型优化 - TensorRT 转换 +from ultralytics import YOLO + +model = YOLO('best.pt') +model.export(format='engine', # TensorRT + half=True, # FP16 + int8=False, # INT8 需要校准 + device=0) + +# 5. 使用 TensorRT 模型推理 +model_trt = YOLO('best.engine') + +# 性能对比(参考值): +# YOLOv8n FP32: ~50 FPS on Orin Nano +# YOLOv8n FP16: ~80 FPS on Orin Nano +# YOLOv8n INT8: ~120 FPS on Orin Nano +# YOLOv8s FP16: ~40 FPS on Orin Nano +``` + +### 5.2 云端/服务器部署 + +```python +# 在 GPU 服务器上运行 +# 支持多路视频同时处理 + +from concurrent.futures import ThreadPoolExecutor +import threading + +class MultiCameraPipeline: + """多相机并发处理""" + + def __init__(self, config_files: list): + self.pipelines = [] + for cfg in config_files: + self.pipelines.append(WaterFlowVelocityPipeline(cfg)) + + def run_all(self, video_paths: list, output_dirs: list): + with ThreadPoolExecutor(max_workers=len(video_paths)) as executor: + futures = [] + for pipe, video, output in zip(self.pipelines, video_paths, output_dirs): + future = executor.submit(pipe.run, video, output) + futures.append(future) + + for future in futures: + result = future.result() + print(f"Result: {result}") +``` + +### 5.3 实时监控系统架构 + +``` +┌──────────────────────────────────────────────────────┐ +│ 实时监控系统 │ +│ │ +│ ┌──────────┐ ┌───────────┐ ┌───────────────┐ │ +│ │ 相机/IPC │───>│ 边缘设备 │───>│ 结果上报服务 │ │ +│ │ RTSP流 │ │ Jetson │ │ MQTT/HTTP │ │ +│ └──────────┘ │ 检测+追踪 │ └───────┬───────┘ │ +│ │ 速度计算 │ │ │ +│ └───────────┘ │ │ +│ ▼ │ +│ ┌───────────────┐ │ +│ │ 数据库/时序DB │ │ +│ │ InfluxDB/PgSQL │ │ +│ └───────┬───────┘ │ +│ │ │ +│ ▼ │ +│ ┌───────────────┐ │ +│ │ 可视化/告警 │ │ +│ │ Grafana/Web │ │ +│ └───────────────┘ │ +└──────────────────────────────────────────────────────┘ + +数据流: +1. 相机以 RTSP 推流到边缘设备 +2. Jetson 运行 YOLO + 追踪 + 速度计算 +3. 每 N 秒上报一次流速统计结果 +4. 数据存入时序数据库 +5. Grafana 展示实时流速曲线 +6. 流速超标时触发告警(短信/邮件/声音) +``` + +--- + +## 六、进阶优化 + +### 6.1 无自然示踪物时的方案 + +``` +当水面没有自然漂浮物时: + +方案 A:人工投放示踪物 +├── 环保可降解示踪球(直径 2-5cm) +├── 木屑/锯末(适用于小溪) +├── 生物可降解浮标 +└── 适用:短期测量 + +方案 B:基于水面纹理的光流法 +├── 使用 Lucas-Kanade 光流追踪水面纹理 +├── 不需要离散目标检测 +├── 但对水面纹理要求高 +└── 实现:cv2.calcOpticalFlowFarneback() + +方案 C:深度学习密集光流 +├── RAFT / FlowFormer 等光流模型 +├── 计算整个画面的稠密运动场 +├── 不需要检测器 +└── 计算量大,适合离线处理 + +方案 D:混合方案(推荐) +├── YOLO 检测离散示踪物 → 精确轨迹 +├── 光流法补充无示踪物区域 +└── 两者结果融合 +``` + +### 6.2 多摄像头融合 + +```python +def multi_camera_fusion(camera_results: list): + """ + 融合多个相机的流速测量结果 + + camera_results: list of { + 'camera_id': str, + 'mean_velocity': float, + 'std': float, + 'n_tracks': int, + 'roi': (x1, y1, x2, y2), # 相机覆盖区域 + } + """ + # 按轨迹数加权平均 + total_weight = sum(r['n_tracks'] for r in camera_results) + weighted_velocity = sum( + r['mean_velocity'] * r['n_tracks'] + for r in camera_results + ) / total_weight + + return { + 'fused_velocity': weighted_velocity, + 'camera_count': len(camera_results), + 'per_camera': camera_results, + } +``` + +### 6.3 流量估算(速度 → 流量) + +```python +def estimate_discharge(velocity, cross_section_area, velocity_coefficient=0.85): + """ + 从表面流速估算流量 + + Q = v_surface × α × A + + 其中: + - v_surface: 表面流速 (m/s) + - α: 表面流速到平均流速的转换系数(通常 0.8-0.9) + - A: 过水断面积 (m²) + + 注意:表面流速通常比平均流速大 15-20% + """ + return velocity * velocity_coefficient * cross_section_area + +# 示例: +# 表面流速 1.5 m/s,河宽 10m,平均水深 1m +# Q = 1.5 × 0.85 × (10 × 1) = 12.75 m³/s +``` + +--- + +## 七、项目时间规划 + +``` +周 1-2:需求分析与方案设计 +├── 确定测量场景和目标 +├── 选择相机位置和型号 +├── 制定标定方案 +└── 准备硬件设备 + +周 3-4:硬件部署与标定 +├── 安装相机和边缘计算设备 +├── 执行相机标定 +├── 采集初始标定数据 +└── 验证标定精度 + +周 5-6:数据采集与标注 +├── 采集不同条件下的视频 +├── 抽取关键帧进行标注 +├── 质量检查 +└── 准备训练数据集 + +周 7-8:模型训练与优化 +├── 训练 YOLO 检测器 +├── 评估和调整 +├── 集成追踪器 +└── 验证追踪效果 + +周 9-10:速度计算与验证 +├── 实现速度计算模块 +├── 与传统方法对比验证 +├── 误差分析 +└── 优化参数 + +周 11-12:系统集成与部署 +├── 端到端流水线整合 +├── 边缘设备部署 +├── 实时监控系统搭建 +└── 文档编写 + +总计:约 3 个月完成从 0 到生产部署 +``` + +--- + +## 八、成本估算 + +| 项目 | 低配方案 | 推荐方案 | 高配方案 | +|------|---------|---------|---------| +| 相机 | 500 元(USB 相机) | 3,000 元(IPC) | 10,000 元(工业相机) | +| 计算设备 | 800 元(Raspberry Pi) | 4,000 元(Jetson Orin Nano) | 8,000 元(Jetson AGX Orin) | +| 支架/配件 | 200 元 | 500 元 | 2,000 元 | +| 标定工具 | 100 元(卷尺) | 500 元 | 2,000 元(全站仪) | +| **硬件总计** | **~1,600 元** | **~8,000 元** | **~22,000 元** | +| 人工(标注+开发) | 5,000 元 | 15,000 元 | 30,000 元 | +| **总成本** | **~7,000 元** | **~23,000 元** | **~52,000 元** | + +对比:传统 ADCP 设备 5-20 万元,雷达流速仪 2-10 万元。 + +--- + +## 九、风险与对策 + +| 风险 | 概率 | 影响 | 对策 | +|------|------|------|------| +| 标定期失效(相机移动) | 中 | 高 | 牢固固定相机,定期复标 | +| 水面无示踪物 | 高 | 高 | 混合光流法方案 | +| 夜间无法工作 | 中 | 中 | 补光灯 / 红外相机 | +| 暴雨/雾天镜头模糊 | 中 | 中 | 防水罩 / 自动清洗 | +| ID 切换导致轨迹断裂 | 低 | 中 | 调整追踪参数 | +| 模型泛化能力差 | 中 | 高 | 多样化训练数据 | +| 边缘设备性能不足 | 低 | 中 | 模型量化 / 降低分辨率 | + +--- + +## 十、参考资源 + +### 学术论文 +- **ByteTrack**: Zhang et al., "ByteTrack: Multi-Object Tracking by Associating Every Detection Box", ECCV 2022 +- **LSPIV**: Fujita et al., "Large-scale Particle Image Velocimetry for flow analysis", 1998 +- **YOLOv8**: Ultralytics YOLOv8, https://github.com/ultralytics/ultralytics +- **STIV**: 空间时间图像测速法 (Space-Time Image Velocimetry) + +### 开源项目 +- [Ultralytics YOLOv8](https://github.com/ultralytics/ultralytics) — 检测+追踪一体框架 +- [ByteTrack](https://github.com/ifzhang/ByteTrack) — 原版 ByteTrack +- [OpenPIV](https://github.com/OpenPIV/openpiv-python) — 传统 PIV 实现 +- [rivertools](https://github.com/orgs/OpenRiverCam) — 河流监测开源项目 +- [ras-riv](https://github.com/HydroLogic/ras-riv) — 水文分析工具 + +### 商业产品 +- **FathomNet**: 水面监测云平台 +- **iSAR**: 雷达表面流速仪 +- **OTT HydroMet**: 水文监测综合方案 + +--- + +## 十一、快速上手检查清单 + +``` +[ ] 1. 确定测量场景(河道宽度、流速范围、光照条件) +[ ] 2. 选择并安装相机(固定位置、角度测试) +[ ] 3. 准备标定参考物(至少 4 个控制点) +[ ] 4. 执行相机标定,检查误差 < 0.3m +[ ] 5. 采集 300+ 张训练图像 +[ ] 6. 标注数据(Roboflow 或 CVAT) +[ ] 7. 训练 YOLOv8n 模型,目标 mAP@50 > 0.85 +[ ] 8. 集成 ByteTrack/BoT-SORT 追踪器 +[ ] 9. 处理测试视频,检查追踪效果 +[ ] 10. 与传统方法对比验证精度 +[ ] 11. 优化参数,减少误差 +[ ] 12. 部署到边缘设备,实现实时监测 +``` + +--- + +*整理:知识库管理员 | 归档:2026-04-23* diff --git a/专业领域/ByteTrack/YOLO+ByteTrack水流速度检测.md b/专业领域/ByteTrack/YOLO+ByteTrack水流速度检测.md index d681651..402e882 100755 --- a/专业领域/ByteTrack/YOLO+ByteTrack水流速度检测.md +++ b/专业领域/ByteTrack/YOLO+ByteTrack水流速度检测.md @@ -303,6 +303,7 @@ pip install opencv-python numpy scipy lap motmetrics ## AI工程索引 相关笔记: +- [[YOLO+ByteTrack水流速度检测-可行性详细分析]] — **完整可行性分析与实现指南(2026-04-23 更新)** - [[INDEX_AI工程]] - AI工程知识索引 - [[ClaudeCode完全研究]] - Claude Code 完整指南