66 KiB
Executable File
66 KiB
Executable File
YOLO11 + RV1126B 水流速度检测 — 混合方案完整实现
核心方案:YOLO11 检测离散示踪物 + 光流法补充无示踪物区域 + 结果融合
硬件平台:Rockchip RV1126B (1.2 TOPS NPU)
创建:2026-04-23
一、RV1126B 硬件约束分析
1.1 芯片规格
| 参数 | 规格 | 对我们的影响 |
|---|---|---|
| NPU | 1.2 TOPS (INT8) | 只能跑小模型,YOLO11n 是上限 |
| CPU | ARM Cortex-A7 (单核) | 光流法跑 CPU,性能有限 |
| 内存 | 256MB-512MB DDR3 | 模型不能太大,批处理=1 |
| 视频编码 | H.264/H.265 硬编解码 | 可以利用 VPU 解码减轻 CPU |
| ISP | 支持 MIPI CSI 相机输入 | 直接接摄像头 |
| 操作系统 | Buildroot / Debian (Linux) | 交叉编译环境 |
1.2 性能预期
| 模型 | 输入尺寸 | 量化方式 | 预期 FPS | 备注 |
|---|---|---|---|---|
| YOLO11n | 320×320 | INT8 | ~25-30 | 推荐,可以实时 |
| YOLO11n | 640×640 | INT8 | ~8-12 | 精度更高但不够实时 |
| YOLO11s | 320×320 | INT8 | ~12-15 | 可能内存紧张 |
| YOLO11n | 320×320 | FP16 | ~15-20 | 精度略高,速度慢 |
关键决策:使用 YOLO11n + 320×320 + INT8 量化
二、整体架构设计
┌────────────────────────────────────────────────────────────┐
│ RV1126B 处理流水线 │
│ │
│ ┌──────────┐ ┌────────────┐ ┌──────────────────┐ │
│ │ 摄像头 │───>│ VPU 硬解码 │───>│ 帧缓冲 (NV12) │ │
│ │ MIPI/USB │ │ (硬件加速) │ │ │ │
│ └──────────┘ └────────────┘ └────────┬─────────┘ │
│ │ │
│ ┌─────────────────┼──────────┐ │
│ │ │ │ │
│ ▼ ▼ │ │
│ ┌─────────────┐ ┌──────────────┐ │ │
│ │ NPU 推理 │ │ CPU 光流计算 │ │ │
│ │ YOLO11n │ │ Farneback │ │ │
│ │ INT8 RKNN │ │ 稀疏光流 │ │ │
│ │ 320×320 │ │ ROI 区域 │ │ │
│ └──────┬──────┘ └──────┬───────┘ │ │
│ │ │ │ │
│ ▼ ▼ │ │
│ ┌─────────────────────────────┐ │ │
│ │ 结果融合引擎 │ │ │
│ │ │ │ │
│ │ YOLO 区域 → 精确轨迹 │ │ │
│ │ 光流区域 → 稠密速度场 │ │ │
│ │ 融合 → 综合流速估计 │ │ │
│ └──────────────┬──────────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────────────────────┐ │ │
│ │ 流速计算 + 像素→实际坐标转换 │ │ │
│ │ 单应性矩阵 / 标定因子 │ │ │
│ └──────────────┬──────────────┘ │ │
│ │ │ │
│ ▼ │ │
│ ┌─────────────────────────────┐ │ │
│ │ 结果输出: JSON / MQTT / 显示 │ │ │
│ └─────────────────────────────┘ │ │
└──────────────────────────────────────────────────────┘
时序规划 (目标 30 FPS, 每帧 33ms):
├── VPU 解码: ~3ms (硬件)
├── YOLO11n 推理: ~35ms (NPU, INT8) → 可以每 2 帧跑一次
├── 光流计算: ~15ms (CPU, 稀疏/降采样)
├── 结果融合: ~2ms (CPU)
└── 总周期: 每帧 ~20ms (YOLO 隔帧) → 可达 15-20 FPS
三、环境搭建
3.1 主机端(训练 + 模型转换)
# 在 x86 Linux 主机上操作(用于训练和模型转换)
# 1. 创建 Python 环境
conda create -n yolo11-rv1126 python=3.10 -y
conda activate yolo11-rv1126
# 2. 安装 Ultralytics (YOLO11)
pip install ultralytics
# 3. 安装 RKNN Toolkit2
# 下载: https://github.com/airockchip/rknn-toolkit2
# 选择对应版本(Ubuntu x86)
git clone https://github.com/airockchip/rknn-toolkit2.git
cd rknn-toolkit2/rknn-toolkit2/packages/
pip install rknn_toolkit2-*.whl
# 4. 其他依赖
pip install opencv-python numpy onnx onnxruntime
pip install roboflow # 数据管理(可选)
pip install supervision # 可视化
3.2 RV1126B 目标板环境
# 在 RV1126B 开发板上操作
# 1. 安装 RKNN Runtime(推理运行时,不含训练功能)
# 从 Rockchip SDK 或板子系统镜像中获取
# 通常在 /usr/lib/librknnrt.so
# 2. 安装 Python RKNN 运行时
pip install rknn_toolkit2_lite # 轻量版,只含推理
# 3. 安装 OpenCV(用于光流和后处理)
# 使用 Buildroot 编译或 apt 安装
apt-get install python3-opencv # 如果使用 Debian 系统
# 4. 其他依赖
pip install numpy
3.3 交叉编译工具链
# 安装 Rockchip 交叉编译工具链
# 从 Rockchip SDK 获取
# 设置环境变量
export CROSS_COMPILE=arm-buildroot-linux-gnueabihf-
export CC=${CROSS_COMPILE}gcc
export CXX=${CROSS_COMPILE}g++
# 交叉编译 OpenCV(如果 Buildroot 未包含)
# 交叉编译 Python C 扩展模块
四、YOLO11 模型训练
4.1 数据准备
# water_flow.yaml
path: ./water_flow_dataset
train: images/train
val: images/val
nc: 1
names:
- floating_object
数据集结构:
water_flow_dataset/
├── images/
│ ├── train/ # 800-2000 张
│ └── val/ # 200-500 张
└── labels/
├── train/ # YOLO 格式标注
└── val/
4.2 YOLO11n 训练
from ultralytics import YOLO
# 加载 YOLO11n 预训练模型
model = YOLO('yolo11n.pt')
# 训练配置
results = model.train(
# 数据集
data='water_flow.yaml',
# 训练参数
epochs=150,
imgsz=640, # 训练时用高分辨率
batch=16,
device=0,
patience=30,
# 针对水面场景的数据增强
hsv_h=0.015, # 色调小幅变化
hsv_s=0.7, # 饱和度大变化(水面反光)
hsv_v=0.4, # 亮度变化
degrees=0.0, # 不旋转(相机固定)
translate=0.1,
scale=0.5, # 大尺度变化(适应不同距离目标)
fliplr=0.5,
mosaic=1.0,
mixup=0.1,
copy_paste=0.1, # 复制粘贴增强(示踪物少的场景)
# 优化器
optimizer='AdamW',
lr0=0.001,
lrf=0.01,
weight_decay=0.0005,
# 保存
project='runs/detect',
name='water_flow_yolo11n',
save_period=10,
)
# 验证
metrics = model.val()
print(f"mAP@50: {metrics.box.map50:.4f}")
print(f"mAP@50-95: {metrics.box.map:.4f}")
4.3 针对 RV1126B 的特殊训练策略
# 关键:RV1126B NPU 是 INT8 量化,训练时需要考虑量化感知
from ultralytics import YOLO
import torch
# 方案 A:PTQ(训练后量化)— 简单但精度损失 2-5%
# 直接导出 INT8,下面会讲到
# 方案 B:QAT(量化感知训练)— 精度损失 < 1%,推荐
class QATWaterFlowTrainer:
"""
量化感知训练,让模型适应 INT8 量化
"""
def __init__(self):
self.model = YOLO('yolo11n.pt')
def train_with_quantization(self, data_config, epochs=50):
# 第一阶段:正常训练到收敛
self.model.train(
data=data_config,
epochs=epochs,
imgsz=640,
batch=16,
)
# 第二阶段:模拟量化微调
# 注意:Ultralytics 不直接支持 QAT
# 需要手动添加 fake quantization
# 加载最佳权重
model = YOLO('runs/detect/water_flow_yolo11n/weights/best.pt')
# 添加量化模拟(简化的做法:降低学习率继续训练)
model.train(
data=data_config,
epochs=20,
imgsz=320, # 用目标推理分辨率微调
batch=32,
lr0=0.0001, # 极低学习率
close_mosaic=10, # 关闭 mosaic 模拟推理条件
)
return model
五、模型导出与 RKNN 转换
5.1 导出流程
训练好的 YOLO11 PyTorch 模型 (.pt)
│
▼ (torch.onnx.export)
ONNX 模型 (.onnx)
│
▼ (RKNN Toolkit2)
RKNN 模型 (.rknn) — INT8 量化
│
▼ (部署到 RV1126B)
NPU 推理
5.2 导出 ONNX
from ultralytics import YOLO
# 加载训练好的模型
model = YOLO('runs/detect/water_flow_yolo11n/weights/best.pt')
# 导出 ONNX
# 关键参数:
# - opset 13: RKNN 支持的最佳 opset
# - simplify: 简化计算图
# - imgsz 320: 导出目标推理尺寸(RV1126B 用 320)
model.export(
format='onnx',
imgsz=320, # 目标推理分辨率
opset=13, # ONNX opset 版本
simplify=True, # 简化模型
dynamic=False, # 固定输入尺寸(NPU 不支持动态)
half=False, # 不需要 FP16,用 INT8
int8=False, # ONNX 导出不量化,后面用 RKNN 量化
)
# 输出: runs/detect/water_flow_yolo11n/weights/best.onnx
print("ONNX 导出完成")
5.3 ONNX → RKNN 转换(INT8 量化)
"""
export_rknn.py — 在 x86 主机上运行
将 ONNX 模型转换为 RKNN INT8 量化模型
"""
from rknn.api import RKNN
import cv2
import numpy as np
import os
# ========== 配置 ==========
ONNX_MODEL = 'runs/detect/water_flow_yolo11n/weights/best.onnx'
RKNN_MODEL = 'yolo11n_water_flow_int8.rknn'
CALIBRATION_DIR = './calibration_images/' # 校准数据集
DATASET_TXT = 'calibration_list.txt' # 校准图片列表
TARGET_PLATFORM = 'rv1126b' # 或 'rk3588' 等
# ========== 准备校准数据 ==========
def prepare_calibration_images(image_dir, output_txt, num_images=100):
"""
准备 INT8 量化校准数据
需要 100-500 张代表性图片
"""
images = [f for f in os.listdir(image_dir) if f.endswith(('.jpg', '.png'))]
images = images[:num_images]
with open(output_txt, 'w') as f:
for img in images:
f.write(os.path.join(image_dir, img) + '\n')
print(f"Prepared {len(images)} calibration images")
# 准备校准数据
prepare_calibration_images(CALIBRATION_DIR, DATASET_TXT)
# ========== RKNN 转换 ==========
rknn = RKNN(verbose=True)
# 1. 配置
print("--> Configuring RKNN model")
ret = rknn.config(
mean_values=[[0, 0, 0]], # 根据训练时的归一化调整
std_values=[[255, 255, 255]], # YOLO 通常是 /255
target_platform=TARGET_PLATFORM,
quantized_dtype='asymmetric_affine-u8', # INT8 量化
quantized_algorithm='KLD', # KLD 校准算法
)
if ret != 0:
print("Config failed!")
exit()
# 2. 加载 ONNX
print("--> Loading ONNX model")
ret = rknn.load_onnx(model=ONNX_MODEL)
if ret != 0:
print("Load ONNX failed!")
exit()
# 3. 构建模型(含 INT8 量化)
print("--> Building RKNN model with INT8 quantization")
ret = rknn.build(
do_quantization=True,
dataset=DATASET_TXT,
num_preprocess=1, # 预处理放在 NPU 上
)
if ret != 0:
print("Build failed!")
exit()
# 4. 导出 RKNN 模型
print(f"--> Exporting RKNN model to {RKNN_MODEL}")
ret = rknn.export_rknn(RKNN_MODEL)
if ret != 0:
print("Export failed!")
exit()
# 5. (可选)在主机上模拟 RV1126B 推理验证
print("--> Testing on host (simulating RV1126B)")
ret = rknn.init_runtime(target=TARGET_PLATFORM)
if ret != 0:
print("Init runtime failed!")
exit()
# 推理测试
test_img = cv2.imread(os.path.join(CALIBRATION_DIR, os.listdir(CALIBRATION_DIR)[0]))
test_img = cv2.resize(test_img, (320, 320))
test_img = cv2.cvtColor(test_img, cv2.COLOR_BGR2RGB)
outputs = rknn.inference(inputs=[test_img])
print(f"RKNN inference output shape: {[o.shape for o in outputs]}")
rknn.release()
print("Done!")
5.4 RKNN 量化配置详解
# 不同量化策略对比
# 策略 1:对称量化(速度快,精度略低)
rknn.config(
quantized_dtype='symmetric_affine-u8',
quantized_algorithm='MIN_MAX',
)
# 策略 2:非对称量化(推荐,精度更好)
rknn.config(
quantized_dtype='asymmetric_affine-u8',
quantized_algorithm='KLD',
)
# 策略 3:混合量化(部分层 INT8,部分层 FP16)
rknn.config(
quantized_dtype='mixed',
# 需要在模型配置中指定哪些层用 FP16
)
# 量化校准技巧:
# 1. 校准数据需要覆盖所有场景(不同光照、不同流速、不同天气)
# 2. 至少 100 张,推荐 200-500 张
# 3. 校准数据不需要标注,只需要原始图片
# 4. 图片尺寸需要和训练时一致
六、YOLO11 检测模块(RV1126B 端)
6.1 RKNN 推理封装
"""
yolo11_detector.py — RV1126B 端 YOLO11 检测器
使用 RKNN Runtime 在 NPU 上推理
"""
import cv2
import numpy as np
import time
try:
from rknn.api import RKNN
except ImportError:
# 主机端开发用 mock
from rknn.mock_runtime import RKNN
class YOLO11Detector:
"""
YOLO11n RKNN 推理封装
专为 RV1126B 优化
"""
# YOLO11 默认参数
INPUT_SIZE = 320
CONF_THRESHOLD = 0.25
NMS_THRESHOLD = 0.45
NUM_CLASSES = 1
def __init__(self, rknn_model_path, use_npu=True):
self.rknn = RKNN()
self.use_npu = use_npu
# 加载模型
print(f"Loading RKNN model: {rknn_model_path}")
ret = self.rknn.load_rknn(rknn_model_path)
if ret != 0:
raise RuntimeError("Failed to load RKNN model")
# 初始化运行时
if use_npu:
ret = self.rknn.init_runtime(target='rv1126b', device_id=None)
else:
ret = self.rknn.init_runtime(target=None) # CPU 模拟
if ret != 0:
raise RuntimeError("Failed to init RKNN runtime")
print("YOLO11 Detector initialized")
def preprocess(self, frame):
"""
预处理: resize + BGR→RGB + normalize
输入: BGR 图像 (任意尺寸)
输出: RGB numpy array (320×320)
"""
# 保持宽高比的 resize + padding
h, w = frame.shape[:2]
scale = self.INPUT_SIZE / max(h, w)
new_w, new_h = int(w * scale), int(h * scale)
resized = cv2.resize(frame, (new_w, new_h))
# padding 到 320×320
padded = np.full((self.INPUT_SIZE, self.INPUT_SIZE, 3), 114, dtype=np.uint8)
top = (self.INPUT_SIZE - new_h) // 2
left = (self.INPUT_SIZE - new_w) // 2
padded[top:top+new_h, left:left+new_w] = resized
# BGR → RGB
rgb = cv2.cvtColor(padded, cv2.COLOR_BGR2RGB)
# 记录变换信息(用于后处理还原坐标)
transform_info = {
'original_size': (w, h),
'scale': scale,
'padding': (left, top),
}
return rgb, transform_info
def postprocess(self, outputs, transform_info, conf_threshold=None):
"""
解析 YOLO11 输出
YOLO11 ONNX 输出格式: (1, 84, 8400) for COCO
- 84 = 4 (bbox) + 80 (classes)
- 8400 = 检测锚点数量
对于我们的自定义模型:
- 84 = 4 (bbox) + 1 (class)
"""
if conf_threshold is None:
conf_threshold = self.CONF_THRESHOLD
# 输出解析
# YOLO11 输出: (1, num_classes + 4, num_anchors)
pred = outputs[0] # shape: (1, 5, 8400) for 1 class
pred = pred[0].T # shape: (8400, 5) → (8400, 5)
boxes = []
scores = []
class_ids = []
for i in range(pred.shape[0]):
score = pred[i, 4] # 置信度(只有一个类别)
if score < conf_threshold:
continue
# bbox (中心点 + 宽高)
cx, cy, bw, bh = pred[i, 0:4]
# 还原到原始图像坐标
cx = (cx - transform_info['padding'][0]) / transform_info['scale']
cy = (cy - transform_info['padding'][1]) / transform_info['scale']
bw = bw / transform_info['scale']
bh = bh / transform_info['scale']
# 转为 x1, y1, x2, y2
x1 = cx - bw / 2
y1 = cy - bh / 2
x2 = cx + bw / 2
y2 = cy + bh / 2
# 裁剪到图像边界
orig_w, orig_h = transform_info['original_size']
x1 = max(0, x1)
y1 = max(0, y1)
x2 = min(orig_w, x2)
y2 = min(orig_h, y2)
boxes.append([x1, y1, x2, y2])
scores.append(float(score))
class_ids.append(0)
if len(boxes) == 0:
return []
# NMS
indices = cv2.dnn.NMSBoxes(
boxes, scores, conf_threshold, self.NMS_THRESHOLD
)
detections = []
if len(indices) > 0:
for i in indices.flatten():
detections.append({
'bbox': boxes[i],
'score': scores[i],
'class_id': class_ids[i],
'center': (
(boxes[i][0] + boxes[i][2]) / 2,
(boxes[i][1] + boxes[i][3]) / 2
),
})
return detections
def detect(self, frame, conf_threshold=None):
"""
完整检测流程: 预处理 → NPU推理 → 后处理
Args:
frame: BGR 图像 (numpy array)
conf_threshold: 置信度阈值
Returns:
list of detection dicts
"""
# 预处理
rgb, transform_info = self.preprocess(frame)
# NPU 推理
outputs = self.rknn.inference(inputs=[rgb])
# 后处理
detections = self.postprocess(outputs, transform_info, conf_threshold)
return detections
def detect_batch(self, frames):
"""批量检测(如果 NPU 支持,但目前 RV1126B batch=1)"""
return [self.detect(f) for f in frames]
def get_fps(self, num_iterations=100, test_frame=None):
"""
测量推理 FPS
"""
if test_frame is None:
test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
times = []
for _ in range(num_iterations):
start = time.perf_counter()
self.detect(test_frame)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg_time = np.mean(times)
fps = 1.0 / avg_time
print(f"Average inference time: {avg_time*1000:.1f}ms, FPS: {fps:.1f}")
return fps
def release(self):
self.rknn.release()
6.2 检测性能优化
class OptimizedDetector(YOLO11Detector):
"""
针对 RV1126B 的深度优化检测器
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 预分配内存(避免推理时分配)
self._prealloc_buffer = np.zeros(
(self.INPUT_SIZE, self.INPUT_SIZE, 3), dtype=np.uint8
)
def detect_fast(self, frame, conf_threshold=None):
"""
快速检测路径 — 减少内存拷贝
优化点:
1. 原地 resize(减少内存分配)
2. 跳过不必要的颜色转换(如果相机输出 RGB)
3. 预分配推理输入 buffer
4. 复用 NMS 输出 buffer
"""
# 如果相机可以输出 RGB,跳过 BGR→RGB 转换
# 在 RV1126B ISP 中配置输出格式
# ... (实现同上,但优化内存操作)
pass
def detect_roi(self, frame, roi):
"""
只在 ROI 区域检测 — 大幅加速
roi: (x1, y1, x2, y2)
"""
x1, y1, x2, y2 = roi
roi_frame = frame[y1:y2, x1:x2]
detections = self.detect(roi_frame)
# 还原坐标
for det in detections:
det['bbox'] = [
det['bbox'][0] + x1,
det['bbox'][1] + y1,
det['bbox'][2] + x1,
det['bbox'][3] + y1,
]
det['center'] = (
det['center'][0] + x1,
det['center'][1] + y1,
)
return detections
七、光流法模块(CPU 端)
7.1 为什么需要光流法
YOLO 检测的局限:
├── 只能检测离散的目标
├── 水面无漂浮物时 → 无法测速
├── 示踪物太小 → 检测不到
└── 需要足够标注数据
光流法的补充:
├── 可以追踪水面纹理运动
├── 不需要离散目标
├── 稠密或半稠密速度场
└── 但精度低于 YOLO 轨迹
混合方案优势:
├── 有示踪物 → YOLO 精确轨迹(高精度)
├── 无示踪物 → 光流补充(覆盖盲区)
└── 两者结果融合 → 鲁棒性最强
7.2 稀疏光流法实现(适合 RV1126B CPU)
"""
optical_flow.py — 光流法流速估计
针对 RV1126B 单核 CPU 优化
"""
import cv2
import numpy as np
from collections import defaultdict
class SparseOpticalFlow:
"""
Lucas-Kanade 稀疏光流法
优势:
- 计算量小(只追踪特征点)
- 适合嵌入式 CPU
- 可以追踪水面纹理特征
策略:
- 只在 YOLO 未检测到的区域追踪
- 或作为 YOLO 检测的补充
"""
def __init__(self, frame_size):
# Lucas-Kanade 参数
self.lk_params = dict(
winSize=(15, 15), # 窗口大小
maxLevel=2, # 金字塔层数
criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03)
)
# 特征点检测参数
self.feature_params = dict(
maxCorners=100, # 最多特征点数
qualityLevel=0.01, # 质量阈值
minDistance=10, # 最小间距
blockSize=3,
)
self.frame_size = frame_size
# 状态
self.prev_gray = None
self.prev_points = []
self.point_trajectories = defaultdict(list)
self.point_id_counter = 0
def process_frame(self, frame, yolo_detections=None,
exclude_radius=20):
"""
处理单帧,追踪特征点
Args:
frame: BGR 当前帧
yolo_detections: YOLO 检测结果(可选)
exclude_radius: 排除 YOLO 检测区域的半径(避免重复)
Returns:
flow_vectors: [(x1, y1, x2, y2, point_id), ...]
特征点位移向量
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
if self.prev_gray is None:
self.prev_gray = gray
self._detect_features(gray, yolo_detections, exclude_radius)
return []
# 追踪特征点
if len(self.prev_points) > 0:
prev_pts = np.array(self.prev_points, dtype=np.float32)
next_pts, status, errors = cv2.calcOpticalFlowPyrLK(
self.prev_gray, gray, prev_pts, None, **self.lk_params
)
# 过滤有效点
good_prev = []
good_next = []
valid_ids = []
for i, (next_pt, status_flag, error) in enumerate(
zip(next_pts, status, errors)
):
if status_flag == 1 and error < 500:
# 检查是否在图像范围内
if (0 < next_pt[0] < self.frame_size[0] and
0 < next_pt[1] < self.frame_size[1]):
good_prev.append(self.prev_points[i])
good_next.append(next_pt.tolist())
valid_ids.append(self._point_ids[i])
# 记录轨迹
for pid, prev_pt, next_pt in zip(valid_ids, good_prev, good_next):
self.point_trajectories[pid].append(next_pt)
# 限制轨迹长度
if len(self.point_trajectories[pid]) > 30:
self.point_trajectories[pid] = \
self.point_trajectories[pid][-30:]
# 计算光流向量
flow_vectors = []
for pid, p, n in zip(valid_ids, good_prev, good_next):
flow_vectors.append((p[0], p[1], n[0], n[1], pid))
# 更新状态
self.prev_points = good_next
self._point_ids = valid_ids
# 补充新特征点(如果数量不足)
if len(self.prev_points) < 50:
self._detect_features(gray, yolo_detections, exclude_radius)
# 合并
new_pts = [p for p in self.prev_points
if p not in good_next]
if new_pts:
self.prev_points.extend(new_pts[:20])
else:
self._detect_features(gray, yolo_detections, exclude_radius)
flow_vectors = []
self.prev_gray = gray
return flow_vectors
def _detect_features(self, gray, yolo_detections=None,
exclude_radius=20):
"""
检测新的特征点
排除 YOLO 检测区域(避免重复追踪)
"""
mask = np.ones(gray.shape[:2], dtype=np.uint8) * 255
if yolo_detections:
# 在 YOLO 检测区域设置 mask=0(排除)
for det in yolo_detections:
x1, y1, x2, y2 = [int(v) for v in det['bbox']]
cv2.rectangle(mask, (x1 - exclude_radius, y1 - exclude_radius),
(x2 + exclude_radius, y2 + exclude_radius), 0, -1)
points = cv2.goodFeaturesToTrack(
gray, mask=mask, **self.feature_params
)
if points is not None:
self.prev_points = points.reshape(-1, 2).tolist()
self._point_ids = list(range(
self.point_id_counter,
self.point_id_counter + len(self.prev_points)
))
self.point_id_counter += len(self.prev_points)
def compute_velocity_field(self, fps):
"""
从追踪的轨迹计算速度场
Returns:
velocities: [(x, y, vx, vy, speed), ...]
每个特征点的位置和速度
"""
velocities = []
for pid, traj in self.point_trajectories.items():
if len(traj) < 5:
continue
# 使用最近 15 帧
recent = traj[-min(15, len(traj)):]
start = recent[0]
end = recent[-1]
dx = end[0] - start[0]
dy = end[1] - start[1]
frames_span = len(recent)
time_sec = frames_span / fps
if time_sec == 0:
continue
vx = dx / time_sec # 像素/秒
vy = dy / time_sec
speed = np.sqrt(vx**2 + vy**2)
# 当前位置
current_pos = traj[-1]
velocities.append((current_pos[0], current_pos[1],
vx, vy, speed, pid))
return velocities
def reset(self):
"""重置状态"""
self.prev_gray = None
self.prev_points = []
self.point_trajectories = defaultdict(list)
class DenseOpticalFlow:
"""
Farneback 稠密光流法
注意:在 RV1126B 单核 CPU 上,全分辨率稠密光流太慢
使用降采样 + ROI 策略
"""
def __init__(self, downsample_factor=4):
self.downsample_factor = downsample_factor
self.flow_params = dict(
pyr_scale=0.5,
levels=3,
winsize=15,
iterations=3,
poly_n=5,
poly_sigma=1.2,
flags=0,
)
self.prev_gray = None
def process_frame(self, frame, roi=None):
"""
计算稠密光流(降采样)
roi: 只计算感兴趣区域 (x1, y1, x2, y2)
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# 降采样加速
small = cv2.resize(gray, None,
fx=1/self.downsample_factor,
fy=1/self.downsample_factor)
if self.prev_gray is None:
self.prev_gray = small
return None
flow = cv2.calcOpticalFlowFarneback(
self.prev_gray, small, None, **self.flow_params
)
self.prev_gray = small
# 恢复到原始分辨率
if roi is None:
flow = cv2.resize(flow, (frame.shape[1], frame.shape[0]))
else:
# 只还原 ROI 区域
x1, y1, x2, y2 = roi
flow_roi = cv2.resize(
flow[y1//self.downsample_factor:y2//self.downsample_factor,
x1//self.downsample_factor:x2//self.downsample_factor],
(x2-x1, y2-y1)
)
return flow_roi
return flow
def get_average_flow(self, flow):
"""从光流场计算平均流速"""
if flow is None:
return 0, 0
magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1])
# 过滤掉太小和太大的运动
valid = (magnitude > 0.5) & (magnitude < 100)
if np.sum(valid) == 0:
return 0, 0
avg_vx = np.mean(flow[..., 0][valid])
avg_vy = np.mean(flow[..., 1][valid])
return avg_vx, avg_vy
7.3 光流法性能优化(RV1126B 专用)
class OptimizedOpticalFlow:
"""
针对 RV1126B 单核 CPU 的极致优化
优化策略:
1. 降采样 4x(减少 16 倍计算量)
2. 隔帧计算(每 2 帧算一次光流)
3. 只在水面 ROI 区域计算
4. 限制特征点数量
5. 使用 NEON 指令集(如果 OpenCV 编译时启用)
"""
def __init__(self, frame_width, frame_height):
self.frame_size = (frame_width, frame_height)
# 水面 ROI(需要根据实际场景设定)
# 排除天空和岸边
self.water_roi = (0, int(frame_height * 0.3),
frame_width, frame_height)
# 降采样
self.scale = 0.25 # 1/4 分辨率
# 隔帧
self.skip_frames = 1 # 每 2 帧计算一次
self.frame_counter = 0
self.sparse_flow = SparseOpticalFlow(
(int(frame_width * self.scale),
int(frame_height * self.scale))
)
def process(self, frame, yolo_detections=None):
"""
处理帧 — 优化版本
时间预算(RV1126B CPU):
└── 目标:每帧 < 15ms
"""
self.frame_counter += 1
if self.frame_counter % (self.skip_frames + 1) != 0:
return [] # 跳帧
# 裁剪水面 ROI
x1, y1, x2, y2 = self.water_roi
roi_frame = frame[y1:y2, x1:x2]
# 降采样
small = cv2.resize(roi_frame, None, fx=self.scale, fy=self.scale)
# 调整 YOLO 检测框到小图坐标
small_detections = []
if yolo_detections:
for det in yolo_detections:
bx1, by1, bx2, by2 = det['bbox']
# 转换到 ROI + 降采样坐标
sbx1 = (bx1 - x1) * self.scale
sby1 = (by1 - y1) * self.scale
sbx2 = (bx2 - x1) * self.scale
sby2 = (by2 - y1) * self.scale
small_detections.append({
'bbox': [sbx1, sby1, sbx2, sby2],
**{k: v for k, v in det.items() if k != 'bbox'}
})
# 光流计算(在小图上)
flows = self.sparse_flow.process_frame(small, small_detections)
# 还原到原图坐标
result = []
for (x1_f, y1_f, x2_f, y2_f, pid) in flows:
# 还原
rx1 = x1_f / self.scale + x1
ry1 = y1_f / self.scale + y1
rx2 = x2_f / self.scale + x1
ry2 = y2_f / self.scale + y1
result.append((rx1, ry1, rx2, ry2, pid))
return result
八、结果融合引擎
8.1 融合策略设计
融合逻辑:
场景 1: YOLO 检测到示踪物 + 光流有数据
├── 优先使用 YOLO 轨迹(离散、精确)
├── 光流作为验证(一致性检查)
└── 两者偏差 > 阈值 → 标记为可疑
场景 2: YOLO 未检测到 + 光流有数据
├── 使用光流结果
├── 标记为"低置信度"
└── 需要足够多的特征点(> 10)
场景 3: YOLO 检测到 + 光流无数据
├── 使用 YOLO 轨迹
└── 标记为"正常"
场景 4: 两者都无数据
└── 无法测速 → 报告无效
融合公式:
当两者都有数据时:
v_fused = w_yolo × v_yolo + w_flow × v_flow
w_yolo = 0.7, w_flow = 0.3 (YOLO 权重更高)
一致性检查:
|v_yolo - v_flow| / max(v_yolo, v_flow) < threshold (0.3)
→ 通过:正常融合
→ 不通过:只使用 YOLO,标记异常
8.2 融合引擎实现
"""
fusion_engine.py — YOLO + 光流融合引擎
"""
import numpy as np
from collections import defaultdict
from enum import Enum
class ConfidenceLevel(Enum):
HIGH = "high" # YOLO 轨迹
MEDIUM = "medium" # YOLO + 光流融合
LOW = "low" # 仅光流
INVALID = "invalid" # 无有效数据
class VelocityFusionEngine:
"""
融合 YOLO 检测轨迹和光流速度场
核心逻辑:
1. YOLO 轨迹提供精确的离散速度点
2. 光流提供稠密的背景速度场
3. 融合两者,得到更全面的速度估计
"""
def __init__(self, pixels_per_meter=None, homography_matrix=None,
fps=30):
self.ppm = pixels_per_meter
self.H = homography_matrix
self.fps = fps
# 融合权重
self.yolo_weight = 0.7
self.flow_weight = 0.3
self.consistency_threshold = 0.3 # 30% 偏差阈值
# YOLO 轨迹存储
self.yolo_trajectories = defaultdict(list)
# 光流速度场(最近 N 帧的滑动窗口)
self.flow_velocity_history = []
self.flow_history_size = 10
# 结果缓存
self.last_fused_velocity = None
self.last_confidence = ConfidenceLevel.INVALID
def update_yolo(self, detections, frame_idx):
"""
更新 YOLO 检测结果
detections: list of {bbox, score, center, ...}
"""
for det in detections:
track_id = det.get('track_id', id(det)) # 如果有追踪 ID
center = det['center']
self.yolo_trajectories[track_id].append(
(frame_idx, center[0], center[1])
)
# 限制轨迹长度
if len(self.yolo_trajectories[track_id]) > 60:
self.yolo_trajectories[track_id] = \
self.yolo_trajectories[track_id][-60:]
def update_flow(self, flow_velocities):
"""
更新光流速度场
flow_velocities: [(x, y, vx, vy, speed), ...]
"""
if not flow_velocities:
self.flow_velocity_history.append([])
else:
speeds = [v[4] for v in flow_velocities]
self.flow_velocity_history.append(speeds)
# 限制历史长度
if len(self.flow_velocity_history) > self.flow_history_size:
self.flow_velocity_history = \
self.flow_velocity_history[-self.flow_history_size:]
def compute_fused_velocity(self, frame_idx):
"""
计算当前帧的融合流速
Returns:
{
'velocity': float, # 融合后的流速 (m/s)
'velocity_pixel': float, # 像素速度
'confidence': ConfidenceLevel,
'source': str, # 'yolo', 'flow', 'fused'
'n_yolo_tracks': int,
'n_flow_points': int,
}
"""
# 1. 计算 YOLO 速度
yolo_velocities = []
for track_id, traj in self.yolo_trajectories.items():
v = self._compute_trajectory_velocity(traj)
if v is not None:
yolo_velocities.append(v)
# 2. 计算光流速度
flow_speeds = []
for speeds in self.flow_velocity_history[-5:]: # 最近 5 帧
flow_speeds.extend(speeds)
# 3. 融合
result = {
'n_yolo_tracks': len(yolo_velocities),
'n_flow_points': len(flow_speeds),
}
if yolo_velocities and flow_speeds:
# 两者都有数据
yolo_speed = np.median(yolo_velocities)
flow_speed = np.median(flow_speeds)
# 一致性检查
max_speed = max(yolo_speed, flow_speed)
if max_speed > 0:
deviation = abs(yolo_speed - flow_speed) / max_speed
else:
deviation = 0
if deviation < self.consistency_threshold:
# 一致 → 加权融合
fused = self.yolo_weight * yolo_speed + \
self.flow_weight * flow_speed
result['velocity_pixel'] = fused
result['confidence'] = ConfidenceLevel.MEDIUM
result['source'] = 'fused'
else:
# 不一致 → 只用 YOLO
result['velocity_pixel'] = yolo_speed
result['confidence'] = ConfidenceLevel.HIGH
result['source'] = 'yolo'
result['warning'] = f'YOLO/Flow deviation: {deviation:.2f}'
elif yolo_velocities:
# 只有 YOLO
result['velocity_pixel'] = np.median(yolo_velocities)
result['confidence'] = ConfidenceLevel.HIGH
result['source'] = 'yolo'
elif flow_speeds:
# 只有光流
if len(flow_speeds) >= 5: # 至少 5 个特征点
result['velocity_pixel'] = np.median(flow_speeds)
result['confidence'] = ConfidenceLevel.LOW
result['source'] = 'flow'
else:
result['confidence'] = ConfidenceLevel.INVALID
else:
result['confidence'] = ConfidenceLevel.INVALID
# 像素速度 → 实际速度
if result.get('velocity_pixel') is not None:
result['velocity'] = self._pixel_to_meters(
result['velocity_pixel']
)
else:
result['velocity'] = None
self.last_fused_velocity = result
self.last_confidence = result['confidence']
return result
def _compute_trajectory_velocity(self, trajectory):
"""从轨迹计算速度(像素/秒)"""
if len(trajectory) < 5:
return None
# 使用分段平均
segment_size = 10
velocities = []
for i in range(0, len(trajectory) - segment_size, segment_size // 2):
seg = trajectory[i:i + segment_size]
start_f, sx, sy = seg[0]
end_f, ex, ey = seg[-1]
dist = np.sqrt((ex - sx)**2 + (ey - sy)**2)
time_sec = (end_f - start_f) / self.fps
if time_sec > 0:
velocities.append(dist / time_sec)
if not velocities:
return None
return float(np.median(velocities))
def _pixel_to_meters(self, pixel_velocity):
"""像素速度转换为 m/s"""
if self.H is not None:
# 使用单应性矩阵(考虑透视)
# 在画面中心采样转换因子
center_x = 320 # 假设 640 宽
center_y = 240 # 假设 480 高
pt = np.array([[[center_x, center_y]]], dtype=np.float32)
pt_moved = np.array([[[center_x + 1, center_y]]], dtype=np.float32)
real1 = cv2.perspectiveTransform(pt, self.H)[0][0]
real2 = cv2.perspectiveTransform(pt_moved, self.H)[0][0]
meter_per_pixel = np.linalg.norm(real2 - real1)
return pixel_velocity * meter_per_pixel
elif self.ppm:
return pixel_velocity / self.ppm
return pixel_velocity # 返回像素速度
def get_summary(self, window_frames=300):
"""获取最近 N 帧的流速摘要"""
# 实现类似前面的统计摘要
pass
def reset(self):
"""重置所有状态"""
self.yolo_trajectories.clear()
self.flow_velocity_history.clear()
self.last_fused_velocity = None
self.last_confidence = ConfidenceLevel.INVALID
九、完整集成流水线
9.1 主程序
"""
main.py — RV1126B 水流速度检测主程序
运行方式:
python3 main.py --model yolo11n_water_flow_int8.rknn \
--input /dev/video0 \
--calibration calibration.json \
--output results/
"""
import cv2
import json
import time
import argparse
import numpy as np
from pathlib import Path
from datetime import datetime
from yolo11_detector import YOLO11Detector
from optical_flow import SparseOpticalFlow, OptimizedOpticalFlow
from fusion_engine import VelocityFusionEngine
from tracker import SimpleTracker # 下面会定义
class SimpleTracker:
"""
轻量级追踪器 — RV1126B 版
不使用完整的 ByteTrack(太重),
用简化的 IoU 匹配追踪
"""
def __init__(self, max_lost=10, min_iou=0.3):
self.tracks = {} # track_id -> {last_bbox, last_center, lost_count}
self.next_id = 0
self.max_lost = max_lost
self.min_iou = min_iou
def update(self, detections):
"""
更新追踪状态
detections: [{bbox, score, center}, ...]
Returns: [{bbox, score, center, track_id}, ...]
"""
if not detections:
# 所有轨迹增加丢失计数
for track in self.tracks.values():
track['lost_count'] += 1
# 清理丢失的
self.tracks = {
tid: t for tid, t in self.tracks.items()
if t['lost_count'] <= self.max_lost
}
return []
# IoU 匹配
matched_det = set()
for tid, track in list(self.tracks.items()):
best_iou = 0
best_det_idx = -1
for i, det in enumerate(detections):
if i in matched_det:
continue
iou = self._compute_iou(track['last_bbox'], det['bbox'])
if iou > best_iou:
best_iou = iou
best_det_idx = i
if best_iou >= self.min_iou:
# 匹配成功
track['last_bbox'] = detections[best_det_idx]['bbox']
track['last_center'] = detections[best_det_idx]['center']
track['lost_count'] = 0
detections[best_det_idx]['track_id'] = tid
matched_det.add(best_det_idx)
else:
track['lost_count'] += 1
# 未匹配的检测 → 新轨迹
for i, det in enumerate(detections):
if i not in matched_det:
new_id = self.next_id
self.next_id += 1
det['track_id'] = new_id
self.tracks[new_id] = {
'last_bbox': det['bbox'],
'last_center': det['center'],
'lost_count': 0,
}
# 清理
self.tracks = {
tid: t for tid, t in self.tracks.items()
if t['lost_count'] <= self.max_lost
}
return detections
def _compute_iou(self, box1, box2):
"""计算 IoU"""
x1 = max(box1[0], box2[0])
y1 = max(box1[1], box2[1])
x2 = min(box1[2], box2[2])
y2 = min(box1[3], box2[3])
inter_area = max(0, x2 - x1) * max(0, y2 - y1)
box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
union = box1_area + box2_area - inter_area
return inter_area / union if union > 0 else 0
class WaterFlowVelocitySystem:
"""
端到端水流速度检测系统
RV1126B 优化版本
"""
def __init__(self, args):
# 加载标定参数
with open(args.calibration) as f:
calib = json.load(f)
ppm = calib.get('pixels_per_meter')
H = calib.get('homography_matrix')
if H:
H = np.array(H)
# 初始化组件
print("Initializing YOLO11 detector...")
self.detector = YOLO11Detector(args.model, use_npu=True)
print("Initializing optical flow...")
self.optical_flow = SparseOpticalFlow((640, 480))
print("Initializing tracker...")
self.tracker = SimpleTracker()
print("Initializing fusion engine...")
self.fusion = VelocityFusionEngine(
pixels_per_meter=ppm,
homography_matrix=H,
fps=args.fps,
)
# 配置
self.fps = args.fps
self.run_yolo_every = 2 # 每 2 帧跑一次 YOLO
self.frame_count = 0
# 结果存储
self.results = []
self.output_dir = Path(args.output)
self.output_dir.mkdir(parents=True, exist_ok=True)
def run_video(self, input_source):
"""
主处理循环
input_source: 摄像头设备路径 或 视频文件路径
"""
cap = cv2.VideoCapture(input_source)
if not cap.isOpened():
raise RuntimeError(f"Cannot open {input_source}")
# 设置分辨率(降低到 640x480 以节省性能)
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
frame_idx = 0
fps_timer = []
print(f"Starting processing...")
print(f"Input: {input_source}")
print(f"YOLO runs every {self.run_yolo_every} frames")
try:
while True:
frame_start = time.perf_counter()
ret, frame = cap.read()
if not ret:
break
# ---- 阶段 1: YOLO 检测(隔帧) ----
if frame_idx % self.run_yolo_every == 0:
yolo_detections = self.detector.detect(frame)
# 追踪
tracked = self.tracker.update(yolo_detections)
# 更新融合引擎
self.fusion.update_yolo(tracked, frame_idx)
else:
# 不跑 YOLO,只更新追踪(预测模式)
tracked = []
# ---- 阶段 2: 光流计算 ----
flow_vectors = self.optical_flow.process_frame(
frame,
yolo_detections if frame_idx % self.run_yolo_every == 0 else None
)
self.fusion.update_flow(flow_vectors)
# ---- 阶段 3: 融合 ----
fusion_result = self.fusion.compute_fused_velocity(frame_idx)
# ---- 阶段 4: 可视化 ----
display_frame = self._visualize(
frame, tracked, flow_vectors, fusion_result
)
# ---- 阶段 5: 记录结果 ----
if frame_idx % self.fps == 0: # 每秒记录一次
self.results.append({
'timestamp': datetime.now().isoformat(),
'frame': frame_idx,
**fusion_result,
})
# 性能计时
frame_time = time.perf_counter() - frame_start
fps_timer.append(frame_time)
if frame_idx % 30 == 0 and fps_timer:
avg_fps = len(fps_timer) / sum(fps_timer)
print(f"Frame {frame_idx}: {avg_fps:.1f} FPS | "
f"Velocity: {fusion_result.get('velocity', 0):.3f} m/s | "
f"Confidence: {fusion_result.get('confidence', 'N/A')}")
# 显示(如果有屏幕)
cv2.imshow("Water Flow Velocity", display_frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
frame_idx += 1
except KeyboardInterrupt:
print("\nInterrupted by user")
finally:
cap.release()
cv2.destroyAllWindows()
self._save_results()
# 最终统计
if fps_timer:
avg_fps = len(fps_timer) / sum(fps_timer)
print(f"\nAverage FPS: {avg_fps:.1f}")
print(f"Total frames: {frame_idx}")
def _visualize(self, frame, tracked, flow_vectors, fusion_result):
"""可视化输出"""
display = frame.copy()
# 绘制 YOLO 检测结果
for det in tracked:
x1, y1, x2, y2 = [int(v) for v in det['bbox']]
tid = det.get('track_id', 0)
cv2.rectangle(display, (x1, y1), (x2, y2), (0, 255, 0), 2)
cv2.putText(display, f"ID:{tid}", (x1, y1 - 5),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1)
# 绘制光流向量
for (x1, y1, x2, y2, pid) in flow_vectors[:50]: # 最多 50 条
cv2.arrowedLine(display,
(int(x1), int(y1)),
(int(x2), int(y2)),
(255, 0, 0), 1)
# 绘制融合结果
velocity = fusion_result.get('velocity')
confidence = fusion_result.get('confidence', 'invalid')
if velocity is not None:
# 颜色根据置信度
if confidence.value == 'high':
color = (0, 255, 0) # 绿
elif confidence.value == 'medium':
color = (0, 255, 255) # 黄
elif confidence.value == 'low':
color = (0, 165, 255) # 橙
else:
color = (0, 0, 255) # 红
cv2.putText(display,
f"Velocity: {velocity:.3f} m/s",
(10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
cv2.putText(display,
f"Confidence: {confidence.value}",
(10, 60),
cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1)
# 绘制 FPS
cv2.putText(display, f"Frame: {self.frame_count}",
(10, frame.shape[0] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
return display
def _save_results(self):
"""保存结果到文件"""
output_file = self.output_dir / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(output_file, 'w') as f:
json.dump(self.results, f, indent=2)
print(f"Results saved to {output_file}")
def parse_args():
parser = argparse.ArgumentParser(description="Water Flow Velocity Detection on RV1126B")
parser.add_argument('--model', required=True, help='Path to RKNN model')
parser.add_argument('--input', default='/dev/video0', help='Camera device or video file')
parser.add_argument('--calibration', required=True, help='Calibration JSON file')
parser.add_argument('--output', default='./results', help='Output directory')
parser.add_argument('--fps', type=int, default=30, help='Video FPS')
return parser.parse_args()
if __name__ == '__main__':
args = parse_args()
system = WaterFlowVelocitySystem(args)
system.run_video(args.input)
9.2 标定文件格式
{
"calibration_date": "2026-04-23",
"location": "XX River, Section A",
"method": "homography",
"homography_matrix": [
[1.2e-3, 3.4e-5, -0.5],
[2.1e-5, 1.1e-3, -0.3],
[8.7e-7, 2.3e-7, 1.0]
],
"control_points": [
{
"pixel": [100, 200],
"real": [0.0, 0.0]
},
{
"pixel": [500, 180],
"real": [5.2, 0.3]
},
{
"pixel": [150, 600],
"real": [0.5, 3.1]
},
{
"pixel": [600, 580],
"real": [5.8, 3.4]
}
],
"camera": {
"resolution": "1920x1080",
"fps": 30,
"height_above_water": 3.5,
"angle_degrees": 45
},
"validation": {
"mean_error_meters": 0.15,
"max_error_meters": 0.28,
"num_validation_points": 4
}
}
十、构建与部署流程
10.1 完整构建脚本
#!/bin/bash
# build_rv1126b.sh — 完整的构建和部署流程
set -e
echo "=== YOLO11 + RV1126B Water Flow Detection Build ==="
# ========== 阶段 1: 训练模型 (x86 主机) ==========
echo ""
echo "=== Phase 1: Training YOLO11 ==="
python train_yolo11.py \
--data water_flow.yaml \
--epochs 150 \
--imgsz 640 \
--batch 16 \
--device 0
echo "Training complete!"
# ========== 阶段 2: 导出 ONNX ==========
echo ""
echo "=== Phase 2: Exporting ONNX ==="
python export_onnx.py \
--model runs/detect/water_flow_yolo11n/weights/best.pt \
--imgsz 320 \
--output yolo11n_water_flow.onnx
echo "ONNX export complete!"
# ========== 阶段 3: 转换为 RKNN INT8 ==========
echo ""
echo "=== Phase 3: Converting to RKNN (INT8) ==="
python export_rknn.py \
--onnx yolo11n_water_flow.onnx \
--output yolo11n_water_flow_int8.rknn \
--calibration-dir ./calibration_images \
--target rv1126b
echo "RKNN conversion complete!"
# ========== 阶段 4: 部署到 RV1126B ==========
echo ""
echo "=== Phase 4: Deploying to RV1126B ==="
# 传输到设备
scp yolo11n_water_flow_int8.rknn \
calibration.json \
main.py \
yolo11_detector.py \
optical_flow.py \
fusion_engine.py \
root@192.168.1.100:/app/water_flow/
echo "Deployment complete!"
echo ""
echo "To run on RV1126B:"
echo " ssh root@192.168.1.100"
echo " cd /app/water_flow"
echo " python3 main.py --model yolo11n_water_flow_int8.rknn \\"
echo " --input /dev/video0 \\"
echo " --calibration calibration.json"
10.2 RV1126B SDK 集成
# 如果使用 Rockchip 官方 SDK 构建完整固件
# 1. 获取 RV1126B SDK
repo init --repo-url https://github.com/rockchip-linux/repo \
-u https://github.com/rockchip-linux/manifests \
-b master -m rv1126b.xml
repo sync
# 2. 编译 SDK
./build.sh lunch # 选择 rv1126b
./build.sh
# 3. 将应用加入 Buildroot
# 在 external/ 下创建自定义包
# 4. 烧录固件
./build.sh updateimg
./upgrade_tool ul RV1126B_XXX.img
10.3 C++ 推理(生产环境推荐)
/*
* yolo11_rknn.cpp — C++ 版 RKNN 推理(性能更好)
*
* 编译:
* arm-linux-gnueabihf-g++ yolo11_rknn.cpp -o yolo11_detect \
* -lrknnrt -lopencv_core -lopencv_imgproc -lopencv_video \
* -I/usr/include/rknn
*/
#include <rknn_api.h>
#include <opencv2/opencv.hpp>
#include <iostream>
#include <vector>
#include <chrono>
class YOLO11RKNN {
public:
YOLO11RKNN(const char* model_path) {
// 加载 RKNN 模型
FILE* fp = fopen(model_path, "rb");
fseek(fp, 0, SEEK_END);
size_t size = ftell(fp);
fseek(fp, 0, SEEK_SET);
void* model_data = malloc(size);
fread(model_data, 1, size, fp);
fclose(fp);
// 初始化
int ret = rknn_init(&ctx_, model_data, size, 0, NULL);
if (ret < 0) {
std::cerr << "rknn_init failed: " << ret << std::endl;
exit(-1);
}
free(model_data);
// 获取输入输出信息
rknn_input_output_num io_num;
rknn_query(ctx_, RKNN_QUERY_IN_OUT_NUM, &io_num, sizeof(io_num));
std::cout << "Input count: " << io_num.n_input << std::endl;
std::cout << "Output count: " << io_num.n_output << std::endl;
}
std::vector<Detection> detect(const cv::Mat& frame) {
// 预处理
cv::Mat input = preprocess(frame);
// 设置输入
rknn_input inputs[1];
memset(inputs, 0, sizeof(inputs));
inputs[0].index = 0;
inputs[0].type = RKNN_TENSOR_UINT8;
inputs[0].size = 320 * 320 * 3;
inputs[0].fmt = RKNN_TENSOR_NHWC;
inputs[0].buf = input.data;
rknn_inputs_set(ctx_, 1, inputs);
// 推理
rknn_run(ctx_, nullptr);
// 获取输出
rknn_output outputs[1];
memset(outputs, 0, sizeof(outputs));
outputs[0].want_float = 1;
rknn_outputs_get(ctx_, 1, outputs, NULL);
// 后处理
auto detections = postprocess(outputs[0].buf, outputs[0].size);
rknn_outputs_release(ctx_, 1, outputs);
return detections;
}
~YOLO11RKNN() {
rknn_destroy(ctx_);
}
private:
rknn_context ctx_;
cv::Mat preprocess(const cv::Mat& frame) {
cv::Mat resized, padded;
cv::resize(frame, resized, cv::Size(320, 320));
cv::cvtColor(resized, padded, cv::COLOR_BGR2RGB);
return padded;
}
// ... (后处理实现)
};
struct Detection {
float x1, y1, x2, y2;
float confidence;
int class_id;
int track_id;
};
十一、性能优化清单
11.1 RV1126B 极限优化
目标:在 RV1126B 上达到 15-20 FPS
┌──────────────────┬──────────┬────────────────────────────┐
│ 优化项 │ 预期收益 │ 实现难度 │
├──────────────────┼──────────┼────────────────────────────┤
│ YOLO11n INT8 │ 基准 │ - │
│ 输入 320→256 │ +20% FPS │ 低(需重新训练/微调) │
│ 隔帧检测 │ +50% FPS │ 极低 │
│ C++ 推理 │ +10% FPS │ 中 │
│ RGA 硬件缩放 │ +5% FPS │ 中(RV1126B 有 RGA 单元)│
│ 光流隔帧 │ +50% CPU │ 极低 │
│ 光流降采样 4x │ +300% CPU │ 极低 │
│ 内存预分配 │ -10% 抖动 │ 中 │
│ NEON 优化 │ +30% CPU │ 高 │
└──────────────────┴──────────┴────────────────────────────┘
推荐组合:
YOLO11n INT8 320×320 + 隔帧检测 + C++ 推理 + RGA 缩放
→ 预期 15-20 FPS(YOLO)+ 光流并行
光流单独:
降采样 4x + 隔帧 + ROI 裁剪
→ 预期 15-25 FPS(CPU)
综合:15 FPS 以上(两者交替执行)
11.2 内存管理
# RV1126B 内存优化
class MemoryOptimizedPipeline:
"""
RV1126B 只有 256-512MB 内存
需要严格管理
"""
def __init__(self):
# 预分配所有 buffer
self.frame_buffer = np.zeros((480, 640, 3), dtype=np.uint8)
self.gray_buffer1 = np.zeros((480, 640), dtype=np.uint8)
self.gray_buffer2 = np.zeros((480, 640), dtype=np.uint8)
self.small_gray1 = np.zeros((120, 160), dtype=np.uint8)
self.small_gray2 = np.zeros((120, 160), dtype=np.uint8)
self.rknn_input = np.zeros((320, 320, 3), dtype=np.uint8)
self.flow_field = np.zeros((120, 160, 2), dtype=np.float32)
# 复用而不是重新分配
# ...
def process(self, raw_frame):
# 直接拷贝到预分配 buffer
np.copyto(self.frame_buffer[:raw_frame.shape[0], :raw_frame.shape[1]], raw_frame)
# 而不是: gray = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY)
# ...
十二、测试与验证
12.1 精度验证流程
步骤 1:实验室验证
├── 在已知流速的水槽中测试
├── 对比 YOLO+光流 结果与标准流速仪
└── 记录误差分布
步骤 2:现场验证
├── 在实际河道部署
├── 同步使用浮标法或 ADCP 测量
├── 在不同流速下测试(低/中/高)
└── 记录不同光照条件下的表现
步骤 3:长期稳定性
├── 连续运行 7 天
├── 监控内存泄漏
├── 监控精度漂移
└── 记录设备稳定性
12.2 基准测试脚本
"""
benchmark.py — 在 RV1126B 上运行基准测试
"""
import time
import numpy as np
import cv2
def benchmark_yolo(detector, num_runs=100):
"""YOLO 推理基准测试"""
test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
# 预热
for _ in range(10):
detector.detect(test_frame)
# 测试
times = []
for _ in range(num_runs):
start = time.perf_counter()
detector.detect(test_frame)
times.append(time.perf_counter() - start)
avg = np.mean(times)
p50 = np.percentile(times, 50)
p95 = np.percentile(times, 95)
p99 = np.percentile(times, 99)
print(f"YOLO11 Benchmark ({num_runs} runs):")
print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)")
print(f" P50: {p50*1000:.1f}ms")
print(f" P95: {p95*1000:.1f}ms")
print(f" P99: {p99*1000:.1f}ms")
print(f" Min: {min(times)*1000:.1f}ms")
print(f" Max: {max(times)*1000:.1f}ms")
def benchmark_optical_flow(flow_processor, num_runs=100):
"""光流基准测试"""
frame1 = np.random.randint(0, 255, (120, 160), dtype=np.uint8)
frame2 = np.random.randint(0, 255, (120, 160), dtype=np.uint8)
times = []
for _ in range(num_runs):
start = time.perf_counter()
cv2.calcOpticalFlowFarneback(frame1, frame2, None,
pyr_scale=0.5, levels=3,
winsize=15, iterations=3,
poly_n=5, poly_sigma=1.2, flags=0)
times.append(time.perf_counter() - start)
frame1, frame2 = frame2, frame1
avg = np.mean(times)
print(f"Optical Flow Benchmark ({num_runs} runs):")
print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)")
print(f" P95: {np.percentile(times, 95)*1000:.1f}ms")
def benchmark_full_pipeline(system, num_frames=300):
"""完整流水线基准测试"""
test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
times = []
for i in range(num_frames):
start = time.perf_counter()
# 模拟一帧的处理
if i % 2 == 0:
system.detector.detect(test_frame)
system.optical_flow.process_frame(test_frame)
elapsed = time.perf_counter() - start
times.append(elapsed)
avg = np.mean(times)
print(f"Full Pipeline Benchmark ({num_frames} frames):")
print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)")
print(f" P95: {np.percentile(times, 95)*1000:.1f}ms")
if __name__ == '__main__':
print("=" * 50)
print("RV1126B Benchmark Suite")
print("=" * 50)
# 需要先在 RV1126B 上初始化组件
# benchmark_yolo(detector)
# benchmark_optical_flow(flow)
# benchmark_full_pipeline(system)
十三、项目文件结构
water-flow-rv1126b/
├── training/ # 训练阶段(x86 主机)
│ ├── train_yolo11.py # YOLO11 训练脚本
│ ├── export_onnx.py # ONNX 导出
│ ├── export_rknn.py # RKNN 转换
│ ├── water_flow.yaml # 数据集配置
│ └── calibration_images/ # INT8 校准数据
│
├── deployment/ # 部署阶段(RV1126B)
│ ├── main.py # 主程序
│ ├── yolo11_detector.py # YOLO11 RKNN 推理
│ ├── optical_flow.py # 光流模块
│ ├── fusion_engine.py # 融合引擎
│ ├── tracker.py # 轻量追踪器
│ ├── calibration.json # 标定参数
│ └── yolo11n_water_flow_int8.rknn # RKNN 模型
│
├── cpp/ # C++ 生产版本
│ ├── yolo11_rknn.cpp
│ ├── optical_flow.cpp
│ ├── fusion_engine.cpp
│ ├── main.cpp
│ └── CMakeLists.txt
│
├── benchmark/
│ └── benchmark.py # 性能基准测试
│
├── docs/
│ └── calibration_guide.md # 标定操作指南
│
└── scripts/
└── build_rv1126b.sh # 构建部署脚本
十四、快速启动检查清单
[ ] 1. 在 x86 主机上安装 Ultralytics + RKNN Toolkit2
[ ] 2. 采集 1000+ 张水面示踪物图像
[ ] 3. 标注数据(COCO/YOLO 格式)
[ ] 4. 训练 YOLO11n 模型(mAP@50 > 0.85)
[ ] 5. 导出 ONNX (imgsz=320)
[ ] 6. 准备 100-200 张校准图片
[ ] 7. 转换为 RKNN INT8 模型
[ ] 8. 在主机上模拟 RV1126B 推理验证
[ ] 9. 传输到 RV1126B 开发板
[ ] 10. 执行相机标定,生成 calibration.json
[ ] 11. 在 RV1126B 上运行基准测试
[ ] 12. 连接相机,端到端测试
[ ] 13. 与传统方法对比验证精度
[ ] 14. 调优参数(检测阈值、光流参数、融合权重)
创建:2026-04-23 | 平台:RV1126B | 模型:YOLO11n