# YOLO11 + RV1126B 水流速度检测 — 混合方案完整实现 > **核心方案**:YOLO11 检测离散示踪物 + 光流法补充无示踪物区域 + 结果融合 > > **硬件平台**:Rockchip RV1126B (1.2 TOPS NPU) > > 创建:2026-04-23 --- ## 一、RV1126B 硬件约束分析 ### 1.1 芯片规格 | 参数 | 规格 | 对我们的影响 | |------|------|-------------| | **NPU** | 1.2 TOPS (INT8) | 只能跑小模型,YOLO11n 是上限 | | **CPU** | ARM Cortex-A7 (单核) | 光流法跑 CPU,性能有限 | | **内存** | 256MB-512MB DDR3 | 模型不能太大,批处理=1 | | **视频编码** | H.264/H.265 硬编解码 | 可以利用 VPU 解码减轻 CPU | | **ISP** | 支持 MIPI CSI 相机输入 | 直接接摄像头 | | **操作系统** | Buildroot / Debian (Linux) | 交叉编译环境 | ### 1.2 性能预期 | 模型 | 输入尺寸 | 量化方式 | 预期 FPS | 备注 | |------|---------|---------|---------|------| | YOLO11n | 320×320 | INT8 | ~25-30 | **推荐**,可以实时 | | YOLO11n | 640×640 | INT8 | ~8-12 | 精度更高但不够实时 | | YOLO11s | 320×320 | INT8 | ~12-15 | 可能内存紧张 | | YOLO11n | 320×320 | FP16 | ~15-20 | 精度略高,速度慢 | **关键决策:使用 YOLO11n + 320×320 + INT8 量化** --- ## 二、整体架构设计 ``` ┌────────────────────────────────────────────────────────────┐ │ RV1126B 处理流水线 │ │ │ │ ┌──────────┐ ┌────────────┐ ┌──────────────────┐ │ │ │ 摄像头 │───>│ VPU 硬解码 │───>│ 帧缓冲 (NV12) │ │ │ │ MIPI/USB │ │ (硬件加速) │ │ │ │ │ └──────────┘ └────────────┘ └────────┬─────────┘ │ │ │ │ │ ┌─────────────────┼──────────┐ │ │ │ │ │ │ │ ▼ ▼ │ │ │ ┌─────────────┐ ┌──────────────┐ │ │ │ │ NPU 推理 │ │ CPU 光流计算 │ │ │ │ │ YOLO11n │ │ Farneback │ │ │ │ │ INT8 RKNN │ │ 稀疏光流 │ │ │ │ │ 320×320 │ │ ROI 区域 │ │ │ │ └──────┬──────┘ └──────┬───────┘ │ │ │ │ │ │ │ │ ▼ ▼ │ │ │ ┌─────────────────────────────┐ │ │ │ │ 结果融合引擎 │ │ │ │ │ │ │ │ │ │ YOLO 区域 → 精确轨迹 │ │ │ │ │ 光流区域 → 稠密速度场 │ │ │ │ │ 融合 → 综合流速估计 │ │ │ │ └──────────────┬──────────────┘ │ │ │ │ │ │ │ ▼ │ │ │ ┌─────────────────────────────┐ │ │ │ │ 流速计算 + 像素→实际坐标转换 │ │ │ │ │ 单应性矩阵 / 标定因子 │ │ │ │ └──────────────┬──────────────┘ │ │ │ │ │ │ │ ▼ │ │ │ ┌─────────────────────────────┐ │ │ │ │ 结果输出: JSON / MQTT / 显示 │ │ │ │ └─────────────────────────────┘ │ │ └──────────────────────────────────────────────────────┘ 时序规划 (目标 30 FPS, 每帧 33ms): ├── VPU 解码: ~3ms (硬件) ├── YOLO11n 推理: ~35ms (NPU, INT8) → 可以每 2 帧跑一次 ├── 光流计算: ~15ms (CPU, 稀疏/降采样) ├── 结果融合: ~2ms (CPU) └── 总周期: 每帧 ~20ms (YOLO 隔帧) → 可达 15-20 FPS ``` --- ## 三、环境搭建 ### 3.1 主机端(训练 + 模型转换) ```bash # 在 x86 Linux 主机上操作(用于训练和模型转换) # 1. 创建 Python 环境 conda create -n yolo11-rv1126 python=3.10 -y conda activate yolo11-rv1126 # 2. 安装 Ultralytics (YOLO11) pip install ultralytics # 3. 安装 RKNN Toolkit2 # 下载: https://github.com/airockchip/rknn-toolkit2 # 选择对应版本(Ubuntu x86) git clone https://github.com/airockchip/rknn-toolkit2.git cd rknn-toolkit2/rknn-toolkit2/packages/ pip install rknn_toolkit2-*.whl # 4. 其他依赖 pip install opencv-python numpy onnx onnxruntime pip install roboflow # 数据管理(可选) pip install supervision # 可视化 ``` ### 3.2 RV1126B 目标板环境 ```bash # 在 RV1126B 开发板上操作 # 1. 安装 RKNN Runtime(推理运行时,不含训练功能) # 从 Rockchip SDK 或板子系统镜像中获取 # 通常在 /usr/lib/librknnrt.so # 2. 安装 Python RKNN 运行时 pip install rknn_toolkit2_lite # 轻量版,只含推理 # 3. 安装 OpenCV(用于光流和后处理) # 使用 Buildroot 编译或 apt 安装 apt-get install python3-opencv # 如果使用 Debian 系统 # 4. 其他依赖 pip install numpy ``` ### 3.3 交叉编译工具链 ```bash # 安装 Rockchip 交叉编译工具链 # 从 Rockchip SDK 获取 # 设置环境变量 export CROSS_COMPILE=arm-buildroot-linux-gnueabihf- export CC=${CROSS_COMPILE}gcc export CXX=${CROSS_COMPILE}g++ # 交叉编译 OpenCV(如果 Buildroot 未包含) # 交叉编译 Python C 扩展模块 ``` --- ## 四、YOLO11 模型训练 ### 4.1 数据准备 ```yaml # water_flow.yaml path: ./water_flow_dataset train: images/train val: images/val nc: 1 names: - floating_object ``` ``` 数据集结构: water_flow_dataset/ ├── images/ │ ├── train/ # 800-2000 张 │ └── val/ # 200-500 张 └── labels/ ├── train/ # YOLO 格式标注 └── val/ ``` ### 4.2 YOLO11n 训练 ```python from ultralytics import YOLO # 加载 YOLO11n 预训练模型 model = YOLO('yolo11n.pt') # 训练配置 results = model.train( # 数据集 data='water_flow.yaml', # 训练参数 epochs=150, imgsz=640, # 训练时用高分辨率 batch=16, device=0, patience=30, # 针对水面场景的数据增强 hsv_h=0.015, # 色调小幅变化 hsv_s=0.7, # 饱和度大变化(水面反光) hsv_v=0.4, # 亮度变化 degrees=0.0, # 不旋转(相机固定) translate=0.1, scale=0.5, # 大尺度变化(适应不同距离目标) fliplr=0.5, mosaic=1.0, mixup=0.1, copy_paste=0.1, # 复制粘贴增强(示踪物少的场景) # 优化器 optimizer='AdamW', lr0=0.001, lrf=0.01, weight_decay=0.0005, # 保存 project='runs/detect', name='water_flow_yolo11n', save_period=10, ) # 验证 metrics = model.val() print(f"mAP@50: {metrics.box.map50:.4f}") print(f"mAP@50-95: {metrics.box.map:.4f}") ``` ### 4.3 针对 RV1126B 的特殊训练策略 ```python # 关键:RV1126B NPU 是 INT8 量化,训练时需要考虑量化感知 from ultralytics import YOLO import torch # 方案 A:PTQ(训练后量化)— 简单但精度损失 2-5% # 直接导出 INT8,下面会讲到 # 方案 B:QAT(量化感知训练)— 精度损失 < 1%,推荐 class QATWaterFlowTrainer: """ 量化感知训练,让模型适应 INT8 量化 """ def __init__(self): self.model = YOLO('yolo11n.pt') def train_with_quantization(self, data_config, epochs=50): # 第一阶段:正常训练到收敛 self.model.train( data=data_config, epochs=epochs, imgsz=640, batch=16, ) # 第二阶段:模拟量化微调 # 注意:Ultralytics 不直接支持 QAT # 需要手动添加 fake quantization # 加载最佳权重 model = YOLO('runs/detect/water_flow_yolo11n/weights/best.pt') # 添加量化模拟(简化的做法:降低学习率继续训练) model.train( data=data_config, epochs=20, imgsz=320, # 用目标推理分辨率微调 batch=32, lr0=0.0001, # 极低学习率 close_mosaic=10, # 关闭 mosaic 模拟推理条件 ) return model ``` --- ## 五、模型导出与 RKNN 转换 ### 5.1 导出流程 ``` 训练好的 YOLO11 PyTorch 模型 (.pt) │ ▼ (torch.onnx.export) ONNX 模型 (.onnx) │ ▼ (RKNN Toolkit2) RKNN 模型 (.rknn) — INT8 量化 │ ▼ (部署到 RV1126B) NPU 推理 ``` ### 5.2 导出 ONNX ```python from ultralytics import YOLO # 加载训练好的模型 model = YOLO('runs/detect/water_flow_yolo11n/weights/best.pt') # 导出 ONNX # 关键参数: # - opset 13: RKNN 支持的最佳 opset # - simplify: 简化计算图 # - imgsz 320: 导出目标推理尺寸(RV1126B 用 320) model.export( format='onnx', imgsz=320, # 目标推理分辨率 opset=13, # ONNX opset 版本 simplify=True, # 简化模型 dynamic=False, # 固定输入尺寸(NPU 不支持动态) half=False, # 不需要 FP16,用 INT8 int8=False, # ONNX 导出不量化,后面用 RKNN 量化 ) # 输出: runs/detect/water_flow_yolo11n/weights/best.onnx print("ONNX 导出完成") ``` ### 5.3 ONNX → RKNN 转换(INT8 量化) ```python """ export_rknn.py — 在 x86 主机上运行 将 ONNX 模型转换为 RKNN INT8 量化模型 """ from rknn.api import RKNN import cv2 import numpy as np import os # ========== 配置 ========== ONNX_MODEL = 'runs/detect/water_flow_yolo11n/weights/best.onnx' RKNN_MODEL = 'yolo11n_water_flow_int8.rknn' CALIBRATION_DIR = './calibration_images/' # 校准数据集 DATASET_TXT = 'calibration_list.txt' # 校准图片列表 TARGET_PLATFORM = 'rv1126b' # 或 'rk3588' 等 # ========== 准备校准数据 ========== def prepare_calibration_images(image_dir, output_txt, num_images=100): """ 准备 INT8 量化校准数据 需要 100-500 张代表性图片 """ images = [f for f in os.listdir(image_dir) if f.endswith(('.jpg', '.png'))] images = images[:num_images] with open(output_txt, 'w') as f: for img in images: f.write(os.path.join(image_dir, img) + '\n') print(f"Prepared {len(images)} calibration images") # 准备校准数据 prepare_calibration_images(CALIBRATION_DIR, DATASET_TXT) # ========== RKNN 转换 ========== rknn = RKNN(verbose=True) # 1. 配置 print("--> Configuring RKNN model") ret = rknn.config( mean_values=[[0, 0, 0]], # 根据训练时的归一化调整 std_values=[[255, 255, 255]], # YOLO 通常是 /255 target_platform=TARGET_PLATFORM, quantized_dtype='asymmetric_affine-u8', # INT8 量化 quantized_algorithm='KLD', # KLD 校准算法 ) if ret != 0: print("Config failed!") exit() # 2. 加载 ONNX print("--> Loading ONNX model") ret = rknn.load_onnx(model=ONNX_MODEL) if ret != 0: print("Load ONNX failed!") exit() # 3. 构建模型(含 INT8 量化) print("--> Building RKNN model with INT8 quantization") ret = rknn.build( do_quantization=True, dataset=DATASET_TXT, num_preprocess=1, # 预处理放在 NPU 上 ) if ret != 0: print("Build failed!") exit() # 4. 导出 RKNN 模型 print(f"--> Exporting RKNN model to {RKNN_MODEL}") ret = rknn.export_rknn(RKNN_MODEL) if ret != 0: print("Export failed!") exit() # 5. (可选)在主机上模拟 RV1126B 推理验证 print("--> Testing on host (simulating RV1126B)") ret = rknn.init_runtime(target=TARGET_PLATFORM) if ret != 0: print("Init runtime failed!") exit() # 推理测试 test_img = cv2.imread(os.path.join(CALIBRATION_DIR, os.listdir(CALIBRATION_DIR)[0])) test_img = cv2.resize(test_img, (320, 320)) test_img = cv2.cvtColor(test_img, cv2.COLOR_BGR2RGB) outputs = rknn.inference(inputs=[test_img]) print(f"RKNN inference output shape: {[o.shape for o in outputs]}") rknn.release() print("Done!") ``` ### 5.4 RKNN 量化配置详解 ```python # 不同量化策略对比 # 策略 1:对称量化(速度快,精度略低) rknn.config( quantized_dtype='symmetric_affine-u8', quantized_algorithm='MIN_MAX', ) # 策略 2:非对称量化(推荐,精度更好) rknn.config( quantized_dtype='asymmetric_affine-u8', quantized_algorithm='KLD', ) # 策略 3:混合量化(部分层 INT8,部分层 FP16) rknn.config( quantized_dtype='mixed', # 需要在模型配置中指定哪些层用 FP16 ) # 量化校准技巧: # 1. 校准数据需要覆盖所有场景(不同光照、不同流速、不同天气) # 2. 至少 100 张,推荐 200-500 张 # 3. 校准数据不需要标注,只需要原始图片 # 4. 图片尺寸需要和训练时一致 ``` --- ## 六、YOLO11 检测模块(RV1126B 端) ### 6.1 RKNN 推理封装 ```python """ yolo11_detector.py — RV1126B 端 YOLO11 检测器 使用 RKNN Runtime 在 NPU 上推理 """ import cv2 import numpy as np import time try: from rknn.api import RKNN except ImportError: # 主机端开发用 mock from rknn.mock_runtime import RKNN class YOLO11Detector: """ YOLO11n RKNN 推理封装 专为 RV1126B 优化 """ # YOLO11 默认参数 INPUT_SIZE = 320 CONF_THRESHOLD = 0.25 NMS_THRESHOLD = 0.45 NUM_CLASSES = 1 def __init__(self, rknn_model_path, use_npu=True): self.rknn = RKNN() self.use_npu = use_npu # 加载模型 print(f"Loading RKNN model: {rknn_model_path}") ret = self.rknn.load_rknn(rknn_model_path) if ret != 0: raise RuntimeError("Failed to load RKNN model") # 初始化运行时 if use_npu: ret = self.rknn.init_runtime(target='rv1126b', device_id=None) else: ret = self.rknn.init_runtime(target=None) # CPU 模拟 if ret != 0: raise RuntimeError("Failed to init RKNN runtime") print("YOLO11 Detector initialized") def preprocess(self, frame): """ 预处理: resize + BGR→RGB + normalize 输入: BGR 图像 (任意尺寸) 输出: RGB numpy array (320×320) """ # 保持宽高比的 resize + padding h, w = frame.shape[:2] scale = self.INPUT_SIZE / max(h, w) new_w, new_h = int(w * scale), int(h * scale) resized = cv2.resize(frame, (new_w, new_h)) # padding 到 320×320 padded = np.full((self.INPUT_SIZE, self.INPUT_SIZE, 3), 114, dtype=np.uint8) top = (self.INPUT_SIZE - new_h) // 2 left = (self.INPUT_SIZE - new_w) // 2 padded[top:top+new_h, left:left+new_w] = resized # BGR → RGB rgb = cv2.cvtColor(padded, cv2.COLOR_BGR2RGB) # 记录变换信息(用于后处理还原坐标) transform_info = { 'original_size': (w, h), 'scale': scale, 'padding': (left, top), } return rgb, transform_info def postprocess(self, outputs, transform_info, conf_threshold=None): """ 解析 YOLO11 输出 YOLO11 ONNX 输出格式: (1, 84, 8400) for COCO - 84 = 4 (bbox) + 80 (classes) - 8400 = 检测锚点数量 对于我们的自定义模型: - 84 = 4 (bbox) + 1 (class) """ if conf_threshold is None: conf_threshold = self.CONF_THRESHOLD # 输出解析 # YOLO11 输出: (1, num_classes + 4, num_anchors) pred = outputs[0] # shape: (1, 5, 8400) for 1 class pred = pred[0].T # shape: (8400, 5) → (8400, 5) boxes = [] scores = [] class_ids = [] for i in range(pred.shape[0]): score = pred[i, 4] # 置信度(只有一个类别) if score < conf_threshold: continue # bbox (中心点 + 宽高) cx, cy, bw, bh = pred[i, 0:4] # 还原到原始图像坐标 cx = (cx - transform_info['padding'][0]) / transform_info['scale'] cy = (cy - transform_info['padding'][1]) / transform_info['scale'] bw = bw / transform_info['scale'] bh = bh / transform_info['scale'] # 转为 x1, y1, x2, y2 x1 = cx - bw / 2 y1 = cy - bh / 2 x2 = cx + bw / 2 y2 = cy + bh / 2 # 裁剪到图像边界 orig_w, orig_h = transform_info['original_size'] x1 = max(0, x1) y1 = max(0, y1) x2 = min(orig_w, x2) y2 = min(orig_h, y2) boxes.append([x1, y1, x2, y2]) scores.append(float(score)) class_ids.append(0) if len(boxes) == 0: return [] # NMS indices = cv2.dnn.NMSBoxes( boxes, scores, conf_threshold, self.NMS_THRESHOLD ) detections = [] if len(indices) > 0: for i in indices.flatten(): detections.append({ 'bbox': boxes[i], 'score': scores[i], 'class_id': class_ids[i], 'center': ( (boxes[i][0] + boxes[i][2]) / 2, (boxes[i][1] + boxes[i][3]) / 2 ), }) return detections def detect(self, frame, conf_threshold=None): """ 完整检测流程: 预处理 → NPU推理 → 后处理 Args: frame: BGR 图像 (numpy array) conf_threshold: 置信度阈值 Returns: list of detection dicts """ # 预处理 rgb, transform_info = self.preprocess(frame) # NPU 推理 outputs = self.rknn.inference(inputs=[rgb]) # 后处理 detections = self.postprocess(outputs, transform_info, conf_threshold) return detections def detect_batch(self, frames): """批量检测(如果 NPU 支持,但目前 RV1126B batch=1)""" return [self.detect(f) for f in frames] def get_fps(self, num_iterations=100, test_frame=None): """ 测量推理 FPS """ if test_frame is None: test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8) times = [] for _ in range(num_iterations): start = time.perf_counter() self.detect(test_frame) elapsed = time.perf_counter() - start times.append(elapsed) avg_time = np.mean(times) fps = 1.0 / avg_time print(f"Average inference time: {avg_time*1000:.1f}ms, FPS: {fps:.1f}") return fps def release(self): self.rknn.release() ``` ### 6.2 检测性能优化 ```python class OptimizedDetector(YOLO11Detector): """ 针对 RV1126B 的深度优化检测器 """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # 预分配内存(避免推理时分配) self._prealloc_buffer = np.zeros( (self.INPUT_SIZE, self.INPUT_SIZE, 3), dtype=np.uint8 ) def detect_fast(self, frame, conf_threshold=None): """ 快速检测路径 — 减少内存拷贝 优化点: 1. 原地 resize(减少内存分配) 2. 跳过不必要的颜色转换(如果相机输出 RGB) 3. 预分配推理输入 buffer 4. 复用 NMS 输出 buffer """ # 如果相机可以输出 RGB,跳过 BGR→RGB 转换 # 在 RV1126B ISP 中配置输出格式 # ... (实现同上,但优化内存操作) pass def detect_roi(self, frame, roi): """ 只在 ROI 区域检测 — 大幅加速 roi: (x1, y1, x2, y2) """ x1, y1, x2, y2 = roi roi_frame = frame[y1:y2, x1:x2] detections = self.detect(roi_frame) # 还原坐标 for det in detections: det['bbox'] = [ det['bbox'][0] + x1, det['bbox'][1] + y1, det['bbox'][2] + x1, det['bbox'][3] + y1, ] det['center'] = ( det['center'][0] + x1, det['center'][1] + y1, ) return detections ``` --- ## 七、光流法模块(CPU 端) ### 7.1 为什么需要光流法 ``` YOLO 检测的局限: ├── 只能检测离散的目标 ├── 水面无漂浮物时 → 无法测速 ├── 示踪物太小 → 检测不到 └── 需要足够标注数据 光流法的补充: ├── 可以追踪水面纹理运动 ├── 不需要离散目标 ├── 稠密或半稠密速度场 └── 但精度低于 YOLO 轨迹 混合方案优势: ├── 有示踪物 → YOLO 精确轨迹(高精度) ├── 无示踪物 → 光流补充(覆盖盲区) └── 两者结果融合 → 鲁棒性最强 ``` ### 7.2 稀疏光流法实现(适合 RV1126B CPU) ```python """ optical_flow.py — 光流法流速估计 针对 RV1126B 单核 CPU 优化 """ import cv2 import numpy as np from collections import defaultdict class SparseOpticalFlow: """ Lucas-Kanade 稀疏光流法 优势: - 计算量小(只追踪特征点) - 适合嵌入式 CPU - 可以追踪水面纹理特征 策略: - 只在 YOLO 未检测到的区域追踪 - 或作为 YOLO 检测的补充 """ def __init__(self, frame_size): # Lucas-Kanade 参数 self.lk_params = dict( winSize=(15, 15), # 窗口大小 maxLevel=2, # 金字塔层数 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03) ) # 特征点检测参数 self.feature_params = dict( maxCorners=100, # 最多特征点数 qualityLevel=0.01, # 质量阈值 minDistance=10, # 最小间距 blockSize=3, ) self.frame_size = frame_size # 状态 self.prev_gray = None self.prev_points = [] self.point_trajectories = defaultdict(list) self.point_id_counter = 0 def process_frame(self, frame, yolo_detections=None, exclude_radius=20): """ 处理单帧,追踪特征点 Args: frame: BGR 当前帧 yolo_detections: YOLO 检测结果(可选) exclude_radius: 排除 YOLO 检测区域的半径(避免重复) Returns: flow_vectors: [(x1, y1, x2, y2, point_id), ...] 特征点位移向量 """ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if self.prev_gray is None: self.prev_gray = gray self._detect_features(gray, yolo_detections, exclude_radius) return [] # 追踪特征点 if len(self.prev_points) > 0: prev_pts = np.array(self.prev_points, dtype=np.float32) next_pts, status, errors = cv2.calcOpticalFlowPyrLK( self.prev_gray, gray, prev_pts, None, **self.lk_params ) # 过滤有效点 good_prev = [] good_next = [] valid_ids = [] for i, (next_pt, status_flag, error) in enumerate( zip(next_pts, status, errors) ): if status_flag == 1 and error < 500: # 检查是否在图像范围内 if (0 < next_pt[0] < self.frame_size[0] and 0 < next_pt[1] < self.frame_size[1]): good_prev.append(self.prev_points[i]) good_next.append(next_pt.tolist()) valid_ids.append(self._point_ids[i]) # 记录轨迹 for pid, prev_pt, next_pt in zip(valid_ids, good_prev, good_next): self.point_trajectories[pid].append(next_pt) # 限制轨迹长度 if len(self.point_trajectories[pid]) > 30: self.point_trajectories[pid] = \ self.point_trajectories[pid][-30:] # 计算光流向量 flow_vectors = [] for pid, p, n in zip(valid_ids, good_prev, good_next): flow_vectors.append((p[0], p[1], n[0], n[1], pid)) # 更新状态 self.prev_points = good_next self._point_ids = valid_ids # 补充新特征点(如果数量不足) if len(self.prev_points) < 50: self._detect_features(gray, yolo_detections, exclude_radius) # 合并 new_pts = [p for p in self.prev_points if p not in good_next] if new_pts: self.prev_points.extend(new_pts[:20]) else: self._detect_features(gray, yolo_detections, exclude_radius) flow_vectors = [] self.prev_gray = gray return flow_vectors def _detect_features(self, gray, yolo_detections=None, exclude_radius=20): """ 检测新的特征点 排除 YOLO 检测区域(避免重复追踪) """ mask = np.ones(gray.shape[:2], dtype=np.uint8) * 255 if yolo_detections: # 在 YOLO 检测区域设置 mask=0(排除) for det in yolo_detections: x1, y1, x2, y2 = [int(v) for v in det['bbox']] cv2.rectangle(mask, (x1 - exclude_radius, y1 - exclude_radius), (x2 + exclude_radius, y2 + exclude_radius), 0, -1) points = cv2.goodFeaturesToTrack( gray, mask=mask, **self.feature_params ) if points is not None: self.prev_points = points.reshape(-1, 2).tolist() self._point_ids = list(range( self.point_id_counter, self.point_id_counter + len(self.prev_points) )) self.point_id_counter += len(self.prev_points) def compute_velocity_field(self, fps): """ 从追踪的轨迹计算速度场 Returns: velocities: [(x, y, vx, vy, speed), ...] 每个特征点的位置和速度 """ velocities = [] for pid, traj in self.point_trajectories.items(): if len(traj) < 5: continue # 使用最近 15 帧 recent = traj[-min(15, len(traj)):] start = recent[0] end = recent[-1] dx = end[0] - start[0] dy = end[1] - start[1] frames_span = len(recent) time_sec = frames_span / fps if time_sec == 0: continue vx = dx / time_sec # 像素/秒 vy = dy / time_sec speed = np.sqrt(vx**2 + vy**2) # 当前位置 current_pos = traj[-1] velocities.append((current_pos[0], current_pos[1], vx, vy, speed, pid)) return velocities def reset(self): """重置状态""" self.prev_gray = None self.prev_points = [] self.point_trajectories = defaultdict(list) class DenseOpticalFlow: """ Farneback 稠密光流法 注意:在 RV1126B 单核 CPU 上,全分辨率稠密光流太慢 使用降采样 + ROI 策略 """ def __init__(self, downsample_factor=4): self.downsample_factor = downsample_factor self.flow_params = dict( pyr_scale=0.5, levels=3, winsize=15, iterations=3, poly_n=5, poly_sigma=1.2, flags=0, ) self.prev_gray = None def process_frame(self, frame, roi=None): """ 计算稠密光流(降采样) roi: 只计算感兴趣区域 (x1, y1, x2, y2) """ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) # 降采样加速 small = cv2.resize(gray, None, fx=1/self.downsample_factor, fy=1/self.downsample_factor) if self.prev_gray is None: self.prev_gray = small return None flow = cv2.calcOpticalFlowFarneback( self.prev_gray, small, None, **self.flow_params ) self.prev_gray = small # 恢复到原始分辨率 if roi is None: flow = cv2.resize(flow, (frame.shape[1], frame.shape[0])) else: # 只还原 ROI 区域 x1, y1, x2, y2 = roi flow_roi = cv2.resize( flow[y1//self.downsample_factor:y2//self.downsample_factor, x1//self.downsample_factor:x2//self.downsample_factor], (x2-x1, y2-y1) ) return flow_roi return flow def get_average_flow(self, flow): """从光流场计算平均流速""" if flow is None: return 0, 0 magnitude, angle = cv2.cartToPolar(flow[..., 0], flow[..., 1]) # 过滤掉太小和太大的运动 valid = (magnitude > 0.5) & (magnitude < 100) if np.sum(valid) == 0: return 0, 0 avg_vx = np.mean(flow[..., 0][valid]) avg_vy = np.mean(flow[..., 1][valid]) return avg_vx, avg_vy ``` ### 7.3 光流法性能优化(RV1126B 专用) ```python class OptimizedOpticalFlow: """ 针对 RV1126B 单核 CPU 的极致优化 优化策略: 1. 降采样 4x(减少 16 倍计算量) 2. 隔帧计算(每 2 帧算一次光流) 3. 只在水面 ROI 区域计算 4. 限制特征点数量 5. 使用 NEON 指令集(如果 OpenCV 编译时启用) """ def __init__(self, frame_width, frame_height): self.frame_size = (frame_width, frame_height) # 水面 ROI(需要根据实际场景设定) # 排除天空和岸边 self.water_roi = (0, int(frame_height * 0.3), frame_width, frame_height) # 降采样 self.scale = 0.25 # 1/4 分辨率 # 隔帧 self.skip_frames = 1 # 每 2 帧计算一次 self.frame_counter = 0 self.sparse_flow = SparseOpticalFlow( (int(frame_width * self.scale), int(frame_height * self.scale)) ) def process(self, frame, yolo_detections=None): """ 处理帧 — 优化版本 时间预算(RV1126B CPU): └── 目标:每帧 < 15ms """ self.frame_counter += 1 if self.frame_counter % (self.skip_frames + 1) != 0: return [] # 跳帧 # 裁剪水面 ROI x1, y1, x2, y2 = self.water_roi roi_frame = frame[y1:y2, x1:x2] # 降采样 small = cv2.resize(roi_frame, None, fx=self.scale, fy=self.scale) # 调整 YOLO 检测框到小图坐标 small_detections = [] if yolo_detections: for det in yolo_detections: bx1, by1, bx2, by2 = det['bbox'] # 转换到 ROI + 降采样坐标 sbx1 = (bx1 - x1) * self.scale sby1 = (by1 - y1) * self.scale sbx2 = (bx2 - x1) * self.scale sby2 = (by2 - y1) * self.scale small_detections.append({ 'bbox': [sbx1, sby1, sbx2, sby2], **{k: v for k, v in det.items() if k != 'bbox'} }) # 光流计算(在小图上) flows = self.sparse_flow.process_frame(small, small_detections) # 还原到原图坐标 result = [] for (x1_f, y1_f, x2_f, y2_f, pid) in flows: # 还原 rx1 = x1_f / self.scale + x1 ry1 = y1_f / self.scale + y1 rx2 = x2_f / self.scale + x1 ry2 = y2_f / self.scale + y1 result.append((rx1, ry1, rx2, ry2, pid)) return result ``` --- ## 八、结果融合引擎 ### 8.1 融合策略设计 ``` 融合逻辑: 场景 1: YOLO 检测到示踪物 + 光流有数据 ├── 优先使用 YOLO 轨迹(离散、精确) ├── 光流作为验证(一致性检查) └── 两者偏差 > 阈值 → 标记为可疑 场景 2: YOLO 未检测到 + 光流有数据 ├── 使用光流结果 ├── 标记为"低置信度" └── 需要足够多的特征点(> 10) 场景 3: YOLO 检测到 + 光流无数据 ├── 使用 YOLO 轨迹 └── 标记为"正常" 场景 4: 两者都无数据 └── 无法测速 → 报告无效 融合公式: 当两者都有数据时: v_fused = w_yolo × v_yolo + w_flow × v_flow w_yolo = 0.7, w_flow = 0.3 (YOLO 权重更高) 一致性检查: |v_yolo - v_flow| / max(v_yolo, v_flow) < threshold (0.3) → 通过:正常融合 → 不通过:只使用 YOLO,标记异常 ``` ### 8.2 融合引擎实现 ```python """ fusion_engine.py — YOLO + 光流融合引擎 """ import numpy as np from collections import defaultdict from enum import Enum class ConfidenceLevel(Enum): HIGH = "high" # YOLO 轨迹 MEDIUM = "medium" # YOLO + 光流融合 LOW = "low" # 仅光流 INVALID = "invalid" # 无有效数据 class VelocityFusionEngine: """ 融合 YOLO 检测轨迹和光流速度场 核心逻辑: 1. YOLO 轨迹提供精确的离散速度点 2. 光流提供稠密的背景速度场 3. 融合两者,得到更全面的速度估计 """ def __init__(self, pixels_per_meter=None, homography_matrix=None, fps=30): self.ppm = pixels_per_meter self.H = homography_matrix self.fps = fps # 融合权重 self.yolo_weight = 0.7 self.flow_weight = 0.3 self.consistency_threshold = 0.3 # 30% 偏差阈值 # YOLO 轨迹存储 self.yolo_trajectories = defaultdict(list) # 光流速度场(最近 N 帧的滑动窗口) self.flow_velocity_history = [] self.flow_history_size = 10 # 结果缓存 self.last_fused_velocity = None self.last_confidence = ConfidenceLevel.INVALID def update_yolo(self, detections, frame_idx): """ 更新 YOLO 检测结果 detections: list of {bbox, score, center, ...} """ for det in detections: track_id = det.get('track_id', id(det)) # 如果有追踪 ID center = det['center'] self.yolo_trajectories[track_id].append( (frame_idx, center[0], center[1]) ) # 限制轨迹长度 if len(self.yolo_trajectories[track_id]) > 60: self.yolo_trajectories[track_id] = \ self.yolo_trajectories[track_id][-60:] def update_flow(self, flow_velocities): """ 更新光流速度场 flow_velocities: [(x, y, vx, vy, speed), ...] """ if not flow_velocities: self.flow_velocity_history.append([]) else: speeds = [v[4] for v in flow_velocities] self.flow_velocity_history.append(speeds) # 限制历史长度 if len(self.flow_velocity_history) > self.flow_history_size: self.flow_velocity_history = \ self.flow_velocity_history[-self.flow_history_size:] def compute_fused_velocity(self, frame_idx): """ 计算当前帧的融合流速 Returns: { 'velocity': float, # 融合后的流速 (m/s) 'velocity_pixel': float, # 像素速度 'confidence': ConfidenceLevel, 'source': str, # 'yolo', 'flow', 'fused' 'n_yolo_tracks': int, 'n_flow_points': int, } """ # 1. 计算 YOLO 速度 yolo_velocities = [] for track_id, traj in self.yolo_trajectories.items(): v = self._compute_trajectory_velocity(traj) if v is not None: yolo_velocities.append(v) # 2. 计算光流速度 flow_speeds = [] for speeds in self.flow_velocity_history[-5:]: # 最近 5 帧 flow_speeds.extend(speeds) # 3. 融合 result = { 'n_yolo_tracks': len(yolo_velocities), 'n_flow_points': len(flow_speeds), } if yolo_velocities and flow_speeds: # 两者都有数据 yolo_speed = np.median(yolo_velocities) flow_speed = np.median(flow_speeds) # 一致性检查 max_speed = max(yolo_speed, flow_speed) if max_speed > 0: deviation = abs(yolo_speed - flow_speed) / max_speed else: deviation = 0 if deviation < self.consistency_threshold: # 一致 → 加权融合 fused = self.yolo_weight * yolo_speed + \ self.flow_weight * flow_speed result['velocity_pixel'] = fused result['confidence'] = ConfidenceLevel.MEDIUM result['source'] = 'fused' else: # 不一致 → 只用 YOLO result['velocity_pixel'] = yolo_speed result['confidence'] = ConfidenceLevel.HIGH result['source'] = 'yolo' result['warning'] = f'YOLO/Flow deviation: {deviation:.2f}' elif yolo_velocities: # 只有 YOLO result['velocity_pixel'] = np.median(yolo_velocities) result['confidence'] = ConfidenceLevel.HIGH result['source'] = 'yolo' elif flow_speeds: # 只有光流 if len(flow_speeds) >= 5: # 至少 5 个特征点 result['velocity_pixel'] = np.median(flow_speeds) result['confidence'] = ConfidenceLevel.LOW result['source'] = 'flow' else: result['confidence'] = ConfidenceLevel.INVALID else: result['confidence'] = ConfidenceLevel.INVALID # 像素速度 → 实际速度 if result.get('velocity_pixel') is not None: result['velocity'] = self._pixel_to_meters( result['velocity_pixel'] ) else: result['velocity'] = None self.last_fused_velocity = result self.last_confidence = result['confidence'] return result def _compute_trajectory_velocity(self, trajectory): """从轨迹计算速度(像素/秒)""" if len(trajectory) < 5: return None # 使用分段平均 segment_size = 10 velocities = [] for i in range(0, len(trajectory) - segment_size, segment_size // 2): seg = trajectory[i:i + segment_size] start_f, sx, sy = seg[0] end_f, ex, ey = seg[-1] dist = np.sqrt((ex - sx)**2 + (ey - sy)**2) time_sec = (end_f - start_f) / self.fps if time_sec > 0: velocities.append(dist / time_sec) if not velocities: return None return float(np.median(velocities)) def _pixel_to_meters(self, pixel_velocity): """像素速度转换为 m/s""" if self.H is not None: # 使用单应性矩阵(考虑透视) # 在画面中心采样转换因子 center_x = 320 # 假设 640 宽 center_y = 240 # 假设 480 高 pt = np.array([[[center_x, center_y]]], dtype=np.float32) pt_moved = np.array([[[center_x + 1, center_y]]], dtype=np.float32) real1 = cv2.perspectiveTransform(pt, self.H)[0][0] real2 = cv2.perspectiveTransform(pt_moved, self.H)[0][0] meter_per_pixel = np.linalg.norm(real2 - real1) return pixel_velocity * meter_per_pixel elif self.ppm: return pixel_velocity / self.ppm return pixel_velocity # 返回像素速度 def get_summary(self, window_frames=300): """获取最近 N 帧的流速摘要""" # 实现类似前面的统计摘要 pass def reset(self): """重置所有状态""" self.yolo_trajectories.clear() self.flow_velocity_history.clear() self.last_fused_velocity = None self.last_confidence = ConfidenceLevel.INVALID ``` --- ## 九、完整集成流水线 ### 9.1 主程序 ```python """ main.py — RV1126B 水流速度检测主程序 运行方式: python3 main.py --model yolo11n_water_flow_int8.rknn \ --input /dev/video0 \ --calibration calibration.json \ --output results/ """ import cv2 import json import time import argparse import numpy as np from pathlib import Path from datetime import datetime from yolo11_detector import YOLO11Detector from optical_flow import SparseOpticalFlow, OptimizedOpticalFlow from fusion_engine import VelocityFusionEngine from tracker import SimpleTracker # 下面会定义 class SimpleTracker: """ 轻量级追踪器 — RV1126B 版 不使用完整的 ByteTrack(太重), 用简化的 IoU 匹配追踪 """ def __init__(self, max_lost=10, min_iou=0.3): self.tracks = {} # track_id -> {last_bbox, last_center, lost_count} self.next_id = 0 self.max_lost = max_lost self.min_iou = min_iou def update(self, detections): """ 更新追踪状态 detections: [{bbox, score, center}, ...] Returns: [{bbox, score, center, track_id}, ...] """ if not detections: # 所有轨迹增加丢失计数 for track in self.tracks.values(): track['lost_count'] += 1 # 清理丢失的 self.tracks = { tid: t for tid, t in self.tracks.items() if t['lost_count'] <= self.max_lost } return [] # IoU 匹配 matched_det = set() for tid, track in list(self.tracks.items()): best_iou = 0 best_det_idx = -1 for i, det in enumerate(detections): if i in matched_det: continue iou = self._compute_iou(track['last_bbox'], det['bbox']) if iou > best_iou: best_iou = iou best_det_idx = i if best_iou >= self.min_iou: # 匹配成功 track['last_bbox'] = detections[best_det_idx]['bbox'] track['last_center'] = detections[best_det_idx]['center'] track['lost_count'] = 0 detections[best_det_idx]['track_id'] = tid matched_det.add(best_det_idx) else: track['lost_count'] += 1 # 未匹配的检测 → 新轨迹 for i, det in enumerate(detections): if i not in matched_det: new_id = self.next_id self.next_id += 1 det['track_id'] = new_id self.tracks[new_id] = { 'last_bbox': det['bbox'], 'last_center': det['center'], 'lost_count': 0, } # 清理 self.tracks = { tid: t for tid, t in self.tracks.items() if t['lost_count'] <= self.max_lost } return detections def _compute_iou(self, box1, box2): """计算 IoU""" x1 = max(box1[0], box2[0]) y1 = max(box1[1], box2[1]) x2 = min(box1[2], box2[2]) y2 = min(box1[3], box2[3]) inter_area = max(0, x2 - x1) * max(0, y2 - y1) box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1]) box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1]) union = box1_area + box2_area - inter_area return inter_area / union if union > 0 else 0 class WaterFlowVelocitySystem: """ 端到端水流速度检测系统 RV1126B 优化版本 """ def __init__(self, args): # 加载标定参数 with open(args.calibration) as f: calib = json.load(f) ppm = calib.get('pixels_per_meter') H = calib.get('homography_matrix') if H: H = np.array(H) # 初始化组件 print("Initializing YOLO11 detector...") self.detector = YOLO11Detector(args.model, use_npu=True) print("Initializing optical flow...") self.optical_flow = SparseOpticalFlow((640, 480)) print("Initializing tracker...") self.tracker = SimpleTracker() print("Initializing fusion engine...") self.fusion = VelocityFusionEngine( pixels_per_meter=ppm, homography_matrix=H, fps=args.fps, ) # 配置 self.fps = args.fps self.run_yolo_every = 2 # 每 2 帧跑一次 YOLO self.frame_count = 0 # 结果存储 self.results = [] self.output_dir = Path(args.output) self.output_dir.mkdir(parents=True, exist_ok=True) def run_video(self, input_source): """ 主处理循环 input_source: 摄像头设备路径 或 视频文件路径 """ cap = cv2.VideoCapture(input_source) if not cap.isOpened(): raise RuntimeError(f"Cannot open {input_source}") # 设置分辨率(降低到 640x480 以节省性能) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480) frame_idx = 0 fps_timer = [] print(f"Starting processing...") print(f"Input: {input_source}") print(f"YOLO runs every {self.run_yolo_every} frames") try: while True: frame_start = time.perf_counter() ret, frame = cap.read() if not ret: break # ---- 阶段 1: YOLO 检测(隔帧) ---- if frame_idx % self.run_yolo_every == 0: yolo_detections = self.detector.detect(frame) # 追踪 tracked = self.tracker.update(yolo_detections) # 更新融合引擎 self.fusion.update_yolo(tracked, frame_idx) else: # 不跑 YOLO,只更新追踪(预测模式) tracked = [] # ---- 阶段 2: 光流计算 ---- flow_vectors = self.optical_flow.process_frame( frame, yolo_detections if frame_idx % self.run_yolo_every == 0 else None ) self.fusion.update_flow(flow_vectors) # ---- 阶段 3: 融合 ---- fusion_result = self.fusion.compute_fused_velocity(frame_idx) # ---- 阶段 4: 可视化 ---- display_frame = self._visualize( frame, tracked, flow_vectors, fusion_result ) # ---- 阶段 5: 记录结果 ---- if frame_idx % self.fps == 0: # 每秒记录一次 self.results.append({ 'timestamp': datetime.now().isoformat(), 'frame': frame_idx, **fusion_result, }) # 性能计时 frame_time = time.perf_counter() - frame_start fps_timer.append(frame_time) if frame_idx % 30 == 0 and fps_timer: avg_fps = len(fps_timer) / sum(fps_timer) print(f"Frame {frame_idx}: {avg_fps:.1f} FPS | " f"Velocity: {fusion_result.get('velocity', 0):.3f} m/s | " f"Confidence: {fusion_result.get('confidence', 'N/A')}") # 显示(如果有屏幕) cv2.imshow("Water Flow Velocity", display_frame) if cv2.waitKey(1) & 0xFF == ord('q'): break frame_idx += 1 except KeyboardInterrupt: print("\nInterrupted by user") finally: cap.release() cv2.destroyAllWindows() self._save_results() # 最终统计 if fps_timer: avg_fps = len(fps_timer) / sum(fps_timer) print(f"\nAverage FPS: {avg_fps:.1f}") print(f"Total frames: {frame_idx}") def _visualize(self, frame, tracked, flow_vectors, fusion_result): """可视化输出""" display = frame.copy() # 绘制 YOLO 检测结果 for det in tracked: x1, y1, x2, y2 = [int(v) for v in det['bbox']] tid = det.get('track_id', 0) cv2.rectangle(display, (x1, y1), (x2, y2), (0, 255, 0), 2) cv2.putText(display, f"ID:{tid}", (x1, y1 - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) # 绘制光流向量 for (x1, y1, x2, y2, pid) in flow_vectors[:50]: # 最多 50 条 cv2.arrowedLine(display, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 1) # 绘制融合结果 velocity = fusion_result.get('velocity') confidence = fusion_result.get('confidence', 'invalid') if velocity is not None: # 颜色根据置信度 if confidence.value == 'high': color = (0, 255, 0) # 绿 elif confidence.value == 'medium': color = (0, 255, 255) # 黄 elif confidence.value == 'low': color = (0, 165, 255) # 橙 else: color = (0, 0, 255) # 红 cv2.putText(display, f"Velocity: {velocity:.3f} m/s", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2) cv2.putText(display, f"Confidence: {confidence.value}", (10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 1) # 绘制 FPS cv2.putText(display, f"Frame: {self.frame_count}", (10, frame.shape[0] - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1) return display def _save_results(self): """保存结果到文件""" output_file = self.output_dir / f"results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(output_file, 'w') as f: json.dump(self.results, f, indent=2) print(f"Results saved to {output_file}") def parse_args(): parser = argparse.ArgumentParser(description="Water Flow Velocity Detection on RV1126B") parser.add_argument('--model', required=True, help='Path to RKNN model') parser.add_argument('--input', default='/dev/video0', help='Camera device or video file') parser.add_argument('--calibration', required=True, help='Calibration JSON file') parser.add_argument('--output', default='./results', help='Output directory') parser.add_argument('--fps', type=int, default=30, help='Video FPS') return parser.parse_args() if __name__ == '__main__': args = parse_args() system = WaterFlowVelocitySystem(args) system.run_video(args.input) ``` ### 9.2 标定文件格式 ```json { "calibration_date": "2026-04-23", "location": "XX River, Section A", "method": "homography", "homography_matrix": [ [1.2e-3, 3.4e-5, -0.5], [2.1e-5, 1.1e-3, -0.3], [8.7e-7, 2.3e-7, 1.0] ], "control_points": [ { "pixel": [100, 200], "real": [0.0, 0.0] }, { "pixel": [500, 180], "real": [5.2, 0.3] }, { "pixel": [150, 600], "real": [0.5, 3.1] }, { "pixel": [600, 580], "real": [5.8, 3.4] } ], "camera": { "resolution": "1920x1080", "fps": 30, "height_above_water": 3.5, "angle_degrees": 45 }, "validation": { "mean_error_meters": 0.15, "max_error_meters": 0.28, "num_validation_points": 4 } } ``` --- ## 十、构建与部署流程 ### 10.1 完整构建脚本 ```bash #!/bin/bash # build_rv1126b.sh — 完整的构建和部署流程 set -e echo "=== YOLO11 + RV1126B Water Flow Detection Build ===" # ========== 阶段 1: 训练模型 (x86 主机) ========== echo "" echo "=== Phase 1: Training YOLO11 ===" python train_yolo11.py \ --data water_flow.yaml \ --epochs 150 \ --imgsz 640 \ --batch 16 \ --device 0 echo "Training complete!" # ========== 阶段 2: 导出 ONNX ========== echo "" echo "=== Phase 2: Exporting ONNX ===" python export_onnx.py \ --model runs/detect/water_flow_yolo11n/weights/best.pt \ --imgsz 320 \ --output yolo11n_water_flow.onnx echo "ONNX export complete!" # ========== 阶段 3: 转换为 RKNN INT8 ========== echo "" echo "=== Phase 3: Converting to RKNN (INT8) ===" python export_rknn.py \ --onnx yolo11n_water_flow.onnx \ --output yolo11n_water_flow_int8.rknn \ --calibration-dir ./calibration_images \ --target rv1126b echo "RKNN conversion complete!" # ========== 阶段 4: 部署到 RV1126B ========== echo "" echo "=== Phase 4: Deploying to RV1126B ===" # 传输到设备 scp yolo11n_water_flow_int8.rknn \ calibration.json \ main.py \ yolo11_detector.py \ optical_flow.py \ fusion_engine.py \ root@192.168.1.100:/app/water_flow/ echo "Deployment complete!" echo "" echo "To run on RV1126B:" echo " ssh root@192.168.1.100" echo " cd /app/water_flow" echo " python3 main.py --model yolo11n_water_flow_int8.rknn \\" echo " --input /dev/video0 \\" echo " --calibration calibration.json" ``` ### 10.2 RV1126B SDK 集成 ```bash # 如果使用 Rockchip 官方 SDK 构建完整固件 # 1. 获取 RV1126B SDK repo init --repo-url https://github.com/rockchip-linux/repo \ -u https://github.com/rockchip-linux/manifests \ -b master -m rv1126b.xml repo sync # 2. 编译 SDK ./build.sh lunch # 选择 rv1126b ./build.sh # 3. 将应用加入 Buildroot # 在 external/ 下创建自定义包 # 4. 烧录固件 ./build.sh updateimg ./upgrade_tool ul RV1126B_XXX.img ``` ### 10.3 C++ 推理(生产环境推荐) ```cpp /* * yolo11_rknn.cpp — C++ 版 RKNN 推理(性能更好) * * 编译: * arm-linux-gnueabihf-g++ yolo11_rknn.cpp -o yolo11_detect \ * -lrknnrt -lopencv_core -lopencv_imgproc -lopencv_video \ * -I/usr/include/rknn */ #include #include #include #include #include class YOLO11RKNN { public: YOLO11RKNN(const char* model_path) { // 加载 RKNN 模型 FILE* fp = fopen(model_path, "rb"); fseek(fp, 0, SEEK_END); size_t size = ftell(fp); fseek(fp, 0, SEEK_SET); void* model_data = malloc(size); fread(model_data, 1, size, fp); fclose(fp); // 初始化 int ret = rknn_init(&ctx_, model_data, size, 0, NULL); if (ret < 0) { std::cerr << "rknn_init failed: " << ret << std::endl; exit(-1); } free(model_data); // 获取输入输出信息 rknn_input_output_num io_num; rknn_query(ctx_, RKNN_QUERY_IN_OUT_NUM, &io_num, sizeof(io_num)); std::cout << "Input count: " << io_num.n_input << std::endl; std::cout << "Output count: " << io_num.n_output << std::endl; } std::vector detect(const cv::Mat& frame) { // 预处理 cv::Mat input = preprocess(frame); // 设置输入 rknn_input inputs[1]; memset(inputs, 0, sizeof(inputs)); inputs[0].index = 0; inputs[0].type = RKNN_TENSOR_UINT8; inputs[0].size = 320 * 320 * 3; inputs[0].fmt = RKNN_TENSOR_NHWC; inputs[0].buf = input.data; rknn_inputs_set(ctx_, 1, inputs); // 推理 rknn_run(ctx_, nullptr); // 获取输出 rknn_output outputs[1]; memset(outputs, 0, sizeof(outputs)); outputs[0].want_float = 1; rknn_outputs_get(ctx_, 1, outputs, NULL); // 后处理 auto detections = postprocess(outputs[0].buf, outputs[0].size); rknn_outputs_release(ctx_, 1, outputs); return detections; } ~YOLO11RKNN() { rknn_destroy(ctx_); } private: rknn_context ctx_; cv::Mat preprocess(const cv::Mat& frame) { cv::Mat resized, padded; cv::resize(frame, resized, cv::Size(320, 320)); cv::cvtColor(resized, padded, cv::COLOR_BGR2RGB); return padded; } // ... (后处理实现) }; struct Detection { float x1, y1, x2, y2; float confidence; int class_id; int track_id; }; ``` --- ## 十一、性能优化清单 ### 11.1 RV1126B 极限优化 ``` 目标:在 RV1126B 上达到 15-20 FPS ┌──────────────────┬──────────┬────────────────────────────┐ │ 优化项 │ 预期收益 │ 实现难度 │ ├──────────────────┼──────────┼────────────────────────────┤ │ YOLO11n INT8 │ 基准 │ - │ │ 输入 320→256 │ +20% FPS │ 低(需重新训练/微调) │ │ 隔帧检测 │ +50% FPS │ 极低 │ │ C++ 推理 │ +10% FPS │ 中 │ │ RGA 硬件缩放 │ +5% FPS │ 中(RV1126B 有 RGA 单元)│ │ 光流隔帧 │ +50% CPU │ 极低 │ │ 光流降采样 4x │ +300% CPU │ 极低 │ │ 内存预分配 │ -10% 抖动 │ 中 │ │ NEON 优化 │ +30% CPU │ 高 │ └──────────────────┴──────────┴────────────────────────────┘ 推荐组合: YOLO11n INT8 320×320 + 隔帧检测 + C++ 推理 + RGA 缩放 → 预期 15-20 FPS(YOLO)+ 光流并行 光流单独: 降采样 4x + 隔帧 + ROI 裁剪 → 预期 15-25 FPS(CPU) 综合:15 FPS 以上(两者交替执行) ``` ### 11.2 内存管理 ```python # RV1126B 内存优化 class MemoryOptimizedPipeline: """ RV1126B 只有 256-512MB 内存 需要严格管理 """ def __init__(self): # 预分配所有 buffer self.frame_buffer = np.zeros((480, 640, 3), dtype=np.uint8) self.gray_buffer1 = np.zeros((480, 640), dtype=np.uint8) self.gray_buffer2 = np.zeros((480, 640), dtype=np.uint8) self.small_gray1 = np.zeros((120, 160), dtype=np.uint8) self.small_gray2 = np.zeros((120, 160), dtype=np.uint8) self.rknn_input = np.zeros((320, 320, 3), dtype=np.uint8) self.flow_field = np.zeros((120, 160, 2), dtype=np.float32) # 复用而不是重新分配 # ... def process(self, raw_frame): # 直接拷贝到预分配 buffer np.copyto(self.frame_buffer[:raw_frame.shape[0], :raw_frame.shape[1]], raw_frame) # 而不是: gray = cv2.cvtColor(raw_frame, cv2.COLOR_BGR2GRAY) # ... ``` --- ## 十二、测试与验证 ### 12.1 精度验证流程 ``` 步骤 1:实验室验证 ├── 在已知流速的水槽中测试 ├── 对比 YOLO+光流 结果与标准流速仪 └── 记录误差分布 步骤 2:现场验证 ├── 在实际河道部署 ├── 同步使用浮标法或 ADCP 测量 ├── 在不同流速下测试(低/中/高) └── 记录不同光照条件下的表现 步骤 3:长期稳定性 ├── 连续运行 7 天 ├── 监控内存泄漏 ├── 监控精度漂移 └── 记录设备稳定性 ``` ### 12.2 基准测试脚本 ```python """ benchmark.py — 在 RV1126B 上运行基准测试 """ import time import numpy as np import cv2 def benchmark_yolo(detector, num_runs=100): """YOLO 推理基准测试""" test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8) # 预热 for _ in range(10): detector.detect(test_frame) # 测试 times = [] for _ in range(num_runs): start = time.perf_counter() detector.detect(test_frame) times.append(time.perf_counter() - start) avg = np.mean(times) p50 = np.percentile(times, 50) p95 = np.percentile(times, 95) p99 = np.percentile(times, 99) print(f"YOLO11 Benchmark ({num_runs} runs):") print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)") print(f" P50: {p50*1000:.1f}ms") print(f" P95: {p95*1000:.1f}ms") print(f" P99: {p99*1000:.1f}ms") print(f" Min: {min(times)*1000:.1f}ms") print(f" Max: {max(times)*1000:.1f}ms") def benchmark_optical_flow(flow_processor, num_runs=100): """光流基准测试""" frame1 = np.random.randint(0, 255, (120, 160), dtype=np.uint8) frame2 = np.random.randint(0, 255, (120, 160), dtype=np.uint8) times = [] for _ in range(num_runs): start = time.perf_counter() cv2.calcOpticalFlowFarneback(frame1, frame2, None, pyr_scale=0.5, levels=3, winsize=15, iterations=3, poly_n=5, poly_sigma=1.2, flags=0) times.append(time.perf_counter() - start) frame1, frame2 = frame2, frame1 avg = np.mean(times) print(f"Optical Flow Benchmark ({num_runs} runs):") print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)") print(f" P95: {np.percentile(times, 95)*1000:.1f}ms") def benchmark_full_pipeline(system, num_frames=300): """完整流水线基准测试""" test_frame = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8) times = [] for i in range(num_frames): start = time.perf_counter() # 模拟一帧的处理 if i % 2 == 0: system.detector.detect(test_frame) system.optical_flow.process_frame(test_frame) elapsed = time.perf_counter() - start times.append(elapsed) avg = np.mean(times) print(f"Full Pipeline Benchmark ({num_frames} frames):") print(f" Average: {avg*1000:.1f}ms ({1/avg:.1f} FPS)") print(f" P95: {np.percentile(times, 95)*1000:.1f}ms") if __name__ == '__main__': print("=" * 50) print("RV1126B Benchmark Suite") print("=" * 50) # 需要先在 RV1126B 上初始化组件 # benchmark_yolo(detector) # benchmark_optical_flow(flow) # benchmark_full_pipeline(system) ``` --- ## 十三、项目文件结构 ``` water-flow-rv1126b/ ├── training/ # 训练阶段(x86 主机) │ ├── train_yolo11.py # YOLO11 训练脚本 │ ├── export_onnx.py # ONNX 导出 │ ├── export_rknn.py # RKNN 转换 │ ├── water_flow.yaml # 数据集配置 │ └── calibration_images/ # INT8 校准数据 │ ├── deployment/ # 部署阶段(RV1126B) │ ├── main.py # 主程序 │ ├── yolo11_detector.py # YOLO11 RKNN 推理 │ ├── optical_flow.py # 光流模块 │ ├── fusion_engine.py # 融合引擎 │ ├── tracker.py # 轻量追踪器 │ ├── calibration.json # 标定参数 │ └── yolo11n_water_flow_int8.rknn # RKNN 模型 │ ├── cpp/ # C++ 生产版本 │ ├── yolo11_rknn.cpp │ ├── optical_flow.cpp │ ├── fusion_engine.cpp │ ├── main.cpp │ └── CMakeLists.txt │ ├── benchmark/ │ └── benchmark.py # 性能基准测试 │ ├── docs/ │ └── calibration_guide.md # 标定操作指南 │ └── scripts/ └── build_rv1126b.sh # 构建部署脚本 ``` --- ## 十四、快速启动检查清单 ``` [ ] 1. 在 x86 主机上安装 Ultralytics + RKNN Toolkit2 [ ] 2. 采集 1000+ 张水面示踪物图像 [ ] 3. 标注数据(COCO/YOLO 格式) [ ] 4. 训练 YOLO11n 模型(mAP@50 > 0.85) [ ] 5. 导出 ONNX (imgsz=320) [ ] 6. 准备 100-200 张校准图片 [ ] 7. 转换为 RKNN INT8 模型 [ ] 8. 在主机上模拟 RV1126B 推理验证 [ ] 9. 传输到 RV1126B 开发板 [ ] 10. 执行相机标定,生成 calibration.json [ ] 11. 在 RV1126B 上运行基准测试 [ ] 12. 连接相机,端到端测试 [ ] 13. 与传统方法对比验证精度 [ ] 14. 调优参数(检测阈值、光流参数、融合权重) ``` --- *创建:2026-04-23 | 平台:RV1126B | 模型:YOLO11n*