- 2025-04-03
-
发表了主题帖:
【嵌入式Rust修炼营】初级任务1和2:Rust环境搭建,冒泡排序和文件打印输出
本帖最后由 qianmo2001 于 2025-4-3 21:41 编辑
很荣幸有机会参与EEWorld举办的Rust修炼营活动,本次活动使用Py32F030 Rust开发板,并在windows环境下对该开发板进行嵌入式Rust开发。Rust开发板如下:
开发环境搭建:
在Windows下进行Rust嵌入式开发需要MSCV开发组件,可通过[Microsoft C++ 生成工具 - Visual Studio](https://visualstudio.microsoft.com/zh-hans/visual-cpp-build-tools/)进行安装,无需安装Visual Studio本体,仅安装以下组件
MSVC v143 - VS 2022 C++ x64/x86 build tools (Latest)
Windows 11 SDK (10.0.22621.0)
安装完成后 从Rust的官网下载 rustup-init:https://www.rust-lang.org/zh-CN/双击运行,在CMD终端窗口一直回车即可。
安装完成后即可创建Rust项目
初级任务一:使用冒泡排序处理数组并打印
use rand::random_range;
const TEST_ARR_COUNT: usize = 16;
fn bubble_sort(arr: &mut [i32]) {
for i in 0..arr.len() - 1 {
for j in i + 1..arr.len() {
if arr[i] > arr[j] {
let tmp = arr[i];
arr[i] = arr[j];
arr[j] = tmp;
}
}
}
}
fn main() {
let mut test_arr: [i32; TEST_ARR_COUNT] = [0; TEST_ARR_COUNT];
for t in &mut test_arr {
*t = random_range(-100..100);
}
println!("befor: {:?}", test_arr);
bubble_sort(&mut test_arr);
println!("after: {:?}", test_arr);
}
运行结果
初级任务二:使用fs接口,打印文本文件
use std::{
fs::File,
io::{self, BufRead},
};
fn main() {
let arg = std::env::args();
let files: Vec<_> = arg.collect();
println!("file count: {}", files.len());
let mut iter = files.iter();
println!("exe name: {}", iter.next().unwrap());
while let Some(file_name) = iter.next() {
println!("file: {}", file_name);
match File::open(file_name) {
Ok(file) => {
let reader = io::BufReader::new(file);
for line in reader.lines() {
if let Ok(line) = line {
println!("{}", line)
}
}
}
Err(e) => {
println!("{:?}", e)
}
}
}
}
运行结果
- 2025-03-31
-
加入了学习《直播回放: 嵌入式Rust入门基础知识、解析动手实战Rust的三个任务》,观看 嵌入式Rust入门基础知识、解析动手实战Rust的三个任务
- 2025-03-18
-
回复了主题帖:
嵌入式Rust修炼营入围名单来啦,一起度过嵌入式Rust学习时光的小伙伴集合啦~
个人信息无误
- 2025-03-09
-
回复了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】5.使用开发板实机运行异常检测AI模型
小新生 发表于 2025-3-7 19:35
运行 SDK 里的 external/rknn-toolkit2/rknn-toolkit-lite2/examples/resnet18/ 的 demo 是可以的,说明开 ...
感谢回复,BUG改好了
-
发表了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】8.实时读取传感器数据,并实时推理
本帖最后由 qianmo2001 于 2025-3-9 22:12 编辑
在前面的内容中,我们已经完成了异常检测模型的设计,训练及部署,同时也完成了传感器数据的实时采集与显示。
接下来,我们将基于米尔 瑞芯微RK3576 工业AI开发板完成传感器数据的实时采集,并进行时序异常实时推理。
实时检测流程
数据采集:ADC 模块持续采集传感器数据,并存入缓冲区。
数据预处理:实时将采集到的数据转换为固定长度的时间序列,形成输入张量。
模型推理:利用 RKNN 部署的模型对数据进行前向计算,获取重构误差。
异常判断:比较重构误差与预设阈值,若超过阈值则标记为异常,并及时触发报警或记录日志。
采集数据方式
使用RK3567内置ADC采集传感器数据
def read_adc(channel):
raw_path = f"/sys/bus/iio/devices/iio:device0/in_voltage{channel}_raw"
scale_path = f"/sys/bus/iio/devices/iio:device0/in_voltage_scale"
with open(raw_path, 'r') as f_raw, open(scale_path, 'r') as f_scale:
raw = int(f_raw.read().strip())
scale = float(f_scale.read().strip())
return raw * scale
生成随机数模拟ADC采集数据
# 模拟 ADC 采样函数(实际使用时请替换为真实 ADC 读取代码)
def simulate_adc():
# 返回 0~2伏之间的随机电压值
return np.random.uniform(0, 2)
从npy文件中读取ADC采集数据
# 从 npy 文件中加载 ADC 数据(假设文件中存储一维数组)
adc_data = np.load('dataset/UCR/test.npy') # 请替换为实际文件路径
data_length = len(adc_data)
print("len(adc_data):",len(adc_data))
adc_index = 0 # 全局索引,记录当前采样位置 # 模拟 ADC 数据读取函数:每次从 adc_data 中读取一个数据,循环读取
def read_adc_from_file():
global adc_index, data_length, adc_data
sample = adc_data[adc_index]
adc_index = (adc_index + 1) % data_length
return sample
完整代码实现
from threading import Thread
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import numpy as np
import time
import matplotlib.pyplot as plt
from rknnlite.api import RKNNLite
# 全局参数
BUFFER_SIZE = 200 # ADC 数据缓冲区大小(显示窗口为200个采样点)
MODEL_WINDOW = 16 # 模型输入窗口大小
MAX_SCORE_HISTORY = 200 # 异常分数历史记录窗口大小
# 全局缓冲区,保存最新200个采样值
adc_buffer = np.zeros(BUFFER_SIZE)
# 保存历史异常分数
anomaly_scores = []
# 模拟 ADC 采样函数(实际使用时请替换为真实 ADC 读取代码)
def simulate_adc():
# 返回 0~2伏之间的随机电压值
return np.random.uniform(0, 2)
# 数据采集线程:不断更新 adc_buffer
class ADCReader(Thread):
def __init__(self):
super().__init__()
self.running = True
def run(self):
global adc_buffer
while self.running:
voltage = simulate_adc() # 模拟读取 ADC 电压
adc_buffer = np.roll(adc_buffer, -1)
adc_buffer[-1] = voltage
time.sleep(0.01) # 采样间隔10ms
def stop(self):
self.running = False
# 2. 创建 RKNN 对象
rknn = RKNNLite()
# 6. 导出 RKNN 模型
rknn_model_path = "tcn_autoencoder.rknn"
# 7. 加载 RKNN 模型
rknn.load_rknn(rknn_model_path)
# 8. 初始化 RKNN 模型
rknn.init_runtime(core_mask=RKNNLite.NPU_CORE_0)
# 初始化图形窗口,创建两个子图
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(8, 6))
# 上子图显示 ADC 数据,x轴为最近200个采样点
line1, = ax1.plot(np.arange(BUFFER_SIZE), adc_buffer, 'b-o', label="ADC Data")
ax1.set_title("Real-time ADC Data (Last 200 Samples)")
ax1.set_xlabel("Sample Index")
ax1.set_ylabel("Voltage (V)")
ax1.set_ylim(0, 2.5)
# 下子图显示历史异常分数,x轴范围固定为200
line2, = ax2.plot([], [], 'r-', label="Anomaly Score")
ax2.set_title("Anomaly Score Over Time")
ax2.set_xlabel("Time")
ax2.set_ylabel("MSE")
ax2.set_xlim(0, MAX_SCORE_HISTORY)
ax2.set_ylim(0, 50)
# 更新函数:预处理数据、模型推理并更新图形
def update(frame):
global anomaly_scores, adc_buffer
# 从 adc_buffer 中取出最新MODEL_WINDOW个采样点用于模型推理
window_data = adc_buffer[-MODEL_WINDOW:]
# 预处理:转换为 numpy 数组,并 reshape 为 (1,1,1,16)
input_np = np.array(window_data, dtype=np.float32).reshape(1, 1, 1, MODEL_WINDOW)
inputs = [input_np]
outputs = rknn.inference(inputs=inputs, data_format=['nchw'])
# 获取推理结果
output_data = outputs[0] # 获取模型输出(第一个输出)
print("output_data:", output_data.shape)
# 计算重建误差(均方误差)
reconstruction_error = np.mean((output_data - input_np) ** 2, axis=(2, 3)) # 计算每个样本的均方误差
score = reconstruction_error.item()
# 更新异常分数历史记录
anomaly_scores.append(score)
if len(anomaly_scores) > MAX_SCORE_HISTORY:
anomaly_scores.pop(0)
# 更新上子图:ADC 数据
line1.set_ydata(adc_buffer)
# 更新下子图:异常分数曲线
x_vals = np.arange(len(anomaly_scores))
line2.set_data(x_vals, anomaly_scores)
ax2.set_xlim(0, MAX_SCORE_HISTORY)
return line1, line2
# 启动 ADC 数据采集线程
adc_reader = ADCReader()
adc_reader.start()
# 启动动画:每100ms刷新一次
ani = FuncAnimation(fig, update, interval=100, blit=True)
plt.tight_layout()
plt.show()
# 程序退出时清理线程
adc_reader.stop()
adc_reader.join()
rknn.release()
从npy文件读取ADC采集数据,并实时推理,结果如下图(上部分为ADC实时数据,下部分为推理结果):
挑战与展望
实时性要求:在边缘设备上实现实时检测要求数据采集、预处理与模型推理三者协同高效运行。需要优化 ADC 采集和数据传输速度,同时采用轻量级模型结构和量化技术提升推理速度。
噪声与干扰:实际采集的传感器数据可能存在噪声。需要在预处理阶段设计合适的滤波算法,降低误检率。
模型适应性:工业环境下的工作状态复杂多变,模型需要通过大量数据训练来提升泛化能力,避免过拟合正常状态。
总结
利用 RK3576 开发板结合 ADC 采集真实传感器数据,再通过基于 TCN 自编码器的异常检测模型,可以实现边缘设备上的实时状态监测。该方案不仅能提升设备故障预警的时效性,还能降低人工监控成本,为工业自动化和智能制造提供有力支持。
希望本文能为同样致力于边缘智能检测的开发者提供参考和借鉴,欢迎大家在评论区交流讨论,共同推动工业智能化的发展!
-
发表了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】6.重构时序异常检测模型
本帖最后由 qianmo2001 于 2025-3-9 17:56 编辑
【米尔 瑞芯微RK3576 工业AI开发板】6.重构时序异常检测模型
在前面的测试中,时序异常检测模型可以在PC端仿真推理,但在开发板实机部署时,却在终端抛出“已放弃” 后,异常终止。
在米尔官方技术支持的帮助下,怀疑实际传入的 numpy 数组形状、数据类型,与模型期望的输入不完全一致。
在对米尔的回复内容进行分析(如下图)和对代码进行审查后,主要进行了以下三部分更改:
取消了模型的动态形状(dynamic_shape)支持,固定输入数据形状;
将异常检测 AI 模型中使用的1D卷积替换成2D卷积,用类似处理图像的方式处理时序数据;
将输入数据的维度从三维更改为四维,将输入数据格式与RKNN支持的数据格式NCHW匹配。
在对整个时序异常检测模型进行重构后,问题解决。
在米尔 瑞芯微RK3576 工业AI开发板上运行结果如下图:
重构后的异常检测代码如下(包括模型的训练和onnx模型导出):
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.onnx
from torch.utils.data import DataLoader, TensorDataset
from torch.nn import functional as F
from collections import deque
# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 加载数据
def load_data(file_path):
data = np.load(file_path)
return data
# 滑动窗口切割函数
def sliding_window(data, window_size, step_size):
X = []
for i in range(0, len(data) - window_size, step_size):
window = data[i:i + window_size]
X.append(window)
return np.array(X)
# 自编码器 + TCN 模型
class TCN_Autoencoder(nn.Module):
def __init__(self, input_size, hidden_size, kernel_size=3, num_layers=3):
super(TCN_Autoencoder, self).__init__()
# TCN Encoder
self.encoder = nn.Sequential(
nn.Conv2d(input_size, hidden_size, kernel_size=(1, kernel_size), padding=(0,kernel_size // 2)),
nn.ReLU(),
nn.Conv2d(hidden_size, hidden_size, kernel_size=(1, kernel_size), padding=(0,kernel_size // 2)),
nn.ReLU(),
)
# TCN Decoder
self.decoder = nn.Sequential(
nn.ConvTranspose2d(hidden_size, hidden_size, kernel_size=(1, kernel_size), padding=(0,kernel_size // 2)),
nn.ReLU(),
nn.ConvTranspose2d(hidden_size, input_size, kernel_size=(1, kernel_size), padding=(0,kernel_size // 2)),
)
def forward(self, x):
# 输入 x 的形状应为 (batch_size, channels, seq_len)
x = self.encoder(x)
x = self.decoder(x)
return x
# 模型训练
def train_model(model, train_loader, num_epochs=15, lr=1e-3):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
for data in train_loader:
inputs = data[0].float().to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, inputs)
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader)}")
return model
# 模型推断
def detect_anomalies(model, data, threshold=0.1):
model.eval()
with torch.no_grad():
# data 此时为 torch.Tensor
inputs = data.float().to(device)
outputs = model(inputs)
loss = F.mse_loss(outputs, inputs, reduction='none')
loss = loss.mean(dim=1) # 求均值
anomalies = loss > threshold # 设定阈值
return anomalies.cpu().numpy(), loss.cpu().numpy()
# 主程序
if __name__ == "__main__":
# 数据加载
file_path = 'dataset/UCR/train.npy' # 替换为你的npy文件路径
data = load_data(file_path)
# 预处理数据:使用滑动窗口切割
window_size = 16
step_size = 1
data_windows = sliding_window(data, window_size, step_size)
print("Data windows dtype:", data_windows.dtype)
# 转换为 torch.Tensor 并进行转置
# 原始数据假设形状为 (num_windows, window_size, num_features)
# 转换后调整为 (num_windows, num_features, window_size) 以符合 nn.Conv1d 输入要求
data_windows = torch.from_numpy(data_windows).float().to(device)
data_windows = data_windows.transpose(1, 2)
data_windows = data_windows.unsqueeze(1)
# 数据集准备
train_loader = DataLoader(TensorDataset(data_windows), batch_size=32, shuffle=False)
# 定义模型,此时 input_size 为通道数(即 num_features)
input_size = data_windows.shape[2]
hidden_size = 64
model = TCN_Autoencoder(input_size, hidden_size).to(device)
# 训练模型
model = train_model(model, train_loader)
# 异常检测
anomalies, losses = detect_anomalies(model, data_windows)
print(f"Anomalies detected: {np.sum(anomalies)}")
import torch.onnx
import onnx
import onnxruntime as ort
# 导出 ONNX 模型
model.eval() # 确保模型处于 eval 模式
dummy_input = torch.randn(1,1, input_size, window_size).to(device) # 创建与模型输入形状一致的虚拟输入
onnx_filename = "tcn_autoencoder.onnx"
# torch.onnx.export(
# model,
# dummy_input,
# onnx_filename,
# export_params=True,
# opset_version=11,
# do_constant_folding=True,
# input_names=['input'],
# output_names=['output'],
# dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
# )
torch.onnx.export(
model,
dummy_input,
onnx_filename,
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['input'],
output_names=['output']
)
print(f"ONNX模型已导出至 {onnx_filename}")
# 使用 ONNX Runtime 测试导出的 ONNX 模型
# 加载 ONNX 模型并检查模型有效性
onnx_model = onnx.load(onnx_filename)
onnx.checker.check_model(onnx_model)
print("ONNX 模型检查通过。")
# 创建 ONNX Runtime 推理会话
ort_session = ort.InferenceSession(onnx_filename)
# 准备测试数据(这里使用训练数据的一个 batch 作示例)
test_input = data_windows[:1].cpu().numpy() # 形状为 (batch_size, num_features, window_size)
print("ONNX 模型输入的形状:", test_input.shape)
# 构造 ONNX Runtime 的输入字典
ort_inputs = {ort_session.get_inputs()[0].name: test_input}
# 进行推理
ort_outputs = ort_session.run(None, ort_inputs)
print("ONNX 模型输出的形状:", ort_outputs[0].shape)
# 对比推理结果(可选):计算均方误差
# 将 ONNX 输出转换为 torch.Tensor
onnx_result = torch.from_numpy(ort_outputs[0])
# 使用模型在 PyTorch 上计算输出
# with torch.no_grad():
# torch_output = model(data_windows[:32])
#
# mse = F.mse_loss(torch_output, onnx_result)
# print("PyTorch 与 ONNX 模型输出间的均方误差:", mse.item())
-
发表了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】7.传感器数据读取,并实时显示
本帖最后由 qianmo2001 于 2025-3-9 17:57 编辑
在进行时间序列异常检测时,我们希望采集的数据来自真实的传感器,并能实时检测设备及环境的异常状况。
在本方案中,ADC 用于实时采集传感器信号。通过配置 ADC 模块参数(如采样率、分辨率等),可以将模拟信号转换为数字信号,并通过嵌入式程序实时传输给处理单元进行预处理和后续异常检测。
使用RK3567内置ADC采集传感器数据
ADC通道确认
查阅RK3567开发板原理图,找到可用的内置ADC引脚(SAR ADC_VIN4)如下图。
确保输入信号在ADC参考电压范围内(0~1.8V)。若传感器输出超压,需设计分压电路。
查看ADC设备节点
# 查看ADC设备
ls /sys/bus/iio/devices/iio:device*
cat /sys/bus/iio/devices/iio:device0/name
Python代码实现(基于sysfs)
import time
def read_adc(channel):
raw_path = f"/sys/bus/iio/devices/iio:device0/in_voltage{channel}_raw"
scale_path = f"/sys/bus/iio/devices/iio:device0/in_voltage_scale"
with open(raw_path, 'r') as f_raw, open(scale_path, 'r') as f_scale:
raw = int(f_raw.read().strip())
scale = float(f_scale.read().strip())
voltage = raw * scale
return voltage
# 示例:连续采集通道0
while True:
voltage = read_adc(4)
print(f"电压值: {voltage:.3f} V")
time.sleep(0.001) # 采样间隔1ms(实际受系统调度限制)
ADC采集数据实时显示
方案一 Tkinter动态波形绘制
特点
零额外依赖:仅使用Python标准库(tkinter)
轻量级:代码行数少,资源占用低
实时更新:支持动态刷新波形
代码实现
import sys
import time
import numpy as np
import tkinter as tk
def read_adc(adc_channel):
raw_path = f"/sys/bus/iio/devices/iio:device0/in_voltage{adc_channel}_raw"
scale_path = f"/sys/bus/iio/devices/iio:device0/in_voltage_scale"
try:
with open(raw_path, 'r') as f_raw, open(scale_path, 'r') as f_scale:
raw = int(f_raw.read().strip())
scale = float(f_scale.read().strip())
voltage = raw * scale
return voltage
except Exception as e:
print(f"读取ADC失败: {e}")
return None
import tkinter as tk
import random # 模拟ADC数据,实际替换为读取函数
class SimpleADCPlot:
def __init__(self, root):
self.root = root
self.root.title("ADC波形显示")
# 画布设置
self.width, self.height = 800, 300
self.canvas = tk.Canvas(root, width=self.width, height=self.height, bg='black')
self.canvas.pack()
# 数据缓冲区
self.buffer_size = 100
self.data = [0] * self.buffer_size
# 定时器启动
self.update_plot()
def read_adc(self):
"""模拟ADC读取(替换为实际读取代码)"""
voltage = read_adc(4)*0.001
return voltage # 假设参考电压1.8V
def update_plot(self):
# 更新数据
self.data.pop(0)
self.data.append(self.read_adc())
# 清空画布
self.canvas.delete("all")
# 绘制波形
x_step = self.width / self.buffer_size
y_scale = self.height / 1.8 # 1.8V满量程
points = []
for i, val in enumerate(self.data):
x = i * x_step
y = self.height - val * y_scale # 翻转Y轴
points.extend([x, y])
self.canvas.create_line(points, fill="cyan", width=2)
# 定时刷新(间隔50ms ≈ 20FPS)
self.root.after(20, self.update_plot)
if __name__ == "__main__":
root = tk.Tk()
app = SimpleADCPlot(root)
root.mainloop()
运行结果
方案二 Matplotlib动画
特点
丰富的绘图功能:支持多种图表类型
数据导出方便:直接保存为图片或PDF
适合调试:交互式探索数据
实现代码
import time
import numpy as np
from threading import Thread
from matplotlib import pyplot as plt
from matplotlib.animation import FuncAnimation
# 全局变量
BUFFER_SIZE = 200
data = np.zeros(BUFFER_SIZE)
timestamps = np.arange(BUFFER_SIZE)
def read_adc(channel):
raw_path = f"/sys/bus/iio/devices/iio:device0/in_voltage{channel}_raw"
scale_path = f"/sys/bus/iio/devices/iio:device0/in_voltage_scale"
with open(raw_path, 'r') as f_raw, open(scale_path, 'r') as f_scale:
raw = int(f_raw.read().strip())
scale = float(f_scale.read().strip())
return raw * scale
# 数据采集线程
class ADCReader(Thread):
def __init__(self):
super().__init__()
self.running = True
def run(self):
global data
while self.running:
voltage = read_adc(4)*0.001 # 调用前面的read_adc函数
data = np.roll(data, -1)
data[-1] = voltage
time.sleep(0.001) # 采样间隔1ms
def stop(self):
self.running = False
# 初始化图形
fig, ax = plt.subplots()
line, = ax.plot(timestamps, data, 'r-')
ax.set_ylim(-0.2, 2.0) # 根据ADC参考电压调整
ax.set_xlabel('Samples')
ax.set_ylabel('Voltage (V)')
# 更新函数
def update(frame):
line.set_ydata(data)
return line,
# 启动线程和动画
reader = ADCReader()
reader.start()
ani = FuncAnimation(fig, update, interval=50, blit=True) # 50ms刷新
plt.show()
# 退出时清理
reader.stop()
reader.join()
运行结果
- 2025-03-06
-
回复了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】Deepseek-R1本地板级部署极速体验
lugl4313820 发表于 2025-3-5 21:55
部署后,使用对话,返回的效果如何,速度飞得起吗?
这才1.5b,没啥问题,内存加到8G,7b也能跑
- 2025-03-01
-
发表了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】Deepseek-R1本地板级部署极速体验
本帖最后由 qianmo2001 于 2025-3-1 18:53 编辑
【米尔 瑞芯微RK3576 工业AI开发板】Deepseek-R1本地板级部署快速体验
开发板配置:RK3567,4+32G(内存4G,emmc存储32G)
大模型版本:DeepSeek-R1-Distill-Qwen-1.5B
模型下载链接:https://console.box.lenovo.com/l/l0tXb8 密码:rkllm
模型文件:DeepSeek-R1-Distill-Qwen-1.5B_W4A16_RK3576.rkllm
测试代码链接:GitHub - airockchip/rknn-llm
以下所有操作均在米尔 瑞芯微RK3576 工业AI开发板上进行:
在开发板上下载上述测试代码并解压,进入如下解压目录:
cd rknn-llm-main/examples/DeepSeek-R1-Distill-Qwen-1.5B_Demo/deploy
修改build-linux.sh中编译工具目录为
GCC_COMPILER_PATH=/usr/bin/aarch64-none-linux-gnu
运行脚本
./build-linux.sh
生成编译文件llm_demo后,将上述链接中下载的DeepSeek-R1-Distill-Qwen-1.5B_W4A16_RK3576.rkllm大模型文件移动至llm_demo同级目录,运行指令:
export LD_LIBRARY_PATH=./lib
export RKLLM_LOG_LEVEL=1
./llm_demo ./DeepSeek-R1-Distill-Qwen-1.5B_w4a16_RK3576.rkllm 2048 4096
- 2025-02-25
-
回复了主题帖:
嵌入式Rust修炼营:动手写串口烧录工具和MCU例程,Rust达人Hunter直播带你入门Rust
-参与理由&个人编程基础:
本人对基于Rust的嵌入式开发有浓厚的兴趣及热情,希望可以借此机会深入了解Rust的相关内存安全机制,并掌握基于Rust的嵌入式开发方式。
在编程语言方面,本人有较为扎实的C语言、C++以及Python基础。熟悉MCU和Linux的相关驱动开发。积极参与并完成过本社区组织的测评活动。
-查看修炼任务和活动时间表,预估可以跟着完成几级任务(初级、中级、高级):
预估可以跟着完成初级、中级、高级任务。
-如探索过Rust,请说明Rust学习过程遇到难点,希望在参与活动中收获什么?
希望可以在本次活动中,学习和了解Rust基本理论,熟悉并掌握Rust在嵌入式中的开发方式,进而在后续的工作中进行运用。虽然目前Rust在嵌入式领域还不如C/C++那样普及,但随着工具链和生态的逐步完善,Rust有潜力成为嵌入式开发的重要语言之一。
-
回复了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】3.RKNN模型导出及仿真运行
Jacktang 发表于 2025-2-25 07:35
红色是模型计算的异常分数,这个看起来异常分数也太明显了
数据集使用的是在时序异常检测领域中常见的UCR数据集,正常数据和异常数据区分较为明显,对AII模型来说,难度不大。
-
回复了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】4.开发板RKNN运行环境搭建
Jacktang 发表于 2025-2-25 07:33
必须先激活conda环境后,才能开始安装RKNN-Toolkit-Lite?
是的,需要将RKNN-Toolkit-Lite安装到指定的conda环境中,RKNN-Toolkit-Lite对环境的python版本有要求。
也可以不使用conda,使用系统默认环境安装,但安装过程会更改默认环境,造成不必要的麻烦。
- 2025-02-24
-
发表了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】5.使用开发板实机运行异常检测AI模型
本帖最后由 qianmo2001 于 2025-2-25 14:29 编辑
【米尔 瑞芯微RK3576 工业AI开发板】5.使用开发板实机运行异常检测AI模型
开发板配置为4+32G
板载系统为出厂默认系统
RKNN-Toolkit-Lite和RKNN-Toolkit均来自米尔SDK中MYD-LR3576-Debian12-Distribution-L6.1.75-V1.0.0/MYD-LR3576/external/目录下。
rknn_toolkit2-2.0.0b0+9bab5682-cp38-cp38-linux_x86_64.whl
rknn_toolkit_lite2-2.0.0b0-cp38-cp38-linux_aarch64.whl
确保开发板中完成RKNN-Toolkit-Lite的安装并激活其所在的conda环境后,进入工程文件所在路径,执行以下命令,进行实际运行测试。
python runtime2.py
完整代码如下,完整工程见附件
import numpy as np
import time
import os
import matplotlib.pyplot as plt
from rknnlite.api import RKNNLite
# 滑动窗口切割函数
def sliding_window(data, window_size, step_size):
X = []
for i in range(0, len(data) - window_size, step_size):
window = data[i:i + window_size]
X.append(window)
return np.array(X)
# 1. 加载数据集 (假设数据保存在 'data.npy' 文件中)
window_size = 16
step_size = 1
data_path = 'dataset/UCR/test.npy' # 替换为您的实际数据路径
datax = np.load(data_path)
datax = datax[1000:1020]
print(datax.shape)
data = sliding_window(datax, window_size, step_size)
# 2. 创建 RKNN 对象
rknn = RKNNLite()
# 3. 配置 RKNN 模型(启用动态输入)
# rknn.config(target_platform='rk3576', dynamic_input=[[[1, 16, 1]]]) # 适应输入形状
# 4. 加载 ONNX 模型
# onnx_model_path = "tcn_autoencoder.onnx" # 替换为您实际的模型路径
# rknn.load_onnx(model=onnx_model_path, inputs=['input'], input_size_list=[[1, 16, 1]])
# 5. 编译模型,不启用量化(如果不需要量化)
# rknn.build(do_quantization=False)
# 6. 导出 RKNN 模型
rknn_model_path = "tcn_autoencoder.rknn"
# rknn.export_rknn(rknn_model_path)
# 7. 加载 RKNN 模型
rknn.load_rknn(rknn_model_path)
# 8. 初始化 RKNN 模型
rknn.init_runtime()
# 9. 准备输入数据并逐个进行推理
print(data.shape)
num_samples = data.shape[0] # 样本数量(num)
reconstruction_errors = [] # 用于存储每次推理的重建误差
# 遍历每个样本,逐个进行推理
for i in range(num_samples):
input_data = data[i:i+1].astype(np.float32) # 提取单个样本,形状为 (1, 16, 1)
# 执行推理
inputs = [input_data]
outputs = rknn.inference(inputs=inputs)
# 获取推理结果
output_data = outputs[0] # 获取模型输出(第一个输出)
# 计算重建误差(均方误差)
reconstruction_error = np.mean((output_data - input_data) ** 2, axis=(1, 2)) # 计算每个样本的均方误差
reconstruction_errors.append(reconstruction_error)
# 10. 将重建误差保存为 numpy 数组
reconstruction_errors = np.array(reconstruction_errors)
# 11. 设置阈值并检测异常点
threshold = 0.001 # 设置重建误差的阈值
anomalies = reconstruction_errors > threshold # 如果重建误差大于阈值,则为异常点
# 12. 输出检测到的异常点数量
print(f"检测到 {np.sum(anomalies)} 个异常点")
# 13. 输出所有重建误差
print("所有重建误差:", reconstruction_errors.shape)
# 创建图表和子图
fig, ax1 = plt.subplots()
# 绘制第一条曲线,使用默认的左侧Y轴
ax1.plot(datax[8:-8], label="Reconstruction Error",alpha=0.2, color='b')
ax1.set_xlabel('Sample Index')
ax1.set_ylabel('Reconstruction Error', color='b')
# 创建第二个Y轴
ax2 = ax1.twinx()
# 绘制第二条曲线,使用右侧Y轴
ax2.plot(reconstruction_errors, label="Reconstruction Error", color='r')
ax2.set_ylabel('Y values for ln(x)', color='r')
# 显示图表
plt.show()
rknn.release()
实机运行结果如下:
终端显示已放弃,怀疑是系统资源不足,进程被强制终止,
尝试对数据集进行裁剪,减少模型的输入,无果
重启开发板后,进行测试,发现程序在执行outputs = rknn.inference(inputs=inputs)这一行时出现异常,向终端抛出 “已放弃” 后,程序中断退出。
-
回复了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】1.异常检测模型设计
wangerxian 发表于 2025-2-24 09:04
这里的传感器获取到的是什么数据?
是从npy文件中用滑动窗口读出来。
从外接设备读传感器数据会涉及Linux相关驱动,这部分我还在学习。
-
发表了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】4.开发板RKNN运行环境搭建
【米尔 瑞芯微RK3576 工业AI开发板】4.开发板RKNN运行环境搭建
在开发板上,需要安装RKNN-Toolkit-Lite,以支持模型的推理和部署。
在安装RKNN-Toolkit-Lite之前需要在开发板上搭建相关环境
1. 准备相关文件
在PC端下载Miniconda安装包,RKNN-Toolkit-Lite以及准备在米尔 瑞芯微RK3576 工业AI开发板上运行的相关工程文件
2. 安装Miniconda
将终端切换至Miniconda所在路径,执行以下命令进行安装
./Miniconda3-latest-Linux-aarch64.sh
显示如下界面表示安装成功
重新打开终端,进入conda默认base环境,执行以下命令创建conda环境,并激活
conda create -n py38 python=3.8
conda activate py38
显示如下界面表示创建成功
3. 安装RKNN-Toolkit-Lite
激活conda环境后即可开始安装RKNN-Toolkit-Lite,安装命令如下:
pip install ./rknn_toolkit_lite2-2.0.0b0-cp38-cp38-linux_aarch64.whl
显示如下界面表示安装完成
4. 检验是否安装成功
执行以下命令,检验是否安装成功
python
from rknnlite.api import RKNNLite
如下图,无报错即为安装成功
- 2025-02-23
-
发表了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】3.RKNN模型导出及仿真运行
本帖最后由 qianmo2001 于 2025-2-23 23:11 编辑
【米尔 瑞芯微RK3576 工业AI开发板】3.RKNN模型导出及仿真运行
要将 ONNX 模型转换为 RKNN 模型(Rockchip Neural Network,适用于 Rockchip 设备的神经网络模型),可以使用 RKNN Toolkit。RKNN Toolkit 是 Rockchip 提供的工具,用于将深度学习模型转换为 RKNN 格式,以便在 Rockchip 设备上部署和运行。
1. 在PC端Linux系统下搭建相关环境
之前关于异常检测的模型设计,训练,验证及ONNX模型的导出,验证均在Windows环境下进行
后续RKNN模型的导出及仿真运行在虚拟机中的Linux环境下进行。
默认Linux环境中已经安装python相关环境及conda
新建一个conda环境并激活
conda creat -n py38rk3576 python=3.8 #新建一个conda环境
conda activate py38rk3576 #激活环境
2. 安装 RKNN Toolkit
RKNN Toolkit可以通过扫描开发板上的二维码,在米尔官网提供的SDK中获得,解压后文件所在目录如下图所示:
在该目录下打开终端,执行conda activate py38rk3576,激活环境。
执行以下命令安装相关必要的包,并安装RKNN Toolkit。
pip install -r requirements_cp38-2.0.0b0.txt
pip install ./rknn_toolkit2-2.0.0b0+9bab5682-cp38-cp38-linux_x86_64.whl
安装成功后终端显示如上图:
在终端中执行以下命令验证RKNN Toolkit是否可以正常使用。
python
from rknn.api import RKNN
终端结果如下图则完成安装。
3. 加载ONNX模型并将其转换为RKNN模型
完整代码如下:
# from rknn import RKNN
from rknn.api import RKNN
rknn = RKNN()
rknn.config(target_platform='rk3568', dynamic_input=[[[1, 100, 1]]])
onnx_model_path = "tcn_autoencoder.onnx"
rknn.load_onnx(model=onnx_model_path, inputs=['input'], input_size_list=[[1, 100, 1]])
rknn.build(do_quantization=False)
rknn_model_path = "tcn_autoencoder.rknn"
rknn.export_rknn(rknn_model_path)
print("ONNX")
终端输出如下:
4. 加载RKNN模型并仿真验证
完整代码如下:
import numpy as np
import time
import os
import matplotlib.pyplot as plt
from rknn.api import RKNN
# 滑动窗口切割函数
def sliding_window(data, window_size, step_size):
X = []
for i in range(0, len(data) - window_size, step_size):
window = data[i:i + window_size]
X.append(window)
return np.array(X)
# 1. 加载数据集 (假设数据保存在 'data.npy' 文件中)
window_size = 16
step_size = 1
data_path = 'dataset/UCR/test.npy' # 替换为您的实际数据路径
datax = np.load(data_path)
print(datax.shape)
data = sliding_window(datax, window_size, step_size)
# 2. 创建 RKNN 对象
rknn = RKNN()
# 3. 配置 RKNN 模型(启用动态输入)
rknn.config(target_platform='rk3576', dynamic_input=[[[1, 16, 1]]]) # 适应输入形状
# 4. 加载 ONNX 模型
onnx_model_path = "tcn_autoencoder.onnx" # 替换为您实际的模型路径
rknn.load_onnx(model=onnx_model_path, inputs=['input'], input_size_list=[[1, 16, 1]])
# 5. 编译模型,不启用量化(如果不需要量化)
rknn.build(do_quantization=False)
# 6. 导出 RKNN 模型
rknn_model_path = "tcn_autoencoder.rknn"
rknn.export_rknn(rknn_model_path)
# 7. 加载 RKNN 模型
rknn.load_rknn(rknn_model_path)
# 8. 初始化 RKNN 模型
rknn.init_runtime()
# 9. 准备输入数据并逐个进行推理
print(data.shape)
num_samples = data.shape[0] # 样本数量(num)
reconstruction_errors = [] # 用于存储每次推理的重建误差
# 遍历每个样本,逐个进行推理
for i in range(num_samples):
input_data = data[i:i+1].astype(np.float32) # 提取单个样本,形状为 (1, 16, 1)
# 执行推理
inputs = [input_data]
outputs = rknn.inference(inputs=inputs)
# 获取推理结果
output_data = outputs[0] # 获取模型输出(第一个输出)
# 计算重建误差(均方误差)
reconstruction_error = np.mean((output_data - input_data) ** 2, axis=(1, 2)) # 计算每个样本的均方误差
reconstruction_errors.append(reconstruction_error)
# 10. 将重建误差保存为 numpy 数组
reconstruction_errors = np.array(reconstruction_errors)
# 11. 设置阈值并检测异常点
threshold = 0.001 # 设置重建误差的阈值
anomalies = reconstruction_errors > threshold # 如果重建误差大于阈值,则为异常点
# 12. 输出检测到的异常点数量
print(f"检测到 {np.sum(anomalies)} 个异常点")
# 13. 输出所有重建误差
print("所有重建误差:", reconstruction_errors.shape)
# 创建图表和子图
fig, ax1 = plt.subplots()
# 绘制第一条曲线,使用默认的左侧Y轴
ax1.plot(datax[8:5892], label="Reconstruction Error",alpha=0.2, color='b')
ax1.set_xlabel('Sample Index')
ax1.set_ylabel('Reconstruction Error', color='b')
# 创建第二个Y轴
ax2 = ax1.twinx()
# 绘制第二条曲线,使用右侧Y轴
ax2.plot(reconstruction_errors, label="Reconstruction Error", color='r')
ax2.set_ylabel('Y values for ln(x)', color='r')
# 显示图表
plt.show()
rknn.release()
终端输出如下:
5. 运行结果(蓝色为输入数据,红色为模型计算的异常分数):
-
发表了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】2. onnx模型导出及验证
本帖最后由 qianmo2001 于 2025-2-23 21:25 编辑
###【米尔 瑞芯微RK3576 工业AI开发板】2. onnx模型导出及验证
为将异常检测模型部署在米尔 瑞芯微RK3576 工业AI开发板 上,需要将pytorch环境下的pt模型转换为onnx模型,再转为RK3567支持的RKNN模型。
执行命令,安装相关包。
pip install onnxruntime onnx -i https://pypi.tuna.tsinghua.edu.cn/simple
以下为pt模型转onnx模型及验证代码:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.onnx
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
from torch import nn
from torch.nn import functional as F
from collections import deque
# 设置随机种子
torch.manual_seed(42)
np.random.seed(42)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 加载数据
def load_data(file_path):
data = np.load(file_path)
return data
# 滑动窗口切割函数
def sliding_window(data, window_size, step_size):
X = []
for i in range(0, len(data) - window_size, step_size):
window = data[i:i + window_size]
X.append(window)
return np.array(X)
# 自编码器 + TCN 模型
class TCN_Autoencoder(nn.Module):
def __init__(self, input_size, hidden_size, kernel_size=3, num_layers=3):
super(TCN_Autoencoder, self).__init__()
# TCN Encoder
self.encoder = nn.Sequential(
nn.Conv1d(input_size, hidden_size, kernel_size=kernel_size, padding=kernel_size // 2),
nn.ReLU(),
nn.Conv1d(hidden_size, hidden_size, kernel_size=kernel_size, padding=kernel_size // 2),
nn.ReLU(),
)
# TCN Decoder
self.decoder = nn.Sequential(
nn.ConvTranspose1d(hidden_size, hidden_size, kernel_size=kernel_size, padding=kernel_size // 2),
nn.ReLU(),
nn.ConvTranspose1d(hidden_size, input_size, kernel_size=kernel_size, padding=kernel_size // 2),
)
def forward(self, x):
x = x.transpose(1, 2) # 转换为 (batch_size, input_size, seq_len)
x = self.encoder(x)
x = self.decoder(x)
x = x.transpose(1, 2) # 转换回 (batch_size, seq_len, input_size)
return x
# 模型训练
def train_model(model, train_loader, num_epochs=10, lr=1e-3):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
for data in train_loader:
inputs = data[0].float().to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, inputs)
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader)}")
return model
# 模型推断
def detect_anomalies(model, data, threshold=0.1):
model.eval()
with torch.no_grad():
inputs = torch.tensor(data).float().to(device)
outputs = model(inputs)
loss = F.mse_loss(outputs, inputs, reduction='none')
loss = loss.mean(dim=1) # 求均值
anomalies = loss > threshold # 设定阈值
return anomalies.cpu().numpy(), loss.cpu().numpy()
# 主程序
if __name__ == "__main__":
# 数据加载
file_path = 'dataset/UCR/train.npy' # 替换为你的npy文件路径
data = load_data(file_path)
# data = data.T
# 预处理数据
window_size = 16
step_size = 1
data_windows = sliding_window(data, window_size, step_size)
# 标准化数据
scaler = StandardScaler()
data_windows = scaler.fit_transform(data_windows.reshape(-1, data_windows.shape[-1])).reshape(data_windows.shape)
# 数据集准备
train_data = torch.tensor(data_windows).float().to(device)
train_loader = DataLoader(TensorDataset(train_data), batch_size=32, shuffle=False)
# 定义模型
input_size = data_windows.shape[2] # 输入的特征数量
hidden_size = 64
model = TCN_Autoencoder(input_size, hidden_size).to(device)
# 训练模型
modelxx = train_model(model, train_loader)
# 异常检测
anomalies, losses = detect_anomalies(modelxx, data_windows)
print(f"Anomalies detected: {np.sum(anomalies)}")
import torch.onnx
# 将模型设置为评估模式
model.eval()
# 创建一个与模型输入相同形状的虚拟输入张量
# dummy_input = torch.randn(1, input_size, window_size).to(device)
dummy_input = torch.randn(1,window_size , input_size).to(device)
# 导出模型为 ONNX 格式
onnx_filename = "tcn_autoencoder.onnx"
torch.onnx.export(
model,
dummy_input,
onnx_filename,
export_params=True,
opset_version=11,
do_constant_folding=True,
input_names=['input'],
output_names=['output'],
dynamic_axes={'input': {0: 'batch_size'}, 'output': {0: 'batch_size'}}
)
import onnx
import onnxruntime as ort
import numpy as np
onnx_filename = "tcn_autoencoder.onnx"
# 加载 ONNX 模型
onnx_model = onnx.load(onnx_filename)
# 检查模型的有效性
onnx.checker.check_model(onnx_model)
print("ONNX 模型有效")
# 创建 ONNX Runtime 推理会话
ort_session = ort.InferenceSession(onnx_filename)
# 准备输入数据
# 假设 data_windows 是您的输入数据,形状为 (batch_size, window_size, input_size)
# 请根据实际情况调整输入数据的形状和预处理步骤
inputs = data_windows.astype(np.float32)
# 进行推理
ort_inputs = {ort_session.get_inputs()[0].name: inputs}
ort_outputs = ort_session.run(None, ort_inputs)
# 获取输出结果
outputs = ort_outputs[0]
# 处理输出结果
# 例如,计算重建误差
reconstruction_error = np.mean((outputs - inputs) ** 2, axis=(1, 2))
# 检测异常
threshold = 0.005
anomalies = reconstruction_error > threshold
print(f"检测到 {np.sum(anomalies)} 个异常")
代码如上,滑动显示全部。
onnx模型运行结果
-
发表了主题帖:
【米尔 瑞芯微RK3576 工业AI开发板】1.异常检测模型设计
本帖最后由 qianmo2001 于 2025-2-23 23:07 编辑
【米尔 瑞芯微RK3576 工业AI开发板】1.异常检测模型设计
引言
随着工业自动化和智能化的发展,基于AI的边缘计算逐渐成为各类设备智能化升级的核心。米尔瑞芯微RK3576工业AI开发板,作为一款高性能的AIoT平台,凭借其强大的AI计算能力和低功耗特点,广泛应用于各类工业检测任务。本次评测的主要目标是利用该开发板进行六面顶锥金属疲劳检测,具体目标是结合自编码器和TCN(时间卷积网络)模型,基于传感器数据进行时序异常检测和异常识别。
硬件概述
米尔电子推出的瑞芯微RK3576工业AI开发板是一款面向工业智能化和边缘计算场景的高性能开发平台,其核心优势在于强大的算力、丰富的接口以及工业级可靠性。
米尔瑞芯微RK3576开发板采用了Rockchip RK3576芯片,配备了四核Cortex-A72和四核Cortex-A53的CPU架构,支持6 TOPS(Tera Operations Per Second)的AI推理能力。该板卡内置NPU(神经网络处理单元),为深度学习模型提供加速计算,适合在边缘计算和工业物联网中部署AI应用。此外,RK3576具有强大的计算能力和较低的功耗,这对于要求高性能与长时间稳定运行的工业设备尤为重要。
开发板支持Linux操作系统,并提供丰富的接口,可以方便地进行数据采集和控制。开发板提供的计算资源使其成为AI部署的理想平台,能够在实时性要求高的任务中,尤其是在金属疲劳检测和异常分析中,提供快速且高效的推理能力。
模型设计与实现
本设计需要实现多变量时间序列异常检测,通过设计一个具有自编码器结构的TCN时间序列重建模型,对多变量时序数据进行压缩,学习数据的一般特征表示,最后对压缩数据进行重建,并与输入数据进行对比,计算重建误差。
该模型使用正常数据进行训练,对输入多变量时序数据进行建模,学习正常数据特征。在测试时,对于与正常数据有较大差异的异常数据,模型会输出较大的重建误差。通过对重建误差的分析,实现多变量时间序列的异常检测。
具有自编码器结构的TCN时间序列重建模型的训练及测试完整代码如下:
1. 数据加载与预处理:
为实现顶锥金属疲劳检测任务,首先需要对从传感器获取的时序数据进行加载和预处理。数据存储在npy文件中,这里使用公开的UCR数据集,预处理过程包括滑动窗口切割和标准化处理。具体步骤如下:
滑动窗口切割:通过滑动窗口技术,将传感器数据切割为多个时序数据片段,使其适合输入到神经网络模型中。
标准化: 使用StandardScaler对数据进行标准化处理,确保数据分布具有统一的均值和方差,有助于提升模型训练效果。
代码实现如下:
import numpy as np
from sklearn.preprocessing import StandardScaler
# 加载训练集和测试集
train_data = load_data('dataset/UCR/train.npy')
test_data = load_data('dataset/UCR/test.npy')
# 滑动窗口切割函数
def sliding_window(data, window_size, step_size):
X = []
for i in range(0, len(data) - window_size, step_size):
window = data[i:i + window_size]
X.append(window)
return np.array(X)
# 预处理数据
window_size = 100
step_size = 20
data_windows = sliding_window(data, window_size, step_size)
# 标准化训练集
scaler = StandardScaler()
train_windows = scaler.fit_transform(train_windows.reshape(-1, train_windows.shape[-1])).reshape(train_windows.shape)
# 预处理测试集
test_windows = sliding_window(test_data, window_size, step_size)
test_windows = scaler.transform(test_windows.reshape(-1, test_windows.shape[-1])).reshape(test_windows.shape)
2. 创建数据加载器:
使用 DataLoader 创建训练集和测试集的数据加载器:
train_tensor = torch.tensor(train_windows).float().to(device)
test_tensor = torch.tensor(test_windows).float().to(device)
train_loader = DataLoader(TensorDataset(train_tensor), batch_size=32, shuffle=True)
test_loader = DataLoader(TensorDataset(test_tensor), batch_size=32, shuffle=False)
3. 模型设计:
为了实现时序异常检测,选择结合自编码器(Autoencoder)和TCN(时间卷积网络)作为模型架构。自编码器能够通过学习数据的特征进行数据重构,而TCN则在处理时序数据时展现出强大的建模能力。模型结构包括一个编码器和解码器:
编码器部分: 由多个1D卷积层组成,负责从输入数据中提取特征。
解码器部分: 由多个转置卷积层组成,负责将编码后的数据进行重构。
模型设计代码如下:
class TCN_Autoencoder(nn.Module):
def __init__(self, input_size, hidden_size, kernel_size=3, num_layers=3):
super(TCN_Autoencoder, self).__init__()
# TCN Encoder
self.encoder = nn.Sequential(
nn.Conv1d(input_size, hidden_size, kernel_size=kernel_size, padding=kernel_size // 2),
nn.ReLU(),
nn.Conv1d(hidden_size, hidden_size, kernel_size=kernel_size, padding=kernel_size // 2),
nn.ReLU(),
)
# TCN Decoder
self.decoder = nn.Sequential(
nn.ConvTranspose1d(hidden_size, hidden_size, kernel_size=kernel_size, padding=kernel_size // 2),
nn.ReLU(),
nn.ConvTranspose1d(hidden_size, input_size, kernel_size=kernel_size, padding=kernel_size // 2),
)
def forward(self, x):
x = x.transpose(1, 2) # 转换为 (batch_size, input_size, seq_len)
x = self.encoder(x)
x = self.decoder(x)
x = x.transpose(1, 2) # 转换回 (batch_size, seq_len, input_size)
return x
模型的设计不仅能有效提取时序数据中的深层特征,还能通过自编码器的重构误差来检测数据的异常情况。
4. 模型训练与验证:
在PC端使用PyTorch框架进行模型的训练,采用均方误差(MSE)作为损失函数。训练过程包括数据的批量处理、模型训练、损失计算和参数优化。
input_size = train_windows.shape[2]
hidden_size = 64
model = TCN_Autoencoder(input_size, hidden_size).to(device)
# 训练模型
def train_model(model, train_loader, num_epochs=50, lr=1e-3):
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=lr)
for epoch in range(num_epochs):
model.train()
running_loss = 0.0
for data in train_loader:
inputs = data[0].float().to(device)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, inputs)
loss.backward()
optimizer.step()
running_loss += loss.item()
print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / len(train_loader)}")
return model
model = train_model(model, train_loader)
def test_model(model, test_loader):
model.eval()
with torch.no_grad():
total_loss = 0.0
for data in test_loader:
inputs = data[0].float().to(device)
outputs = model(inputs)
loss = F.mse_loss(outputs, inputs, reduction='none')
loss = loss.mean(dim=1) # 求均值
total_loss += loss.sum().item()
avg_loss = total_loss / len(test_loader.dataset)
print(f"Test Loss: {avg_loss}")
test_model(model, test_loader)
End
- 2024-12-13
-
回复了主题帖:
测评入围名单: 米尔 瑞芯微RK3576 工业AI开发板
个人信息无误,确认可以完成评测计划