From 4981480d533ab946a63638b9b57c2b388b4febba Mon Sep 17 00:00:00 2001 From: FNS Service Date: Thu, 23 Apr 2026 19:03:28 +0800 Subject: [PATCH] Update from Sync Service --- .../YOLO11+RV1126B水流速度检测-混合方案实现.md | 2213 +++++++++++++++++ 1 file changed, 2213 insertions(+) create mode 100755 专业领域/ByteTrack/YOLO11+RV1126B水流速度检测-混合方案实现.md diff --git a/专业领域/ByteTrack/YOLO11+RV1126B水流速度检测-混合方案实现.md b/专业领域/ByteTrack/YOLO11+RV1126B水流速度检测-混合方案实现.md new file mode 100755 index 0000000..f1e0395 --- /dev/null +++ b/专业领域/ByteTrack/YOLO11+RV1126B水流速度检测-混合方案实现.md @@ -0,0 +1,2213 @@ +# YOLO11 + RV1126B 水流速度检测 — 混合方案完整实现 + +> **核心方案**:YOLO11 检测离散示踪物 + 光流法补充无示踪物区域 + 结果融合 +> +> **硬件平台**:Rockchip RV1126B (1.2 TOPS NPU) +> +> 创建:2026-04-23 + +--- + +## 一、RV1126B 硬件约束分析 + +### 1.1 芯片规格 + +| 参数 | 规格 | 对我们的影响 | +|------|------|-------------| +| **NPU** | 1.2 TOPS (INT8) | 只能跑小模型,YOLO11n 是上限 | +| **CPU** | ARM Cortex-A7 (单核) | 光流法跑 CPU,性能有限 | +| **内存** | 256MB-512MB DDR3 | 模型不能太大,批处理=1 | +| **视频编码** | H.264/H.265 硬编解码 | 可以利用 VPU 解码减轻 CPU | +| **ISP** | 支持 MIPI CSI 相机输入 | 直接接摄像头 | +| **操作系统** | Buildroot / Debian (Linux) | 交叉编译环境 | + +### 1.2 性能预期 + +| 模型 | 输入尺寸 | 量化方式 | 预期 FPS | 备注 | +|------|---------|---------|---------|------| +| YOLO11n | 320×320 | INT8 | ~25-30 | **推荐**,可以实时 | +| YOLO11n | 640×640 | INT8 | ~8-12 | 精度更高但不够实时 | +| YOLO11s | 320×320 | INT8 | ~12-15 | 可能内存紧张 | +| YOLO11n | 320×320 | FP16 | ~15-20 | 精度略高,速度慢 | + +**关键决策:使用 YOLO11n + 320×320 + INT8 量化** + +--- + +## 二、整体架构设计 + +``` +┌────────────────────────────────────────────────────────────┐ +│ RV1126B 处理流水线 │ +│ │ +│ ┌──────────┐ ┌────────────┐ ┌──────────────────┐ │ +│ │ 摄像头 │───>│ VPU 硬解码 │───>│ 帧缓冲 (NV12) │ │ +│ │ MIPI/USB │ │ (硬件加速) │ │ │ │ +│ └──────────┘ └────────────┘ └────────┬─────────┘ │ +│ │ │ +│ ┌─────────────────┼──────────┐ │ +│ │ │ │ │ +│ ▼ ▼ │ │ +│ ┌─────────────┐ ┌──────────────┐ │ │ +│ │ NPU 推理 │ │ CPU 光流计算 │ │ │ +│ │ YOLO11n │ │ Farneback │ │ │ +│ │ INT8 RKNN │ │ 稀疏光流 │ │ │ +│ │ 320×320 │ │ ROI 区域 │ │ │ +│ └──────┬──────┘ └──────┬───────┘ │ │ +│ │ │ │ │ +│ ▼ ▼ │ │ +│ ┌─────────────────────────────┐ │ │ +│ │ 结果融合引擎 │ │ │ +│ │ │ │ │ +│ │ YOLO 区域 → 精确轨迹 │ │ │ +│ │ 光流区域 → 稠密速度场 │ │ │ +│ │ 融合 → 综合流速估计 │ │ │ +│ └──────────────┬──────────────┘ │ │ +│ │ │ │ +│ ▼ │ │ +│ ┌─────────────────────────────┐ │ │ +│ │ 流速计算 + 像素→实际坐标转换 │ │ │ +│ │ 单应性矩阵 / 标定因子 │ │ │ +│ └──────────────┬──────────────┘ │ │ +│ │ │ │ +│ ▼ │ │ +│ ┌─────────────────────────────┐ │ │ +│ │ 结果输出: JSON / MQTT / 显示 │ │ │ +│ └─────────────────────────────┘ │ │ +└──────────────────────────────────────────────────────┘ + +时序规划 (目标 30 FPS, 每帧 33ms): +├── VPU 解码: ~3ms (硬件) +├── YOLO11n 推理: ~35ms (NPU, INT8) → 可以每 2 帧跑一次 +├── 光流计算: ~15ms (CPU, 稀疏/降采样) +├── 结果融合: ~2ms (CPU) +└── 总周期: 每帧 ~20ms (YOLO 隔帧) → 可达 15-20 FPS +``` + +--- + +## 三、环境搭建 + +### 3.1 主机端(训练 + 模型转换) + +```bash +# 在 x86 Linux 主机上操作(用于训练和模型转换) + +# 1. 创建 Python 环境 +conda create -n yolo11-rv1126 python=3.10 -y +conda activate yolo11-rv1126 + +# 2. 安装 Ultralytics (YOLO11) +pip install ultralytics + +# 3. 安装 RKNN Toolkit2 +# 下载: https://github.com/airockchip/rknn-toolkit2 +# 选择对应版本(Ubuntu x86) +git clone https://github.com/airockchip/rknn-toolkit2.git +cd rknn-toolkit2/rknn-toolkit2/packages/ +pip install rknn_toolkit2-*.whl + +# 4. 其他依赖 +pip install opencv-python numpy onnx onnxruntime +pip install roboflow # 数据管理(可选) +pip install supervision # 可视化 +``` + +### 3.2 RV1126B 目标板环境 + +```bash +# 在 RV1126B 开发板上操作 + +# 1. 安装 RKNN Runtime(推理运行时,不含训练功能) +# 从 Rockchip SDK 或板子系统镜像中获取 +# 通常在 /usr/lib/librknnrt.so + +# 2. 安装 Python RKNN 运行时 +pip install rknn_toolkit2_lite # 轻量版,只含推理 + +# 3. 安装 OpenCV(用于光流和后处理) +# 使用 Buildroot 编译或 apt 安装 +apt-get install python3-opencv # 如果使用 Debian 系统 + +# 4. 其他依赖 +pip install numpy +``` + +### 3.3 交叉编译工具链 + +```bash +# 安装 Rockchip 交叉编译工具链 +# 从 Rockchip SDK 获取 + +# 设置环境变量 +export CROSS_COMPILE=arm-buildroot-linux-gnueabihf- +export CC=${CROSS_COMPILE}gcc +export CXX=${CROSS_COMPILE}g++ + +# 交叉编译 OpenCV(如果 Buildroot 未包含) +# 交叉编译 Python C 扩展模块 +``` + +--- + +## 四、YOLO11 模型训练 + +### 4.1 数据准备 + +```yaml +# water_flow.yaml +path: ./water_flow_dataset +train: images/train +val: images/val + +nc: 1 +names: + - floating_object +``` + +``` +数据集结构: +water_flow_dataset/ +├── images/ +│ ├── train/ # 800-2000 张 +│ └── val/ # 200-500 张 +└── labels/ + ├── train/ # YOLO 格式标注 + └── val/ +``` + +### 4.2 YOLO11n 训练 + +```python +from ultralytics import YOLO + +# 加载 YOLO11n 预训练模型 +model = YOLO('yolo11n.pt') + +# 训练配置 +results = model.train( + # 数据集 + data='water_flow.yaml', + + # 训练参数 + epochs=150, + imgsz=640, # 训练时用高分辨率 + batch=16, + device=0, + patience=30, + + # 针对水面场景的数据增强 + hsv_h=0.015, # 色调小幅变化 + hsv_s=0.7, # 饱和度大变化(水面反光) + hsv_v=0.4, # 亮度变化 + degrees=0.0, # 不旋转(相机固定) + translate=0.1, + scale=0.5, # 大尺度变化(适应不同距离目标) + fliplr=0.5, + mosaic=1.0, + mixup=0.1, + copy_paste=0.1, # 复制粘贴增强(示踪物少的场景) + + # 优化器 + optimizer='AdamW', + lr0=0.001, + lrf=0.01, + weight_decay=0.0005, + + # 保存 + project='runs/detect', + name='water_flow_yolo11n', + save_period=10, +) + +# 验证 +metrics = model.val() +print(f"mAP@50: {metrics.box.map50:.4f}") +print(f"mAP@50-95: {metrics.box.map:.4f}") +``` + +### 4.3 针对 RV1126B 的特殊训练策略 + +```python +# 关键:RV1126B NPU 是 INT8 量化,训练时需要考虑量化感知 + +from ultralytics import YOLO +import torch + +# 方案 A:PTQ(训练后量化)— 简单但精度损失 2-5% +# 直接导出 INT8,下面会讲到 + +# 方案 B:QAT(量化感知训练)— 精度损失 < 1%,推荐 +class QATWaterFlowTrainer: + """ + 量化感知训练,让模型适应 INT8 量化 + """ + def __init__(self): + self.model = YOLO('yolo11n.pt') + + def train_with_quantization(self, data_config, epochs=50): + # 第一阶段:正常训练到收敛 + self.model.train( + data=data_config, + epochs=epochs, + imgsz=640, + batch=16, + ) + + # 第二阶段:模拟量化微调 + # 注意:Ultralytics 不直接支持 QAT + # 需要手动添加 fake quantization + + # 加载最佳权重 + model = YOLO('runs/detect/water_flow_yolo11n/weights/best.pt') + + # 添加量化模拟(简化的做法:降低学习率继续训练) + model.train( + data=data_config, + epochs=20, + imgsz=320, # 用目标推理分辨率微调 + batch=32, + lr0=0.0001, # 极低学习率 + close_mosaic=10, # 关闭 mosaic 模拟推理条件 + ) + + return model +``` + +--- + +## 五、模型导出与 RKNN 转换 + +### 5.1 导出流程 + +``` +训练好的 YOLO11 PyTorch 模型 (.pt) + │ + ▼ (torch.onnx.export) + ONNX 模型 (.onnx) + │ + ▼ (RKNN Toolkit2) + RKNN 模型 (.rknn) — INT8 量化 + │ + ▼ (部署到 RV1126B) + NPU 推理 +``` + +### 5.2 导出 ONNX + +```python +from ultralytics import YOLO + +# 加载训练好的模型 +model = YOLO('runs/detect/water_flow_yolo11n/weights/best.pt') + +# 导出 ONNX +# 关键参数: +# - opset 13: RKNN 支持的最佳 opset +# - simplify: 简化计算图 +# - imgsz 320: 导出目标推理尺寸(RV1126B 用 320) +model.export( + format='onnx', + imgsz=320, # 目标推理分辨率 + opset=13, # ONNX opset 版本 + simplify=True, # 简化模型 + dynamic=False, # 固定输入尺寸(NPU 不支持动态) + half=False, # 不需要 FP16,用 INT8 + int8=False, # ONNX 导出不量化,后面用 RKNN 量化 +) + +# 输出: runs/detect/water_flow_yolo11n/weights/best.onnx +print("ONNX 导出完成") +``` + +### 5.3 ONNX → RKNN 转换(INT8 量化) + +```python +""" +export_rknn.py — 在 x86 主机上运行 +将 ONNX 模型转换为 RKNN INT8 量化模型 +""" +from rknn.api import RKNN +import cv2 +import numpy as np +import os + +# ========== 配置 ========== +ONNX_MODEL = 'runs/detect/water_flow_yolo11n/weights/best.onnx' +RKNN_MODEL = 'yolo11n_water_flow_int8.rknn' +CALIBRATION_DIR = './calibration_images/' # 校准数据集 +DATASET_TXT = 'calibration_list.txt' # 校准图片列表 +TARGET_PLATFORM = 'rv1126b' # 或 'rk3588' 等 + +# ========== 准备校准数据 ========== +def prepare_calibration_images(image_dir, output_txt, num_images=100): + """ + 准备 INT8 量化校准数据 + 需要 100-500 张代表性图片 + """ + images = [f for f in os.listdir(image_dir) if f.endswith(('.jpg', '.png'))] + images = images[:num_images] + + with open(output_txt, 'w') as f: + for img in images: + f.write(os.path.join(image_dir, img) + '\n') + + print(f"Prepared {len(images)} calibration images") + +# 准备校准数据 +prepare_calibration_images(CALIBRATION_DIR, DATASET_TXT) + +# ========== RKNN 转换 ========== +rknn = RKNN(verbose=True) + +# 1. 配置 +print("--> Configuring RKNN model") +ret = rknn.config( + mean_values=[[0, 0, 0]], # 根据训练时的归一化调整 + std_values=[[255, 255, 255]], # YOLO 通常是 /255 + target_platform=TARGET_PLATFORM, + quantized_dtype='asymmetric_affine-u8', # INT8 量化 + quantized_algorithm='KLD', # KLD 校准算法 +) +if ret != 0: + print("Config failed!") + exit() + +# 2. 加载 ONNX +print("--> Loading ONNX model") +ret = rknn.load_onnx(model=ONNX_MODEL) +if ret != 0: + print("Load ONNX failed!") + exit() + +# 3. 构建模型(含 INT8 量化) +print("--> Building RKNN model with INT8 quantization") +ret = rknn.build( + do_quantization=True, + dataset=DATASET_TXT, + num_preprocess=1, # 预处理放在 NPU 上 +) +if ret != 0: + print("Build failed!") + exit() + +# 4. 导出 RKNN 模型 +print(f"--> Exporting RKNN model to {RKNN_MODEL}") +ret = rknn.export_rknn(RKNN_MODEL) +if ret != 0: + print("Export failed!") + exit() + +# 5. (可选)在主机上模拟 RV1126B 推理验证 +print("--> Testing on host (simulating RV1126B)") +ret = rknn.init_runtime(target=TARGET_PLATFORM) +if ret != 0: + print("Init runtime failed!") + exit() + +# 推理测试 +test_img = cv2.imread(os.path.join(CALIBRATION_DIR, os.listdir(CALIBRATION_DIR)[0])) +test_img = cv2.resize(test_img, (320, 320)) +test_img = cv2.cvtColor(test_img, cv2.COLOR_BGR2RGB) + +outputs = rknn.inference(inputs=[test_img]) +print(f"RKNN inference output shape: {[o.shape for o in outputs]}") + +rknn.release() +print("Done!") +``` + +### 5.4 RKNN 量化配置详解 + +```python +# 不同量化策略对比 + +# 策略 1:对称量化(速度快,精度略低) +rknn.config( + quantized_dtype='symmetric_affine-u8', + quantized_algorithm='MIN_MAX', +) + +# 策略 2:非对称量化(推荐,精度更好) +rknn.config( + quantized_dtype='asymmetric_affine-u8', + quantized_algorithm='KLD', +) + +# 策略 3:混合量化(部分层 INT8,部分层 FP16) +rknn.config( + quantized_dtype='mixed', + # 需要在模型配置中指定哪些层用 FP16 +) + +# 量化校准技巧: +# 1. 校准数据需要覆盖所有场景(不同光照、不同流速、不同天气) +# 2. 至少 100 张,推荐 200-500 张 +# 3. 校准数据不需要标注,只需要原始图片 +# 4. 图片尺寸需要和训练时一致 +``` + +--- + +## 六、YOLO11 检测模块(RV1126B 端) + +### 6.1 RKNN 推理封装 + +```python +""" +yolo11_detector.py — RV1126B 端 YOLO11 检测器 +使用 RKNN Runtime 在 NPU 上推理 +""" +import cv2 +import numpy as np +import time +try: + from rknn.api import RKNN +except ImportError: + # 主机端开发用 mock + from rknn.mock_runtime import RKNN + + +class YOLO11Detector: + """ + YOLO11n RKNN 推理封装 + 专为 RV1126B 优化 + """ + + # YOLO11 默认参数 + INPUT_SIZE = 320 + CONF_THRESHOLD = 0.25 + NMS_THRESHOLD = 0.45 + NUM_CLASSES = 1 + + def __init__(self, rknn_model_path, use_npu=True): + self.rknn = RKNN() + self.use_npu = use_npu + + # 加载模型 + print(f"Loading RKNN model: {rknn_model_path}") + ret = self.rknn.load_rknn(rknn_model_path) + if ret != 0: + raise RuntimeError("Failed to load RKNN model") + + # 初始化运行时 + if use_npu: + ret = self.rknn.init_runtime(target='rv1126b', device_id=None) + else: + ret = self.rknn.init_runtime(target=None) # CPU 模拟 + + if ret != 0: + raise RuntimeError("Failed to init RKNN runtime") + + print("YOLO11 Detector initialized") + + def preprocess(self, frame): + """ + 预处理: resize + BGR→RGB + normalize + 输入: BGR 图像 (任意尺寸) + 输出: RGB numpy array (320×320) + """ + # 保持宽高比的 resize + padding + h, w = frame.shape[:2] + scale = self.INPUT_SIZE / max(h, w) + new_w, new_h = int(w * scale), int(h * scale) + + resized = cv2.resize(frame, (new_w, new_h)) + + # padding 到 320×320 + padded = np.full((self.INPUT_SIZE, self.INPUT_SIZE, 3), 114, dtype=np.uint8) + top = (self.INPUT_SIZE - new_h) // 2 + left = (self.INPUT_SIZE - new_w) // 2 + padded[top:top+new_h, left:left+new_w] = resized + + # BGR → RGB + rgb = cv2.cvtColor(padded, cv2.COLOR_BGR2RGB) + + # 记录变换信息(用于后处理还原坐标) + transform_info = { + 'original_size': (w, h), + 'scale': scale, + 'padding': (left, top), + } + + return rgb, transform_info + + def postprocess(self, outputs, transform_info, conf_threshold=None): + """ + 解析 YOLO11 输出 + + YOLO11 ONNX 输出格式: (1, 84, 8400) for COCO + - 84 = 4 (bbox) + 80 (classes) + - 8400 = 检测锚点数量 + + 对于我们的自定义模型: + - 84 = 4 (bbox) + 1 (class) + """ + if conf_threshold is None: + conf_threshold = self.CONF_THRESHOLD + + # 输出解析 + # YOLO11 输出: (1, num_classes + 4, num_anchors) + pred = outputs[0] # shape: (1, 5, 8400) for 1 class + pred = pred[0].T # shape: (8400, 5) → (8400, 5) + + boxes = [] + scores = [] + class_ids = [] + + for i in range(pred.shape[0]): + score = pred[i, 4] # 置信度(只有一个类别) + if score < conf_threshold: + continue + + # bbox (中心点 + 宽高) + cx, cy, bw, bh = pred[i, 0:4] + + # 还原到原始图像坐标 + cx = (cx - transform_info['padding'][0]) / transform_info['scale'] + cy = (cy - transform_info['padding'][1]) / transform_info['scale'] + bw = bw / transform_info['scale'] + bh = bh / transform_info['scale'] + + # 转为 x1, y1, x2, y2 + x1 = cx - bw / 2 + y1 = cy - bh / 2 + x2 = cx + bw / 2 + y2 = cy + bh / 2 + + # 裁剪到图像边界 + orig_w, orig_h = transform_info['original_size'] + x1 = max(0, x1) + y1 = max(0, y1) + x2 = min(orig_w, x2) + y2 = min(orig_h, y2) + + boxes.append([x1, y1, x2, y2]) + scores.append(float(score)) + class_ids.append(0) + + if len(boxes) == 0: + return [] + + # NMS + indices = cv2.dnn.NMSBoxes( + boxes, scores, conf_threshold, self.NMS_THRESHOLD + ) + + detections = [] + if len(indices) > 0: + for i in indices.flatten(): + detections.append({ + 'bbox': boxes[i], + 'score': scores[i], + 'class_id': class_ids[i], + 'center': ( + (boxes[i][0] + boxes[i][2]) / 2, + (boxes[i][1] + boxes[i][3]) / 2 + ), + }) + + return detections + + def detect(self, frame, conf_threshold=None): + """ + 完整检测流程: 预处理 → NPU推理 → 后处理 + + Args: + frame: BGR 图像 (numpy array) + conf_threshold: 置信度阈值 + + Returns: + list of detection dicts + """ + # 预处理 + rgb, transform_info = self.preprocess(frame) + + # NPU 推理 + outputs = self.rknn.inference(inputs=[rgb]) + + # 后处理 + detections = self.postprocess(outputs, transform_info, conf_threshold) + + return detections + + def detect_batch(self, frames): + """批量检测(如果 NPU 支持,但目前 RV1126B batch=1)""" + return [self.detect(f) for f in frames] + + def get_fps(self, num_iterations=100, test_frame=None): + """ + 测量推理 FPS + """ + if test_frame is None: + test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8) + + times = [] + for _ in range(num_iterations): + start = time.perf_counter() + self.detect(test_frame) + elapsed = time.perf_counter() - start + times.append(elapsed) + + avg_time = np.mean(times) + fps = 1.0 / avg_time + print(f"Average inference time: {avg_time*1000:.1f}ms, FPS: {fps:.1f}") + return fps + + def release(self): + self.rknn.release() +``` + +### 6.2 检测性能优化 + +```python +class OptimizedDetector(YOLO11Detector): + """ + 针对 RV1126B 的深度优化检测器 + """ + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # 预分配内存(避免推理时分配) + self._prealloc_buffer = np.zeros( + (self.INPUT_SIZE, self.INPUT_SIZE, 3), dtype=np.uint8 + ) + + def detect_fast(self, frame, conf_threshold=None): + """ + 快速检测路径 — 减少内存拷贝 + + 优化点: + 1. 原地 resize(减少内存分配) + 2. 跳过不必要的颜色转换(如果相机输出 RGB) + 3. 预分配推理输入 buffer + 4. 复用 NMS 输出 buffer + """ + # 如果相机可以输出 RGB,跳过 BGR→RGB 转换 + # 在 RV1126B ISP 中配置输出格式 + + # ... (实现同上,但优化内存操作) + pass + + def detect_roi(self, frame, roi): + """ + 只在 ROI 区域检测 — 大幅加速 + roi: (x1, y1, x2, y2) + """ + x1, y1, x2, y2 = roi + roi_frame = frame[y1:y2, x1:x2] + detections = self.detect(roi_frame) + + # 还原坐标 + for det in detections: + det['bbox'] = [ + det['bbox'][0] + x1, + det['bbox'][1] + y1, + det['bbox'][2] + x1, + det['bbox'][3] + y1, + ] + det['center'] = ( + det['center'][0] + x1, + det['center'][1] + y1, + ) + + return detections +``` + +--- + +## 七、光流法模块(CPU 端) + +### 7.1 为什么需要光流法 + +``` +YOLO 检测的局限: +├── 只能检测离散的目标 +├── 水面无漂浮物时 → 无法测速 +├── 示踪物太小 → 检测不到 +└── 需要足够标注数据 + +光流法的补充: +├── 可以追踪水面纹理运动 +├── 不需要离散目标 +├── 稠密或半稠密速度场 +└── 但精度低于 YOLO 轨迹 + +混合方案优势: +├── 有示踪物 → YOLO 精确轨迹(高精度) +├── 无示踪物 → 光流补充(覆盖盲区) +└── 两者结果融合 → 鲁棒性最强 +``` + +### 7.2 稀疏光流法实现(适合 RV1126B CPU) + +```python +""" +optical_flow.py — 光流法流速估计 +针对 RV1126B 单核 CPU 优化 +""" +import cv2 +import numpy as np +from collections import defaultdict + + +class SparseOpticalFlow: + """ + Lucas-Kanade 稀疏光流法 + + 优势: + - 计算量小(只追踪特征点) + - 适合嵌入式 CPU + - 可以追踪水面纹理特征 + + 策略: + - 只在 YOLO 未检测到的区域追踪 + - 或作为 YOLO 检测的补充 + """ + + def __init__(self, frame_size): + # Lucas-Kanade 参数 + self.lk_params = dict( + winSize=(15, 15), # 窗口大小 + maxLevel=2, # 金字塔层数 + criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03) + ) + + # 特征点检测参数 + self.feature_params = dict( + maxCorners=100, # 最多特征点数 + qualityLevel=0.01, # 质量阈值 + minDistance=10, # 最小间距 + blockSize=3, + ) + + self.frame_size = frame_size + + # 状态 + self.prev_gray = None + self.prev_points = [] + self.point_trajectories = defaultdict(list) + self.point_id_counter = 0 + + def process_frame(self, frame, yolo_detections=None, + exclude_radius=20): + """ + 处理单帧,追踪特征点 + + Args: + frame: BGR 当前帧 + yolo_detections: YOLO 检测结果(可选) + exclude_radius: 排除 YOLO 检测区域的半径(避免重复) + + Returns: + flow_vectors: [(x1, y1, x2, y2, point_id), ...] + 特征点位移向量 + """ + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + + if self.prev_gray is None: + self.prev_gray = gray + self._detect_features(gray, yolo_detections, exclude_radius) + return [] + + # 追踪特征点 + if len(self.prev_points) > 0: + prev_pts = np.array(self.prev_points, dtype=np.float32) + + next_pts, status, errors = cv2.calcOpticalFlowPyrLK( + self.prev_gray, gray, prev_pts, None, **self.lk_params + ) + + # 过滤有效点 + good_prev = [] + good_next = [] + valid_ids = [] + + for i, (next_pt, status_flag, error) in enumerate( + zip(next_pts, status, errors) + ): + if status_flag == 1 and error < 500: + # 检查是否在图像范围内 + if (0 < next_pt[0] < self.frame_size[0] and + 0 < next_pt[1] < self.frame_size[1]): + good_prev.append(self.prev_points[i]) + good_next.append(next_pt.tolist()) + valid_ids.append(self._point_ids[i]) + + # 记录轨迹 + for pid, prev_pt, next_pt in zip(valid_ids, good_prev, good_next): + self.point_trajectories[pid].append(next_pt) + # 限制轨迹长度 + if len(self.point_trajectories[pid]) > 30: + self.point_trajectories[pid] = \ + self.point_trajectories[pid][-30:] + + # 计算光流向量 + flow_vectors = [] + for pid, p, n in zip(valid_ids, good_prev, good_next): + flow_vectors.append((p[0], p[1], n[0], n[1], pid)) + + # 更新状态 + self.prev_points = good_next + self._point_ids = valid_ids + + # 补充新特征点(如果数量不足) + if len(self.prev_points) < 50: + self._detect_features(gray, yolo_detections, exclude_radius) + # 合并 + new_pts = [p for p in self.prev_points + if p not in good_next] + if new_pts: + self.prev_points.extend(new_pts[:20]) + + else: + self._detect_features(gray, yolo_detections, exclude_radius) + flow_vectors = [] + + self.prev_gray = gray + return flow_vectors + + def _detect_features(self, gray, yolo_detections=None, + exclude_radius=20): + """ + 检测新的特征点 + 排除 YOLO 检测区域(避免重复追踪) + """ + mask = np.ones(gray.shape[:2], dtype=np.uint8) * 255 + + if yolo_detections: + # 在 YOLO 检测区域设置 mask=0(排除) + for det in yolo_detections: + x1, y1, x2, y2 = [int(v) for v in det['bbox']] + cv2.rectangle(mask, (x1 - exclude_radius, y1 - exclude_radius), + (x2 + exclude_radius, y2 + exclude_radius), 0, -1) + + points = cv2.goodFeaturesToTrack( + gray, mask=mask, **self.feature_params + ) + + if points is not None: + self.prev_points = points.reshape(-1, 2).tolist() + self._point_ids = list(range( + self.point_id_counter, + self.point_id_counter + len(self.prev_points) + )) + self.point_id_counter += len(self.prev_points) + + def compute_velocity_field(self, fps): + """ + 从追踪的轨迹计算速度场 + + Returns: + velocities: [(x, y, vx, vy, speed), ...] + 每个特征点的位置和速度 + """ + velocities = [] + + for pid, traj in self.point_trajectories.items(): + if len(traj) < 5: + continue + + # 使用最近 15 帧 + recent = traj[-min(15, len(traj)):] + + start = recent[0] + end = recent[-1] + + dx = end[0] - start[0] + dy = end[1] - start[1] + + frames_span = len(recent) + time_sec = frames_span / fps + + if time_sec == 0: + continue + + vx = dx / time_sec # 像素/秒 + vy = dy / time_sec + speed = np.sqrt(vx**2 + vy**2) + + # 当前位置 + current_pos = traj[-1] + + velocities.append((current_pos[0], current_pos[1], + vx, vy, speed, pid)) + + return velocities + + def reset(self): + """重置状态""" + self.prev_gray = None + self.prev_points = [] + self.point_trajectories = defaultdict(list) + + +class DenseOpticalFlow: + """ + Farneback 稠密光流法 + + 注意:在 RV1126B 单核 CPU 上,全分辨率稠密光流太慢 + 使用降采样 + ROI 策略 + """ + + def __init__(self, downsample_factor=4): + self.downsample_factor = downsample_factor + + self.flow_params = dict( + pyr_scale=0.5, + levels=3, + winsize=15, + iterations=3, + poly_n=5, + poly_sigma=1.2, + flags=0, + ) + + self.prev_gray = None + + def process_frame(self, frame, roi=None): + """ + 计算稠密光流(降采样) + + roi: 只计算感兴趣区域 (x1, y1, x2, y2) + """ + gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) + + # 降采样加速 + small = cv2.resize(gray, None, + fx=1/self.downsample_factor, + fy=1/self.downsample_factor) + + if self.prev_gray is None: + self.prev_gray = small + return None + + flow = cv2.calcOpticalFlowFarneback( + self.prev_gray, small, None, **self.flow_params + ) + + self.prev_gray = small + + # 恢复到原始分辨率 + if roi is None: + flow = cv2.resize(flow, (frame.shape[1], frame.shape[0])) + else: + # 只还原 ROI 区域 + x1, y1, x2, y2 = roi + flow_roi = cv2.resize( + flow[y1//self.downsample_factor:y2//self.downsample_factor, + x1//self.downsample_factor:x2//self.downsample_factor], + (x2-x1, y2-y1) + ) + return flow_roi + + return flow + + def get_average_flow(self, flow): + """从光流场计算平均流速""" + if flow is None: + return 0, 0 + + magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1]) + + # 过滤掉太小和太大的运动 + valid = (magnitude > 0.5) & (magnitude < 100) + + if np.sum(valid) == 0: + return 0, 0 + + avg_vx = np.mean(flow[..., 0][valid]) + avg_vy = np.mean(flow[..., 1][valid]) + + return avg_vx, avg_vy +``` + +### 7.3 光流法性能优化(RV1126B 专用) + +```python +class OptimizedOpticalFlow: + """ + 针对 RV1126B 单核 CPU 的极致优化 + + 优化策略: + 1. 降采样 4x(减少 16 倍计算量) + 2. 隔帧计算(每 2 帧算一次光流) + 3. 只在水面 ROI 区域计算 + 4. 限制特征点数量 + 5. 使用 NEON 指令集(如果 OpenCV 编译时启用) + """ + + def __init__(self, frame_width, frame_height): + self.frame_size = (frame_width, frame_height) + + # 水面 ROI(需要根据实际场景设定) + # 排除天空和岸边 + self.water_roi = (0, int(frame_height * 0.3), + frame_width, frame_height) + + # 降采样 + self.scale = 0.25 # 1/4 分辨率 + + # 隔帧 + self.skip_frames = 1 # 每 2 帧计算一次 + self.frame_counter = 0 + + self.sparse_flow = SparseOpticalFlow( + (int(frame_width * self.scale), + int(frame_height * self.scale)) + ) + + def process(self, frame, yolo_detections=None): + """ + 处理帧 — 优化版本 + + 时间预算(RV1126B CPU): + └── 目标:每帧 < 15ms + """ + self.frame_counter += 1 + + if self.frame_counter % (self.skip_frames + 1) != 0: + return [] # 跳帧 + + # 裁剪水面 ROI + x1, y1, x2, y2 = self.water_roi + roi_frame = frame[y1:y2, x1:x2] + + # 降采样 + small = cv2.resize(roi_frame, None, fx=self.scale, fy=self.scale) + + # 调整 YOLO 检测框到小图坐标 + small_detections = [] + if yolo_detections: + for det in yolo_detections: + bx1, by1, bx2, by2 = det['bbox'] + # 转换到 ROI + 降采样坐标 + sbx1 = (bx1 - x1) * self.scale + sby1 = (by1 - y1) * self.scale + sbx2 = (bx2 - x1) * self.scale + sby2 = (by2 - y1) * self.scale + small_detections.append({ + 'bbox': [sbx1, sby1, sbx2, sby2], + **{k: v for k, v in det.items() if k != 'bbox'} + }) + + # 光流计算(在小图上) + flows = self.sparse_flow.process_frame(small, small_detections) + + # 还原到原图坐标 + result = [] + for (x1_f, y1_f, x2_f, y2_f, pid) in flows: + # 还原 + rx1 = x1_f / self.scale + x1 + ry1 = y1_f / self.scale + y1 + rx2 = x2_f / self.scale + x1 + ry2 = y2_f / self.scale + y1 + + result.append((rx1, ry1, rx2, ry2, pid)) + + return result +``` + +--- + +## 八、结果融合引擎 + +### 8.1 融合策略设计 + +``` +融合逻辑: + +场景 1: YOLO 检测到示踪物 + 光流有数据 +├── 优先使用 YOLO 轨迹(离散、精确) +├── 光流作为验证(一致性检查) +└── 两者偏差 > 阈值 → 标记为可疑 + +场景 2: YOLO 未检测到 + 光流有数据 +├── 使用光流结果 +├── 标记为"低置信度" +└── 需要足够多的特征点(> 10) + +场景 3: YOLO 检测到 + 光流无数据 +├── 使用 YOLO 轨迹 +└── 标记为"正常" + +场景 4: 两者都无数据 +└── 无法测速 → 报告无效 + +融合公式: + +当两者都有数据时: + v_fused = w_yolo × v_yolo + w_flow × v_flow + w_yolo = 0.7, w_flow = 0.3 (YOLO 权重更高) + +一致性检查: + |v_yolo - v_flow| / max(v_yolo, v_flow) < threshold (0.3) + → 通过:正常融合 + → 不通过:只使用 YOLO,标记异常 +``` + +### 8.2 融合引擎实现 + +```python +""" +fusion_engine.py — YOLO + 光流融合引擎 +""" +import numpy as np +from collections import defaultdict +from enum import Enum + + +class ConfidenceLevel(Enum): + HIGH = "high" # YOLO 轨迹 + MEDIUM = "medium" # YOLO + 光流融合 + LOW = "low" # 仅光流 + INVALID = "invalid" # 无有效数据 + + +class VelocityFusionEngine: + """ + 融合 YOLO 检测轨迹和光流速度场 + + 核心逻辑: + 1. YOLO 轨迹提供精确的离散速度点 + 2. 光流提供稠密的背景速度场 + 3. 融合两者,得到更全面的速度估计 + """ + + def __init__(self, pixels_per_meter=None, homography_matrix=None, + fps=30): + self.ppm = pixels_per_meter + self.H = homography_matrix + self.fps = fps + + # 融合权重 + self.yolo_weight = 0.7 + self.flow_weight = 0.3 + self.consistency_threshold = 0.3 # 30% 偏差阈值 + + # YOLO 轨迹存储 + self.yolo_trajectories = defaultdict(list) + + # 光流速度场(最近 N 帧的滑动窗口) + self.flow_velocity_history = [] + self.flow_history_size = 10 + + # 结果缓存 + self.last_fused_velocity = None + self.last_confidence = ConfidenceLevel.INVALID + + def update_yolo(self, detections, frame_idx): + """ + 更新 YOLO 检测结果 + + detections: list of {bbox, score, center, ...} + """ + for det in detections: + track_id = det.get('track_id', id(det)) # 如果有追踪 ID + center = det['center'] + self.yolo_trajectories[track_id].append( + (frame_idx, center[0], center[1]) + ) + # 限制轨迹长度 + if len(self.yolo_trajectories[track_id]) > 60: + self.yolo_trajectories[track_id] = \ + self.yolo_trajectories[track_id][-60:] + + def update_flow(self, flow_velocities): + """ + 更新光流速度场 + + flow_velocities: [(x, y, vx, vy, speed), ...] + """ + if not flow_velocities: + self.flow_velocity_history.append([]) + else: + speeds = [v[4] for v in flow_velocities] + self.flow_velocity_history.append(speeds) + + # 限制历史长度 + if len(self.flow_velocity_history) > self.flow_history_size: + self.flow_velocity_history = \ + self.flow_velocity_history[-self.flow_history_size:] + + def compute_fused_velocity(self, frame_idx): + """ + 计算当前帧的融合流速 + + Returns: + { + 'velocity': float, # 融合后的流速 (m/s) + 'velocity_pixel': float, # 像素速度 + 'confidence': ConfidenceLevel, + 'source': str, # 'yolo', 'flow', 'fused' + 'n_yolo_tracks': int, + 'n_flow_points': int, + } + """ + # 1. 计算 YOLO 速度 + yolo_velocities = [] + for track_id, traj in self.yolo_trajectories.items(): + v = self._compute_trajectory_velocity(traj) + if v is not None: + yolo_velocities.append(v) + + # 2. 计算光流速度 + flow_speeds = [] + for speeds in self.flow_velocity_history[-5:]: # 最近 5 帧 + flow_speeds.extend(speeds) + + # 3. 融合 + result = { + 'n_yolo_tracks': len(yolo_velocities), + 'n_flow_points': len(flow_speeds), + } + + if yolo_velocities and flow_speeds: + # 两者都有数据 + yolo_speed = np.median(yolo_velocities) + flow_speed = np.median(flow_speeds) + + # 一致性检查 + max_speed = max(yolo_speed, flow_speed) + if max_speed > 0: + deviation = abs(yolo_speed - flow_speed) / max_speed + else: + deviation = 0 + + if deviation < self.consistency_threshold: + # 一致 → 加权融合 + fused = self.yolo_weight * yolo_speed + \ + self.flow_weight * flow_speed + result['velocity_pixel'] = fused + result['confidence'] = ConfidenceLevel.MEDIUM + result['source'] = 'fused' + else: + # 不一致 → 只用 YOLO + result['velocity_pixel'] = yolo_speed + result['confidence'] = ConfidenceLevel.HIGH + result['source'] = 'yolo' + result['warning'] = f'YOLO/Flow deviation: {deviation:.2f}' + + elif yolo_velocities: + # 只有 YOLO + result['velocity_pixel'] = np.median(yolo_velocities) + result['confidence'] = ConfidenceLevel.HIGH + result['source'] = 'yolo' + + elif flow_speeds: + # 只有光流 + if len(flow_speeds) >= 5: # 至少 5 个特征点 + result['velocity_pixel'] = np.median(flow_speeds) + result['confidence'] = ConfidenceLevel.LOW + result['source'] = 'flow' + else: + result['confidence'] = ConfidenceLevel.INVALID + + else: + result['confidence'] = ConfidenceLevel.INVALID + + # 像素速度 → 实际速度 + if result.get('velocity_pixel') is not None: + result['velocity'] = self._pixel_to_meters( + result['velocity_pixel'] + ) + else: + result['velocity'] = None + + self.last_fused_velocity = result + self.last_confidence = result['confidence'] + return result + + def _compute_trajectory_velocity(self, trajectory): + """从轨迹计算速度(像素/秒)""" + if len(trajectory) < 5: + return None + + # 使用分段平均 + segment_size = 10 + velocities = [] + + for i in range(0, len(trajectory) - segment_size, segment_size // 2): + seg = trajectory[i:i + segment_size] + start_f, sx, sy = seg[0] + end_f, ex, ey = seg[-1] + + dist = np.sqrt((ex - sx)**2 + (ey - sy)**2) + time_sec = (end_f - start_f) / self.fps + + if time_sec > 0: + velocities.append(dist / time_sec) + + if not velocities: + return None + + return float(np.median(velocities)) + + def _pixel_to_meters(self, pixel_velocity): + """像素速度转换为 m/s""" + if self.H is not None: + # 使用单应性矩阵(考虑透视) + # 在画面中心采样转换因子 + center_x = 320 # 假设 640 宽 + center_y = 240 # 假设 480 高 + + pt = np.array([[[center_x, center_y]]], dtype=np.float32) + pt_moved = np.array([[[center_x + 1, center_y]]], dtype=np.float32) + + real1 = cv2.perspectiveTransform(pt, self.H)[0][0] + real2 = cv2.perspectiveTransform(pt_moved, self.H)[0][0] + + meter_per_pixel = np.linalg.norm(real2 - real1) + return pixel_velocity * meter_per_pixel + + elif self.ppm: + return pixel_velocity / self.ppm + + return pixel_velocity # 返回像素速度 + + def get_summary(self, window_frames=300): + """获取最近 N 帧的流速摘要""" + # 实现类似前面的统计摘要 + pass + + def reset(self): + """重置所有状态""" + self.yolo_trajectories.clear() + self.flow_velocity_history.clear() + self.last_fused_velocity = None + self.last_confidence = ConfidenceLevel.INVALID +``` + +--- + +## 九、完整集成流水线 + +### 9.1 主程序 + +```python +""" +main.py — RV1126B 水流速度检测主程序 + +运行方式: +python3 main.py --model yolo11n_water_flow_int8.rknn \ + --input /dev/video0 \ + --calibration calibration.json \ + --output results/ +""" +import cv2 +import json +import time +import argparse +import numpy as np +from pathlib import Path +from datetime import datetime + +from yolo11_detector import YOLO11Detector +from optical_flow import SparseOpticalFlow, OptimizedOpticalFlow +from fusion_engine import VelocityFusionEngine +from tracker import SimpleTracker # 下面会定义 + + +class SimpleTracker: + """ + 轻量级追踪器 — RV1126B 版 + + 不使用完整的 ByteTrack(太重), + 用简化的 IoU 匹配追踪 + """ + + def __init__(self, max_lost=10, min_iou=0.3): + self.tracks = {} # track_id -> {last_bbox, last_center, lost_count} + self.next_id = 0 + self.max_lost = max_lost + self.min_iou = min_iou + + def update(self, detections): + """ + 更新追踪状态 + + detections: [{bbox, score, center}, ...] + Returns: [{bbox, score, center, track_id}, ...] + """ + if not detections: + # 所有轨迹增加丢失计数 + for track in self.tracks.values(): + track['lost_count'] += 1 + # 清理丢失的 + self.tracks = { + tid: t for tid, t in self.tracks.items() + if t['lost_count'] <= self.max_lost + } + return [] + + # IoU 匹配 + matched_det = set() + for tid, track in list(self.tracks.items()): + best_iou = 0 + best_det_idx = -1 + + for i, det in enumerate(detections): + if i in matched_det: + continue + iou = self._compute_iou(track['last_bbox'], det['bbox']) + if iou > best_iou: + best_iou = iou + best_det_idx = i + + if best_iou >= self.min_iou: + # 匹配成功 + track['last_bbox'] = detections[best_det_idx]['bbox'] + track['last_center'] = detections[best_det_idx]['center'] + track['lost_count'] = 0 + detections[best_det_idx]['track_id'] = tid + matched_det.add(best_det_idx) + else: + track['lost_count'] += 1 + + # 未匹配的检测 → 新轨迹 + for i, det in enumerate(detections): + if i not in matched_det: + new_id = self.next_id + self.next_id += 1 + det['track_id'] = new_id + self.tracks[new_id] = { + 'last_bbox': det['bbox'], + 'last_center': det['center'], + 'lost_count': 0, + } + + # 清理 + self.tracks = { + tid: t for tid, t in self.tracks.items() + if t['lost_count'] <= self.max_lost + } + + return detections + + def _compute_iou(self, box1, box2): + """计算 IoU""" + x1 = max(box1[0], box2[0]) + y1 = max(box1[1], box2[1]) + x2 = min(box1[2], box2[2]) + y2 = min(box1[3], box2[3]) + + inter_area = max(0, x2 - x1) * max(0, y2 - y1) + + box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) + box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) + + union = box1_area + box2_area - inter_area + return inter_area / union if union > 0 else 0 + + +class WaterFlowVelocitySystem: + """ + 端到端水流速度检测系统 + + RV1126B 优化版本 + """ + + def __init__(self, args): + # 加载标定参数 + with open(args.calibration) as f: + calib = json.load(f) + + ppm = calib.get('pixels_per_meter') + H = calib.get('homography_matrix') + if H: + H = np.array(H) + + # 初始化组件 + print("Initializing YOLO11 detector...") + self.detector = YOLO11Detector(args.model, use_npu=True) + + print("Initializing optical flow...") + self.optical_flow = SparseOpticalFlow((640, 480)) + + print("Initializing tracker...") + self.tracker = SimpleTracker() + + print("Initializing fusion engine...") + self.fusion = VelocityFusionEngine( + pixels_per_meter=ppm, + homography_matrix=H, + fps=args.fps, + ) + + # 配置 + self.fps = args.fps + self.run_yolo_every = 2 # 每 2 帧跑一次 YOLO + self.frame_count = 0 + + # 结果存储 + self.results = [] + self.output_dir = Path(args.output) + self.output_dir.mkdir(parents=True, exist_ok=True) + + def run_video(self, input_source): + """ + 主处理循环 + + input_source: 摄像头设备路径 或 视频文件路径 + """ + cap = cv2.VideoCapture(input_source) + if not cap.isOpened(): + raise RuntimeError(f"Cannot open {input_source}") + + # 设置分辨率(降低到 640x480 以节省性能) + cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) + cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) + + frame_idx = 0 + fps_timer = [] + + print(f"Starting processing...") + print(f"Input: {input_source}") + print(f"YOLO runs every {self.run_yolo_every} frames") + + try: + while True: + frame_start = time.perf_counter() + + ret, frame = cap.read() + if not ret: + break + + # ---- 阶段 1: YOLO 检测(隔帧) ---- + if frame_idx % self.run_yolo_every == 0: + yolo_detections = self.detector.detect(frame) + + # 追踪 + tracked = self.tracker.update(yolo_detections) + + # 更新融合引擎 + self.fusion.update_yolo(tracked, frame_idx) + else: + # 不跑 YOLO,只更新追踪(预测模式) + tracked = [] + + # ---- 阶段 2: 光流计算 ---- + flow_vectors = self.optical_flow.process_frame( + frame, + yolo_detections if frame_idx % self.run_yolo_every == 0 else None + ) + self.fusion.update_flow(flow_vectors) + + # ---- 阶段 3: 融合 ---- + fusion_result = self.fusion.compute_fused_velocity(frame_idx) + + # ---- 阶段 4: 可视化 ---- + display_frame = self._visualize( + frame, tracked, flow_vectors, fusion_result + ) + + # ---- 阶段 5: 记录结果 ---- + if frame_idx % self.fps == 0: # 每秒记录一次 + self.results.append({ + 'timestamp': datetime.now().isoformat(), + 'frame': frame_idx, + **fusion_result, + }) + + # 性能计时 + frame_time = time.perf_counter() - frame_start + fps_timer.append(frame_time) + + if frame_idx % 30 == 0 and fps_timer: + avg_fps = len(fps_timer) / sum(fps_timer) + print(f"Frame {frame_idx}: {avg_fps:.1f} FPS | " + f"Velocity: {fusion_result.get('velocity', 0):.3f} m/s | " + f"Confidence: {fusion_result.get('confidence', 'N/A')}") + + # 显示(如果有屏幕) + cv2.imshow("Water Flow Velocity", display_frame) + if cv2.waitKey(1) & 0xFF == ord('q'): + break + + frame_idx += 1 + + except KeyboardInterrupt: + print("\nInterrupted by user") + finally: + cap.release() + cv2.destroyAllWindows() + self._save_results() + + # 最终统计 + if fps_timer: + avg_fps = len(fps_timer) / sum(fps_timer) + print(f"\nAverage FPS: {avg_fps:.1f}") + print(f"Total frames: {frame_idx}") + + def _visualize(self, frame, tracked, flow_vectors, fusion_result): + """可视化输出""" + display = frame.copy() + + # 绘制 YOLO 检测结果 + for det in tracked: + x1, y1, x2, y2 = [int(v) for v in det['bbox']] + tid = det.get('track_id', 0) + + cv2.rectangle(display, (x1, y1), (x2, y2), (0, 255, 0), 2) + cv2.putText(display, f"ID:{tid}", (x1, y1 - 5), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) + + # 绘制光流向量 + for (x1, y1, x2, y2, pid) in flow_vectors[:50]: # 最多 50 条 + cv2.arrowedLine(display, + (int(x1), int(y1)), + (int(x2), int(y2)), + (255, 0, 0), 1) + + # 绘制融合结果 + velocity = fusion_result.get('velocity') + confidence = fusion_result.get('confidence', 'invalid') + + if velocity is not None: + # 颜色根据置信度 + if confidence.value == 'high': + color = (0, 255, 0) # 绿 + elif confidence.value == 'medium': + color = (0, 255, 255) # 黄 + elif confidence.value == 'low': + color = (0, 165, 255) # 橙 + else: + color = (0, 0, 255) # 红 + + cv2.putText(display, + f"Velocity: {velocity:.3f} m/s", + (10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2) + + cv2.putText(display, + f"Confidence: {confidence.value}", + (10, 60), + cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1) + + # 绘制 FPS + cv2.putText(display, f"Frame: {self.frame_count}", + (10, frame.shape[0] - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) + + return display + + def _save_results(self): + """保存结果到文件""" + output_file = self.output_dir / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" + + with open(output_file, 'w') as f: + json.dump(self.results, f, indent=2) + + print(f"Results saved to {output_file}") + + +def parse_args(): + parser = argparse.ArgumentParser(description="Water Flow Velocity Detection on RV1126B") + parser.add_argument('--model', required=True, help='Path to RKNN model') + parser.add_argument('--input', default='/dev/video0', help='Camera device or video file') + parser.add_argument('--calibration', required=True, help='Calibration JSON file') + parser.add_argument('--output', default='./results', help='Output directory') + parser.add_argument('--fps', type=int, default=30, help='Video FPS') + return parser.parse_args() + + +if __name__ == '__main__': + args = parse_args() + system = WaterFlowVelocitySystem(args) + system.run_video(args.input) +``` + +### 9.2 标定文件格式 + +```json +{ + "calibration_date": "2026-04-23", + "location": "XX River, Section A", + + "method": "homography", + + "homography_matrix": [ + [1.2e-3, 3.4e-5, -0.5], + [2.1e-5, 1.1e-3, -0.3], + [8.7e-7, 2.3e-7, 1.0] + ], + + "control_points": [ + { + "pixel": [100, 200], + "real": [0.0, 0.0] + }, + { + "pixel": [500, 180], + "real": [5.2, 0.3] + }, + { + "pixel": [150, 600], + "real": [0.5, 3.1] + }, + { + "pixel": [600, 580], + "real": [5.8, 3.4] + } + ], + + "camera": { + "resolution": "1920x1080", + "fps": 30, + "height_above_water": 3.5, + "angle_degrees": 45 + }, + + "validation": { + "mean_error_meters": 0.15, + "max_error_meters": 0.28, + "num_validation_points": 4 + } +} +``` + +--- + +## 十、构建与部署流程 + +### 10.1 完整构建脚本 + +```bash +#!/bin/bash +# build_rv1126b.sh — 完整的构建和部署流程 + +set -e + +echo "=== YOLO11 + RV1126B Water Flow Detection Build ===" + +# ========== 阶段 1: 训练模型 (x86 主机) ========== +echo "" +echo "=== Phase 1: Training YOLO11 ===" + +python train_yolo11.py \ + --data water_flow.yaml \ + --epochs 150 \ + --imgsz 640 \ + --batch 16 \ + --device 0 + +echo "Training complete!" + +# ========== 阶段 2: 导出 ONNX ========== +echo "" +echo "=== Phase 2: Exporting ONNX ===" + +python export_onnx.py \ + --model runs/detect/water_flow_yolo11n/weights/best.pt \ + --imgsz 320 \ + --output yolo11n_water_flow.onnx + +echo "ONNX export complete!" + +# ========== 阶段 3: 转换为 RKNN INT8 ========== +echo "" +echo "=== Phase 3: Converting to RKNN (INT8) ===" + +python export_rknn.py \ + --onnx yolo11n_water_flow.onnx \ + --output yolo11n_water_flow_int8.rknn \ + --calibration-dir ./calibration_images \ + --target rv1126b + +echo "RKNN conversion complete!" + +# ========== 阶段 4: 部署到 RV1126B ========== +echo "" +echo "=== Phase 4: Deploying to RV1126B ===" + +# 传输到设备 +scp yolo11n_water_flow_int8.rknn \ + calibration.json \ + main.py \ + yolo11_detector.py \ + optical_flow.py \ + fusion_engine.py \ + root@192.168.1.100:/app/water_flow/ + +echo "Deployment complete!" +echo "" +echo "To run on RV1126B:" +echo " ssh root@192.168.1.100" +echo " cd /app/water_flow" +echo " python3 main.py --model yolo11n_water_flow_int8.rknn \\" +echo " --input /dev/video0 \\" +echo " --calibration calibration.json" +``` + +### 10.2 RV1126B SDK 集成 + +```bash +# 如果使用 Rockchip 官方 SDK 构建完整固件 + +# 1. 获取 RV1126B SDK +repo init --repo-url https://github.com/rockchip-linux/repo \ + -u https://github.com/rockchip-linux/manifests \ + -b master -m rv1126b.xml +repo sync + +# 2. 编译 SDK +./build.sh lunch # 选择 rv1126b +./build.sh + +# 3. 将应用加入 Buildroot +# 在 external/ 下创建自定义包 + +# 4. 烧录固件 +./build.sh updateimg +./upgrade_tool ul RV1126B_XXX.img +``` + +### 10.3 C++ 推理(生产环境推荐) + +```cpp +/* + * yolo11_rknn.cpp — C++ 版 RKNN 推理(性能更好) + * + * 编译: + * arm-linux-gnueabihf-g++ yolo11_rknn.cpp -o yolo11_detect \ + * -lrknnrt -lopencv_core -lopencv_imgproc -lopencv_video \ + * -I/usr/include/rknn + */ + +#include +#include +#include +#include +#include + +class YOLO11RKNN { +public: + YOLO11RKNN(const char* model_path) { + // 加载 RKNN 模型 + FILE* fp = fopen(model_path, "rb"); + fseek(fp, 0, SEEK_END); + size_t size = ftell(fp); + fseek(fp, 0, SEEK_SET); + + void* model_data = malloc(size); + fread(model_data, 1, size, fp); + fclose(fp); + + // 初始化 + int ret = rknn_init(&ctx_, model_data, size, 0, NULL); + if (ret < 0) { + std::cerr << "rknn_init failed: " << ret << std::endl; + exit(-1); + } + + free(model_data); + + // 获取输入输出信息 + rknn_input_output_num io_num; + rknn_query(ctx_, RKNN_QUERY_IN_OUT_NUM, &io_num, sizeof(io_num)); + std::cout << "Input count: " << io_num.n_input << std::endl; + std::cout << "Output count: " << io_num.n_output << std::endl; + } + + std::vector detect(const cv::Mat& frame) { + // 预处理 + cv::Mat input = preprocess(frame); + + // 设置输入 + rknn_input inputs[1]; + memset(inputs, 0, sizeof(inputs)); + inputs[0].index = 0; + inputs[0].type = RKNN_TENSOR_UINT8; + inputs[0].size = 320 * 320 * 3; + inputs[0].fmt = RKNN_TENSOR_NHWC; + inputs[0].buf = input.data; + + rknn_inputs_set(ctx_, 1, inputs); + + // 推理 + rknn_run(ctx_, nullptr); + + // 获取输出 + rknn_output outputs[1]; + memset(outputs, 0, sizeof(outputs)); + outputs[0].want_float = 1; + rknn_outputs_get(ctx_, 1, outputs, NULL); + + // 后处理 + auto detections = postprocess(outputs[0].buf, outputs[0].size); + + rknn_outputs_release(ctx_, 1, outputs); + + return detections; + } + + ~YOLO11RKNN() { + rknn_destroy(ctx_); + } + +private: + rknn_context ctx_; + + cv::Mat preprocess(const cv::Mat& frame) { + cv::Mat resized, padded; + cv::resize(frame, resized, cv::Size(320, 320)); + cv::cvtColor(resized, padded, cv::COLOR_BGR2RGB); + return padded; + } + + // ... (后处理实现) +}; + +struct Detection { + float x1, y1, x2, y2; + float confidence; + int class_id; + int track_id; +}; +``` + +--- + +## 十一、性能优化清单 + +### 11.1 RV1126B 极限优化 + +``` +目标:在 RV1126B 上达到 15-20 FPS + +┌──────────────────┬──────────┬────────────────────────────┐ +│ 优化项 │ 预期收益 │ 实现难度 │ +├──────────────────┼──────────┼────────────────────────────┤ +│ YOLO11n INT8 │ 基准 │ - │ +│ 输入 320→256 │ +20% FPS │ 低(需重新训练/微调) │ +│ 隔帧检测 │ +50% FPS │ 极低 │ +│ C++ 推理 │ +10% FPS │ 中 │ +│ RGA 硬件缩放 │ +5% FPS │ 中(RV1126B 有 RGA 单元)│ +│ 光流隔帧 │ +50% CPU │ 极低 │ +│ 光流降采样 4x │ +300% CPU │ 极低 │ +│ 内存预分配 │ -10% 抖动 │ 中 │ +│ NEON 优化 │ +30% CPU │ 高 │ +└──────────────────┴──────────┴────────────────────────────┘ + +推荐组合: +YOLO11n INT8 320×320 + 隔帧检测 + C++ 推理 + RGA 缩放 +→ 预期 15-20 FPS(YOLO)+ 光流并行 + +光流单独: +降采样 4x + 隔帧 + ROI 裁剪 +→ 预期 15-25 FPS(CPU) + +综合:15 FPS 以上(两者交替执行) +``` + +### 11.2 内存管理 + +```python +# RV1126B 内存优化 + +class MemoryOptimizedPipeline: + """ + RV1126B 只有 256-512MB 内存 + 需要严格管理 + """ + + def __init__(self): + # 预分配所有 buffer + self.frame_buffer = np.zeros((480, 640, 3), dtype=np.uint8) + self.gray_buffer1 = np.zeros((480, 640), dtype=np.uint8) + self.gray_buffer2 = np.zeros((480, 640), dtype=np.uint8) + self.small_gray1 = np.zeros((120, 160), dtype=np.uint8) + self.small_gray2 = np.zeros((120, 160), dtype=np.uint8) + self.rknn_input = np.zeros((320, 320, 3), dtype=np.uint8) + self.flow_field = np.zeros((120, 160, 2), dtype=np.float32) + + # 复用而不是重新分配 + # ... + + def process(self, raw_frame): + # 直接拷贝到预分配 buffer + np.copyto(self.frame_buffer[:raw_frame.shape[0], :raw_frame.shape[1]], raw_frame) + # 而不是: gray = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY) + # ... +``` + +--- + +## 十二、测试与验证 + +### 12.1 精度验证流程 + +``` +步骤 1:实验室验证 +├── 在已知流速的水槽中测试 +├── 对比 YOLO+光流 结果与标准流速仪 +└── 记录误差分布 + +步骤 2:现场验证 +├── 在实际河道部署 +├── 同步使用浮标法或 ADCP 测量 +├── 在不同流速下测试(低/中/高) +└── 记录不同光照条件下的表现 + +步骤 3:长期稳定性 +├── 连续运行 7 天 +├── 监控内存泄漏 +├── 监控精度漂移 +└── 记录设备稳定性 +``` + +### 12.2 基准测试脚本 + +```python +""" +benchmark.py — 在 RV1126B 上运行基准测试 +""" +import time +import numpy as np +import cv2 + +def benchmark_yolo(detector, num_runs=100): + """YOLO 推理基准测试""" + test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8) + + # 预热 + for _ in range(10): + detector.detect(test_frame) + + # 测试 + times = [] + for _ in range(num_runs): + start = time.perf_counter() + detector.detect(test_frame) + times.append(time.perf_counter() - start) + + avg = np.mean(times) + p50 = np.percentile(times, 50) + p95 = np.percentile(times, 95) + p99 = np.percentile(times, 99) + + print(f"YOLO11 Benchmark ({num_runs} runs):") + print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)") + print(f" P50: {p50*1000:.1f}ms") + print(f" P95: {p95*1000:.1f}ms") + print(f" P99: {p99*1000:.1f}ms") + print(f" Min: {min(times)*1000:.1f}ms") + print(f" Max: {max(times)*1000:.1f}ms") + +def benchmark_optical_flow(flow_processor, num_runs=100): + """光流基准测试""" + frame1 = np.random.randint(0, 255, (120, 160), dtype=np.uint8) + frame2 = np.random.randint(0, 255, (120, 160), dtype=np.uint8) + + times = [] + for _ in range(num_runs): + start = time.perf_counter() + cv2.calcOpticalFlowFarneback(frame1, frame2, None, + pyr_scale=0.5, levels=3, + winsize=15, iterations=3, + poly_n=5, poly_sigma=1.2, flags=0) + times.append(time.perf_counter() - start) + frame1, frame2 = frame2, frame1 + + avg = np.mean(times) + print(f"Optical Flow Benchmark ({num_runs} runs):") + print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)") + print(f" P95: {np.percentile(times, 95)*1000:.1f}ms") + +def benchmark_full_pipeline(system, num_frames=300): + """完整流水线基准测试""" + test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8) + + times = [] + for i in range(num_frames): + start = time.perf_counter() + + # 模拟一帧的处理 + if i % 2 == 0: + system.detector.detect(test_frame) + system.optical_flow.process_frame(test_frame) + + elapsed = time.perf_counter() - start + times.append(elapsed) + + avg = np.mean(times) + print(f"Full Pipeline Benchmark ({num_frames} frames):") + print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)") + print(f" P95: {np.percentile(times, 95)*1000:.1f}ms") + + +if __name__ == '__main__': + print("=" * 50) + print("RV1126B Benchmark Suite") + print("=" * 50) + + # 需要先在 RV1126B 上初始化组件 + # benchmark_yolo(detector) + # benchmark_optical_flow(flow) + # benchmark_full_pipeline(system) +``` + +--- + +## 十三、项目文件结构 + +``` +water-flow-rv1126b/ +├── training/ # 训练阶段(x86 主机) +│ ├── train_yolo11.py # YOLO11 训练脚本 +│ ├── export_onnx.py # ONNX 导出 +│ ├── export_rknn.py # RKNN 转换 +│ ├── water_flow.yaml # 数据集配置 +│ └── calibration_images/ # INT8 校准数据 +│ +├── deployment/ # 部署阶段(RV1126B) +│ ├── main.py # 主程序 +│ ├── yolo11_detector.py # YOLO11 RKNN 推理 +│ ├── optical_flow.py # 光流模块 +│ ├── fusion_engine.py # 融合引擎 +│ ├── tracker.py # 轻量追踪器 +│ ├── calibration.json # 标定参数 +│ └── yolo11n_water_flow_int8.rknn # RKNN 模型 +│ +├── cpp/ # C++ 生产版本 +│ ├── yolo11_rknn.cpp +│ ├── optical_flow.cpp +│ ├── fusion_engine.cpp +│ ├── main.cpp +│ └── CMakeLists.txt +│ +├── benchmark/ +│ └── benchmark.py # 性能基准测试 +│ +├── docs/ +│ └── calibration_guide.md # 标定操作指南 +│ +└── scripts/ + └── build_rv1126b.sh # 构建部署脚本 +``` + +--- + +## 十四、快速启动检查清单 + +``` +[ ] 1. 在 x86 主机上安装 Ultralytics + RKNN Toolkit2 +[ ] 2. 采集 1000+ 张水面示踪物图像 +[ ] 3. 标注数据(COCO/YOLO 格式) +[ ] 4. 训练 YOLO11n 模型(mAP@50 > 0.85) +[ ] 5. 导出 ONNX (imgsz=320) +[ ] 6. 准备 100-200 张校准图片 +[ ] 7. 转换为 RKNN INT8 模型 +[ ] 8. 在主机上模拟 RV1126B 推理验证 +[ ] 9. 传输到 RV1126B 开发板 +[ ] 10. 执行相机标定,生成 calibration.json +[ ] 11. 在 RV1126B 上运行基准测试 +[ ] 12. 连接相机,端到端测试 +[ ] 13. 与传统方法对比验证精度 +[ ] 14. 调优参数(检测阈值、光流参数、融合权重) +``` + +--- + +*创建:2026-04-23 | 平台:RV1126B | 模型:YOLO11n*