Files
chill_notes/专业领域/ByteTrack/YOLO11+RV1126B水流速度检测-混合方案实现.md
2026-04-23 19:03:28 +08:00

2214 lines
66 KiB
Markdown
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# YOLO11 + RV1126B 水流速度检测 — 混合方案完整实现
> **核心方案**YOLO11 检测离散示踪物 + 光流法补充无示踪物区域 + 结果融合
>
> **硬件平台**Rockchip RV1126B (1.2 TOPS NPU)
>
> 创建2026-04-23
---
## 一、RV1126B 硬件约束分析
### 1.1 芯片规格
| 参数 | 规格 | 对我们的影响 |
|------|------|-------------|
| **NPU** | 1.2 TOPS (INT8) | 只能跑小模型YOLO11n 是上限 |
| **CPU** | ARM Cortex-A7 (单核) | 光流法跑 CPU性能有限 |
| **内存** | 256MB-512MB DDR3 | 模型不能太大,批处理=1 |
| **视频编码** | H.264/H.265 硬编解码 | 可以利用 VPU 解码减轻 CPU |
| **ISP** | 支持 MIPI CSI 相机输入 | 直接接摄像头 |
| **操作系统** | Buildroot / Debian (Linux) | 交叉编译环境 |
### 1.2 性能预期
| 模型 | 输入尺寸 | 量化方式 | 预期 FPS | 备注 |
|------|---------|---------|---------|------|
| YOLO11n | 320×320 | INT8 | ~25-30 | **推荐**,可以实时 |
| YOLO11n | 640×640 | INT8 | ~8-12 | 精度更高但不够实时 |
| YOLO11s | 320×320 | INT8 | ~12-15 | 可能内存紧张 |
| YOLO11n | 320×320 | FP16 | ~15-20 | 精度略高,速度慢 |
**关键决策:使用 YOLO11n + 320×320 + INT8 量化**
---
## 二、整体架构设计
```
┌────────────────────────────────────────────────────────────┐
│ RV1126B 处理流水线 │
│ │
│ ┌──────────┐ ┌────────────┐ ┌──────────────────┐ │
│ │ 摄像头 │───>│ VPU 硬解码 │───>│ 帧缓冲 (NV12) │ │
│ │ MIPI/USB │ │ (硬件加速) │ │ │ │
│ └──────────┘ └────────────┘ └────────┬─────────┘ │
│ │ │
│ ┌─────────────────┼──────────┐ │
│ │ │ │ │
│ ▼ ▼ │ │
│ ┌─────────────┐ ┌──────────────┐ │ │
│ │ NPU 推理 │ │ CPU 光流计算 │ │ │
│ │ YOLO11n │ │ Farneback │ │ │
│ │ INT8 RKNN │ │ 稀疏光流 │ │ │
│ │ 320×320 │ │ ROI 区域 │ │ │
│ └──────┬──────┘ └──────┬───────┘ │ │
│ │ │ │ │
│ ▼ ▼ │ │
│ ┌─────────────────────────────┐ │ │
│ │ 结果融合引擎 │ │ │
│ │ │ │ │
│ │ YOLO 区域 → 精确轨迹 │ │ │
│ │ 光流区域 → 稠密速度场 │ │ │
│ │ 融合 → 综合流速估计 │ │ │
│ └──────────────┬──────────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────────────────────┐ │ │
│ │ 流速计算 + 像素→实际坐标转换 │ │ │
│ │ 单应性矩阵 / 标定因子 │ │ │
│ └──────────────┬──────────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────────────────────┐ │ │
│ │ 结果输出: JSON / MQTT / 显示 │ │ │
│ └─────────────────────────────┘ │ │
└──────────────────────────────────────────────────────┘
时序规划 (目标 30 FPS, 每帧 33ms):
├── VPU 解码: ~3ms (硬件)
├── YOLO11n 推理: ~35ms (NPU, INT8) → 可以每 2 帧跑一次
├── 光流计算: ~15ms (CPU, 稀疏/降采样)
├── 结果融合: ~2ms (CPU)
└── 总周期: 每帧 ~20ms (YOLO 隔帧) → 可达 15-20 FPS
```
---
## 三、环境搭建
### 3.1 主机端(训练 + 模型转换)
```bash
# 在 x86 Linux 主机上操作(用于训练和模型转换)
# 1. 创建 Python 环境
conda create -n yolo11-rv1126 python=3.10 -y
conda activate yolo11-rv1126
# 2. 安装 Ultralytics (YOLO11)
pip install ultralytics
# 3. 安装 RKNN Toolkit2
# 下载: https://github.com/airockchip/rknn-toolkit2
# 选择对应版本Ubuntu x86
git clone https://github.com/airockchip/rknn-toolkit2.git
cd rknn-toolkit2/rknn-toolkit2/packages/
pip install rknn_toolkit2-*.whl
# 4. 其他依赖
pip install opencv-python numpy onnx onnxruntime
pip install roboflow # 数据管理(可选)
pip install supervision # 可视化
```
### 3.2 RV1126B 目标板环境
```bash
# 在 RV1126B 开发板上操作
# 1. 安装 RKNN Runtime推理运行时不含训练功能
# 从 Rockchip SDK 或板子系统镜像中获取
# 通常在 /usr/lib/librknnrt.so
# 2. 安装 Python RKNN 运行时
pip install rknn_toolkit2_lite # 轻量版,只含推理
# 3. 安装 OpenCV用于光流和后处理
# 使用 Buildroot 编译或 apt 安装
apt-get install python3-opencv # 如果使用 Debian 系统
# 4. 其他依赖
pip install numpy
```
### 3.3 交叉编译工具链
```bash
# 安装 Rockchip 交叉编译工具链
# 从 Rockchip SDK 获取
# 设置环境变量
export CROSS_COMPILE=arm-buildroot-linux-gnueabihf-
export CC=${CROSS_COMPILE}gcc
export CXX=${CROSS_COMPILE}g++
# 交叉编译 OpenCV如果 Buildroot 未包含)
# 交叉编译 Python C 扩展模块
```
---
## 四、YOLO11 模型训练
### 4.1 数据准备
```yaml
# water_flow.yaml
path: ./water_flow_dataset
train: images/train
val: images/val
nc: 1
names:
- floating_object
```
```
数据集结构:
water_flow_dataset/
├── images/
│ ├── train/ # 800-2000 张
│ └── val/ # 200-500 张
└── labels/
├── train/ # YOLO 格式标注
└── val/
```
### 4.2 YOLO11n 训练
```python
from ultralytics import YOLO
# 加载 YOLO11n 预训练模型
model = YOLO('yolo11n.pt')
# 训练配置
results = model.train(
# 数据集
data='water_flow.yaml',
# 训练参数
epochs=150,
imgsz=640, # 训练时用高分辨率
batch=16,
device=0,
patience=30,
# 针对水面场景的数据增强
hsv_h=0.015, # 色调小幅变化
hsv_s=0.7, # 饱和度大变化(水面反光)
hsv_v=0.4, # 亮度变化
degrees=0.0, # 不旋转(相机固定)
translate=0.1,
scale=0.5, # 大尺度变化(适应不同距离目标)
fliplr=0.5,
mosaic=1.0,
mixup=0.1,
copy_paste=0.1, # 复制粘贴增强(示踪物少的场景)
# 优化器
optimizer='AdamW',
lr0=0.001,
lrf=0.01,
weight_decay=0.0005,
# 保存
project='runs/detect',
name='water_flow_yolo11n',
save_period=10,
)
# 验证
metrics = model.val()
print(f"mAP@50: {metrics.box.map50:.4f}")
print(f"mAP@50-95: {metrics.box.map:.4f}")
```
### 4.3 针对 RV1126B 的特殊训练策略
```python
# 关键RV1126B NPU 是 INT8 量化,训练时需要考虑量化感知
from ultralytics import YOLO
import torch
# 方案 APTQ训练后量化— 简单但精度损失 2-5%
# 直接导出 INT8下面会讲到
# 方案 BQAT量化感知训练— 精度损失 < 1%,推荐
class QATWaterFlowTrainer:
"""
量化感知训练,让模型适应 INT8 量化
"""
def __init__(self):
self.model = YOLO('yolo11n.pt')
def train_with_quantization(self, data_config, epochs=50):
# 第一阶段:正常训练到收敛
self.model.train(
data=data_config,
epochs=epochs,
imgsz=640,
batch=16,
)
# 第二阶段:模拟量化微调
# 注意Ultralytics 不直接支持 QAT
# 需要手动添加 fake quantization
# 加载最佳权重
model = YOLO('runs/detect/water_flow_yolo11n/weights/best.pt')
# 添加量化模拟(简化的做法:降低学习率继续训练)
model.train(
data=data_config,
epochs=20,
imgsz=320, # 用目标推理分辨率微调
batch=32,
lr0=0.0001, # 极低学习率
close_mosaic=10, # 关闭 mosaic 模拟推理条件
)
return model
```
---
## 五、模型导出与 RKNN 转换
### 5.1 导出流程
```
训练好的 YOLO11 PyTorch 模型 (.pt)
▼ (torch.onnx.export)
ONNX 模型 (.onnx)
▼ (RKNN Toolkit2)
RKNN 模型 (.rknn) — INT8 量化
▼ (部署到 RV1126B)
NPU 推理
```
### 5.2 导出 ONNX
```python
from ultralytics import YOLO
# 加载训练好的模型
model = YOLO('runs/detect/water_flow_yolo11n/weights/best.pt')
# 导出 ONNX
# 关键参数:
# - opset 13: RKNN 支持的最佳 opset
# - simplify: 简化计算图
# - imgsz 320: 导出目标推理尺寸RV1126B 用 320
model.export(
format='onnx',
imgsz=320, # 目标推理分辨率
opset=13, # ONNX opset 版本
simplify=True, # 简化模型
dynamic=False, # 固定输入尺寸NPU 不支持动态)
half=False, # 不需要 FP16用 INT8
int8=False, # ONNX 导出不量化,后面用 RKNN 量化
)
# 输出: runs/detect/water_flow_yolo11n/weights/best.onnx
print("ONNX 导出完成")
```
### 5.3 ONNX → RKNN 转换INT8 量化)
```python
"""
export_rknn.py — 在 x86 主机上运行
将 ONNX 模型转换为 RKNN INT8 量化模型
"""
from rknn.api import RKNN
import cv2
import numpy as np
import os
# ========== 配置 ==========
ONNX_MODEL = 'runs/detect/water_flow_yolo11n/weights/best.onnx'
RKNN_MODEL = 'yolo11n_water_flow_int8.rknn'
CALIBRATION_DIR = './calibration_images/' # 校准数据集
DATASET_TXT = 'calibration_list.txt' # 校准图片列表
TARGET_PLATFORM = 'rv1126b' # 或 'rk3588' 等
# ========== 准备校准数据 ==========
def prepare_calibration_images(image_dir, output_txt, num_images=100):
"""
准备 INT8 量化校准数据
需要 100-500 张代表性图片
"""
images = [f for f in os.listdir(image_dir) if f.endswith(('.jpg', '.png'))]
images = images[:num_images]
with open(output_txt, 'w') as f:
for img in images:
f.write(os.path.join(image_dir, img) + '\n')
print(f"Prepared {len(images)} calibration images")
# 准备校准数据
prepare_calibration_images(CALIBRATION_DIR, DATASET_TXT)
# ========== RKNN 转换 ==========
rknn = RKNN(verbose=True)
# 1. 配置
print("--> Configuring RKNN model")
ret = rknn.config(
mean_values=[[0, 0, 0]], # 根据训练时的归一化调整
std_values=[[255, 255, 255]], # YOLO 通常是 /255
target_platform=TARGET_PLATFORM,
quantized_dtype='asymmetric_affine-u8', # INT8 量化
quantized_algorithm='KLD', # KLD 校准算法
)
if ret != 0:
print("Config failed!")
exit()
# 2. 加载 ONNX
print("--> Loading ONNX model")
ret = rknn.load_onnx(model=ONNX_MODEL)
if ret != 0:
print("Load ONNX failed!")
exit()
# 3. 构建模型(含 INT8 量化)
print("--> Building RKNN model with INT8 quantization")
ret = rknn.build(
do_quantization=True,
dataset=DATASET_TXT,
num_preprocess=1, # 预处理放在 NPU 上
)
if ret != 0:
print("Build failed!")
exit()
# 4. 导出 RKNN 模型
print(f"--> Exporting RKNN model to {RKNN_MODEL}")
ret = rknn.export_rknn(RKNN_MODEL)
if ret != 0:
print("Export failed!")
exit()
# 5. (可选)在主机上模拟 RV1126B 推理验证
print("--> Testing on host (simulating RV1126B)")
ret = rknn.init_runtime(target=TARGET_PLATFORM)
if ret != 0:
print("Init runtime failed!")
exit()
# 推理测试
test_img = cv2.imread(os.path.join(CALIBRATION_DIR, os.listdir(CALIBRATION_DIR)[0]))
test_img = cv2.resize(test_img, (320, 320))
test_img = cv2.cvtColor(test_img, cv2.COLOR_BGR2RGB)
outputs = rknn.inference(inputs=[test_img])
print(f"RKNN inference output shape: {[o.shape for o in outputs]}")
rknn.release()
print("Done!")
```
### 5.4 RKNN 量化配置详解
```python
# 不同量化策略对比
# 策略 1对称量化速度快精度略低
rknn.config(
quantized_dtype='symmetric_affine-u8',
quantized_algorithm='MIN_MAX',
)
# 策略 2非对称量化推荐精度更好
rknn.config(
quantized_dtype='asymmetric_affine-u8',
quantized_algorithm='KLD',
)
# 策略 3混合量化部分层 INT8部分层 FP16
rknn.config(
quantized_dtype='mixed',
# 需要在模型配置中指定哪些层用 FP16
)
# 量化校准技巧:
# 1. 校准数据需要覆盖所有场景(不同光照、不同流速、不同天气)
# 2. 至少 100 张,推荐 200-500 张
# 3. 校准数据不需要标注,只需要原始图片
# 4. 图片尺寸需要和训练时一致
```
---
## 六、YOLO11 检测模块RV1126B 端)
### 6.1 RKNN 推理封装
```python
"""
yolo11_detector.py — RV1126B 端 YOLO11 检测器
使用 RKNN Runtime 在 NPU 上推理
"""
import cv2
import numpy as np
import time
try:
from rknn.api import RKNN
except ImportError:
# 主机端开发用 mock
from rknn.mock_runtime import RKNN
class YOLO11Detector:
"""
YOLO11n RKNN 推理封装
专为 RV1126B 优化
"""
# YOLO11 默认参数
INPUT_SIZE = 320
CONF_THRESHOLD = 0.25
NMS_THRESHOLD = 0.45
NUM_CLASSES = 1
def __init__(self, rknn_model_path, use_npu=True):
self.rknn = RKNN()
self.use_npu = use_npu
# 加载模型
print(f"Loading RKNN model: {rknn_model_path}")
ret = self.rknn.load_rknn(rknn_model_path)
if ret != 0:
raise RuntimeError("Failed to load RKNN model")
# 初始化运行时
if use_npu:
ret = self.rknn.init_runtime(target='rv1126b', device_id=None)
else:
ret = self.rknn.init_runtime(target=None) # CPU 模拟
if ret != 0:
raise RuntimeError("Failed to init RKNN runtime")
print("YOLO11 Detector initialized")
def preprocess(self, frame):
"""
预处理: resize + BGR→RGB + normalize
输入: BGR 图像 (任意尺寸)
输出: RGB numpy array (320×320)
"""
# 保持宽高比的 resize + padding
h, w = frame.shape[:2]
scale = self.INPUT_SIZE / max(h, w)
new_w, new_h = int(w * scale), int(h * scale)
resized = cv2.resize(frame, (new_w, new_h))
# padding 到 320×320
padded = np.full((self.INPUT_SIZE, self.INPUT_SIZE, 3), 114, dtype=np.uint8)
top = (self.INPUT_SIZE - new_h) // 2
left = (self.INPUT_SIZE - new_w) // 2
padded[top:top+new_h, left:left+new_w] = resized
# BGR → RGB
rgb = cv2.cvtColor(padded, cv2.COLOR_BGR2RGB)
# 记录变换信息(用于后处理还原坐标)
transform_info = {
'original_size': (w, h),
'scale': scale,
'padding': (left, top),
}
return rgb, transform_info
def postprocess(self, outputs, transform_info, conf_threshold=None):
"""
解析 YOLO11 输出
YOLO11 ONNX 输出格式: (1, 84, 8400) for COCO
- 84 = 4 (bbox) + 80 (classes)
- 8400 = 检测锚点数量
对于我们的自定义模型:
- 84 = 4 (bbox) + 1 (class)
"""
if conf_threshold is None:
conf_threshold = self.CONF_THRESHOLD
# 输出解析
# YOLO11 输出: (1, num_classes + 4, num_anchors)
pred = outputs[0] # shape: (1, 5, 8400) for 1 class
pred = pred[0].T # shape: (8400, 5) → (8400, 5)
boxes = []
scores = []
class_ids = []
for i in range(pred.shape[0]):
score = pred[i, 4] # 置信度(只有一个类别)
if score < conf_threshold:
continue
# bbox (中心点 + 宽高)
cx, cy, bw, bh = pred[i, 0:4]
# 还原到原始图像坐标
cx = (cx - transform_info['padding'][0]) / transform_info['scale']
cy = (cy - transform_info['padding'][1]) / transform_info['scale']
bw = bw / transform_info['scale']
bh = bh / transform_info['scale']
# 转为 x1, y1, x2, y2
x1 = cx - bw / 2
y1 = cy - bh / 2
x2 = cx + bw / 2
y2 = cy + bh / 2
# 裁剪到图像边界
orig_w, orig_h = transform_info['original_size']
x1 = max(0, x1)
y1 = max(0, y1)
x2 = min(orig_w, x2)
y2 = min(orig_h, y2)
boxes.append([x1, y1, x2, y2])
scores.append(float(score))
class_ids.append(0)
if len(boxes) == 0:
return []
# NMS
indices = cv2.dnn.NMSBoxes(
boxes, scores, conf_threshold, self.NMS_THRESHOLD
)
detections = []
if len(indices) > 0:
for i in indices.flatten():
detections.append({
'bbox': boxes[i],
'score': scores[i],
'class_id': class_ids[i],
'center': (
(boxes[i][0] + boxes[i][2]) / 2,
(boxes[i][1] + boxes[i][3]) / 2
),
})
return detections
def detect(self, frame, conf_threshold=None):
"""
完整检测流程: 预处理 → NPU推理 → 后处理
Args:
frame: BGR 图像 (numpy array)
conf_threshold: 置信度阈值
Returns:
list of detection dicts
"""
# 预处理
rgb, transform_info = self.preprocess(frame)
# NPU 推理
outputs = self.rknn.inference(inputs=[rgb])
# 后处理
detections = self.postprocess(outputs, transform_info, conf_threshold)
return detections
def detect_batch(self, frames):
"""批量检测(如果 NPU 支持,但目前 RV1126B batch=1"""
return [self.detect(f) for f in frames]
def get_fps(self, num_iterations=100, test_frame=None):
"""
测量推理 FPS
"""
if test_frame is None:
test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
times = []
for _ in range(num_iterations):
start = time.perf_counter()
self.detect(test_frame)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg_time = np.mean(times)
fps = 1.0 / avg_time
print(f"Average inference time: {avg_time*1000:.1f}ms, FPS: {fps:.1f}")
return fps
def release(self):
self.rknn.release()
```
### 6.2 检测性能优化
```python
class OptimizedDetector(YOLO11Detector):
"""
针对 RV1126B 的深度优化检测器
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 预分配内存(避免推理时分配)
self._prealloc_buffer = np.zeros(
(self.INPUT_SIZE, self.INPUT_SIZE, 3), dtype=np.uint8
)
def detect_fast(self, frame, conf_threshold=None):
"""
快速检测路径 — 减少内存拷贝
优化点:
1. 原地 resize减少内存分配
2. 跳过不必要的颜色转换(如果相机输出 RGB
3. 预分配推理输入 buffer
4. 复用 NMS 输出 buffer
"""
# 如果相机可以输出 RGB跳过 BGR→RGB 转换
# 在 RV1126B ISP 中配置输出格式
# ... (实现同上,但优化内存操作)
pass
def detect_roi(self, frame, roi):
"""
只在 ROI 区域检测 — 大幅加速
roi: (x1, y1, x2, y2)
"""
x1, y1, x2, y2 = roi
roi_frame = frame[y1:y2, x1:x2]
detections = self.detect(roi_frame)
# 还原坐标
for det in detections:
det['bbox'] = [
det['bbox'][0] + x1,
det['bbox'][1] + y1,
det['bbox'][2] + x1,
det['bbox'][3] + y1,
]
det['center'] = (
det['center'][0] + x1,
det['center'][1] + y1,
)
return detections
```
---
## 七、光流法模块CPU 端)
### 7.1 为什么需要光流法
```
YOLO 检测的局限:
├── 只能检测离散的目标
├── 水面无漂浮物时 → 无法测速
├── 示踪物太小 → 检测不到
└── 需要足够标注数据
光流法的补充:
├── 可以追踪水面纹理运动
├── 不需要离散目标
├── 稠密或半稠密速度场
└── 但精度低于 YOLO 轨迹
混合方案优势:
├── 有示踪物 → YOLO 精确轨迹(高精度)
├── 无示踪物 → 光流补充(覆盖盲区)
└── 两者结果融合 → 鲁棒性最强
```
### 7.2 稀疏光流法实现(适合 RV1126B CPU
```python
"""
optical_flow.py — 光流法流速估计
针对 RV1126B 单核 CPU 优化
"""
import cv2
import numpy as np
from collections import defaultdict
class SparseOpticalFlow:
"""
Lucas-Kanade 稀疏光流法
优势:
- 计算量小(只追踪特征点)
- 适合嵌入式 CPU
- 可以追踪水面纹理特征
策略:
- 只在 YOLO 未检测到的区域追踪
- 或作为 YOLO 检测的补充
"""
def __init__(self, frame_size):
# Lucas-Kanade 参数
self.lk_params = dict(
winSize=(15, 15), # 窗口大小
maxLevel=2, # 金字塔层数
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)
# 特征点检测参数
self.feature_params = dict(
maxCorners=100, # 最多特征点数
qualityLevel=0.01, # 质量阈值
minDistance=10, # 最小间距
blockSize=3,
)
self.frame_size = frame_size
# 状态
self.prev_gray = None
self.prev_points = []
self.point_trajectories = defaultdict(list)
self.point_id_counter = 0
def process_frame(self, frame, yolo_detections=None,
exclude_radius=20):
"""
处理单帧,追踪特征点
Args:
frame: BGR 当前帧
yolo_detections: YOLO 检测结果(可选)
exclude_radius: 排除 YOLO 检测区域的半径(避免重复)
Returns:
flow_vectors: [(x1, y1, x2, y2, point_id), ...]
特征点位移向量
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if self.prev_gray is None:
self.prev_gray = gray
self._detect_features(gray, yolo_detections, exclude_radius)
return []
# 追踪特征点
if len(self.prev_points) > 0:
prev_pts = np.array(self.prev_points, dtype=np.float32)
next_pts, status, errors = cv2.calcOpticalFlowPyrLK(
self.prev_gray, gray, prev_pts, None, **self.lk_params
)
# 过滤有效点
good_prev = []
good_next = []
valid_ids = []
for i, (next_pt, status_flag, error) in enumerate(
zip(next_pts, status, errors)
):
if status_flag == 1 and error < 500:
# 检查是否在图像范围内
if (0 < next_pt[0] < self.frame_size[0] and
0 < next_pt[1] < self.frame_size[1]):
good_prev.append(self.prev_points[i])
good_next.append(next_pt.tolist())
valid_ids.append(self._point_ids[i])
# 记录轨迹
for pid, prev_pt, next_pt in zip(valid_ids, good_prev, good_next):
self.point_trajectories[pid].append(next_pt)
# 限制轨迹长度
if len(self.point_trajectories[pid]) > 30:
self.point_trajectories[pid] = \
self.point_trajectories[pid][-30:]
# 计算光流向量
flow_vectors = []
for pid, p, n in zip(valid_ids, good_prev, good_next):
flow_vectors.append((p[0], p[1], n[0], n[1], pid))
# 更新状态
self.prev_points = good_next
self._point_ids = valid_ids
# 补充新特征点(如果数量不足)
if len(self.prev_points) < 50:
self._detect_features(gray, yolo_detections, exclude_radius)
# 合并
new_pts = [p for p in self.prev_points
if p not in good_next]
if new_pts:
self.prev_points.extend(new_pts[:20])
else:
self._detect_features(gray, yolo_detections, exclude_radius)
flow_vectors = []
self.prev_gray = gray
return flow_vectors
def _detect_features(self, gray, yolo_detections=None,
exclude_radius=20):
"""
检测新的特征点
排除 YOLO 检测区域(避免重复追踪)
"""
mask = np.ones(gray.shape[:2], dtype=np.uint8) * 255
if yolo_detections:
# 在 YOLO 检测区域设置 mask=0排除
for det in yolo_detections:
x1, y1, x2, y2 = [int(v) for v in det['bbox']]
cv2.rectangle(mask, (x1 - exclude_radius, y1 - exclude_radius),
(x2 + exclude_radius, y2 + exclude_radius), 0, -1)
points = cv2.goodFeaturesToTrack(
gray, mask=mask, **self.feature_params
)
if points is not None:
self.prev_points = points.reshape(-1, 2).tolist()
self._point_ids = list(range(
self.point_id_counter,
self.point_id_counter + len(self.prev_points)
))
self.point_id_counter += len(self.prev_points)
def compute_velocity_field(self, fps):
"""
从追踪的轨迹计算速度场
Returns:
velocities: [(x, y, vx, vy, speed), ...]
每个特征点的位置和速度
"""
velocities = []
for pid, traj in self.point_trajectories.items():
if len(traj) < 5:
continue
# 使用最近 15 帧
recent = traj[-min(15, len(traj)):]
start = recent[0]
end = recent[-1]
dx = end[0] - start[0]
dy = end[1] - start[1]
frames_span = len(recent)
time_sec = frames_span / fps
if time_sec == 0:
continue
vx = dx / time_sec # 像素/秒
vy = dy / time_sec
speed = np.sqrt(vx**2 + vy**2)
# 当前位置
current_pos = traj[-1]
velocities.append((current_pos[0], current_pos[1],
vx, vy, speed, pid))
return velocities
def reset(self):
"""重置状态"""
self.prev_gray = None
self.prev_points = []
self.point_trajectories = defaultdict(list)
class DenseOpticalFlow:
"""
Farneback 稠密光流法
注意:在 RV1126B 单核 CPU 上,全分辨率稠密光流太慢
使用降采样 + ROI 策略
"""
def __init__(self, downsample_factor=4):
self.downsample_factor = downsample_factor
self.flow_params = dict(
pyr_scale=0.5,
levels=3,
winsize=15,
iterations=3,
poly_n=5,
poly_sigma=1.2,
flags=0,
)
self.prev_gray = None
def process_frame(self, frame, roi=None):
"""
计算稠密光流(降采样)
roi: 只计算感兴趣区域 (x1, y1, x2, y2)
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 降采样加速
small = cv2.resize(gray, None,
fx=1/self.downsample_factor,
fy=1/self.downsample_factor)
if self.prev_gray is None:
self.prev_gray = small
return None
flow = cv2.calcOpticalFlowFarneback(
self.prev_gray, small, None, **self.flow_params
)
self.prev_gray = small
# 恢复到原始分辨率
if roi is None:
flow = cv2.resize(flow, (frame.shape[1], frame.shape[0]))
else:
# 只还原 ROI 区域
x1, y1, x2, y2 = roi
flow_roi = cv2.resize(
flow[y1//self.downsample_factor:y2//self.downsample_factor,
x1//self.downsample_factor:x2//self.downsample_factor],
(x2-x1, y2-y1)
)
return flow_roi
return flow
def get_average_flow(self, flow):
"""从光流场计算平均流速"""
if flow is None:
return 0, 0
magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
# 过滤掉太小和太大的运动
valid = (magnitude > 0.5) & (magnitude < 100)
if np.sum(valid) == 0:
return 0, 0
avg_vx = np.mean(flow[..., 0][valid])
avg_vy = np.mean(flow[..., 1][valid])
return avg_vx, avg_vy
```
### 7.3 光流法性能优化RV1126B 专用)
```python
class OptimizedOpticalFlow:
"""
针对 RV1126B 单核 CPU 的极致优化
优化策略:
1. 降采样 4x减少 16 倍计算量)
2. 隔帧计算(每 2 帧算一次光流)
3. 只在水面 ROI 区域计算
4. 限制特征点数量
5. 使用 NEON 指令集(如果 OpenCV 编译时启用)
"""
def __init__(self, frame_width, frame_height):
self.frame_size = (frame_width, frame_height)
# 水面 ROI需要根据实际场景设定
# 排除天空和岸边
self.water_roi = (0, int(frame_height * 0.3),
frame_width, frame_height)
# 降采样
self.scale = 0.25 # 1/4 分辨率
# 隔帧
self.skip_frames = 1 # 每 2 帧计算一次
self.frame_counter = 0
self.sparse_flow = SparseOpticalFlow(
(int(frame_width * self.scale),
int(frame_height * self.scale))
)
def process(self, frame, yolo_detections=None):
"""
处理帧 — 优化版本
时间预算RV1126B CPU:
└── 目标:每帧 < 15ms
"""
self.frame_counter += 1
if self.frame_counter % (self.skip_frames + 1) != 0:
return [] # 跳帧
# 裁剪水面 ROI
x1, y1, x2, y2 = self.water_roi
roi_frame = frame[y1:y2, x1:x2]
# 降采样
small = cv2.resize(roi_frame, None, fx=self.scale, fy=self.scale)
# 调整 YOLO 检测框到小图坐标
small_detections = []
if yolo_detections:
for det in yolo_detections:
bx1, by1, bx2, by2 = det['bbox']
# 转换到 ROI + 降采样坐标
sbx1 = (bx1 - x1) * self.scale
sby1 = (by1 - y1) * self.scale
sbx2 = (bx2 - x1) * self.scale
sby2 = (by2 - y1) * self.scale
small_detections.append({
'bbox': [sbx1, sby1, sbx2, sby2],
**{k: v for k, v in det.items() if k != 'bbox'}
})
# 光流计算(在小图上)
flows = self.sparse_flow.process_frame(small, small_detections)
# 还原到原图坐标
result = []
for (x1_f, y1_f, x2_f, y2_f, pid) in flows:
# 还原
rx1 = x1_f / self.scale + x1
ry1 = y1_f / self.scale + y1
rx2 = x2_f / self.scale + x1
ry2 = y2_f / self.scale + y1
result.append((rx1, ry1, rx2, ry2, pid))
return result
```
---
## 八、结果融合引擎
### 8.1 融合策略设计
```
融合逻辑:
场景 1: YOLO 检测到示踪物 + 光流有数据
├── 优先使用 YOLO 轨迹(离散、精确)
├── 光流作为验证(一致性检查)
└── 两者偏差 > 阈值 → 标记为可疑
场景 2: YOLO 未检测到 + 光流有数据
├── 使用光流结果
├── 标记为"低置信度"
└── 需要足够多的特征点(> 10
场景 3: YOLO 检测到 + 光流无数据
├── 使用 YOLO 轨迹
└── 标记为"正常"
场景 4: 两者都无数据
└── 无法测速 → 报告无效
融合公式:
当两者都有数据时:
v_fused = w_yolo × v_yolo + w_flow × v_flow
w_yolo = 0.7, w_flow = 0.3 YOLO 权重更高)
一致性检查:
|v_yolo - v_flow| / max(v_yolo, v_flow) < threshold (0.3)
→ 通过:正常融合
→ 不通过:只使用 YOLO标记异常
```
### 8.2 融合引擎实现
```python
"""
fusion_engine.py — YOLO + 光流融合引擎
"""
import numpy as np
from collections import defaultdict
from enum import Enum
class ConfidenceLevel(Enum):
HIGH = "high" # YOLO 轨迹
MEDIUM = "medium" # YOLO + 光流融合
LOW = "low" # 仅光流
INVALID = "invalid" # 无有效数据
class VelocityFusionEngine:
"""
融合 YOLO 检测轨迹和光流速度场
核心逻辑:
1. YOLO 轨迹提供精确的离散速度点
2. 光流提供稠密的背景速度场
3. 融合两者,得到更全面的速度估计
"""
def __init__(self, pixels_per_meter=None, homography_matrix=None,
fps=30):
self.ppm = pixels_per_meter
self.H = homography_matrix
self.fps = fps
# 融合权重
self.yolo_weight = 0.7
self.flow_weight = 0.3
self.consistency_threshold = 0.3 # 30% 偏差阈值
# YOLO 轨迹存储
self.yolo_trajectories = defaultdict(list)
# 光流速度场(最近 N 帧的滑动窗口)
self.flow_velocity_history = []
self.flow_history_size = 10
# 结果缓存
self.last_fused_velocity = None
self.last_confidence = ConfidenceLevel.INVALID
def update_yolo(self, detections, frame_idx):
"""
更新 YOLO 检测结果
detections: list of {bbox, score, center, ...}
"""
for det in detections:
track_id = det.get('track_id', id(det)) # 如果有追踪 ID
center = det['center']
self.yolo_trajectories[track_id].append(
(frame_idx, center[0], center[1])
)
# 限制轨迹长度
if len(self.yolo_trajectories[track_id]) > 60:
self.yolo_trajectories[track_id] = \
self.yolo_trajectories[track_id][-60:]
def update_flow(self, flow_velocities):
"""
更新光流速度场
flow_velocities: [(x, y, vx, vy, speed), ...]
"""
if not flow_velocities:
self.flow_velocity_history.append([])
else:
speeds = [v[4] for v in flow_velocities]
self.flow_velocity_history.append(speeds)
# 限制历史长度
if len(self.flow_velocity_history) > self.flow_history_size:
self.flow_velocity_history = \
self.flow_velocity_history[-self.flow_history_size:]
def compute_fused_velocity(self, frame_idx):
"""
计算当前帧的融合流速
Returns:
{
'velocity': float, # 融合后的流速 (m/s)
'velocity_pixel': float, # 像素速度
'confidence': ConfidenceLevel,
'source': str, # 'yolo', 'flow', 'fused'
'n_yolo_tracks': int,
'n_flow_points': int,
}
"""
# 1. 计算 YOLO 速度
yolo_velocities = []
for track_id, traj in self.yolo_trajectories.items():
v = self._compute_trajectory_velocity(traj)
if v is not None:
yolo_velocities.append(v)
# 2. 计算光流速度
flow_speeds = []
for speeds in self.flow_velocity_history[-5:]: # 最近 5 帧
flow_speeds.extend(speeds)
# 3. 融合
result = {
'n_yolo_tracks': len(yolo_velocities),
'n_flow_points': len(flow_speeds),
}
if yolo_velocities and flow_speeds:
# 两者都有数据
yolo_speed = np.median(yolo_velocities)
flow_speed = np.median(flow_speeds)
# 一致性检查
max_speed = max(yolo_speed, flow_speed)
if max_speed > 0:
deviation = abs(yolo_speed - flow_speed) / max_speed
else:
deviation = 0
if deviation < self.consistency_threshold:
# 一致 → 加权融合
fused = self.yolo_weight * yolo_speed + \
self.flow_weight * flow_speed
result['velocity_pixel'] = fused
result['confidence'] = ConfidenceLevel.MEDIUM
result['source'] = 'fused'
else:
# 不一致 → 只用 YOLO
result['velocity_pixel'] = yolo_speed
result['confidence'] = ConfidenceLevel.HIGH
result['source'] = 'yolo'
result['warning'] = f'YOLO/Flow deviation: {deviation:.2f}'
elif yolo_velocities:
# 只有 YOLO
result['velocity_pixel'] = np.median(yolo_velocities)
result['confidence'] = ConfidenceLevel.HIGH
result['source'] = 'yolo'
elif flow_speeds:
# 只有光流
if len(flow_speeds) >= 5: # 至少 5 个特征点
result['velocity_pixel'] = np.median(flow_speeds)
result['confidence'] = ConfidenceLevel.LOW
result['source'] = 'flow'
else:
result['confidence'] = ConfidenceLevel.INVALID
else:
result['confidence'] = ConfidenceLevel.INVALID
# 像素速度 → 实际速度
if result.get('velocity_pixel') is not None:
result['velocity'] = self._pixel_to_meters(
result['velocity_pixel']
)
else:
result['velocity'] = None
self.last_fused_velocity = result
self.last_confidence = result['confidence']
return result
def _compute_trajectory_velocity(self, trajectory):
"""从轨迹计算速度(像素/秒)"""
if len(trajectory) < 5:
return None
# 使用分段平均
segment_size = 10
velocities = []
for i in range(0, len(trajectory) - segment_size, segment_size // 2):
seg = trajectory[i:i + segment_size]
start_f, sx, sy = seg[0]
end_f, ex, ey = seg[-1]
dist = np.sqrt((ex - sx)**2 + (ey - sy)**2)
time_sec = (end_f - start_f) / self.fps
if time_sec > 0:
velocities.append(dist / time_sec)
if not velocities:
return None
return float(np.median(velocities))
def _pixel_to_meters(self, pixel_velocity):
"""像素速度转换为 m/s"""
if self.H is not None:
# 使用单应性矩阵(考虑透视)
# 在画面中心采样转换因子
center_x = 320 # 假设 640 宽
center_y = 240 # 假设 480 高
pt = np.array([[[center_x, center_y]]], dtype=np.float32)
pt_moved = np.array([[[center_x + 1, center_y]]], dtype=np.float32)
real1 = cv2.perspectiveTransform(pt, self.H)[0][0]
real2 = cv2.perspectiveTransform(pt_moved, self.H)[0][0]
meter_per_pixel = np.linalg.norm(real2 - real1)
return pixel_velocity * meter_per_pixel
elif self.ppm:
return pixel_velocity / self.ppm
return pixel_velocity # 返回像素速度
def get_summary(self, window_frames=300):
"""获取最近 N 帧的流速摘要"""
# 实现类似前面的统计摘要
pass
def reset(self):
"""重置所有状态"""
self.yolo_trajectories.clear()
self.flow_velocity_history.clear()
self.last_fused_velocity = None
self.last_confidence = ConfidenceLevel.INVALID
```
---
## 九、完整集成流水线
### 9.1 主程序
```python
"""
main.py — RV1126B 水流速度检测主程序
运行方式:
python3 main.py --model yolo11n_water_flow_int8.rknn \
--input /dev/video0 \
--calibration calibration.json \
--output results/
"""
import cv2
import json
import time
import argparse
import numpy as np
from pathlib import Path
from datetime import datetime
from yolo11_detector import YOLO11Detector
from optical_flow import SparseOpticalFlow, OptimizedOpticalFlow
from fusion_engine import VelocityFusionEngine
from tracker import SimpleTracker # 下面会定义
class SimpleTracker:
"""
轻量级追踪器 — RV1126B 版
不使用完整的 ByteTrack太重
用简化的 IoU 匹配追踪
"""
def __init__(self, max_lost=10, min_iou=0.3):
self.tracks = {} # track_id -> {last_bbox, last_center, lost_count}
self.next_id = 0
self.max_lost = max_lost
self.min_iou = min_iou
def update(self, detections):
"""
更新追踪状态
detections: [{bbox, score, center}, ...]
Returns: [{bbox, score, center, track_id}, ...]
"""
if not detections:
# 所有轨迹增加丢失计数
for track in self.tracks.values():
track['lost_count'] += 1
# 清理丢失的
self.tracks = {
tid: t for tid, t in self.tracks.items()
if t['lost_count'] <= self.max_lost
}
return []
# IoU 匹配
matched_det = set()
for tid, track in list(self.tracks.items()):
best_iou = 0
best_det_idx = -1
for i, det in enumerate(detections):
if i in matched_det:
continue
iou = self._compute_iou(track['last_bbox'], det['bbox'])
if iou > best_iou:
best_iou = iou
best_det_idx = i
if best_iou >= self.min_iou:
# 匹配成功
track['last_bbox'] = detections[best_det_idx]['bbox']
track['last_center'] = detections[best_det_idx]['center']
track['lost_count'] = 0
detections[best_det_idx]['track_id'] = tid
matched_det.add(best_det_idx)
else:
track['lost_count'] += 1
# 未匹配的检测 → 新轨迹
for i, det in enumerate(detections):
if i not in matched_det:
new_id = self.next_id
self.next_id += 1
det['track_id'] = new_id
self.tracks[new_id] = {
'last_bbox': det['bbox'],
'last_center': det['center'],
'lost_count': 0,
}
# 清理
self.tracks = {
tid: t for tid, t in self.tracks.items()
if t['lost_count'] <= self.max_lost
}
return detections
def _compute_iou(self, box1, box2):
"""计算 IoU"""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
inter_area = max(0, x2 - x1) * max(0, y2 - y1)
box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
union = box1_area + box2_area - inter_area
return inter_area / union if union > 0 else 0
class WaterFlowVelocitySystem:
"""
端到端水流速度检测系统
RV1126B 优化版本
"""
def __init__(self, args):
# 加载标定参数
with open(args.calibration) as f:
calib = json.load(f)
ppm = calib.get('pixels_per_meter')
H = calib.get('homography_matrix')
if H:
H = np.array(H)
# 初始化组件
print("Initializing YOLO11 detector...")
self.detector = YOLO11Detector(args.model, use_npu=True)
print("Initializing optical flow...")
self.optical_flow = SparseOpticalFlow((640, 480))
print("Initializing tracker...")
self.tracker = SimpleTracker()
print("Initializing fusion engine...")
self.fusion = VelocityFusionEngine(
pixels_per_meter=ppm,
homography_matrix=H,
fps=args.fps,
)
# 配置
self.fps = args.fps
self.run_yolo_every = 2 # 每 2 帧跑一次 YOLO
self.frame_count = 0
# 结果存储
self.results = []
self.output_dir = Path(args.output)
self.output_dir.mkdir(parents=True, exist_ok=True)
def run_video(self, input_source):
"""
主处理循环
input_source: 摄像头设备路径 或 视频文件路径
"""
cap = cv2.VideoCapture(input_source)
if not cap.isOpened():
raise RuntimeError(f"Cannot open {input_source}")
# 设置分辨率(降低到 640x480 以节省性能)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
frame_idx = 0
fps_timer = []
print(f"Starting processing...")
print(f"Input: {input_source}")
print(f"YOLO runs every {self.run_yolo_every} frames")
try:
while True:
frame_start = time.perf_counter()
ret, frame = cap.read()
if not ret:
break
# ---- 阶段 1: YOLO 检测(隔帧) ----
if frame_idx % self.run_yolo_every == 0:
yolo_detections = self.detector.detect(frame)
# 追踪
tracked = self.tracker.update(yolo_detections)
# 更新融合引擎
self.fusion.update_yolo(tracked, frame_idx)
else:
# 不跑 YOLO只更新追踪预测模式
tracked = []
# ---- 阶段 2: 光流计算 ----
flow_vectors = self.optical_flow.process_frame(
frame,
yolo_detections if frame_idx % self.run_yolo_every == 0 else None
)
self.fusion.update_flow(flow_vectors)
# ---- 阶段 3: 融合 ----
fusion_result = self.fusion.compute_fused_velocity(frame_idx)
# ---- 阶段 4: 可视化 ----
display_frame = self._visualize(
frame, tracked, flow_vectors, fusion_result
)
# ---- 阶段 5: 记录结果 ----
if frame_idx % self.fps == 0: # 每秒记录一次
self.results.append({
'timestamp': datetime.now().isoformat(),
'frame': frame_idx,
**fusion_result,
})
# 性能计时
frame_time = time.perf_counter() - frame_start
fps_timer.append(frame_time)
if frame_idx % 30 == 0 and fps_timer:
avg_fps = len(fps_timer) / sum(fps_timer)
print(f"Frame {frame_idx}: {avg_fps:.1f} FPS | "
f"Velocity: {fusion_result.get('velocity', 0):.3f} m/s | "
f"Confidence: {fusion_result.get('confidence', 'N/A')}")
# 显示(如果有屏幕)
cv2.imshow("Water Flow Velocity", display_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_idx += 1
except KeyboardInterrupt:
print("\nInterrupted by user")
finally:
cap.release()
cv2.destroyAllWindows()
self._save_results()
# 最终统计
if fps_timer:
avg_fps = len(fps_timer) / sum(fps_timer)
print(f"\nAverage FPS: {avg_fps:.1f}")
print(f"Total frames: {frame_idx}")
def _visualize(self, frame, tracked, flow_vectors, fusion_result):
"""可视化输出"""
display = frame.copy()
# 绘制 YOLO 检测结果
for det in tracked:
x1, y1, x2, y2 = [int(v) for v in det['bbox']]
tid = det.get('track_id', 0)
cv2.rectangle(display, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(display, f"ID:{tid}", (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
# 绘制光流向量
for (x1, y1, x2, y2, pid) in flow_vectors[:50]: # 最多 50 条
cv2.arrowedLine(display,
(int(x1), int(y1)),
(int(x2), int(y2)),
(255, 0, 0), 1)
# 绘制融合结果
velocity = fusion_result.get('velocity')
confidence = fusion_result.get('confidence', 'invalid')
if velocity is not None:
# 颜色根据置信度
if confidence.value == 'high':
color = (0, 255, 0) # 绿
elif confidence.value == 'medium':
color = (0, 255, 255) # 黄
elif confidence.value == 'low':
color = (0, 165, 255) # 橙
else:
color = (0, 0, 255) # 红
cv2.putText(display,
f"Velocity: {velocity:.3f} m/s",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
cv2.putText(display,
f"Confidence: {confidence.value}",
(10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1)
# 绘制 FPS
cv2.putText(display, f"Frame: {self.frame_count}",
(10, frame.shape[0] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return display
def _save_results(self):
"""保存结果到文件"""
output_file = self.output_dir / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"Results saved to {output_file}")
def parse_args():
parser = argparse.ArgumentParser(description="Water Flow Velocity Detection on RV1126B")
parser.add_argument('--model', required=True, help='Path to RKNN model')
parser.add_argument('--input', default='/dev/video0', help='Camera device or video file')
parser.add_argument('--calibration', required=True, help='Calibration JSON file')
parser.add_argument('--output', default='./results', help='Output directory')
parser.add_argument('--fps', type=int, default=30, help='Video FPS')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
system = WaterFlowVelocitySystem(args)
system.run_video(args.input)
```
### 9.2 标定文件格式
```json
{
"calibration_date": "2026-04-23",
"location": "XX River, Section A",
"method": "homography",
"homography_matrix": [
[1.2e-3, 3.4e-5, -0.5],
[2.1e-5, 1.1e-3, -0.3],
[8.7e-7, 2.3e-7, 1.0]
],
"control_points": [
{
"pixel": [100, 200],
"real": [0.0, 0.0]
},
{
"pixel": [500, 180],
"real": [5.2, 0.3]
},
{
"pixel": [150, 600],
"real": [0.5, 3.1]
},
{
"pixel": [600, 580],
"real": [5.8, 3.4]
}
],
"camera": {
"resolution": "1920x1080",
"fps": 30,
"height_above_water": 3.5,
"angle_degrees": 45
},
"validation": {
"mean_error_meters": 0.15,
"max_error_meters": 0.28,
"num_validation_points": 4
}
}
```
---
## 十、构建与部署流程
### 10.1 完整构建脚本
```bash
#!/bin/bash
# build_rv1126b.sh — 完整的构建和部署流程
set -e
echo "=== YOLO11 + RV1126B Water Flow Detection Build ==="
# ========== 阶段 1: 训练模型 (x86 主机) ==========
echo ""
echo "=== Phase 1: Training YOLO11 ==="
python train_yolo11.py \
--data water_flow.yaml \
--epochs 150 \
--imgsz 640 \
--batch 16 \
--device 0
echo "Training complete!"
# ========== 阶段 2: 导出 ONNX ==========
echo ""
echo "=== Phase 2: Exporting ONNX ==="
python export_onnx.py \
--model runs/detect/water_flow_yolo11n/weights/best.pt \
--imgsz 320 \
--output yolo11n_water_flow.onnx
echo "ONNX export complete!"
# ========== 阶段 3: 转换为 RKNN INT8 ==========
echo ""
echo "=== Phase 3: Converting to RKNN (INT8) ==="
python export_rknn.py \
--onnx yolo11n_water_flow.onnx \
--output yolo11n_water_flow_int8.rknn \
--calibration-dir ./calibration_images \
--target rv1126b
echo "RKNN conversion complete!"
# ========== 阶段 4: 部署到 RV1126B ==========
echo ""
echo "=== Phase 4: Deploying to RV1126B ==="
# 传输到设备
scp yolo11n_water_flow_int8.rknn \
calibration.json \
main.py \
yolo11_detector.py \
optical_flow.py \
fusion_engine.py \
root@192.168.1.100:/app/water_flow/
echo "Deployment complete!"
echo ""
echo "To run on RV1126B:"
echo " ssh root@192.168.1.100"
echo " cd /app/water_flow"
echo " python3 main.py --model yolo11n_water_flow_int8.rknn \\"
echo " --input /dev/video0 \\"
echo " --calibration calibration.json"
```
### 10.2 RV1126B SDK 集成
```bash
# 如果使用 Rockchip 官方 SDK 构建完整固件
# 1. 获取 RV1126B SDK
repo init --repo-url https://github.com/rockchip-linux/repo \
-u https://github.com/rockchip-linux/manifests \
-b master -m rv1126b.xml
repo sync
# 2. 编译 SDK
./build.sh lunch # 选择 rv1126b
./build.sh
# 3. 将应用加入 Buildroot
# 在 external/ 下创建自定义包
# 4. 烧录固件
./build.sh updateimg
./upgrade_tool ul RV1126B_XXX.img
```
### 10.3 C++ 推理(生产环境推荐)
```cpp
/*
* yolo11_rknn.cpp — C++ 版 RKNN 推理(性能更好)
*
* 编译:
* arm-linux-gnueabihf-g++ yolo11_rknn.cpp -o yolo11_detect \
* -lrknnrt -lopencv_core -lopencv_imgproc -lopencv_video \
* -I/usr/include/rknn
*/
#include <rknn_api.h>
#include <opencv2/opencv.hpp>
#include <iostream>
#include <vector>
#include <chrono>
class YOLO11RKNN {
public:
YOLO11RKNN(const char* model_path) {
// 加载 RKNN 模型
FILE* fp = fopen(model_path, "rb");
fseek(fp, 0, SEEK_END);
size_t size = ftell(fp);
fseek(fp, 0, SEEK_SET);
void* model_data = malloc(size);
fread(model_data, 1, size, fp);
fclose(fp);
// 初始化
int ret = rknn_init(&ctx_, model_data, size, 0, NULL);
if (ret < 0) {
std::cerr << "rknn_init failed: " << ret << std::endl;
exit(-1);
}
free(model_data);
// 获取输入输出信息
rknn_input_output_num io_num;
rknn_query(ctx_, RKNN_QUERY_IN_OUT_NUM, &io_num, sizeof(io_num));
std::cout << "Input count: " << io_num.n_input << std::endl;
std::cout << "Output count: " << io_num.n_output << std::endl;
}
std::vector<Detection> detect(const cv::Mat& frame) {
// 预处理
cv::Mat input = preprocess(frame);
// 设置输入
rknn_input inputs[1];
memset(inputs, 0, sizeof(inputs));
inputs[0].index = 0;
inputs[0].type = RKNN_TENSOR_UINT8;
inputs[0].size = 320 * 320 * 3;
inputs[0].fmt = RKNN_TENSOR_NHWC;
inputs[0].buf = input.data;
rknn_inputs_set(ctx_, 1, inputs);
// 推理
rknn_run(ctx_, nullptr);
// 获取输出
rknn_output outputs[1];
memset(outputs, 0, sizeof(outputs));
outputs[0].want_float = 1;
rknn_outputs_get(ctx_, 1, outputs, NULL);
// 后处理
auto detections = postprocess(outputs[0].buf, outputs[0].size);
rknn_outputs_release(ctx_, 1, outputs);
return detections;
}
~YOLO11RKNN() {
rknn_destroy(ctx_);
}
private:
rknn_context ctx_;
cv::Mat preprocess(const cv::Mat& frame) {
cv::Mat resized, padded;
cv::resize(frame, resized, cv::Size(320, 320));
cv::cvtColor(resized, padded, cv::COLOR_BGR2RGB);
return padded;
}
// ... (后处理实现)
};
struct Detection {
float x1, y1, x2, y2;
float confidence;
int class_id;
int track_id;
};
```
---
## 十一、性能优化清单
### 11.1 RV1126B 极限优化
```
目标:在 RV1126B 上达到 15-20 FPS
┌──────────────────┬──────────┬────────────────────────────┐
│ 优化项 │ 预期收益 │ 实现难度 │
├──────────────────┼──────────┼────────────────────────────┤
│ YOLO11n INT8 │ 基准 │ - │
│ 输入 320→256 │ +20% FPS │ 低(需重新训练/微调) │
│ 隔帧检测 │ +50% FPS │ 极低 │
│ C++ 推理 │ +10% FPS │ 中 │
│ RGA 硬件缩放 │ +5% FPS │ 中RV1126B 有 RGA 单元)│
│ 光流隔帧 │ +50% CPU │ 极低 │
│ 光流降采样 4x │ +300% CPU │ 极低 │
│ 内存预分配 │ -10% 抖动 │ 中 │
│ NEON 优化 │ +30% CPU │ 高 │
└──────────────────┴──────────┴────────────────────────────┘
推荐组合:
YOLO11n INT8 320×320 + 隔帧检测 + C++ 推理 + RGA 缩放
→ 预期 15-20 FPSYOLO+ 光流并行
光流单独:
降采样 4x + 隔帧 + ROI 裁剪
→ 预期 15-25 FPSCPU
综合15 FPS 以上(两者交替执行)
```
### 11.2 内存管理
```python
# RV1126B 内存优化
class MemoryOptimizedPipeline:
"""
RV1126B 只有 256-512MB 内存
需要严格管理
"""
def __init__(self):
# 预分配所有 buffer
self.frame_buffer = np.zeros((480, 640, 3), dtype=np.uint8)
self.gray_buffer1 = np.zeros((480, 640), dtype=np.uint8)
self.gray_buffer2 = np.zeros((480, 640), dtype=np.uint8)
self.small_gray1 = np.zeros((120, 160), dtype=np.uint8)
self.small_gray2 = np.zeros((120, 160), dtype=np.uint8)
self.rknn_input = np.zeros((320, 320, 3), dtype=np.uint8)
self.flow_field = np.zeros((120, 160, 2), dtype=np.float32)
# 复用而不是重新分配
# ...
def process(self, raw_frame):
# 直接拷贝到预分配 buffer
np.copyto(self.frame_buffer[:raw_frame.shape[0], :raw_frame.shape[1]], raw_frame)
# 而不是: gray = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
# ...
```
---
## 十二、测试与验证
### 12.1 精度验证流程
```
步骤 1实验室验证
├── 在已知流速的水槽中测试
├── 对比 YOLO+光流 结果与标准流速仪
└── 记录误差分布
步骤 2现场验证
├── 在实际河道部署
├── 同步使用浮标法或 ADCP 测量
├── 在不同流速下测试(低/中/高)
└── 记录不同光照条件下的表现
步骤 3长期稳定性
├── 连续运行 7 天
├── 监控内存泄漏
├── 监控精度漂移
└── 记录设备稳定性
```
### 12.2 基准测试脚本
```python
"""
benchmark.py — 在 RV1126B 上运行基准测试
"""
import time
import numpy as np
import cv2
def benchmark_yolo(detector, num_runs=100):
"""YOLO 推理基准测试"""
test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
# 预热
for _ in range(10):
detector.detect(test_frame)
# 测试
times = []
for _ in range(num_runs):
start = time.perf_counter()
detector.detect(test_frame)
times.append(time.perf_counter() - start)
avg = np.mean(times)
p50 = np.percentile(times, 50)
p95 = np.percentile(times, 95)
p99 = np.percentile(times, 99)
print(f"YOLO11 Benchmark ({num_runs} runs):")
print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)")
print(f" P50: {p50*1000:.1f}ms")
print(f" P95: {p95*1000:.1f}ms")
print(f" P99: {p99*1000:.1f}ms")
print(f" Min: {min(times)*1000:.1f}ms")
print(f" Max: {max(times)*1000:.1f}ms")
def benchmark_optical_flow(flow_processor, num_runs=100):
"""光流基准测试"""
frame1 = np.random.randint(0, 255, (120, 160), dtype=np.uint8)
frame2 = np.random.randint(0, 255, (120, 160), dtype=np.uint8)
times = []
for _ in range(num_runs):
start = time.perf_counter()
cv2.calcOpticalFlowFarneback(frame1, frame2, None,
pyr_scale=0.5, levels=3,
winsize=15, iterations=3,
poly_n=5, poly_sigma=1.2, flags=0)
times.append(time.perf_counter() - start)
frame1, frame2 = frame2, frame1
avg = np.mean(times)
print(f"Optical Flow Benchmark ({num_runs} runs):")
print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)")
print(f" P95: {np.percentile(times, 95)*1000:.1f}ms")
def benchmark_full_pipeline(system, num_frames=300):
"""完整流水线基准测试"""
test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
times = []
for i in range(num_frames):
start = time.perf_counter()
# 模拟一帧的处理
if i % 2 == 0:
system.detector.detect(test_frame)
system.optical_flow.process_frame(test_frame)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg = np.mean(times)
print(f"Full Pipeline Benchmark ({num_frames} frames):")
print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)")
print(f" P95: {np.percentile(times, 95)*1000:.1f}ms")
if __name__ == '__main__':
print("=" * 50)
print("RV1126B Benchmark Suite")
print("=" * 50)
# 需要先在 RV1126B 上初始化组件
# benchmark_yolo(detector)
# benchmark_optical_flow(flow)
# benchmark_full_pipeline(system)
```
---
## 十三、项目文件结构
```
water-flow-rv1126b/
├── training/ # 训练阶段x86 主机)
│ ├── train_yolo11.py # YOLO11 训练脚本
│ ├── export_onnx.py # ONNX 导出
│ ├── export_rknn.py # RKNN 转换
│ ├── water_flow.yaml # 数据集配置
│ └── calibration_images/ # INT8 校准数据
├── deployment/ # 部署阶段RV1126B
│ ├── main.py # 主程序
│ ├── yolo11_detector.py # YOLO11 RKNN 推理
│ ├── optical_flow.py # 光流模块
│ ├── fusion_engine.py # 融合引擎
│ ├── tracker.py # 轻量追踪器
│ ├── calibration.json # 标定参数
│ └── yolo11n_water_flow_int8.rknn # RKNN 模型
├── cpp/ # C++ 生产版本
│ ├── yolo11_rknn.cpp
│ ├── optical_flow.cpp
│ ├── fusion_engine.cpp
│ ├── main.cpp
│ └── CMakeLists.txt
├── benchmark/
│ └── benchmark.py # 性能基准测试
├── docs/
│ └── calibration_guide.md # 标定操作指南
└── scripts/
└── build_rv1126b.sh # 构建部署脚本
```
---
## 十四、快速启动检查清单
```
[ ] 1. 在 x86 主机上安装 Ultralytics + RKNN Toolkit2
[ ] 2. 采集 1000+ 张水面示踪物图像
[ ] 3. 标注数据COCO/YOLO 格式)
[ ] 4. 训练 YOLO11n 模型mAP@50 > 0.85
[ ] 5. 导出 ONNX (imgsz=320)
[ ] 6. 准备 100-200 张校准图片
[ ] 7. 转换为 RKNN INT8 模型
[ ] 8. 在主机上模拟 RV1126B 推理验证
[ ] 9. 传输到 RV1126B 开发板
[ ] 10. 执行相机标定,生成 calibration.json
[ ] 11. 在 RV1126B 上运行基准测试
[ ] 12. 连接相机,端到端测试
[ ] 13. 与传统方法对比验证精度
[ ] 14. 调优参数(检测阈值、光流参数、融合权重)
```
---
*创建2026-04-23 | 平台RV1126B | 模型YOLO11n*