1809 lines
56 KiB
Markdown
Executable File
1809 lines
56 KiB
Markdown
Executable File
# YOLO + ByteTrack 水流速度检测 — 可行性详细分析与完整实现指南
|
||
|
||
> **结论先行:技术上完全可行,但精度和适用场景有明确边界。**
|
||
>
|
||
> 归档:2026-04-23
|
||
|
||
---
|
||
|
||
## 一、可行性总评
|
||
|
||
### 1.1 核心结论
|
||
|
||
| 维度 | 评级 | 说明 |
|
||
|------|------|------|
|
||
| **技术可行性** | ✅ 高 | YOLO 检测 + ByteTrack 追踪在原理上完全成立,已有类似应用 |
|
||
| **精度可行性** | ⚠️ 中 | 取决于标定质量、水面特征丰富度、相机角度;典型误差 5-15% |
|
||
| **工程可行性** | ✅ 高 | 成熟开源生态,GPU 可实时,CPU 勉强可跑 |
|
||
| **成本可行性** | ✅ 高 | 相机成本低(几百到几千),远低于 ADCP/雷达流速仪 |
|
||
| **部署可行性** | ⚠️ 中 | 边缘设备需要优化(TensorRT/INT8),但完全可做到 |
|
||
|
||
### 1.2 与传统方法对比
|
||
|
||
| 方法 | 精度 | 成本 | 实时性 | 部署难度 | 适用场景 |
|
||
|------|------|------|--------|---------|---------|
|
||
| **ADCP** | ±1-2% | 5-20万 | 是 | 高(需入水) | 深水、精确测量 |
|
||
| **雷达流速仪** | ±2-5% | 2-10万 | 是 | 中 | 定点测量 |
|
||
| **LSPIV(传统)** | ±5-10% | 1-3万 | 中 | 中高 | 开阔水面 |
|
||
| **YOLO+ByteTrack** | ±5-15% | 0.5-2万 | 是 | 中 | 开阔水面、有示踪物 |
|
||
| **浮标测速** | ±5-10% | 0.1-1万 | 否 | 低 | 单次测量 |
|
||
|
||
**关键洞察**:YOLO+ByteTrack 本质上是 **LSPIV(大尺度粒子图像测速)** 的现代深度学习变体。传统 LSPIV 用互相关算法追踪水面纹理,我们用 YOLO 检测离散示踪物 + ByteTrack 追踪轨迹。
|
||
|
||
---
|
||
|
||
## 二、适用场景与限制
|
||
|
||
### 2.1 适用的场景
|
||
|
||
✅ **适合使用此方案:**
|
||
- 开阔河道、渠道、排水口的水面流速测量
|
||
- 有天然示踪物(树叶、泡沫、漂浮物)的水面
|
||
- 有固定摄像头安装位置(桥梁、河岸建筑)
|
||
- 洪水监测、排污监测、生态流量监测
|
||
- 无人机航拍水面流速调查
|
||
|
||
❌ **不适合使用此方案:**
|
||
- 非常清澈、无特征的水面(无法检测/追踪)
|
||
- 夜间无照明条件
|
||
- 暴雨导致镜头模糊的情况
|
||
- 需要精确垂向流速剖面(此方法仅测表面流速)
|
||
- 水面波动剧烈导致示踪物翻转的情况
|
||
|
||
### 2.2 关键精度影响因素
|
||
|
||
| 因素 | 影响程度 | 应对策略 |
|
||
|------|---------|---------|
|
||
| 相机标定误差 | 高 | 多点标定、定期验证 |
|
||
| 透视变形 | 高 | 单应性变换校正 |
|
||
| 示踪物跟随性 | 中 | 选择合适大小的示踪物 |
|
||
| 水面湍流 | 中 | 增加追踪帧数、平滑滤波 |
|
||
| 镜头畸变 | 低-中 | 相机标定校正 |
|
||
| 光照变化 | 中 | 数据增强、HDR |
|
||
|
||
---
|
||
|
||
## 三、完整实现步骤
|
||
|
||
### Step 1:硬件准备与部署
|
||
|
||
#### 1.1 设备清单
|
||
|
||
```
|
||
必备:
|
||
├── 工业/监控相机(1080p @ 30fps 以上)
|
||
│ ├── 推荐:Hikvision DS-2CD 系列 / Dahua IPC 系列
|
||
│ ├── 预算方案:普通 USB 相机 + 防水外壳
|
||
│ └── 高端方案:FLIR 全局快门相机(减少运动模糊)
|
||
├── 边缘计算设备
|
||
│ ├── NVIDIA Jetson Orin Nano / NX(推荐)
|
||
│ ├── Jetson Xavier NX(中等预算)
|
||
│ └── 树莓派 5 + Coral TPU(仅推理,性能有限)
|
||
├── 固定支架/杆(确保相机稳定)
|
||
└── 参考标尺(用于像素-实际距离标定)
|
||
|
||
可选:
|
||
├── 补光灯(夜间使用)
|
||
├── 偏振滤镜(减少水面反光)
|
||
├── 气象防护罩
|
||
└── 4G/5G 模块(远程数据传输)
|
||
```
|
||
|
||
#### 1.2 相机部署位置选择
|
||
|
||
```
|
||
最佳位置特征:
|
||
1. 俯视或斜视角度(30°-60° 俯角)
|
||
2. 覆盖足够长的测量断面(至少 5-10 米水面)
|
||
3. 避免逆光拍摄
|
||
4. 相机到水面的距离和角度要固定不变
|
||
5. 确保视野内有可用于标定的已知尺寸参考物
|
||
|
||
不推荐:
|
||
1. 正对水面(90° 俯视)— 透视信息不足
|
||
2. 几乎水平拍摄 — 透视畸变过大
|
||
3. 水面反光直射方向
|
||
```
|
||
|
||
### Step 2:相机标定(关键步骤)
|
||
|
||
#### 2.1 标定方法概述
|
||
|
||
有三种主要方法将像素坐标转换为实际坐标:
|
||
|
||
**方法 A:简单比例因子法(最简单,适合相机正对河道)**
|
||
|
||
```python
|
||
# 只需要一个已知长度的参考物
|
||
# 例如:河面上已知 2 米宽的浮标
|
||
|
||
# 在图像上测量浮标的像素宽度
|
||
reference_pixels = measure_width_in_image(image, reference_object)
|
||
real_width_meters = 2.0
|
||
|
||
pixels_per_meter = reference_pixels / real_width_meters
|
||
# 假设所有方向比例相同(仅在正对小范围区域时有效)
|
||
```
|
||
|
||
**方法 B:单应性变换法(推荐,适合斜视相机)**
|
||
|
||
```python
|
||
import cv2
|
||
import numpy as np
|
||
|
||
class PerspectiveCalibrator:
|
||
"""
|
||
通过地面控制点 (GCP) 建立像素坐标到实际坐标的映射
|
||
使用单应性矩阵 (Homography) 进行透视变换
|
||
"""
|
||
def __init__(self):
|
||
self.homography_matrix = None
|
||
|
||
def calibrate(self, pixel_points, real_points):
|
||
"""
|
||
pixel_points: Nx2 数组,图像上的控制点像素坐标
|
||
real_points: Nx2 数组,对应的实际坐标 (米)
|
||
|
||
至少需要 4 个控制点
|
||
"""
|
||
assert len(pixel_points) >= 4, "至少需要4个控制点"
|
||
|
||
pixel_points = np.array(pixel_points, dtype=np.float32)
|
||
real_points = np.array(real_points, dtype=np.float32)
|
||
|
||
# 计算单应性矩阵
|
||
H, status = cv2.findHomography(pixel_points, real_points)
|
||
self.homography_matrix = H
|
||
|
||
# 评估标定精度
|
||
errors = []
|
||
for p, r in zip(pixel_points, real_points):
|
||
# 将像素点变换到实际坐标
|
||
p_hom = np.array([p[0], p[1], 1.0])
|
||
r_pred = H @ p_hom
|
||
r_pred = r_pred[:2] / r_pred[2] # 归一化
|
||
error = np.linalg.norm(r_pred - r)
|
||
errors.append(error)
|
||
|
||
mean_error = np.mean(errors)
|
||
max_error = np.max(errors)
|
||
|
||
print(f"标定精度 — 平均误差: {mean_error:.3f}m, 最大误差: {max_error:.3f}m")
|
||
return mean_error, max_error
|
||
|
||
def pixel_to_real(self, pixel_coords):
|
||
"""将像素坐标转换为实际坐标(米)"""
|
||
assert self.homography_matrix is not None, "请先标定"
|
||
|
||
pts = np.array([[pixel_coords[0], pixel_coords[1]]], dtype=np.float32)
|
||
pts = cv2.perspectiveTransform(pts.reshape(1, -1, 2), self.homography_matrix)
|
||
return pts[0][0]
|
||
|
||
def batch_pixel_to_real(self, pixel_coords_array):
|
||
"""批量转换"""
|
||
pts = np.array(pixel_coords_array, dtype=np.float32).reshape(-1, 1, 2)
|
||
transformed = cv2.perspectiveTransform(pts, self.homography_matrix)
|
||
return transformed.reshape(-1, 2)
|
||
|
||
# ---- 使用示例 ----
|
||
calibrator = PerspectiveCalibrator()
|
||
|
||
# 在图像上标记至少4个已知实际坐标的点
|
||
# 例如:河道两岸的固定标志物、桥梁栏杆等
|
||
pixel_control_points = [
|
||
[100, 200], # 点1 的像素坐标
|
||
[500, 180], # 点2
|
||
[150, 600], # 点3
|
||
[600, 580], # 点4
|
||
[350, 400], # 点5(可选,提高精度)
|
||
]
|
||
|
||
# 对应的实际坐标(需要实地测量,单位:米)
|
||
# 以某个基准点为原点建立平面坐标系
|
||
real_control_points = [
|
||
[0.0, 0.0], # 点1 的实际坐标
|
||
[5.2, 0.3], # 点2
|
||
[0.5, 3.1], # 点3
|
||
[5.8, 3.4], # 点4
|
||
[3.0, 1.8], # 点5
|
||
]
|
||
|
||
mean_err, max_err = calibrator.calibrate(pixel_control_points, real_control_points)
|
||
```
|
||
|
||
**方法 C:相机内参+外参标定(最精确,适合高精度需求)**
|
||
|
||
```python
|
||
class FullCameraCalibrator:
|
||
"""
|
||
使用棋盘格标定内参,再用PnP求解外参
|
||
适用于需要高精度的场景
|
||
"""
|
||
def __init__(self):
|
||
self.camera_matrix = None
|
||
self.dist_coeffs = None
|
||
self.rvec = None
|
||
self.tvec = None
|
||
|
||
def intrinsic_calibration(self, chessboard_images, chessboard_size=(9, 6), square_size=0.025):
|
||
"""
|
||
使用棋盘格图像标定相机内参
|
||
chessboard_images: 从不同角度拍摄的棋盘格图像列表
|
||
chessboard_size: 棋盘格内角点数量
|
||
square_size: 每个格子的实际物理尺寸(米)
|
||
"""
|
||
criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 30, 0.001)
|
||
objp = np.zeros((chessboard_size[0] * chessboard_size[1], 3), np.float32)
|
||
objp[:, :2] = np.mgrid[0:chessboard_size[0], 0:chessboard_size[1]].T.reshape(-1, 2) * square_size
|
||
|
||
objpoints = [] # 实际3D点
|
||
imgpoints = [] # 2D图像点
|
||
|
||
for img in chessboard_images:
|
||
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
|
||
ret, corners = cv2.findChessboardCorners(gray, chessboard_size, None)
|
||
if ret:
|
||
objpoints.append(objp)
|
||
corners2 = cv2.cornerSubPix(gray, corners, (11, 11), (-1, -1), criteria)
|
||
imgpoints.append(corners2)
|
||
|
||
ret, self.camera_matrix, self.dist_coeffs, rvecs, tvecs = \
|
||
cv2.calibrateCamera(objpoints, imgpoints, gray.shape[::-1], None, None)
|
||
|
||
return ret
|
||
|
||
def extrinsic_calibration(self, world_points, image_points):
|
||
"""
|
||
使用已知世界坐标和对应图像点求解相机外参
|
||
"""
|
||
world_points = np.array(world_points, dtype=np.float32)
|
||
image_points = np.array(image_points, dtype=np.float32)
|
||
|
||
ret, self.rvec, self.tvec = cv2.solvePnP(
|
||
world_points, image_points, self.camera_matrix, self.dist_coeffs
|
||
)
|
||
return ret
|
||
|
||
def project_to_ground(self, pixel_point, ground_z=0.0):
|
||
"""
|
||
将图像上的点投影到地平面(假设水面为 z=0 平面)
|
||
需要考虑水面高度与参考平面的偏差
|
||
"""
|
||
# 使用射线-平面相交方法
|
||
# 详细实现略(涉及较多线性代数)
|
||
pass
|
||
```
|
||
|
||
#### 2.2 标定实操指南
|
||
|
||
```
|
||
现场标定流程:
|
||
|
||
1. 选择标定日,准备卷尺、GPS(可选)、标记物
|
||
|
||
2. 在相机视野内设置至少 4 个地面控制点 (GCP):
|
||
- 使用鲜艳的标记物(橙色锥桶、喷漆标记)
|
||
- 分布在测量区域内,不要太集中
|
||
- 确保标记物在水面上方可见
|
||
|
||
3. 用卷尺测量各 GCP 之间的实际距离
|
||
- 建立平面坐标系(以某个 GCP 为原点)
|
||
- 记录每个 GCP 的 (x, y) 实际坐标
|
||
|
||
4. 拍摄标定图像
|
||
- 确保所有 GCP 在画面中清晰可见
|
||
- 记录此时相机参数(焦距、角度)— 之后不能改变
|
||
|
||
5. 在图像上标记每个 GCP 的像素坐标
|
||
- 使用标注工具或手动记录
|
||
|
||
6. 运行标定算法,检查误差
|
||
- 平均误差 < 0.1m 为优秀
|
||
- 平均误差 < 0.3m 为可用
|
||
- 误差过大需重新标定
|
||
|
||
7. 保存标定参数(H 矩阵)供后续使用
|
||
```
|
||
|
||
### Step 3:数据采集与标注
|
||
|
||
#### 3.1 数据采集策略
|
||
|
||
```python
|
||
import cv2
|
||
import os
|
||
from pathlib import Path
|
||
|
||
class DataCollector:
|
||
"""
|
||
从相机视频流中采集训练数据
|
||
"""
|
||
def __init__(self, source=0, output_dir="./raw_data"):
|
||
self.source = source
|
||
self.output_dir = Path(output_dir)
|
||
self.output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
def collect_frames(self, interval_seconds=0.5, max_frames=1000):
|
||
"""
|
||
从视频流中按间隔抽取帧
|
||
|
||
注意:
|
||
- 需要覆盖不同时间段(早/中/晚)
|
||
- 需要覆盖不同天气条件
|
||
- 需要覆盖不同水流状态(枯水/洪水)
|
||
"""
|
||
cap = cv2.VideoCapture(self.source)
|
||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||
frame_interval = int(fps * interval_seconds)
|
||
|
||
frame_count = 0
|
||
saved_count = 0
|
||
|
||
while saved_count < max_frames:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
if frame_count % frame_interval == 0:
|
||
filename = self.output_dir / f"frame_{saved_count:06d}.jpg"
|
||
cv2.imwrite(str(filename), frame)
|
||
saved_count += 1
|
||
print(f"Saved {saved_count}/{max_frames}")
|
||
|
||
frame_count += 1
|
||
|
||
cap.release()
|
||
print(f"Collected {saved_count} frames from {frame_count} total frames")
|
||
```
|
||
|
||
#### 3.2 数据标注
|
||
|
||
**标注策略**:
|
||
|
||
```
|
||
目标类别设计:
|
||
|
||
基础方案(1类):
|
||
└── floating_object(所有漂浮物统一标注)
|
||
优点:简单,数据需求少
|
||
缺点:可能混入无关漂浮物
|
||
|
||
精细方案(3-5类):
|
||
├── natural_debris(树叶、树枝等天然漂浮物)
|
||
├── foam_bubble(泡沫)
|
||
├── artificial_tracer(人工投放的示踪物)
|
||
├── ice_chunk(冰块,冬季)
|
||
└── wave_crest(浪尖,可选)
|
||
优点:可以过滤不可靠的示踪物
|
||
缺点:标注成本高
|
||
|
||
推荐方案:
|
||
初期用 1 类方案快速验证
|
||
后期根据需求扩展到多类
|
||
```
|
||
|
||
**标注工具选择**:
|
||
|
||
| 工具 | 适合场景 | 价格 | 特点 |
|
||
|------|---------|------|------|
|
||
| **Roboflow** | 快速标注+自动预处理 | 免费层可用 | 自动标注辅助,直接导出 COCO |
|
||
| **CVAT** | 视频标注 | 免费开源 | 支持视频插值,适合大量帧 |
|
||
| **Label Studio** | 灵活自定义 | 免费开源 | 可扩展,支持多人协作 |
|
||
| **LabelImg** | 简单图像标注 | 免费开源 | 轻量,仅支持 YOLO/PascalVOC |
|
||
|
||
**数据量建议**:
|
||
|
||
```
|
||
最低要求:
|
||
├── 300-500 张标注图像(快速验证)
|
||
├── 覆盖至少 3 种不同光照条件
|
||
└── 包含各种大小的示踪物
|
||
|
||
生产级别:
|
||
├── 2000-5000 张标注图像
|
||
├── 覆盖全天候光照变化
|
||
├── 包含不同天气条件
|
||
├── 包含不同流速状态
|
||
└── 预留 20% 作为验证集
|
||
```
|
||
|
||
#### 3.3 标注质量控制
|
||
|
||
```python
|
||
# 标注检查脚本
|
||
import json
|
||
import cv2
|
||
import numpy as np
|
||
|
||
def validate_annotations(coco_json_path, image_dir, min_area=100, max_area=500000):
|
||
"""检查标注质量"""
|
||
with open(coco_json_path) as f:
|
||
data = json.load(f)
|
||
|
||
issues = []
|
||
|
||
for ann in data['annotations']:
|
||
bbox = ann['bbox'] # [x, y, w, h]
|
||
area = bbox[2] * bbox[3]
|
||
|
||
if area < min_area:
|
||
issues.append(f"标注 {ann['id']} 面积过小: {area}px²")
|
||
if area > max_area:
|
||
issues.append(f"标注 {ann['id']} 面积过大: {area}px²")
|
||
|
||
# 检查边界框是否在图像范围内
|
||
img_info = next(img for img in data['images'] if img['id'] == ann['image_id'])
|
||
if (bbox[0] + bbox[2] > img_info['width'] or
|
||
bbox[1] + bbox[3] > img_info['height']):
|
||
issues.append(f"标注 {ann['id']} 超出图像边界")
|
||
|
||
if issues:
|
||
print(f"发现 {len(issues)} 个标注问题:")
|
||
for issue in issues[:20]:
|
||
print(f" - {issue}")
|
||
else:
|
||
print("标注质量检查通过 ✓")
|
||
|
||
return issues
|
||
```
|
||
|
||
### Step 4:模型训练
|
||
|
||
#### 4.1 推荐方案:YOLOv8 + Ultralytics 生态
|
||
|
||
> 原始笔记推荐 YOLOX,但截至 2025-2026,**YOLOv8 已成为更优选择**:
|
||
> - Ultralytics 提供内置的 ByteTrack 支持
|
||
> - API 更简洁,部署更简单
|
||
> - 模型性能更优
|
||
> - 活跃的社区和文档
|
||
|
||
```bash
|
||
# ===== 环境搭建 =====
|
||
|
||
# 1. 创建 conda 环境
|
||
conda create -n waterflow python=3.10 -y
|
||
conda activate waterflow
|
||
|
||
# 2. 安装 PyTorch(根据你的 CUDA 版本)
|
||
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
|
||
|
||
# 3. 安装 Ultralytics
|
||
pip install ultralytics
|
||
|
||
# 4. 安装其他依赖
|
||
pip install opencv-python numpy scipy pandas matplotlib seaborn
|
||
pip install supervision # 可视化工具
|
||
pip install roboflow # 数据管理(可选)
|
||
```
|
||
|
||
#### 4.2 数据准备(YOLOv8 格式)
|
||
|
||
```yaml
|
||
# dataset.yaml
|
||
path: ./water_flow_dataset
|
||
train: images/train
|
||
val: images/val
|
||
test: images/test
|
||
|
||
nc: 1 # 类别数
|
||
names: ['floating_object'] # 类别名
|
||
```
|
||
|
||
```
|
||
数据集目录结构:
|
||
water_flow_dataset/
|
||
├── images/
|
||
│ ├── train/
|
||
│ │ ├── frame_0001.jpg
|
||
│ │ ├── frame_0002.jpg
|
||
│ │ └── ...
|
||
│ ├── val/
|
||
│ │ └── ...
|
||
│ └── test/
|
||
│ └── ...
|
||
└── labels/
|
||
├── train/
|
||
│ ├── frame_0001.txt # YOLO 格式标注
|
||
│ ├── frame_0002.txt
|
||
│ └── ...
|
||
├── val/
|
||
│ └── ...
|
||
└── test/
|
||
└── ...
|
||
```
|
||
|
||
**YOLO 标注格式**(每行一个目标):
|
||
```
|
||
class_id x_center y_center width height
|
||
```
|
||
所有值都是归一化的(0-1 范围),相对于图像宽高。
|
||
|
||
#### 4.3 模型训练
|
||
|
||
```python
|
||
from ultralytics import YOLO
|
||
|
||
# ===== 方案 A:使用预训练模型微调 =====
|
||
|
||
# 加载预训练模型(推荐从 YOLOv8n 开始)
|
||
model = YOLO('yolov8n.pt')
|
||
|
||
# 训练
|
||
results = model.train(
|
||
data='dataset.yaml', # 数据集配置文件
|
||
epochs=100, # 训练轮数
|
||
imgsz=640, # 输入尺寸
|
||
batch=16, # 批次大小
|
||
device=0, # GPU 设备
|
||
patience=20, # 早停轮数
|
||
save_period=10, # 保存间隔
|
||
cache=True, # 缓存数据到内存加速
|
||
# 数据增强参数
|
||
hsv_h=0.015, # 色调增强(水面光照变化)
|
||
hsv_s=0.7, # 饱和度增强
|
||
hsv_v=0.4, # 亮度增强
|
||
degrees=0.0, # 不旋转(相机固定)
|
||
translate=0.1, # 平移
|
||
scale=0.5, # 缩放
|
||
fliplr=0.5, # 水平翻转
|
||
mosaic=1.0, # Mosaic 增强
|
||
mixup=0.1, # Mixup 增强
|
||
# 优化器
|
||
optimizer='AdamW',
|
||
lr0=0.001,
|
||
lrf=0.01,
|
||
weight_decay=0.0005,
|
||
)
|
||
|
||
# ===== 方案 B:从 nano 到 small 的选择 =====
|
||
|
||
# YOLOv8n (3.2M 参数) — 边缘部署首选
|
||
model_nano = YOLO('yolov8n.pt')
|
||
|
||
# YOLOv8s (11.2M 参数) — 平衡选择
|
||
model_small = YOLO('yolov8s.pt')
|
||
|
||
# YOLOv8m (25.9M 参数) — 更高精度
|
||
model_medium = YOLO('yolov8m.pt')
|
||
```
|
||
|
||
#### 4.4 训练监控与评估
|
||
|
||
```python
|
||
from ultralytics import YOLO
|
||
|
||
# 加载训练好的模型
|
||
model = YOLO('runs/detect/train/weights/best.pt')
|
||
|
||
# 在验证集上评估
|
||
metrics = model.val()
|
||
|
||
print(f"mAP@50: {metrics.box.map50:.4f}")
|
||
print(f"mAP@50-95: {metrics.box.map:.4f}")
|
||
print(f"Precision: {metrics.box.mp:.4f}")
|
||
print(f"Recall: {metrics.box.mr:.4f}")
|
||
|
||
# 目标:mAP@50 > 0.85 为良好,> 0.90 为优秀
|
||
|
||
# 可视化混淆矩阵
|
||
model.val(plot=True)
|
||
|
||
# 测试集推理可视化
|
||
results = model.predict(
|
||
source='images/test/',
|
||
save=True,
|
||
save_txt=True,
|
||
conf=0.25,
|
||
iou=0.45
|
||
)
|
||
```
|
||
|
||
#### 4.5 训练问题排查
|
||
|
||
```
|
||
常见问题及解决方案:
|
||
|
||
问题 1:mAP 低于 0.5
|
||
├── 原因:数据量不足或标注质量差
|
||
├── 解决:增加标注数据,检查标注一致性
|
||
└── 解决:增加训练轮数,检查学习率
|
||
|
||
问题 2:小目标检测效果差
|
||
├── 原因:YOLOv8 对小目标检测有局限
|
||
├── 解决:使用 YOLOv8s 或更大的模型
|
||
├── 解决:使用更高的输入分辨率(imgsz=1280)
|
||
└── 解决:使用 SAHI(切片辅助推理)
|
||
|
||
问题 3:过拟合
|
||
├── 原因:训练数据不足或分布单一
|
||
├── 解决:增加数据增强(mixup, mosaic, copy-paste)
|
||
└── 解决:收集更多样化的训练数据
|
||
|
||
问题 4:水面反光导致误检
|
||
├── 原因:反光区域被误认为示踪物
|
||
├── 解决:在训练数据中增加反光场景
|
||
├── 解决:使用偏振滤镜
|
||
└── 解决:提高置信度阈值
|
||
```
|
||
|
||
### Step 5:ByteTrack 追踪集成
|
||
|
||
#### 5.1 方案对比:内置追踪器 vs 独立 ByteTrack
|
||
|
||
```
|
||
Ultralytics YOLOv8 内置了多种追踪器:
|
||
|
||
1. BoT-SORT(默认)— 推荐
|
||
- ByteTrack 的改进版
|
||
- 加入相机运动补偿
|
||
- 加入 ReID 特征
|
||
- API 最简单
|
||
|
||
2. ByteTrack
|
||
- 原版 ByteTrack
|
||
- 轻量快速
|
||
- 无相机运动补偿
|
||
|
||
3. DeepOCSORT
|
||
- 加入深度特征的 OCSORT
|
||
- 对遮挡处理更好
|
||
|
||
推荐:默认使用 BoT-SORT,如果性能瓶颈再切换到 ByteTrack
|
||
```
|
||
|
||
#### 5.2 追踪推理实现
|
||
|
||
```python
|
||
from ultralytics import YOLO
|
||
from collections import defaultdict
|
||
import cv2
|
||
import numpy as np
|
||
|
||
class WaterFlowTracker:
|
||
"""
|
||
水流速度检测追踪器
|
||
集成 YOLO 检测 + 追踪 + 速度计算
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
model_path: str,
|
||
pixels_per_meter: float = None,
|
||
homography_matrix: np.ndarray = None,
|
||
conf_threshold: float = 0.3,
|
||
iou_threshold: float = 0.45,
|
||
tracker_type: str = "botsort", # "botsort" or "bytetrack"
|
||
):
|
||
# 加载模型
|
||
self.model = YOLO(model_path)
|
||
|
||
# 标定参数
|
||
self.pixels_per_meter = pixels_per_meter
|
||
self.homography_matrix = homography_matrix
|
||
|
||
# 追踪参数
|
||
self.conf_threshold = conf_threshold
|
||
self.iou_threshold = iou_threshold
|
||
self.tracker_type = tracker_type
|
||
|
||
# 轨迹存储 {track_id: [(frame_idx, x, y), ...]}
|
||
self.trajectories = defaultdict(list)
|
||
|
||
# 速度记录 {track_id: velocity_mps}
|
||
self.velocities = {}
|
||
|
||
# 追踪配置
|
||
if tracker_type == "botsort":
|
||
self.track_kwargs = {
|
||
"tracker": "botsort.yaml",
|
||
"track_high_thresh": 0.5,
|
||
"track_low_thresh": 0.1,
|
||
"new_track_thresh": 0.6,
|
||
"track_buffer": 30, # 最多容忍丢失30帧
|
||
"match_thresh": 0.8,
|
||
"proximity_thresh": 0.5,
|
||
"appearance_thresh": 0.25,
|
||
"with_reid": True, # 使用 ReID 特征
|
||
"gmc_method": "sparseOptFlow", # 相机运动补偿
|
||
"frame_rate": 30,
|
||
}
|
||
else: # bytetrack
|
||
self.track_kwargs = {
|
||
"tracker": "bytetrack.yaml",
|
||
"track_thresh": 0.5,
|
||
"track_buffer": 30,
|
||
"match_thresh": 0.8,
|
||
}
|
||
|
||
def process_video(self, video_path: str, output_path: str = None,
|
||
skip_frames: int = 0, max_frames: int = None):
|
||
"""
|
||
处理视频文件
|
||
|
||
Args:
|
||
video_path: 输入视频路径
|
||
output_path: 输出视频路径(可选)
|
||
skip_frames: 跳过的初始帧数
|
||
max_frames: 最多处理帧数(None 表示处理全部)
|
||
"""
|
||
cap = cv2.VideoCapture(video_path)
|
||
fps = cap.get(cv2.CAP_PROP_FPS)
|
||
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
||
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
||
|
||
# 视频写入器
|
||
writer = None
|
||
if output_path:
|
||
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
|
||
writer = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
|
||
|
||
frame_idx = 0
|
||
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
if frame_idx < skip_frames:
|
||
frame_idx += 1
|
||
continue
|
||
|
||
if max_frames and frame_idx - skip_frames >= max_frames:
|
||
break
|
||
|
||
# 运行检测+追踪
|
||
results = self.model.track(
|
||
frame,
|
||
persist=True,
|
||
conf=self.conf_threshold,
|
||
iou=self.iou_threshold,
|
||
verbose=False,
|
||
**self.track_kwargs
|
||
)
|
||
|
||
# 处理结果
|
||
if results[0].boxes is not None and results[0].boxes.id is not None:
|
||
boxes = results[0].boxes.xyxy.cpu().numpy()
|
||
track_ids = results[0].boxes.id.cpu().numpy().astype(int)
|
||
confs = results[0].boxes.conf.cpu().numpy()
|
||
|
||
for box, track_id, conf in zip(boxes, track_ids, confs):
|
||
# 计算中心点
|
||
center_x = (box[0] + box[2]) / 2
|
||
center_y = (box[1] + box[3]) / 2
|
||
|
||
# 更新轨迹
|
||
self.trajectories[track_id].append((frame_idx, center_x, center_y))
|
||
|
||
# 计算速度(需要足够的历史帧)
|
||
if len(self.trajectories[track_id]) >= 5:
|
||
velocity = self._calculate_velocity(track_id, fps)
|
||
self.velocities[track_id] = velocity
|
||
|
||
# 可视化(可选)
|
||
annotated_frame = results[0].plot()
|
||
|
||
# 在画面上叠加速度信息
|
||
if annotated_frame is not None:
|
||
self._draw_velocities(annotated_frame, boxes, track_ids)
|
||
|
||
if writer:
|
||
writer.write(annotated_frame if annotated_frame is not None else frame)
|
||
|
||
if frame_idx % 30 == 0:
|
||
print(f"Processed frame {frame_idx}")
|
||
|
||
frame_idx += 1
|
||
|
||
cap.release()
|
||
if writer:
|
||
writer.release()
|
||
|
||
print(f"Total frames processed: {frame_idx}")
|
||
print(f"Total tracks: {len(self.trajectories)}")
|
||
print(f"Tracks with velocity: {len(self.velocities)}")
|
||
|
||
def _calculate_velocity(self, track_id: int, fps: float,
|
||
min_frames: int = 5, use_rolling: bool = True):
|
||
"""
|
||
计算单个追踪对象的速度
|
||
|
||
Args:
|
||
track_id: 追踪 ID
|
||
fps: 视频帧率
|
||
min_frames: 最少需要的帧数
|
||
use_rolling: 是否使用滚动窗口(更稳定)
|
||
"""
|
||
traj = self.trajectories[track_id]
|
||
if len(traj) < min_frames:
|
||
return None
|
||
|
||
if use_rolling:
|
||
# 使用最近 N 帧计算速度(对非匀速水流更准确)
|
||
window_size = min(15, len(traj))
|
||
recent = traj[-window_size:]
|
||
else:
|
||
# 使用全部轨迹
|
||
recent = traj
|
||
|
||
start_frame, start_x, start_y = recent[0]
|
||
end_frame, end_x, end_y = recent[-1]
|
||
|
||
# 像素位移
|
||
dx_pixel = end_x - start_x
|
||
dy_pixel = end_y - start_y
|
||
|
||
if self.homography_matrix is not None:
|
||
# 使用单应性矩阵转换
|
||
start_real = self._pixel_to_real(start_x, start_y)
|
||
end_real = self._pixel_to_real(end_x, end_y)
|
||
dx_real = end_real[0] - start_real[0]
|
||
dy_real = end_real[1] - start_real[1]
|
||
distance_meters = np.sqrt(dx_real**2 + dy_real**2)
|
||
elif self.pixels_per_meter:
|
||
# 简单比例转换
|
||
pixel_dist = np.sqrt(dx_pixel**2 + dy_pixel**2)
|
||
distance_meters = pixel_dist / self.pixels_per_meter
|
||
else:
|
||
# 返回像素速度
|
||
time_sec = (end_frame - start_frame) / fps
|
||
return pixel_dist / time_sec # 像素/秒
|
||
|
||
# 时间
|
||
time_sec = (end_frame - start_frame) / fps
|
||
if time_sec == 0:
|
||
return None
|
||
|
||
# 速度(m/s)
|
||
velocity = distance_meters / time_sec
|
||
|
||
return velocity
|
||
|
||
def _pixel_to_real(self, px, py):
|
||
"""像素坐标转实际坐标"""
|
||
pt = np.array([[[px, py]]], dtype=np.float32)
|
||
transformed = cv2.perspectiveTransform(pt, self.homography_matrix)
|
||
return transformed[0][0]
|
||
|
||
def _draw_velocities(self, frame, boxes, track_ids):
|
||
"""在画面上绘制速度信息"""
|
||
for box, track_id in zip(boxes, track_ids):
|
||
velocity = self.velocities.get(track_id)
|
||
if velocity is None:
|
||
continue
|
||
|
||
# 在目标上方显示速度
|
||
x = int((box[0] + box[2]) / 2)
|
||
y = int(box[1]) - 10
|
||
|
||
# 颜色根据速度(蓝-绿-红)
|
||
color = self._velocity_color(velocity)
|
||
|
||
text = f"ID:{track_id} {velocity:.2f} m/s"
|
||
cv2.putText(frame, text, (x - 50, y),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
|
||
|
||
def _velocity_color(self, velocity):
|
||
"""根据速度返回颜色"""
|
||
if velocity < 0.5:
|
||
return (255, 0, 0) # 蓝色 - 慢
|
||
elif velocity < 1.5:
|
||
return (0, 255, 0) # 绿色 - 中
|
||
else:
|
||
return (0, 0, 255) # 红色 - 快
|
||
|
||
def get_summary(self):
|
||
"""获取流速汇总统计"""
|
||
if not self.velocities:
|
||
return {"error": "No velocity data"}
|
||
|
||
values = [v for v in self.velocities.values() if v is not None]
|
||
if not values:
|
||
return {"error": "No valid velocity data"}
|
||
|
||
return {
|
||
"count": len(values),
|
||
"mean_velocity": np.mean(values),
|
||
"median_velocity": np.median(values),
|
||
"std_velocity": np.std(values),
|
||
"min_velocity": np.min(values),
|
||
"max_velocity": np.max(values),
|
||
"q25": np.percentile(values, 25),
|
||
"q75": np.percentile(values, 75),
|
||
}
|
||
```
|
||
|
||
#### 5.3 独立 ByteTrack 集成(备选方案)
|
||
|
||
如果需要使用原版 ByteTrack(而非 Ultralytics 内置版):
|
||
|
||
```bash
|
||
# 安装 ByteTrack
|
||
git clone https://github.com/ifzhang/ByteTrack.git
|
||
cd ByteTrack
|
||
pip install -r requirements.txt
|
||
python setup.py develop
|
||
|
||
# 安装 Cython 版的 NMS(加速)
|
||
pip install cython
|
||
cd yolox/layers && python setup.py build_ext --inplace
|
||
```
|
||
|
||
```python
|
||
import cv2
|
||
import numpy as np
|
||
from yolox.tracker.byte_tracker import BYTETracker
|
||
|
||
class CustomByteTrackProcessor:
|
||
"""
|
||
使用原版 ByteTrack 的处理器
|
||
适用于需要精细控制追踪参数的场景
|
||
"""
|
||
|
||
def __init__(self, track_thresh=0.5, track_buffer=30, match_thresh=0.8):
|
||
# ByteTrack 参数
|
||
track_args = {
|
||
'track_thresh': track_thresh,
|
||
'track_buffer': track_buffer,
|
||
'match_thresh': match_thresh,
|
||
'frame_rate': 30,
|
||
}
|
||
self.tracker = BYTETracker(track_args)
|
||
|
||
def process_frame(self, frame, detections):
|
||
"""
|
||
处理单帧
|
||
detections: list of [x1, y1, x2, y2, confidence, class_id]
|
||
"""
|
||
# 分离高分框和低分框
|
||
det_boxes = np.array(detections)
|
||
|
||
# 运行追踪
|
||
online_targets = self.tracker.update(det_boxes, frame.shape[:2], (1, 1))
|
||
|
||
results = []
|
||
for t in online_targets:
|
||
results.append({
|
||
'track_id': t.track_id,
|
||
'bbox': t.tlbr,
|
||
'score': t.score,
|
||
})
|
||
|
||
return results
|
||
```
|
||
|
||
### Step 6:速度计算与后处理
|
||
|
||
#### 6.1 速度计算进阶方法
|
||
|
||
```python
|
||
import numpy as np
|
||
from scipy.signal import savgol_filter
|
||
from collections import defaultdict
|
||
|
||
class AdvancedVelocityCalculator:
|
||
"""
|
||
进阶流速计算器
|
||
包含多种速度估计方法和滤波技术
|
||
"""
|
||
|
||
def __init__(self, homography_matrix=None, fps=30):
|
||
self.H = homography_matrix
|
||
self.fps = fps
|
||
|
||
def point_velocity(self, trajectory, method='total'):
|
||
"""
|
||
从轨迹计算速度
|
||
|
||
trajectory: [(frame_idx, x, y), ...]
|
||
|
||
方法:
|
||
- 'total': 总位移/总时间(简单但粗糙)
|
||
- 'segment': 分段速度后取平均(更稳健)
|
||
- 'derivative': 位置微分(捕捉瞬时速度)
|
||
- 'kalman': 卡尔曼滤波估计
|
||
"""
|
||
if len(trajectory) < 3:
|
||
return None
|
||
|
||
if method == 'total':
|
||
return self._total_displacement(trajectory)
|
||
elif method == 'segment':
|
||
return self._segment_average(trajectory)
|
||
elif method == 'derivative':
|
||
return self._derivative_method(trajectory)
|
||
elif method == 'kalman':
|
||
return self._kalman_estimate(trajectory)
|
||
|
||
def _total_displacement(self, trajectory):
|
||
"""首尾位移法"""
|
||
start = trajectory[0]
|
||
end = trajectory[-1]
|
||
|
||
dx_m, dy_m = self._to_real_distance(
|
||
start[1], start[2], end[1], end[2]
|
||
)
|
||
distance = np.sqrt(dx_m**2 + dy_m**2)
|
||
time = (end[0] - start[0]) / self.fps
|
||
|
||
return distance / time if time > 0 else None
|
||
|
||
def _segment_average(self, trajectory, segment_size=10):
|
||
"""分段计算,取中位数(抗异常值)"""
|
||
velocities = []
|
||
for i in range(0, len(trajectory) - segment_size, segment_size // 2):
|
||
segment = trajectory[i:i + segment_size]
|
||
v = self._total_displacement(segment)
|
||
if v is not None:
|
||
velocities.append(v)
|
||
|
||
if not velocities:
|
||
return None
|
||
|
||
# 剔除异常值
|
||
velocities = np.array(velocities)
|
||
q1, q3 = np.percentile(velocities, [25, 75])
|
||
iqr = q3 - q1
|
||
filtered = velocities[(velocities >= q1 - 1.5*iqr) &
|
||
(velocities <= q3 + 1.5*iqr)]
|
||
|
||
return np.median(filtered) if len(filtered) > 0 else None
|
||
|
||
def _derivative_method(self, trajectory, smoothing=True):
|
||
"""数值微分法"""
|
||
frames = np.array([t[0] for t in trajectory])
|
||
xs = np.array([t[1] for t in trajectory])
|
||
ys = np.array([t[2] for t in trajectory])
|
||
|
||
if smoothing:
|
||
# Savitzky-Golay 滤波
|
||
window = min(len(xs), 11)
|
||
if window % 2 == 0:
|
||
window -= 1
|
||
if window >= 5:
|
||
xs = savgol_filter(xs, window, 2)
|
||
ys = savgol_filter(ys, window, 2)
|
||
|
||
# 数值微分
|
||
dt = 1.0 / self.fps
|
||
vx = np.gradient(xs) / dt
|
||
vy = np.gradient(ys) / dt
|
||
|
||
# 转换到实际距离
|
||
speed_pixel = np.sqrt(vx**2 + vy**2)
|
||
|
||
# 平均像素速度转换为 m/s
|
||
avg_pixel_speed = np.mean(speed_pixel)
|
||
|
||
if self.H is not None:
|
||
# 在轨迹中间位置采样转换因子
|
||
mid_idx = len(trajectory) // 2
|
||
mid_x, mid_y = trajectory[mid_idx][1], trajectory[mid_idx][2]
|
||
ppm = self._get_pixels_per_meter_at(mid_x, mid_y)
|
||
return avg_pixel_speed / ppm
|
||
else:
|
||
return avg_pixel_speed # 像素/秒
|
||
|
||
def _kalman_estimate(self, trajectory):
|
||
"""卡尔曼滤波估计"""
|
||
from filterpy.kalman import KalmanFilter
|
||
|
||
# 初始化卡尔曼滤波
|
||
kf = KalmanFilter(dim_x=4, dim_z=2)
|
||
|
||
# 状态: [x, y, vx, vy]
|
||
kf.F = np.array([[1, 0, 1/self.fps, 0],
|
||
[0, 1, 0, 1/self.fps],
|
||
[0, 0, 1, 0],
|
||
[0, 0, 0, 1]])
|
||
|
||
kf.H = np.array([[1, 0, 0, 0],
|
||
[0, 1, 0, 0]])
|
||
|
||
kf.R *= 5 # 测量噪声
|
||
kf.Q[0:2, 0:2] *= 0.01 # 过程噪声
|
||
kf.P *= 100
|
||
|
||
# 初始化
|
||
first = trajectory[0]
|
||
kf.x = np.array([first[1], first[2], 0, 0])
|
||
|
||
# 更新
|
||
for i, (frame, x, y) in enumerate(trajectory[1:]):
|
||
kf.predict()
|
||
kf.update(np.array([x, y]))
|
||
|
||
# 速度估计
|
||
vx, vy = kf.x[2], kf.x[3]
|
||
|
||
if self.H is not None:
|
||
mid_x = trajectory[len(trajectory)//2][1]
|
||
mid_y = trajectory[len(trajectory)//2][2]
|
||
ppm = self._get_pixels_per_meter_at(mid_x, mid_y)
|
||
return np.sqrt(vx**2 + vy**2) / ppm
|
||
|
||
return np.sqrt(vx**2 + vy**2)
|
||
|
||
def _to_real_distance(self, x1, y1, x2, y2):
|
||
"""计算两像素点间的实际距离"""
|
||
if self.H is not None:
|
||
p1 = self._pixel_to_real(x1, y1)
|
||
p2 = self._pixel_to_real(x2, y2)
|
||
return p2[0] - p1[0], p2[1] - p1[1]
|
||
else:
|
||
return x2 - x1, y2 - y1
|
||
|
||
def _pixel_to_real(self, x, y):
|
||
pt = np.array([[[x, y]]], dtype=np.float32)
|
||
transformed = cv2.perspectiveTransform(pt, self.H)
|
||
return transformed[0][0]
|
||
|
||
def _get_pixels_per_meter_at(self, x, y):
|
||
"""获取指定位置的像素/米比例(考虑透视)"""
|
||
if self.H is None:
|
||
return 1.0
|
||
|
||
# 在点附近取两个相距 1 米的点
|
||
real_near = np.array([[[x, y]]], dtype=np.float32)
|
||
real_far = np.array([[[x + 1.0, y]]], dtype=np.float32)
|
||
|
||
# 反向变换到像素空间
|
||
H_inv = np.linalg.inv(self.H)
|
||
px_near = cv2.perspectiveTransform(real_near, H_inv)[0][0]
|
||
px_far = cv2.perspectiveTransform(real_far, H_inv)[0][0]
|
||
|
||
pixel_dist = np.linalg.norm(px_far - px_near)
|
||
return pixel_dist # 像素/米
|
||
```
|
||
|
||
#### 6.2 多目标速度融合
|
||
|
||
```python
|
||
def compute_flow_velocity_summary(velocities, method='trimmed_mean',
|
||
trim_fraction=0.1):
|
||
"""
|
||
从多个追踪对象的速度估计中计算综合流速
|
||
|
||
方法:
|
||
- 'trimmed_mean': 截尾均值(去除极端值)
|
||
- 'median': 中位数(稳健)
|
||
- 'weighted': 按轨迹长度加权
|
||
- 'mode': 众数(最常见速度)
|
||
"""
|
||
if not velocities:
|
||
return None
|
||
|
||
vals = np.array([v for v in velocities if v is not None and v > 0])
|
||
if len(vals) == 0:
|
||
return None
|
||
|
||
if method == 'trimmed_mean':
|
||
sorted_vals = np.sort(vals)
|
||
n = len(sorted_vals)
|
||
trim_count = max(1, int(n * trim_fraction))
|
||
trimmed = sorted_vals[trim_count:-trim_count]
|
||
return np.mean(trimmed) if len(trimmed) > 0 else np.median(vals)
|
||
|
||
elif method == 'median':
|
||
return np.median(vals)
|
||
|
||
elif method == 'weighted':
|
||
# weights = trajectory_lengths
|
||
pass # 需要传入权重
|
||
|
||
elif method == 'mode':
|
||
from scipy.stats import gaussian_kde
|
||
kde = gaussian_kde(vals)
|
||
x_eval = np.linspace(vals.min(), vals.max(), 1000)
|
||
densities = kde(x_eval)
|
||
return x_eval[np.argmax(densities)]
|
||
|
||
return np.mean(vals)
|
||
```
|
||
|
||
### Step 7:完整流水线
|
||
|
||
```python
|
||
#!/usr/bin/env python3
|
||
"""
|
||
水流速度检测完整流水线
|
||
"""
|
||
import cv2
|
||
import numpy as np
|
||
import yaml
|
||
import json
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from ultralytics import YOLO
|
||
from collections import defaultdict
|
||
|
||
class WaterFlowVelocityPipeline:
|
||
"""
|
||
端到端水流速度检测流水线
|
||
|
||
使用流程:
|
||
1. 初始化:传入模型路径和标定参数
|
||
2. 处理视频:自动检测、追踪、计算速度
|
||
3. 获取结果:输出流速统计和可视化
|
||
"""
|
||
|
||
def __init__(self, config_path: str):
|
||
"""从配置文件初始化"""
|
||
with open(config_path) as f:
|
||
self.config = yaml.safe_load(f)
|
||
|
||
# 加载模型
|
||
self.model = YOLO(self.config['model_path'])
|
||
|
||
# 加载标定参数
|
||
if 'homography_matrix' in self.config:
|
||
self.H = np.array(self.config['homography_matrix'])
|
||
else:
|
||
self.H = None
|
||
self.ppm = self.config.get('pixels_per_meter')
|
||
|
||
# 初始化轨迹存储
|
||
self.trajectories = defaultdict(list)
|
||
self.frame_count = 0
|
||
|
||
def run(self, video_path: str, output_dir: str):
|
||
"""运行完整流水线"""
|
||
output_dir = Path(output_dir)
|
||
output_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
# 1. 处理视频
|
||
results = self._process_video(video_path)
|
||
|
||
# 2. 计算速度
|
||
velocities = self._calculate_all_velocities()
|
||
|
||
# 3. 统计分析
|
||
summary = self._compute_summary(velocities)
|
||
|
||
# 4. 保存结果
|
||
self._save_results(output_dir, summary, velocities)
|
||
|
||
# 5. 生成报告
|
||
self._generate_report(output_dir, summary)
|
||
|
||
return summary
|
||
|
||
def _process_video(self, video_path: str):
|
||
"""处理视频,执行检测+追踪"""
|
||
cap = cv2.VideoCapture(video_path)
|
||
video_fps = cap.get(cv2.CAP_PROP_FPS)
|
||
|
||
results = []
|
||
frame_idx = 0
|
||
|
||
while True:
|
||
ret, frame = cap.read()
|
||
if not ret:
|
||
break
|
||
|
||
# 检测 + 追踪
|
||
det_results = self.model.track(
|
||
frame,
|
||
persist=True,
|
||
conf=self.config.get('conf_threshold', 0.3),
|
||
iou=self.config.get('iou_threshold', 0.45),
|
||
tracker="botsort.yaml",
|
||
verbose=False,
|
||
)
|
||
|
||
if det_results[0].boxes is not None and det_results[0].boxes.id is not None:
|
||
boxes = det_results[0].boxes.xyxy.cpu().numpy()
|
||
ids = det_results[0].boxes.id.cpu().numpy().astype(int)
|
||
|
||
for box, track_id in zip(boxes, ids):
|
||
cx = (box[0] + box[2]) / 2
|
||
cy = (box[1] + box[3]) / 2
|
||
self.trajectories[track_id].append((frame_idx, cx, cy))
|
||
|
||
self.frame_count = frame_idx
|
||
frame_idx += 1
|
||
|
||
if frame_idx % 100 == 0:
|
||
print(f"Processed {frame_idx} frames, {len(self.trajectories)} tracks")
|
||
|
||
cap.release()
|
||
return results
|
||
|
||
def _calculate_all_velocities(self):
|
||
"""计算所有追踪对象的速度"""
|
||
velocities = {}
|
||
|
||
for track_id, traj in self.trajectories.items():
|
||
if len(traj) < self.config.get('min_track_length', 10):
|
||
continue # 轨迹太短,不可靠
|
||
|
||
velocity = self._calculate_velocity(traj)
|
||
if velocity is not None and 0 < velocity < 10:
|
||
# 过滤不合理值(>10 m/s 的水流不太可能)
|
||
velocities[track_id] = velocity
|
||
|
||
return velocities
|
||
|
||
def _calculate_velocity(self, trajectory):
|
||
"""计算单个轨迹的速度"""
|
||
if len(trajectory) < 5:
|
||
return None
|
||
|
||
# 使用分段平均法
|
||
segment_size = 15
|
||
velocities = []
|
||
|
||
for i in range(0, len(trajectory) - segment_size, segment_size // 2):
|
||
seg = trajectory[i:i + segment_size]
|
||
start_f, sx, sy = seg[0]
|
||
end_f, ex, ey = seg[-1]
|
||
|
||
if self.H is not None:
|
||
p1 = cv2.perspectiveTransform(
|
||
np.array([[[sx, sy]]], dtype=np.float32), self.H)[0][0]
|
||
p2 = cv2.perspectiveTransform(
|
||
np.array([[[ex, ey]]], dtype=np.float32), self.H)[0][0]
|
||
dist = np.linalg.norm(p2 - p1)
|
||
elif self.ppm:
|
||
dist = np.sqrt((ex-sx)**2 + (ey-sy)**2) / self.ppm
|
||
else:
|
||
continue
|
||
|
||
time = (end_f - start_f) / 30.0 # 假设 30fps
|
||
if time > 0:
|
||
velocities.append(dist / time)
|
||
|
||
if not velocities:
|
||
return None
|
||
|
||
# 中位数滤波
|
||
return float(np.median(velocities))
|
||
|
||
def _compute_summary(self, velocities):
|
||
"""计算流速统计摘要"""
|
||
if not velocities:
|
||
return {"error": "No valid velocity data"}
|
||
|
||
vals = np.array(list(velocities.values()))
|
||
|
||
# 截尾均值
|
||
sorted_v = np.sort(vals)
|
||
n = len(sorted_v)
|
||
trim = max(1, int(n * 0.1))
|
||
trimmed = sorted_v[trim:-trim]
|
||
|
||
return {
|
||
"timestamp": datetime.now().isoformat(),
|
||
"total_tracks": len(self.trajectories),
|
||
"valid_tracks": len(velocities),
|
||
"mean_velocity": float(np.mean(vals)),
|
||
"median_velocity": float(np.median(vals)),
|
||
"trimmed_mean": float(np.mean(trimmed)) if len(trimmed) > 0 else None,
|
||
"std": float(np.std(vals)),
|
||
"min": float(np.min(vals)),
|
||
"max": float(np.max(vals)),
|
||
"q25": float(np.percentile(vals, 25)),
|
||
"q75": float(np.percentile(vals, 75)),
|
||
"frame_count": self.frame_count,
|
||
}
|
||
|
||
def _save_results(self, output_dir, summary, velocities):
|
||
"""保存结果"""
|
||
# JSON 结果
|
||
with open(output_dir / "results.json", 'w') as f:
|
||
json.dump({
|
||
"summary": summary,
|
||
"per_track_velocities": {str(k): v for k, v in velocities.items()}
|
||
}, f, indent=2)
|
||
|
||
# 轨迹 CSV
|
||
import csv
|
||
with open(output_dir / "trajectories.csv", 'w', newline='') as f:
|
||
writer = csv.writer(f)
|
||
writer.writerow(['track_id', 'frame', 'pixel_x', 'pixel_y',
|
||
'real_x', 'real_y', 'velocity_mps'])
|
||
for track_id, traj in self.trajectories.items():
|
||
v = velocities.get(track_id, 0)
|
||
for frame, px, py in traj:
|
||
if self.H is not None:
|
||
real = cv2.perspectiveTransform(
|
||
np.array([[[px, py]]], dtype=np.float32), self.H)[0][0]
|
||
writer.writerow([track_id, frame, px, py,
|
||
real[0], real[1], v])
|
||
else:
|
||
writer.writerow([track_id, frame, px, py, '', '', v])
|
||
|
||
def _generate_report(self, output_dir, summary):
|
||
"""生成文本报告"""
|
||
report = f"""
|
||
========================================
|
||
水流速度检测报告
|
||
========================================
|
||
时间: {summary.get('timestamp', 'N/A')}
|
||
处理帧数: {summary.get('frame_count', 'N/A')}
|
||
总追踪数: {summary.get('total_tracks', 'N/A')}
|
||
有效追踪数: {summary.get('valid_tracks', 'N/A')}
|
||
----------------------------------------
|
||
平均流速: {summary.get('mean_velocity', 'N/A'):.3f} m/s
|
||
中位流速: {summary.get('median_velocity', 'N/A'):.3f} m/s
|
||
截尾均值: {summary.get('trimmed_mean', 'N/A'):.3f} m/s
|
||
标准差: {summary.get('std', 'N/A'):.3f} m/s
|
||
最小流速: {summary.get('min', 'N/A'):.3f} m/s
|
||
最大流速: {summary.get('max', 'N/A'):.3f} m/s
|
||
25分位: {summary.get('q25', 'N/A'):.3f} m/s
|
||
75分位: {summary.get('q75', 'N/A'):.3f} m/s
|
||
========================================
|
||
"""
|
||
with open(output_dir / "report.txt", 'w') as f:
|
||
f.write(report)
|
||
|
||
print(report)
|
||
|
||
|
||
# ===== 配置文件示例 =====
|
||
"""
|
||
# config.yaml
|
||
model_path: "runs/detect/train/weights/best.pt"
|
||
conf_threshold: 0.3
|
||
iou_threshold: 0.45
|
||
min_track_length: 10
|
||
|
||
# 标定参数(二选一)
|
||
homography_matrix:
|
||
- [1.2e-3, 3.4e-5, -0.5]
|
||
- [2.1e-5, 1.1e-3, -0.3]
|
||
- [8.7e-7, 2.3e-7, 1.0]
|
||
|
||
# 或者简单比例
|
||
# pixels_per_meter: 50.0
|
||
"""
|
||
```
|
||
|
||
---
|
||
|
||
## 四、精度验证与误差分析
|
||
|
||
### 4.1 精度验证方法
|
||
|
||
```
|
||
现场验证流程:
|
||
|
||
1. 同步测量法
|
||
- 同时使用传统方法(流速仪/浮标法)测量
|
||
- 对比 YOLO+ByteTrack 的测量结果
|
||
- 计算相对误差
|
||
|
||
2. 已知流量法
|
||
- 在人工渠道中,使用已知流量
|
||
- 对比计算出的流速
|
||
- Q = A × v(流量 = 截面积 × 流速)
|
||
|
||
3. 多点验证法
|
||
- 在河道不同位置设置多个测量区域
|
||
- 每个区域独立计算流速
|
||
- 对比空间一致性
|
||
|
||
精度目标:
|
||
├── 优秀:< 5% 相对误差
|
||
├── 良好:5-10% 相对误差
|
||
├── 可用:10-15% 相对误差
|
||
└── 不可用:> 15% 相对误差
|
||
```
|
||
|
||
### 4.2 误差来源分解
|
||
|
||
| 误差来源 | 典型贡献 | 减少方法 |
|
||
|---------|---------|---------|
|
||
| 相机标定误差 | 2-8% | 增加控制点、定期复标 |
|
||
| 透视校正误差 | 1-5% | 单应性矩阵、分区标定 |
|
||
| 示踪物跟随性 | 1-5% | 使用轻量示踪物 |
|
||
| 检测中心点抖动 | 1-3% | 轨迹平滑、卡尔曼滤波 |
|
||
| 追踪 ID 切换 | 2-5% | 调整追踪参数 |
|
||
| 时间同步误差 | < 1% | 使用视频帧率 |
|
||
| 水面波动 | 2-5% | 长时间平均 |
|
||
| **总误差** | **5-15%** | |
|
||
|
||
---
|
||
|
||
## 五、部署方案
|
||
|
||
### 5.1 边缘部署(Jetson)
|
||
|
||
```bash
|
||
# 在 Jetson Orin 上部署
|
||
|
||
# 1. 安装 JetPack SDK
|
||
# (从 NVIDIA 官网下载并烧录)
|
||
|
||
# 2. 安装 PyTorch for Jetson
|
||
pip install torch torchvision --index-url https://download.pytorch.org/whl/cu118
|
||
|
||
# 3. 安装 Ultralytics
|
||
pip install ultralytics
|
||
|
||
# 4. 模型优化 - TensorRT 转换
|
||
from ultralytics import YOLO
|
||
|
||
model = YOLO('best.pt')
|
||
model.export(format='engine', # TensorRT
|
||
half=True, # FP16
|
||
int8=False, # INT8 需要校准
|
||
device=0)
|
||
|
||
# 5. 使用 TensorRT 模型推理
|
||
model_trt = YOLO('best.engine')
|
||
|
||
# 性能对比(参考值):
|
||
# YOLOv8n FP32: ~50 FPS on Orin Nano
|
||
# YOLOv8n FP16: ~80 FPS on Orin Nano
|
||
# YOLOv8n INT8: ~120 FPS on Orin Nano
|
||
# YOLOv8s FP16: ~40 FPS on Orin Nano
|
||
```
|
||
|
||
### 5.2 云端/服务器部署
|
||
|
||
```python
|
||
# 在 GPU 服务器上运行
|
||
# 支持多路视频同时处理
|
||
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
import threading
|
||
|
||
class MultiCameraPipeline:
|
||
"""多相机并发处理"""
|
||
|
||
def __init__(self, config_files: list):
|
||
self.pipelines = []
|
||
for cfg in config_files:
|
||
self.pipelines.append(WaterFlowVelocityPipeline(cfg))
|
||
|
||
def run_all(self, video_paths: list, output_dirs: list):
|
||
with ThreadPoolExecutor(max_workers=len(video_paths)) as executor:
|
||
futures = []
|
||
for pipe, video, output in zip(self.pipelines, video_paths, output_dirs):
|
||
future = executor.submit(pipe.run, video, output)
|
||
futures.append(future)
|
||
|
||
for future in futures:
|
||
result = future.result()
|
||
print(f"Result: {result}")
|
||
```
|
||
|
||
### 5.3 实时监控系统架构
|
||
|
||
```
|
||
┌──────────────────────────────────────────────────────┐
|
||
│ 实时监控系统 │
|
||
│ │
|
||
│ ┌──────────┐ ┌───────────┐ ┌───────────────┐ │
|
||
│ │ 相机/IPC │───>│ 边缘设备 │───>│ 结果上报服务 │ │
|
||
│ │ RTSP流 │ │ Jetson │ │ MQTT/HTTP │ │
|
||
│ └──────────┘ │ 检测+追踪 │ └───────┬───────┘ │
|
||
│ │ 速度计算 │ │ │
|
||
│ └───────────┘ │ │
|
||
│ ▼ │
|
||
│ ┌───────────────┐ │
|
||
│ │ 数据库/时序DB │ │
|
||
│ │ InfluxDB/PgSQL │ │
|
||
│ └───────┬───────┘ │
|
||
│ │ │
|
||
│ ▼ │
|
||
│ ┌───────────────┐ │
|
||
│ │ 可视化/告警 │ │
|
||
│ │ Grafana/Web │ │
|
||
│ └───────────────┘ │
|
||
└──────────────────────────────────────────────────────┘
|
||
|
||
数据流:
|
||
1. 相机以 RTSP 推流到边缘设备
|
||
2. Jetson 运行 YOLO + 追踪 + 速度计算
|
||
3. 每 N 秒上报一次流速统计结果
|
||
4. 数据存入时序数据库
|
||
5. Grafana 展示实时流速曲线
|
||
6. 流速超标时触发告警(短信/邮件/声音)
|
||
```
|
||
|
||
---
|
||
|
||
## 六、进阶优化
|
||
|
||
### 6.1 无自然示踪物时的方案
|
||
|
||
```
|
||
当水面没有自然漂浮物时:
|
||
|
||
方案 A:人工投放示踪物
|
||
├── 环保可降解示踪球(直径 2-5cm)
|
||
├── 木屑/锯末(适用于小溪)
|
||
├── 生物可降解浮标
|
||
└── 适用:短期测量
|
||
|
||
方案 B:基于水面纹理的光流法
|
||
├── 使用 Lucas-Kanade 光流追踪水面纹理
|
||
├── 不需要离散目标检测
|
||
├── 但对水面纹理要求高
|
||
└── 实现:cv2.calcOpticalFlowFarneback()
|
||
|
||
方案 C:深度学习密集光流
|
||
├── RAFT / FlowFormer 等光流模型
|
||
├── 计算整个画面的稠密运动场
|
||
├── 不需要检测器
|
||
└── 计算量大,适合离线处理
|
||
|
||
方案 D:混合方案(推荐)
|
||
├── YOLO 检测离散示踪物 → 精确轨迹
|
||
├── 光流法补充无示踪物区域
|
||
└── 两者结果融合
|
||
```
|
||
|
||
### 6.2 多摄像头融合
|
||
|
||
```python
|
||
def multi_camera_fusion(camera_results: list):
|
||
"""
|
||
融合多个相机的流速测量结果
|
||
|
||
camera_results: list of {
|
||
'camera_id': str,
|
||
'mean_velocity': float,
|
||
'std': float,
|
||
'n_tracks': int,
|
||
'roi': (x1, y1, x2, y2), # 相机覆盖区域
|
||
}
|
||
"""
|
||
# 按轨迹数加权平均
|
||
total_weight = sum(r['n_tracks'] for r in camera_results)
|
||
weighted_velocity = sum(
|
||
r['mean_velocity'] * r['n_tracks']
|
||
for r in camera_results
|
||
) / total_weight
|
||
|
||
return {
|
||
'fused_velocity': weighted_velocity,
|
||
'camera_count': len(camera_results),
|
||
'per_camera': camera_results,
|
||
}
|
||
```
|
||
|
||
### 6.3 流量估算(速度 → 流量)
|
||
|
||
```python
|
||
def estimate_discharge(velocity, cross_section_area, velocity_coefficient=0.85):
|
||
"""
|
||
从表面流速估算流量
|
||
|
||
Q = v_surface × α × A
|
||
|
||
其中:
|
||
- v_surface: 表面流速 (m/s)
|
||
- α: 表面流速到平均流速的转换系数(通常 0.8-0.9)
|
||
- A: 过水断面积 (m²)
|
||
|
||
注意:表面流速通常比平均流速大 15-20%
|
||
"""
|
||
return velocity * velocity_coefficient * cross_section_area
|
||
|
||
# 示例:
|
||
# 表面流速 1.5 m/s,河宽 10m,平均水深 1m
|
||
# Q = 1.5 × 0.85 × (10 × 1) = 12.75 m³/s
|
||
```
|
||
|
||
---
|
||
|
||
## 七、项目时间规划
|
||
|
||
```
|
||
周 1-2:需求分析与方案设计
|
||
├── 确定测量场景和目标
|
||
├── 选择相机位置和型号
|
||
├── 制定标定方案
|
||
└── 准备硬件设备
|
||
|
||
周 3-4:硬件部署与标定
|
||
├── 安装相机和边缘计算设备
|
||
├── 执行相机标定
|
||
├── 采集初始标定数据
|
||
└── 验证标定精度
|
||
|
||
周 5-6:数据采集与标注
|
||
├── 采集不同条件下的视频
|
||
├── 抽取关键帧进行标注
|
||
├── 质量检查
|
||
└── 准备训练数据集
|
||
|
||
周 7-8:模型训练与优化
|
||
├── 训练 YOLO 检测器
|
||
├── 评估和调整
|
||
├── 集成追踪器
|
||
└── 验证追踪效果
|
||
|
||
周 9-10:速度计算与验证
|
||
├── 实现速度计算模块
|
||
├── 与传统方法对比验证
|
||
├── 误差分析
|
||
└── 优化参数
|
||
|
||
周 11-12:系统集成与部署
|
||
├── 端到端流水线整合
|
||
├── 边缘设备部署
|
||
├── 实时监控系统搭建
|
||
└── 文档编写
|
||
|
||
总计:约 3 个月完成从 0 到生产部署
|
||
```
|
||
|
||
---
|
||
|
||
## 八、成本估算
|
||
|
||
| 项目 | 低配方案 | 推荐方案 | 高配方案 |
|
||
|------|---------|---------|---------|
|
||
| 相机 | 500 元(USB 相机) | 3,000 元(IPC) | 10,000 元(工业相机) |
|
||
| 计算设备 | 800 元(Raspberry Pi) | 4,000 元(Jetson Orin Nano) | 8,000 元(Jetson AGX Orin) |
|
||
| 支架/配件 | 200 元 | 500 元 | 2,000 元 |
|
||
| 标定工具 | 100 元(卷尺) | 500 元 | 2,000 元(全站仪) |
|
||
| **硬件总计** | **~1,600 元** | **~8,000 元** | **~22,000 元** |
|
||
| 人工(标注+开发) | 5,000 元 | 15,000 元 | 30,000 元 |
|
||
| **总成本** | **~7,000 元** | **~23,000 元** | **~52,000 元** |
|
||
|
||
对比:传统 ADCP 设备 5-20 万元,雷达流速仪 2-10 万元。
|
||
|
||
---
|
||
|
||
## 九、风险与对策
|
||
|
||
| 风险 | 概率 | 影响 | 对策 |
|
||
|------|------|------|------|
|
||
| 标定期失效(相机移动) | 中 | 高 | 牢固固定相机,定期复标 |
|
||
| 水面无示踪物 | 高 | 高 | 混合光流法方案 |
|
||
| 夜间无法工作 | 中 | 中 | 补光灯 / 红外相机 |
|
||
| 暴雨/雾天镜头模糊 | 中 | 中 | 防水罩 / 自动清洗 |
|
||
| ID 切换导致轨迹断裂 | 低 | 中 | 调整追踪参数 |
|
||
| 模型泛化能力差 | 中 | 高 | 多样化训练数据 |
|
||
| 边缘设备性能不足 | 低 | 中 | 模型量化 / 降低分辨率 |
|
||
|
||
---
|
||
|
||
## 十、参考资源
|
||
|
||
### 学术论文
|
||
- **ByteTrack**: Zhang et al., "ByteTrack: Multi-Object Tracking by Associating Every Detection Box", ECCV 2022
|
||
- **LSPIV**: Fujita et al., "Large-scale Particle Image Velocimetry for flow analysis", 1998
|
||
- **YOLOv8**: Ultralytics YOLOv8, https://github.com/ultralytics/ultralytics
|
||
- **STIV**: 空间时间图像测速法 (Space-Time Image Velocimetry)
|
||
|
||
### 开源项目
|
||
- [Ultralytics YOLOv8](https://github.com/ultralytics/ultralytics) — 检测+追踪一体框架
|
||
- [ByteTrack](https://github.com/ifzhang/ByteTrack) — 原版 ByteTrack
|
||
- [OpenPIV](https://github.com/OpenPIV/openpiv-python) — 传统 PIV 实现
|
||
- [rivertools](https://github.com/orgs/OpenRiverCam) — 河流监测开源项目
|
||
- [ras-riv](https://github.com/HydroLogic/ras-riv) — 水文分析工具
|
||
|
||
### 商业产品
|
||
- **FathomNet**: 水面监测云平台
|
||
- **iSAR**: 雷达表面流速仪
|
||
- **OTT HydroMet**: 水文监测综合方案
|
||
|
||
---
|
||
|
||
## 十一、快速上手检查清单
|
||
|
||
```
|
||
[ ] 1. 确定测量场景(河道宽度、流速范围、光照条件)
|
||
[ ] 2. 选择并安装相机(固定位置、角度测试)
|
||
[ ] 3. 准备标定参考物(至少 4 个控制点)
|
||
[ ] 4. 执行相机标定,检查误差 < 0.3m
|
||
[ ] 5. 采集 300+ 张训练图像
|
||
[ ] 6. 标注数据(Roboflow 或 CVAT)
|
||
[ ] 7. 训练 YOLOv8n 模型,目标 mAP@50 > 0.85
|
||
[ ] 8. 集成 ByteTrack/BoT-SORT 追踪器
|
||
[ ] 9. 处理测试视频,检查追踪效果
|
||
[ ] 10. 与传统方法对比验证精度
|
||
[ ] 11. 优化参数,减少误差
|
||
[ ] 12. 部署到边缘设备,实现实时监测
|
||
```
|
||
|
||
---
|
||
|
||
*整理:知识库管理员 | 归档:2026-04-23*
|