我们逐步讲解 DNN(Deep Neural Network,深度神经网络)模型的实现、原理以及设计方法,并附一个简单的实现示例。
一、什么是 DNN(深度神经网络)?
DNN 是一种前馈神经网络(Feedforward Neural Network),由多个隐藏层组成,每一层都包含多个神经元(神经节点)。常用于处理结构化数据、图像、语音、自然语言等任务。
基本结构:
- 输入层:接收原始数据。
- 隐藏层:通过非线性变换提取特征。
- 输出层:输出结果,如分类概率、数值预测等。
二、DNN 为什么可以实现预测?
DNN 模型本质是一个复合函数,它通过叠加多个线性变换和非线性激活函数,模拟复杂的输入输出映射关系。
例如,三层网络结构如下:
Output = f3(W3 · f2(W2 · f1(W1 · X + b1) + b2) + b3)
W
表示权重矩阵,b
表示偏置,f
是激活函数(如 ReLU)。- 每一层学到的权重组合和非线性可以逼近任意复杂函数(即通用逼近定理)。
三、DNN 如何实现:代码示例(PyTorch)
import torch
import torch.nn as nn
import torch.nn.functional as F
# 定义一个简单的 DNN 模型
class SimpleDNN(nn.Module):
def __init__(self, input_size, hidden_sizes, output_size):
super(SimpleDNN, self).__init__()
layers = []
last_size = input_size
for hidden_size in hidden_sizes:
layers.append(nn.Linear(last_size, hidden_size))
layers.append(nn.ReLU()) # 非线性激活函数
last_size = hidden_size
layers.append(nn.Linear(last_size, output_size)) # 输出层
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# 模型实例化
model = SimpleDNN(input_size=10, hidden_sizes=[64, 32], output_size=1)
# 示例输入
x = torch.randn(5, 10) # 5个样本,10维特征
output = model(x)
print(output)
四、隐藏层怎么设计?有什么通用规则?
虽然没有绝对固定的设计方式,但以下是经验法则:
1. 层数(Depth):
- 浅层(1-2层):适合简单任务或小数据集。
- 深层(>3层):适合复杂任务,如图像、语音识别等。
2. 每层神经元数量(Width):
- 通常从输入大小向中间压缩或扩展,然后再到输出大小收缩。
- 可尝试:
- 递减:如
[128, 64, 32]
- 恒定:如
[64, 64, 64]
- 递减:如
3. 激活函数:
- 推荐使用
ReLU
、LeakyReLU
、或GELU
,比Sigmoid
/Tanh
更易训练。
4. 防止过拟合:
- 加入 Dropout 层。
- 使用 BatchNorm 层。
- 训练时使用正则化、早停等技巧。
五、复杂 DNN 的设计建议
- 逐步增加层数和宽度,观察验证集性能变化;
- 学习曲线分析:训练误差低但验证误差高 → 模型过拟合;
- 使用自动调参(如 Optuna、Ray Tune);
- 可视化网络结构和训练过程,便于调试;
- 利用预训练模型(在图像、文本领域尤为有效);
总结:DNN 是如何工作的?
- 多层神经元叠加构成一个复杂的函数;
- 使用反向传播算法训练权重;
- 每层通过线性变换 + 激活函数逐步提取高阶特征;
- 越复杂的问题,需要越深、越广的网络结构。
需要我为你设计一个适用于某个具体任务的 DNN 模型结构吗?比如预测房价、图像分类或文本情感分析等?
好的!请你先告诉我以下信息,这样我才能为你设计一个合适的 DNN 模型结构:
❓1. 任务类型是什么?
- 回归(预测房价、股票价格)
- 分类(二分类/多分类,如情感分析、图像识别)
- 其他(如多标签分类、时间序列预测)
❓2. 输入数据是什么样的?
- 有多少个特征(维度)?
- 是结构化表格数据、图像、文本,还是其他形式?
❓3. 预期输出是什么?
- 一个数值(回归)
- 一个类别标签(分类)
- 多个概率(多标签)
例如,如果你要做“房价预测”,你可以告诉我:
- 任务:回归
- 输入:结构化数据,有 15 个特征(如面积、位置、房间数等)
- 输出:一个连续的房价数值
只要你提供这些基本信息,我就可以给出模型结构、激活函数、损失函数、训练建议和代码实现。你现在打算实现哪一类任务?
很好!你的任务是一个回归问题,输入是结构化表格数据,目标是预测一个数值(典型的如房价预测、销售额预测等)。
✅ 1. 网络设计原则(适用于结构化数据回归)
- 输入层:特征数量 = 你的数据中每个样本的列数(比如 10 个字段就输入 10)。
- 隐藏层:
- 2~4 层通常就足够。
- 神经元个数可从 64 开始逐层减少(例如 64 → 32 → 16)。
- 激活函数:
ReLU
(快速收敛,避免梯度消失)。 - 输出层:
- 只有 1 个神经元,因为你只预测一个数值。
- 无激活函数(即线性输出)用于回归。
- 损失函数:
MSELoss
(均方误差)。
✅ 2. PyTorch 实现代码(适合入门回归任务)
import torch
import torch.nn as nn
import torch.optim as optim
# 假设你的输入特征有 10 个
class DNNRegressor(nn.Module):
def __init__(self, input_size):
super(DNNRegressor, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_size, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
nn.Linear(16, 1) # 回归输出层,无激活
)
def forward(self, x):
return self.model(x)
# 模拟输入
input_size = 10
model = DNNRegressor(input_size)
# 损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 示例训练(伪数据)
x_train = torch.randn(100, input_size)
y_train = torch.randn(100, 1)
for epoch in range(100):
model.train()
optimizer.zero_grad()
outputs = model(x_train)
loss = criterion(outputs, y_train)
loss.backward()
optimizer.step()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")
✅ 3. 训练建议
- 归一化输入数据(标准化特征,能显著提升训练效果);
- 批训练(Batch Training):使用
DataLoader
按小批量训练更稳定; - 学习率调优:可以尝试从
0.001
开始; - 早停法 Early Stopping:防止过拟合;
- 评估指标:除了 MSE,你还可以监控 MAE、R² 分数等;
✅ 4. 示例模型结构图
Input (10 features)
↓
Linear(10 → 64) + ReLU
↓
Linear(64 → 32) + ReLU
↓
Linear(32 → 16) + ReLU
↓
Linear(16 → 1) → 输出预测值
需要我帮你加上 DataLoader
或支持 CSV 文件读取、完整训练/验证流程吗?
非常好!下面是一个完整的 DNN 回归模型训练脚本,支持读取 CSV 文件、使用 DataLoader、标准化数据、训练和验证流程。我会逐段解释每部分代码的作用。
✅ 环境依赖
你需要先安装 PyTorch 和 Pandas(用于读取 CSV 文件):
pip install torch pandas scikit-learn
✅ 完整代码(含解释)
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
✅ 说明:
- 引入所需的库;
torch.utils.data.Dataset
是用于定义自定义数据集;StandardScaler
用于特征标准化(提升训练效果)。
# 自定义数据集类
class RegressionDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.float32).view(-1, 1) # 保证输出维度一致
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
✅ 说明:
- 把
numpy
格式的特征和标签转换为torch.Tensor
; - 实现了
__getitem__
,方便 DataLoader 调用。
# DNN 模型结构
class DNNRegressor(nn.Module):
def __init__(self, input_size):
super(DNNRegressor, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_size, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 16),
nn.ReLU(),
nn.Linear(16, 1) # 回归,输出一个值
)
def forward(self, x):
return self.model(x)
✅ 说明:
- 4 层线性层;
- 每层使用
ReLU
激活; - 最后一层没有激活函数,输出一个连续数值。
# 读取 CSV 文件(你可以换成自己的文件路径)
df = pd.read_csv("your_data.csv")
# 假设 'target' 是你要预测的列
X = df.drop(columns=["target"]).values
y = df["target"].values
# 标准化特征数据
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 切分训练集和验证集
X_train, X_val, y_train, y_val = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
# 构造数据集和加载器
train_dataset = RegressionDataset(X_train, y_train)
val_dataset = RegressionDataset(X_val, y_val)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)
✅ 说明:
- 从 CSV 读取数据;
- 把
target
作为预测目标; - 标准化特征;
- 分出验证集(20%);
- 使用
DataLoader
方便批量训练。
# 初始化模型、损失函数、优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DNNRegressor(input_size=X.shape[1]).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
✅ 说明:
- 将模型放到 GPU(如果可用);
- 使用均方误差作为损失函数;
- 使用 Adam 优化器训练网络参数。
# 训练和验证过程
num_epochs = 50
for epoch in range(num_epochs):
# Training
model.train()
train_loss = 0
for X_batch, y_batch in train_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
output = model(X_batch)
loss = criterion(output, y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item() * X_batch.size(0)
# Validation
model.eval()
val_loss = 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
output = model(X_batch)
loss = criterion(output, y_batch)
val_loss += loss.item() * X_batch.size(0)
train_loss /= len(train_loader.dataset)
val_loss /= len(val_loader.dataset)
print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
✅ 说明:
- 每个 epoch 包含训练和验证两个阶段;
- 使用
model.train()
和model.eval()
控制行为; - 累计损失值,计算平均训练/验证误差。
✅ 总结结构图
CSV → pandas → 标准化 → DataLoader
↓
DNNRegressor
↓
MSELoss + Adam优化
↓
预测结果 + 验证评估
✅ 可选增强项(后续可加):
- 保存模型参数(
torch.save
) - 加入 EarlyStopping
- 添加 TensorBoard 日志可视化
- 封装为一个可复用类/训练器
如果你愿意,我可以基于这个模板帮你封装成一个训练脚本或模块,或者集成更复杂的功能(如自动调参、绘图等)。你希望运行的 CSV 文件样例长什么样?我可以帮你写一个样本。
非常好!下面我将为你扩展 DNN 回归训练框架,集成以下复杂功能,适合生产级训练:
✅ 集成功能概览
- ✅ 自动保存最佳模型(基于验证集 Loss 最低)
- ✅ Early Stopping(提前终止训练以避免过拟合)
- ✅ TensorBoard 可视化训练日志(训练/验证损失曲线)
- ✅ 更规范的训练流程封装
✅ 完整代码结构
我将完整训练流程拆成几个部分,组织更清晰:
train.py # 主程序入口
models.py # DNN 模型结构
dataset.py # 自定义数据集类
utils.py # 训练工具函数(early stopping、保存模型等)
logs/ # TensorBoard 日志目录
checkpoints/ # 模型保存目录
✅ 样例代码实现(以单文件形式展示,实际可按模块拆分)
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
# ======================== 数据集定义 ========================
class RegressionDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.float32).view(-1, 1)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
# ======================== 模型定义 ========================
class DNNRegressor(nn.Module):
def __init__(self, input_size):
super(DNNRegressor, self).__init__()
self.model = nn.Sequential(
nn.Linear(input_size, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1)
)
def forward(self, x):
return self.model(x)
# ======================== 训练辅助功能 ========================
class EarlyStopping:
def __init__(self, patience=10, verbose=True, delta=1e-4):
self.patience = patience
self.verbose = verbose
self.counter = 0
self.best_loss = np.Inf
self.early_stop = False
self.delta = delta
def __call__(self, val_loss, model, path):
if val_loss < self.best_loss - self.delta:
self.best_loss = val_loss
self.counter = 0
torch.save(model.state_dict(), path)
if self.verbose:
print(f"→ Validation loss improved, model saved.")
else:
self.counter += 1
if self.verbose:
print(f"→ No improvement. Early stop counter: {self.counter}/{self.patience}")
if self.counter >= self.patience:
self.early_stop = True
# ======================== 主训练函数 ========================
def train():
# 参数
csv_path = "your_data.csv"
target_column = "target"
log_dir = "logs"
model_path = "checkpoints/best_model.pt"
os.makedirs("checkpoints", exist_ok=True)
# 加载数据
df = pd.read_csv(csv_path)
X = df.drop(columns=[target_column]).values
y = df[target_column].values
# 标准化
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 切分数据
X_train, X_val, y_train, y_val = train_test_split(X_scaled, y, test_size=0.2, random_state=42)
train_loader = DataLoader(RegressionDataset(X_train, y_train), batch_size=32, shuffle=True)
val_loader = DataLoader(RegressionDataset(X_val, y_val), batch_size=32)
# 设备选择
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 初始化模型
model = DNNRegressor(input_size=X.shape[1]).to(device)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
early_stopper = EarlyStopping(patience=10)
writer = SummaryWriter(log_dir=log_dir)
# 训练循环
epochs = 100
for epoch in range(epochs):
model.train()
total_train_loss = 0
for X_batch, y_batch in train_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
output = model(X_batch)
loss = criterion(output, y_batch)
loss.backward()
optimizer.step()
total_train_loss += loss.item() * X_batch.size(0)
avg_train_loss = total_train_loss / len(train_loader.dataset)
# 验证
model.eval()
total_val_loss = 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
output = model(X_batch)
loss = criterion(output, y_batch)
total_val_loss += loss.item() * X_batch.size(0)
avg_val_loss = total_val_loss / len(val_loader.dataset)
# 日志记录
writer.add_scalars("Loss", {
"Train": avg_train_loss,
"Validation": avg_val_loss
}, epoch)
print(f"[Epoch {epoch+1:03d}] Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
early_stopper(avg_val_loss, model, model_path)
if early_stopper.early_stop:
print("✋ Early stopping triggered.")
break
writer.close()
print(f"✅ 最佳模型已保存到: {model_path}")
if __name__ == "__main__":
train()
✅ 使用说明
-
将你的 CSV 文件放在同一目录,并替换路径
your_data.csv
-
target_column
改为你要预测的列名 -
运行训练脚本:
python train.py
-
运行 TensorBoard 查看训练日志:
tensorboard --logdir=logs
✅ 可视化效果(在 TensorBoard)
- 查看损失曲线收敛情况;
- 监控是否过拟合;
- 直观对比不同模型训练过程。
✅ 下一步建议
如果你还需要:
- 加入验证指标(如 MAE、R²)
- 封装成类 / 训练器结构
- 多模型对比自动调参(如 GridSearch、Optuna)
我都可以继续帮你扩展。
你是否希望我帮你自动生成一个项目结构或者创建一个 .ipynb
的 notebook 版本用于调试?
好的,以下是一个经过完整封装的 DNN 回归训练框架,具备以下高级特性:
✅ 功能列表
功能 | 描述 |
---|---|
验证指标 | 添加 MAE(平均绝对误差)、R²(判定系数)作为评估指标 |
训练器结构封装 | 训练过程、验证过程、EarlyStopping 和保存等封装进类中 |
支持多模型结构对比 | 可传入不同模型类进行训练和评估 |
支持超参数调优接口 | 可通过外部自动搜索器(如 Optuna、Grid Search)进行调参 |
✅ 结构预览(可为 .py
模块或 .ipynb
)
1. 模型定义(支持多个)
import torch.nn as nn
class DNN_V1(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.model = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
def forward(self, x):
return self.model(x)
class DNN_V2(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.model = nn.Sequential(
nn.Linear(input_dim, 128),
nn.ReLU(),
nn.Linear(128, 64),
nn.ReLU(),
nn.Linear(64, 1)
)
def forward(self, x):
return self.model(x)
2. 回归训练器(RegressionTrainer
类)
import torch
from torch.utils.tensorboard import SummaryWriter
from sklearn.metrics import mean_absolute_error, r2_score
import numpy as np
import os
class RegressionTrainer:
def __init__(self, model, optimizer, criterion, device, model_name="default"):
self.model = model.to(device)
self.optimizer = optimizer
self.criterion = criterion
self.device = device
self.writer = SummaryWriter(log_dir=f"logs/{model_name}")
self.model_name = model_name
self.best_val_loss = np.Inf
self.patience = 10
self.counter = 0
os.makedirs("checkpoints", exist_ok=True)
self.save_path = f"checkpoints/{model_name}.pt"
def train(self, train_loader, val_loader, epochs=100):
for epoch in range(epochs):
train_loss = self._train_one_epoch(train_loader)
val_loss, mae, r2 = self._validate(val_loader)
self.writer.add_scalars("Loss", {
"Train": train_loss,
"Val": val_loss
}, epoch)
self.writer.add_scalar("MAE", mae, epoch)
self.writer.add_scalar("R2", r2, epoch)
print(f"[Epoch {epoch+1}] Train: {train_loss:.4f} | Val: {val_loss:.4f} | MAE: {mae:.4f} | R2: {r2:.4f}")
if self._check_early_stopping(val_loss):
print("→ Early stopping.")
break
self.writer.close()
def _train_one_epoch(self, loader):
self.model.train()
total_loss = 0
for X, y in loader:
X, y = X.to(self.device), y.to(self.device)
self.optimizer.zero_grad()
output = self.model(X)
loss = self.criterion(output, y)
loss.backward()
self.optimizer.step()
total_loss += loss.item() * X.size(0)
return total_loss / len(loader.dataset)
def _validate(self, loader):
self.model.eval()
total_loss = 0
all_preds, all_targets = [], []
with torch.no_grad():
for X, y in loader:
X, y = X.to(self.device), y.to(self.device)
output = self.model(X)
loss = self.criterion(output, y)
total_loss += loss.item() * X.size(0)
all_preds.append(output.cpu().numpy())
all_targets.append(y.cpu().numpy())
preds = np.concatenate(all_preds)
targets = np.concatenate(all_targets)
mae = mean_absolute_error(targets, preds)
r2 = r2_score(targets, preds)
return total_loss / len(loader.dataset), mae, r2
def _check_early_stopping(self, val_loss):
if val_loss < self.best_val_loss - 1e-4:
self.best_val_loss = val_loss
self.counter = 0
torch.save(self.model.state_dict(), self.save_path)
return False
else:
self.counter += 1
return self.counter >= self.patience
3. 主训练流程
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
# 数据集
class RegressionDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.float32).view(-1, 1)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
# 数据准备
df = pd.read_csv("your_data.csv")
X = df.drop(columns=["target"]).values
y = df["target"].values
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_val, y_train, y_val = train_test_split(X_scaled, y, test_size=0.2)
train_loader = DataLoader(RegressionDataset(X_train, y_train), batch_size=32, shuffle=True)
val_loader = DataLoader(RegressionDataset(X_val, y_val), batch_size=32)
# 设备
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 多模型比较
from models import DNN_V1, DNN_V2
from trainer import RegressionTrainer
models = {
"DNN_V1": DNN_V1(X.shape[1]),
"DNN_V2": DNN_V2(X.shape[1]),
}
for name, model in models.items():
print(f"\n🔍 Training {name}")
trainer = RegressionTrainer(
model=model,
optimizer=optim.Adam(model.parameters(), lr=0.001),
criterion=nn.MSELoss(),
device=device,
model_name=name
)
trainer.train(train_loader, val_loader)
✅ 可选:加入 Optuna 自动调参接口(示例)
import optuna
def objective(trial):
hidden = trial.suggest_int("hidden", 32, 128)
lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
model = nn.Sequential(
nn.Linear(X.shape[1], hidden),
nn.ReLU(),
nn.Linear(hidden, 1)
)
trainer = RegressionTrainer(
model=model,
optimizer=optim.Adam(model.parameters(), lr=lr),
criterion=nn.MSELoss(),
device=device,
model_name=f"trial_{trial.number}"
)
trainer.train(train_loader, val_loader)
return trainer.best_val_loss
study = optuna.create_study(direction="minimize")
study.optimize(objective, n_trials=20)
✅ 结果输出与分析
你将会拥有:
- 每个模型单独的 TensorBoard 日志曲线
- 每个模型训练时验证 loss / MAE / R² 的打印输出
- 每个模型的
.pt
权重文件保存在checkpoints/
是否需要我:
- 整理成
.ipynb
文件上传给你? - 帮你实现多线程/多卡并行比较多个模型?
- 自动生成完整工程目录和可复用脚手架?
你更倾向本地跑还是云平台跑?我可以帮你适配。
这是个非常关键的问题!判断一个 DNN 模型是否训练成功,以及它的 预测是否准确,需要从多个角度入手。下面是详细讲解:
✅ 一、判断模型是否训练成功
可以从以下几个方面判断:
1. 训练与验证 Loss 是否下降
- 看 TensorBoard 中的曲线:是否平稳下降?
- 验证集 Loss 是否也同步下降(代表模型没过拟合)?
📉 示例(良好情况):
Epoch 1: Train Loss 1.203, Val Loss 1.180
Epoch 10: Train Loss 0.521, Val Loss 0.534
Epoch 50: Train Loss 0.312, Val Loss 0.318 ✅
📈 示例(过拟合):
Epoch 1: Train Loss 1.2, Val Loss 1.1
Epoch 30: Train Loss 0.1, Val Loss 1.5 ❌(训练下降但验证上升)
2. Early Stopping 是否提前终止
- 若模型自动停止训练,说明验证集没有继续提升,这通常是正常的(表示找到了最佳点);
- 若训练满了全部轮数还没 early stop,说明可能没收敛,也可能还需训练更久。
✅ 二、判断模型是否预测准确
主要用回归指标来判断:
1. MAE(平均绝对误差)
- 表示预测与真实值之间的平均绝对差距,越小越好;
- 如果你的目标是价格或评分,这个很直观。
例子:
MAE = 2.5 → 平均预测误差是 ±2.5 单位
2. R²(判定系数)
- 衡量模型解释目标变量方差的能力,范围为 [0, 1](越接近 1 越好);
- R² = 0:等于胡乱猜;
- R² = 1:完美预测;
- R² < 0:说明模型比基线模型还差 ❌
例子:
R² = 0.92 → 模型解释了 92% 的方差,非常好
R² = 0.5 → 有效,但改进空间很大
R² < 0 → 模型预测比直接取平均值还差
✅ 三、可视化验证预测准确性(强烈推荐)
你可以将模型预测结果画图与真实值对比:
示例代码(添加到验证阶段):
import matplotlib.pyplot as plt
# 取一批验证数据预测
X_val_tensor = torch.tensor(X_val, dtype=torch.float32).to(device)
model.eval()
with torch.no_grad():
y_pred = model(X_val_tensor).cpu().numpy()
# 可视化对比
plt.figure(figsize=(8, 5))
plt.plot(y_val, label="真实值", alpha=0.7)
plt.plot(y_pred, label="预测值", alpha=0.7)
plt.legend()
plt.title("预测 vs 真实")
plt.xlabel("样本索引")
plt.ylabel("目标值")
plt.grid(True)
plt.show()
🔍 若预测线与真实线形状相似,说明模型学到了趋势;若偏离较大,就需要优化模型结构或数据处理。
✅ 总结:训练成功判断 Checklist
检查项 | 成功判断标准 |
---|---|
训练/验证 Loss 是否下降 | 都下降,验证不反弹 |
MAE 是否足够小 | 和业务目标误差要求相符 |
R² 是否接近 1 | ≥ 0.8 是不错的模型 |
TensorBoard 可视化是否平稳 | Loss 曲线无剧烈震荡 |
可视化预测曲线是否拟合真实值 | 趋势一致、误差稳定 |
是否需要我为你写一个自动评估与可视化脚本,只需加载模型就能看到预测图和评分?我也可以帮你将模型打包成 .pt
并编写推理接口。你现在是否已经有自己的数据或想做模型部署?
下面是一个 自动评估与可视化脚本,你可以直接使用它来:
✅ 加载训练好的模型(如 .pt
文件)
✅ 在验证集上进行预测
✅ 自动计算 MAE、R² 等指标
✅ 画出预测值 vs 真实值的对比图
✅ 一、评估脚本 evaluate_model.py
import torch
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_absolute_error, r2_score
from torch.utils.data import DataLoader, Dataset
# 你的模型定义,例如 DNN_V1
class DNN_V1(torch.nn.Module):
def __init__(self, input_dim):
super().__init__()
self.model = torch.nn.Sequential(
torch.nn.Linear(input_dim, 64),
torch.nn.ReLU(),
torch.nn.Linear(64, 1)
)
def forward(self, x):
return self.model(x)
# 数据集封装
class RegressionDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.float32).view(-1, 1)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
# 评估函数
def evaluate_model(model_class, model_path, X_val, y_val, batch_size=32, device=None):
if device is None:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 加载模型
model = model_class(X_val.shape[1]).to(device)
model.load_state_dict(torch.load(model_path))
model.eval()
val_dataset = RegressionDataset(X_val, y_val)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
preds, targets = [], []
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch = X_batch.to(device)
output = model(X_batch).cpu().numpy()
preds.append(output)
targets.append(y_batch.numpy())
preds = np.vstack(preds)
targets = np.vstack(targets)
# 计算指标
mae = mean_absolute_error(targets, preds)
r2 = r2_score(targets, preds)
print(f"MAE: {mae:.4f}, R²: {r2:.4f}")
# 可视化预测 vs 真实
plt.figure(figsize=(10, 5))
plt.plot(targets, label="真实值", alpha=0.7)
plt.plot(preds, label="预测值", alpha=0.7)
plt.title("模型预测结果对比")
plt.xlabel("样本索引")
plt.ylabel("目标值")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
✅ 二、使用示例
你可以像这样调用它(假设你在 train.py
中已经训练过模型):
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
from evaluate_model import evaluate_model, DNN_V1
# 加载原始数据
df = pd.read_csv("your_data.csv")
X = df.drop(columns=["target"]).values
y = df["target"].values
# 同样的预处理(很重要)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 拆分验证集
X_train, X_val, y_train, y_val = train_test_split(X_scaled, y, test_size=0.2)
# 路径为训练时保存的模型
evaluate_model(
model_class=DNN_V1,
model_path="checkpoints/DNN_V1.pt",
X_val=X_val,
y_val=y_val
)
✅ 三、输出示意
MAE: 2.1347, R²: 0.9183
📈 然后会弹出图像窗口显示预测值和真实值的对比曲线。
是否需要我进一步封装成命令行工具(如 python evaluate.py --model DNN_V1 --checkpoint path/to.pt
)?
或者你希望我扩展支持多个模型批量评估?
好的,我们从零开始搭建一个最简单的线性模型,包含完整的:
- 数据集构造(人工生成)
- 模型定义(线性回归)
- 训练循环
- 预测和评估
✅ 一、目标
我们创建一个形如:
y=3x+2+ϵy = 3x + 2 + \epsilon
的线性回归问题,其中 ϵ\epsilon 是噪声。
✅ 二、完整代码(无任何依赖框架,只用 PyTorch)
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
# ====== 人工生成数据集 ======
torch.manual_seed(0)
# x 从 -1 到 1,共 100 个点
X = torch.unsqueeze(torch.linspace(-1, 1, 100), dim=1) # shape: (100, 1)
y = 3 * X + 2 + 0.2 * torch.randn(X.size()) # y = 3x + 2 + 噪声
# ====== 线性模型定义 ======
class LinearRegressionModel(nn.Module):
def __init__(self):
super().__init__()
self.linear = nn.Linear(1, 1) # y = wx + b
def forward(self, x):
return self.linear(x)
model = LinearRegressionModel()
# ====== 训练准备 ======
criterion = nn.MSELoss() # 均方误差损失
optimizer = optim.SGD(model.parameters(), lr=0.1)
# ====== 训练过程 ======
for epoch in range(100):
pred = model(X)
loss = criterion(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (epoch+1) % 10 == 0:
print(f"Epoch {epoch+1:3d} | Loss: {loss.item():.4f}")
# ====== 预测和可视化 ======
predicted = model(X).detach()
plt.figure(figsize=(8, 4))
plt.scatter(X.numpy(), y.numpy(), label="真实数据")
plt.plot(X.numpy(), predicted.numpy(), 'r-', label="模型预测")
plt.legend()
plt.title("最简单线性回归模型")
plt.xlabel("x")
plt.ylabel("y")
plt.grid(True)
plt.show()
# 查看训练后的参数
w, b = model.linear.weight.item(), model.linear.bias.item()
print(f"\n训练后模型参数:w = {w:.3f}, b = {b:.3f}")
✅ 三、代码说明
部分 | 内容 |
---|---|
数据生成 | 构造 1D 的线性关系 y=3x+2+噪声y = 3x + 2 + 噪声 |
模型定义 | nn.Linear(1, 1) 实现线性映射 |
损失函数 | nn.MSELoss() ,用于回归问题 |
优化器 | SGD ,学习率为 0.1 |
训练过程 | 正向传播 → 损失计算 → 反向传播 → 更新权重 |
可视化 | 真实散点图 + 模型预测直线 |
✅ 输出示例(部分)
Epoch 10 | Loss: 0.0395
Epoch 20 | Loss: 0.0237
Epoch 30 | Loss: 0.0225
...
训练后模型参数:w = 2.996, b = 2.002
🔍 参数非常接近原始的 w=3w=3, b=2b=2,说明模型训练成功。
✅ 下一步建议
- 需要我基于这个模板扩展成 多特征输入(结构化数据) 的回归模型?
- 或者我们可以在这上面加上验证集、指标、封装训练函数等逐步升级成 DNN。
需要我做哪一部分?你说。
好的,我们将分步骤逐步升级你的模型。现在先完成 第一步:将线性回归模型扩展为多特征输入的回归模型,接下来会逐步添加验证集、指标和封装训练函数等。
✅ 第一步:多特征输入的线性回归模型
我们假设有一个人工生成的数据集:
y=1.5x1−2x2+0.8x3+3+ϵy = 1.5x_1 - 2x_2 + 0.8x_3 + 3 + \epsilon
🔧 完整代码如下:
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt
# 设置随机种子
torch.manual_seed(0)
# ====== 1. 生成多特征数据集 ======
num_samples = 200
num_features = 3
X = torch.randn(num_samples, num_features) # (200, 3)
true_weights = torch.tensor([[1.5], [-2.0], [0.8]]) # (3, 1)
true_bias = 3.0
noise = 0.1 * torch.randn(num_samples, 1)
y = X @ true_weights + true_bias + noise # 线性组合 + 噪声
# ====== 2. 定义多特征线性回归模型 ======
class LinearRegressionMulti(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.linear = nn.Linear(input_dim, 1)
def forward(self, x):
return self.linear(x)
model = LinearRegressionMulti(input_dim=num_features)
# ====== 3. 训练准备 ======
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)
# ====== 4. 模型训练 ======
epochs = 100
for epoch in range(epochs):
pred = model(X)
loss = criterion(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1:3d} | Loss: {loss.item():.4f}")
# ====== 5. 打印模型参数 ======
w = model.linear.weight.data
b = model.linear.bias.data
print(f"\n训练后权重:{w.numpy()}")
print(f"训练后偏置:{b.item():.4f}")
✅ 输出示例
Epoch 10 | Loss: 0.0185
Epoch 20 | Loss: 0.0104
...
训练后权重:
[[ 1.497 -2.004 0.797]]
训练后偏置:3.0001
说明模型已经很好地拟合了目标函数。
✅ 下一步计划
我们将逐步加入以下部分:
步骤 | 内容 |
---|---|
✅ Step 1 | 多特征输入 ✔️ |
🔜 Step 2 | 拆分训练/验证集 |
🔜 Step 3 | 添加评价指标(MAE、R²) |
🔜 Step 4 | 封装训练函数 |
🔜 Step 5 | 升级为可配置的 DNN 结构 |
🔜 Step 6 | 加入 EarlyStopping、日志记录等功能 |
是否现在进入 Step 2(拆分训练验证集 + 初始评估指标)?
太好了,我们继续按步骤来。现在进入:
✅ Step 2:拆分训练集 / 验证集,并添加基础评估指标(MSE、MAE、R²)
🔧 改进内容:
- 使用
scikit-learn
的train_test_split
拆分数据 - 添加 MAE、R² 指标(使用
sklearn.metrics
) - 在验证集上评估模型性能
✅ 完整代码如下:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
import numpy as np
# 设置随机种子
torch.manual_seed(0)
# ====== 1. 生成数据 ======
num_samples = 200
num_features = 3
X = torch.randn(num_samples, num_features)
true_weights = torch.tensor([[1.5], [-2.0], [0.8]])
true_bias = 3.0
noise = 0.1 * torch.randn(num_samples, 1)
y = X @ true_weights + true_bias + noise
# 拆分训练集和验证集(80/20)
X_train, X_val, y_train, y_val = train_test_split(
X.numpy(), y.numpy(), test_size=0.2, random_state=42
)
# 转回 tensor
X_train = torch.tensor(X_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
# ====== 2. 定义模型 ======
class LinearRegressionMulti(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.linear = nn.Linear(input_dim, 1)
def forward(self, x):
return self.linear(x)
model = LinearRegressionMulti(input_dim=num_features)
# ====== 3. 训练准备 ======
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)
# ====== 4. 模型训练 ======
for epoch in range(100):
model.train()
pred = model(X_train)
loss = criterion(pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1:3d} | Train Loss: {loss.item():.4f}")
# ====== 5. 验证集评估 ======
model.eval()
with torch.no_grad():
val_pred = model(X_val).numpy()
val_true = y_val.numpy()
mae = mean_absolute_error(val_true, val_pred)
r2 = r2_score(val_true, val_pred)
mse = np.mean((val_pred - val_true) ** 2)
print(f"\n验证集评估:")
print(f" - MAE: {mae:.4f}")
print(f" - MSE: {mse:.4f}")
print(f" - R² : {r2:.4f}")
✅ 示例输出(可能略有差异)
Epoch 100 | Train Loss: 0.0097
验证集评估:
- MAE: 0.0703
- MSE: 0.0081
- R² : 0.9973
🧠 解释指标
指标 | 含义 |
---|---|
MSE | 均方误差,常用损失函数 |
MAE | 平均绝对误差,稳定性更强 |
R² | 决定系数,越接近 1 表示越准 |
✅ 下一步计划(Step 3)
将这些流程封装成通用的训练函数和评估函数,比如:
train_model(model, X_train, y_train, ...)
evaluate_model(model, X_val, y_val)
是否继续执行 Step 3?还是你希望先进一步了解某部分内容?
好,我们进入 ✅ Step 3:封装训练与评估函数,使训练流程更简洁、可复用,为后续升级为 DNN 和多模型对比打好基础。
✅ 本步骤目标
封装以下函数:
train_model(...)
:负责训练过程evaluate_model(...)
:负责模型验证评估- 模型结构仍保持线性结构,输入支持多特征
✅ 完整代码(包含封装函数)
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
import numpy as np
# ====== 1. 构造数据 ======
torch.manual_seed(0)
num_samples = 200
num_features = 3
X = torch.randn(num_samples, num_features)
true_weights = torch.tensor([[1.5], [-2.0], [0.8]])
true_bias = 3.0
noise = 0.1 * torch.randn(num_samples, 1)
y = X @ true_weights + true_bias + noise
# 拆分数据集
X_train, X_val, y_train, y_val = train_test_split(
X.numpy(), y.numpy(), test_size=0.2, random_state=42
)
# 转成 tensor
X_train = torch.tensor(X_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
# ====== 2. 定义模型类 ======
class LinearRegressionMulti(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.linear = nn.Linear(input_dim, 1)
def forward(self, x):
return self.linear(x)
# ====== 3. 训练函数封装 ======
def train_model(model, X_train, y_train, epochs=100, lr=0.1, verbose=True):
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=lr)
for epoch in range(epochs):
model.train()
pred = model(X_train)
loss = criterion(pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if verbose and (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1:3d} | Train Loss: {loss.item():.4f}")
# ====== 4. 验证评估函数封装 ======
def evaluate_model(model, X_val, y_val):
model.eval()
with torch.no_grad():
y_pred = model(X_val).numpy()
y_true = y_val.numpy()
mae = mean_absolute_error(y_true, y_pred)
mse = np.mean((y_true - y_pred) ** 2)
r2 = r2_score(y_true, y_pred)
print(f"\n验证集评估:")
print(f" - MAE: {mae:.4f}")
print(f" - MSE: {mse:.4f}")
print(f" - R² : {r2:.4f}")
return {"mae": mae, "mse": mse, "r2": r2}
# ====== 5. 使用封装好的函数进行训练和验证 ======
model = LinearRegressionMulti(input_dim=num_features)
train_model(model, X_train, y_train, epochs=100, lr=0.1)
evaluate_model(model, X_val, y_val)
✅ 示例输出
Epoch 10 | Train Loss: 0.0430
Epoch 20 | Train Loss: 0.0260
...
验证集评估:
- MAE: 0.0642
- MSE: 0.0069
- R² : 0.9976
✅ 小结:我们完成了
✅ | 任务 |
---|---|
✔️ | 多特征输入支持 |
✔️ | 拆分训练/验证集 |
✔️ | 添加评估指标(MSE、MAE、R²) |
✔️ | 封装训练与评估流程 |
🔜 下一步(Step 4)
我们将:
- 把模型升级为 DNN(多层感知机)
- 支持配置隐藏层、激活函数等
- 继续使用现有的训练函数,但保持兼容性
是否继续进入 Step 4:模型升级为 DNN?
好的,以下是 Step 4 的完整代码与运行示例,将我们原来的线性模型升级为 可配置 DNN(多层感知机),并保持训练/评估逻辑一致。
✅ Step 4 完整代码:多特征输入 + DNN 模型 + 封装训练评估
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
import numpy as np
# ========================
# 1. 数据构造
# ========================
torch.manual_seed(0)
np.random.seed(0)
num_samples = 200
num_features = 3
X = torch.randn(num_samples, num_features)
true_weights = torch.tensor([[1.5], [-2.0], [0.8]])
true_bias = 3.0
noise = 0.1 * torch.randn(num_samples, 1)
y = X @ true_weights + true_bias + noise
# 拆分训练/验证集
X_train, X_val, y_train, y_val = train_test_split(
X.numpy(), y.numpy(), test_size=0.2, random_state=42
)
X_train = torch.tensor(X_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
# ========================
# 2. 可配置 DNN 模型定义
# ========================
class DNNRegressor(nn.Module):
def __init__(self, input_dim, hidden_layers=[64, 32], activation=nn.ReLU):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_layers:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(activation()) # 实例化激活函数
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1)) # 输出层
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x)
# ========================
# 3. 封装训练函数
# ========================
def train_model(model, X_train, y_train, epochs=100, lr=0.01, verbose=True):
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=lr)
for epoch in range(epochs):
model.train()
pred = model(X_train)
loss = criterion(pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if verbose and (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1:3d} | Train Loss: {loss.item():.4f}")
# ========================
# 4. 封装验证评估函数
# ========================
def evaluate_model(model, X_val, y_val):
model.eval()
with torch.no_grad():
y_pred = model(X_val).numpy()
y_true = y_val.numpy()
mae = mean_absolute_error(y_true, y_pred)
mse = np.mean((y_true - y_pred) ** 2)
r2 = r2_score(y_true, y_pred)
print(f"\n验证集评估:")
print(f" - MAE: {mae:.4f}")
print(f" - MSE: {mse:.4f}")
print(f" - R² : {r2:.4f}")
return {"mae": mae, "mse": mse, "r2": r2}
# ========================
# 5. 使用示例
# ========================
# 定义模型结构:输入 3 特征,2 个隐藏层 [64, 32]
model = DNNRegressor(input_dim=num_features, hidden_layers=[64, 32], activation=nn.ReLU)
# 训练模型
train_model(model, X_train, y_train, epochs=200, lr=0.01)
# 验证模型性能
evaluate_model(model, X_val, y_val)
✅ 示例输出(可能略有差异)
Epoch 10 | Train Loss: 0.3468
Epoch 20 | Train Loss: 0.1891
...
Epoch 200 | Train Loss: 0.0083
验证集评估:
- MAE: 0.0689
- MSE: 0.0076
- R² : 0.9975
✅ 模型结构说明
当前模型结构:
输入(3维)
→ Linear(3→64) → ReLU
→ Linear(64→32) → ReLU
→ Linear(32→1)
你可以轻松换成别的结构,例如:
- 改为
[128, 64, 16]
隐藏层 - 改为
nn.Tanh
激活函数
示例:
model = DNNRegressor(input_dim=3, hidden_layers=[128, 64, 16], activation=nn.Tanh)
✅ 是否继续 Step 5?
下一步我们将实现:
- 对比多个模型结构(多次实验)
- 自动记录每个模型的性能(如 R²)
- 添加超参组合尝试(结构 + 学习率等)
是否现在进入 Step 5:多模型对比 + 自动调参?
好的,继续进入:
✅ Step 5:多模型结构对比 + 超参数组合尝试(自动调参)
🎯 目标
我们将在当前训练与评估逻辑基础上:
- 封装一个 实验器(
ModelRunner
),支持:- 多种模型结构尝试
- 自动化训练、评估、记录结果
- 输出最佳模型及各模型表现
- 对比模型结构、隐藏层设置、学习率等超参数组合
- 准备好用于可视化(下一步)
✅ 完整代码(基于 Step 4 + 新增 ModelRunner
类)
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
import numpy as np
# ========== 数据构造 ==========
torch.manual_seed(0)
np.random.seed(0)
num_samples = 200
num_features = 3
X = torch.randn(num_samples, num_features)
true_weights = torch.tensor([[1.5], [-2.0], [0.8]])
true_bias = 3.0
noise = 0.1 * torch.randn(num_samples, 1)
y = X @ true_weights + true_bias + noise
X_train, X_val, y_train, y_val = train_test_split(
X.numpy(), y.numpy(), test_size=0.2, random_state=42
)
X_train = torch.tensor(X_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
# ========== DNN 模型 ==========
class DNNRegressor(nn.Module):
def __init__(self, input_dim, hidden_layers=[64, 32], activation=nn.ReLU):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_layers:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(activation())
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x)
# ========== 训练函数 ==========
def train_model(model, X_train, y_train, epochs=100, lr=0.01, verbose=False):
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=lr)
for epoch in range(epochs):
model.train()
pred = model(X_train)
loss = criterion(pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if verbose and (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1:3d} | Train Loss: {loss.item():.4f}")
# ========== 验证函数 ==========
def evaluate_model(model, X_val, y_val):
model.eval()
with torch.no_grad():
y_pred = model(X_val).numpy()
y_true = y_val.numpy()
mae = mean_absolute_error(y_true, y_pred)
mse = np.mean((y_true - y_pred) ** 2)
r2 = r2_score(y_true, y_pred)
return {"mae": mae, "mse": mse, "r2": r2}
# ========== 多模型训练器 ==========
class ModelRunner:
def __init__(self, input_dim):
self.input_dim = input_dim
self.results = []
def run(self, configs, X_train, y_train, X_val, y_val, epochs=100):
for idx, config in enumerate(configs):
print(f"\n🚀 模型 {idx+1}/{len(configs)}: {config}")
model = DNNRegressor(
input_dim=self.input_dim,
hidden_layers=config["hidden_layers"],
activation=config.get("activation", nn.ReLU)
)
train_model(model, X_train, y_train, epochs=epochs, lr=config["lr"])
scores = evaluate_model(model, X_val, y_val)
result = {
"config": config,
"scores": scores
}
self.results.append(result)
def summary(self, sort_by="r2"):
print("\n📊 所有模型表现:")
sorted_results = sorted(self.results, key=lambda r: r["scores"][sort_by], reverse=True)
for i, res in enumerate(sorted_results):
cfg = res["config"]
sc = res["scores"]
print(f"[{i+1}] R²={sc['r2']:.4f} | MAE={sc['mae']:.4f} | MSE={sc['mse']:.4f} | {cfg}")
return sorted_results
# ========== 使用示例 ==========
runner = ModelRunner(input_dim=num_features)
param_grid = [
{"hidden_layers": [64, 32], "lr": 0.01},
{"hidden_layers": [128, 64], "lr": 0.01},
{"hidden_layers": [64, 64, 32], "lr": 0.005},
{"hidden_layers": [32], "lr": 0.1},
{"hidden_layers": [128], "lr": 0.001},
]
runner.run(param_grid, X_train, y_train, X_val, y_val, epochs=200)
runner.summary()
✅ 示例输出(部分)
🚀 模型 1/5: {'hidden_layers': [64, 32], 'lr': 0.01}
Epoch 200 | Train Loss: 0.0084
验证集评估: R² = 0.9971
🚀 模型 2/5: {'hidden_layers': [128, 64], 'lr': 0.01}
...
📊 所有模型表现:
[1] R²=0.9975 | MAE=0.0671 | MSE=0.0071 | {'hidden_layers': [128, 64], 'lr': 0.01}
[2] R²=0.9971 | MAE=0.0685 | MSE=0.0076 | {'hidden_layers': [64, 32], 'lr': 0.01}
[3] ...
✅ 功能说明总结
功能 | 实现 |
---|---|
多模型结构对比 | 自动循环配置训练 |
评估指标 | R²、MAE、MSE |
训练函数 | 可重复使用 |
输出排序 | 默认按 R² 降序显示结果 |
🔜 下一步:Step 6(自动绘图日志 & 可视化)
我们将:
- 自动记录每轮训练损失
- 用
matplotlib
绘制学习曲线 - 准备接入
TensorBoard
或wandb
可选模块
是否继续进入 Step 6:可视化与日志追踪?
当然可以!以下是 Step 6 的完整可运行代码,包括数据构造、模型定义、训练、验证、日志记录、损失曲线绘制等内容,确保你能在本地完整运行后再进入 Step 7。
✅ Step 6:完整代码(含训练日志与可视化)
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
import matplotlib.pyplot as plt
# ========== Step 1: 生成结构化数据 ==========
torch.manual_seed(0)
np.random.seed(0)
num_samples = 200
num_features = 3
X = torch.randn(num_samples, num_features)
true_weights = torch.tensor([[1.5], [-2.0], [0.8]])
true_bias = 3.0
noise = 0.1 * torch.randn(num_samples, 1)
y = X @ true_weights + true_bias + noise
X_train, X_val, y_train, y_val = train_test_split(
X.numpy(), y.numpy(), test_size=0.2, random_state=42
)
X_train = torch.tensor(X_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
# ========== Step 2: 定义模型结构 ==========
class DNNRegressor(nn.Module):
def __init__(self, input_dim, hidden_layers=[64, 32], activation=nn.ReLU):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_layers:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(activation())
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x)
# ========== Step 3: 训练函数,记录 loss ==========
def train_model(model, X_train, y_train, epochs=100, lr=0.01, verbose=False):
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=lr)
loss_history = []
for epoch in range(epochs):
model.train()
pred = model(X_train)
loss = criterion(pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_history.append(loss.item())
if verbose and (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1:3d} | Train Loss: {loss.item():.4f}")
return loss_history
# ========== Step 4: 验证指标函数 ==========
def evaluate_model(model, X_val, y_val):
model.eval()
with torch.no_grad():
y_pred = model(X_val).numpy()
y_true = y_val.numpy()
mae = mean_absolute_error(y_true, y_pred)
mse = np.mean((y_true - y_pred) ** 2)
r2 = r2_score(y_true, y_pred)
return {"mae": mae, "mse": mse, "r2": r2}
# ========== Step 5: 多模型运行器 + loss 曲线绘图 ==========
class ModelRunner:
def __init__(self, input_dim):
self.input_dim = input_dim
self.results = []
def run(self, configs, X_train, y_train, X_val, y_val, epochs=100):
for idx, config in enumerate(configs):
print(f"\n🚀 模型 {idx+1}/{len(configs)}: {config}")
model = DNNRegressor(
input_dim=self.input_dim,
hidden_layers=config["hidden_layers"],
activation=config.get("activation", nn.ReLU)
)
loss_history = train_model(model, X_train, y_train, epochs=epochs, lr=config["lr"])
scores = evaluate_model(model, X_val, y_val)
result = {
"config": config,
"scores": scores,
"losses": loss_history
}
self.results.append(result)
def summary(self, sort_by="r2"):
print("\n📊 所有模型表现:")
sorted_results = sorted(self.results, key=lambda r: r["scores"][sort_by], reverse=True)
for i, res in enumerate(sorted_results):
cfg = res["config"]
sc = res["scores"]
print(f"[{i+1}] R²={sc['r2']:.4f} | MAE={sc['mae']:.4f} | MSE={sc['mse']:.4f} | {cfg}")
return sorted_results
def plot_losses(self):
plt.figure(figsize=(10, 6))
for i, result in enumerate(self.results):
label = f"Model {i+1}: {result['config']['hidden_layers']}, lr={result['config']['lr']}"
plt.plot(result["losses"], label=label)
plt.xlabel("Epoch")
plt.ylabel("Train Loss (MSE)")
plt.title("训练损失曲线对比")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
# ========== Step 6: 使用示例 ==========
if __name__ == "__main__":
runner = ModelRunner(input_dim=num_features)
param_grid = [
{"hidden_layers": [64, 32], "lr": 0.01},
{"hidden_layers": [128, 64], "lr": 0.01},
{"hidden_layers": [64, 64, 32], "lr": 0.005},
]
runner.run(param_grid, X_train, y_train, X_val, y_val, epochs=200)
runner.summary()
runner.plot_losses()
✅ 输出效果
- 控制台会显示每个模型的训练表现(R²、MAE、MSE)
- 弹出一个 matplotlib 图形窗口,展示每个模型的训练损失曲线,如:
[1] R²=0.9973 | MAE=0.0690 | MSE=0.0075 | {'hidden_layers': [128, 64], 'lr': 0.01}
[2] R²=0.9971 | MAE=0.0684 | MSE=0.0077 | {'hidden_layers': [64, 32], 'lr': 0.01}
...
✅ Step 6 完成 ✅
你现在已经有一个:
- 多模型训练/评估/对比系统
- 自动记录训练过程
- 训练曲线可视化工具
🔜 Step 7(下一步)
我们将在 Step 7 做:
- 自动保存结果(CSV / JSON)
- 预测结果对比图(真值 vs 预测)
- 支持模型导出/加载(可选)
是否继续?🧠
好的!下面将 Step 7 的内容整理为一个完整的 Jupyter Notebook,并给出完整代码及示例。每个代码块都按顺序执行,包含了训练、评估、日志记录、损失曲线、预测结果可视化和结果导出等功能。
📝 Notebook 完整代码
# 1. 导入必要的库
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
import pandas as pd
# 2. 生成结构化数据
torch.manual_seed(0)
np.random.seed(0)
num_samples = 200
num_features = 3
X = torch.randn(num_samples, num_features)
true_weights = torch.tensor([[1.5], [-2.0], [0.8]])
true_bias = 3.0
noise = 0.1 * torch.randn(num_samples, 1)
y = X @ true_weights + true_bias + noise
# 划分训练集与验证集
X_train, X_val, y_train, y_val = train_test_split(
X.numpy(), y.numpy(), test_size=0.2, random_state=42
)
X_train = torch.tensor(X_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
# 3. 定义DNN回归模型
class DNNRegressor(nn.Module):
def __init__(self, input_dim, hidden_layers=[64, 32], activation=nn.ReLU):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_layers:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(activation())
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x)
# 4. 定义训练函数
def train_model(model, X_train, y_train, epochs=100, lr=0.01, verbose=False):
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=lr)
loss_history = []
for epoch in range(epochs):
model.train()
pred = model(X_train)
loss = criterion(pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_history.append(loss.item())
if verbose and (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1:3d} | Train Loss: {loss.item():.4f}")
return loss_history
# 5. 定义验证函数
def evaluate_model(model, X_val, y_val):
model.eval()
with torch.no_grad():
y_pred = model(X_val).numpy()
y_true = y_val.numpy()
mae = mean_absolute_error(y_true, y_pred)
mse = np.mean((y_true - y_pred) ** 2)
r2 = r2_score(y_true, y_pred)
return {"mae": mae, "mse": mse, "r2": r2}
# 6. 定义多模型训练/评估/对比类
class ModelRunner:
def __init__(self, input_dim):
self.input_dim = input_dim
self.results = []
def run(self, configs, X_train, y_train, X_val, y_val, epochs=100):
for idx, config in enumerate(configs):
print(f"\n🚀 模型 {idx+1}/{len(configs)}: {config}")
model = DNNRegressor(
input_dim=self.input_dim,
hidden_layers=config["hidden_layers"],
activation=config.get("activation", nn.ReLU)
)
loss_history = train_model(model, X_train, y_train, epochs=epochs, lr=config["lr"])
scores = evaluate_model(model, X_val, y_val)
y_pred = model(X_val).detach().numpy()
result = {
"config": config,
"scores": scores,
"losses": loss_history,
"y_true": y_val.numpy().flatten(),
"y_pred": y_pred.flatten()
}
self.results.append(result)
def summary(self, sort_by="r2"):
print("\n📊 所有模型表现:")
sorted_results = sorted(self.results, key=lambda r: r["scores"][sort_by], reverse=True)
for i, res in enumerate(sorted_results):
cfg = res["config"]
sc = res["scores"]
print(f"[{i+1}] R²={sc['r2']:.4f} | MAE={sc['mae']:.4f} | MSE={sc['mse']:.4f} | {cfg}")
return sorted_results
def plot_losses(self):
plt.figure(figsize=(10, 6))
for i, result in enumerate(self.results):
label = f"Model {i+1}: {result['config']['hidden_layers']}, lr={result['config']['lr']}"
plt.plot(result["losses"], label=label)
plt.xlabel("Epoch")
plt.ylabel("Train Loss (MSE)")
plt.title("训练损失曲线对比")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
def plot_predictions(self):
for i, result in enumerate(self.results):
plt.figure(figsize=(6, 6))
plt.scatter(result["y_true"], result["y_pred"], alpha=0.6)
plt.plot([min(result["y_true"]), max(result["y_true"])],
[min(result["y_true"]), max(result["y_true"])], 'r--')
plt.xlabel("True Value")
plt.ylabel("Predicted Value")
plt.title(f"模型 {i+1} 预测对比图")
plt.grid(True)
plt.tight_layout()
plt.show()
def export_results(self, filepath="model_results.csv"):
records = []
for i, res in enumerate(self.results):
record = {
"model_id": i + 1,
"r2": res["scores"]["r2"],
"mae": res["scores"]["mae"],
"mse": res["scores"]["mse"],
"hidden_layers": str(res["config"]["hidden_layers"]),
"lr": res["config"]["lr"]
}
records.append(record)
df = pd.DataFrame(records)
df.to_csv(filepath, index=False)
print(f"✅ 模型评估结果已保存至:{filepath}")
# 7. 模型运行示例
if __name__ == "__main__":
runner = ModelRunner(input_dim=num_features)
param_grid = [
{"hidden_layers": [64, 32], "lr": 0.01},
{"hidden_layers": [128, 64], "lr": 0.01},
{"hidden_layers": [64, 64, 32], "lr": 0.005},
]
runner.run(param_grid, X_train, y_train, X_val, y_val, epochs=200)
runner.summary()
runner.plot_losses()
runner.plot_predictions()
runner.export_results("results_step7.csv")
📑 Notebook 说明
- 导入库
引入必要的库,如
torch
、sklearn
、matplotlib
、pandas
等。 - 数据生成与划分 生成随机数据并添加噪声,然后划分为训练集和验证集。
- 模型定义 定义一个简单的全连接神经网络模型(DNNRegressor)用于回归任务。
- 训练函数
定义
train_model
函数,利用均方误差(MSE)进行模型训练。 - 验证函数
evaluate_model
函数用于计算并返回模型的 MAE、MSE、R² 等指标。 - 模型训练与对比
使用
ModelRunner
类执行多模型训练,记录模型配置、评估结果、损失曲线,并绘制训练过程中的损失曲线和预测结果对比图。 - 结果导出 将训练和评估结果保存为 CSV 文件,便于后续的分析与对比。
🧪 执行流程
- 运行模型训练与评估:
使用
runner.run()
运行不同的模型配置,训练过程中会输出每个模型的表现。 - 查看模型总结:
runner.summary()
将根据 R² 对模型进行排序,并输出每个模型的评估指标。 - 绘制损失曲线:
runner.plot_losses()
将展示所有模型训练的损失曲线,帮助判断哪个模型收敛较快。 - 绘制预测结果对比:
runner.plot_predictions()
绘制预测值与真实值的散点图,用于直观评估回归效果。 - 导出结果:
runner.export_results()
将模型的评估结果保存为 CSV 文件,便于后续查看。
⚡ 后续步骤
- Step 8:模型保存与加载(
torch.save()
) - Step 9:调参(GridSearch / RandomSearch)
- Step 10:加入早停(EarlyStopping)机制,防止过拟合
如果你有其他问题或希望继续扩展后续步骤,随时告诉我!
好的!接下来,我将 Step 8 整理为一个 Jupyter Notebook 使用的完整代码,并给出相关示例,之后再继续进行 Step 9。
📑 Step 8: 模型保存与加载 - 完整 Notebook
# 1. 导入必要的库
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, r2_score
import pandas as pd
# 2. 生成结构化数据
torch.manual_seed(0)
np.random.seed(0)
num_samples = 200
num_features = 3
X = torch.randn(num_samples, num_features)
true_weights = torch.tensor([[1.5], [-2.0], [0.8]])
true_bias = 3.0
noise = 0.1 * torch.randn(num_samples, 1)
y = X @ true_weights + true_bias + noise
# 划分训练集与验证集
X_train, X_val, y_train, y_val = train_test_split(
X.numpy(), y.numpy(), test_size=0.2, random_state=42
)
X_train = torch.tensor(X_train, dtype=torch.float32)
X_val = torch.tensor(X_val, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.float32)
y_val = torch.tensor(y_val, dtype=torch.float32)
# 3. 定义DNN回归模型
class DNNRegressor(nn.Module):
def __init__(self, input_dim, hidden_layers=[64, 32], activation=nn.ReLU):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_layers:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(activation())
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x)
# 4. 定义训练函数
def train_model(model, X_train, y_train, epochs=100, lr=0.01, verbose=False):
criterion = nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=lr)
loss_history = []
for epoch in range(epochs):
model.train()
pred = model(X_train)
loss = criterion(pred, y_train)
optimizer.zero_grad()
loss.backward()
optimizer.step()
loss_history.append(loss.item())
if verbose and (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1:3d} | Train Loss: {loss.item():.4f}")
return loss_history
# 5. 定义验证函数
def evaluate_model(model, X_val, y_val):
model.eval()
with torch.no_grad():
y_pred = model(X_val).numpy()
y_true = y_val.numpy()
mae = mean_absolute_error(y_true, y_pred)
mse = np.mean((y_true - y_pred) ** 2)
r2 = r2_score(y_true, y_pred)
return {"mae": mae, "mse": mse, "r2": r2}
# 6. 定义多模型训练/评估/对比类
class ModelRunner:
def __init__(self, input_dim):
self.input_dim = input_dim
self.results = []
def run(self, configs, X_train, y_train, X_val, y_val, epochs=100):
for idx, config in enumerate(configs):
print(f"\n🚀 模型 {idx+1}/{len(configs)}: {config}")
model = DNNRegressor(
input_dim=self.input_dim,
hidden_layers=config["hidden_layers"],
activation=config.get("activation", nn.ReLU)
)
loss_history = train_model(model, X_train, y_train, epochs=epochs, lr=config["lr"])
scores = evaluate_model(model, X_val, y_val)
y_pred = model(X_val).detach().numpy()
result = {
"config": config,
"scores": scores,
"losses": loss_history,
"y_true": y_val.numpy().flatten(),
"y_pred": y_pred.flatten(),
"model": model
}
self.results.append(result)
def summary(self, sort_by="r2"):
print("\n📊 所有模型表现:")
sorted_results = sorted(self.results, key=lambda r: r["scores"][sort_by], reverse=True)
for i, res in enumerate(sorted_results):
cfg = res["config"]
sc = res["scores"]
print(f"[{i+1}] R²={sc['r2']:.4f} | MAE={sc['mae']:.4f} | MSE={sc['mse']:.4f} | {cfg}")
return sorted_results
def plot_losses(self):
plt.figure(figsize=(10, 6))
for i, result in enumerate(self.results):
label = f"Model {i+1}: {result['config']['hidden_layers']}, lr={result['config']['lr']}"
plt.plot(result["losses"], label=label)
plt.xlabel("Epoch")
plt.ylabel("Train Loss (MSE)")
plt.title("训练损失曲线对比")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
def plot_predictions(self):
for i, result in enumerate(self.results):
plt.figure(figsize=(6, 6))
plt.scatter(result["y_true"], result["y_pred"], alpha=0.6)
plt.plot([min(result["y_true"]), max(result["y_true"])],
[min(result["y_true"]), max(result["y_true"])], 'r--')
plt.xlabel("True Value")
plt.ylabel("Predicted Value")
plt.title(f"模型 {i+1} 预测对比图")
plt.grid(True)
plt.tight_layout()
plt.show()
def export_results(self, filepath="model_results.csv"):
records = []
for i, res in enumerate(self.results):
record = {
"model_id": i + 1,
"r2": res["scores"]["r2"],
"mae": res["scores"]["mae"],
"mse": res["scores"]["mse"],
"hidden_layers": str(res["config"]["hidden_layers"]),
"lr": res["config"]["lr"]
}
records.append(record)
df = pd.DataFrame(records)
df.to_csv(filepath, index=False)
print(f"✅ 模型评估结果已保存至:{filepath}")
def save_model(self, model, filename="model.pth"):
# 保存模型
torch.save(model.state_dict(), filename)
print(f"✅ 模型已保存至 {filename}")
def load_model(self, model, filename="model.pth"):
# 加载模型
model.load_state_dict(torch.load(filename))
model.eval()
print(f"✅ 模型已从 {filename} 加载")
return model
🧑💻 Notebook 运行示例
- 模型训练并保存
if __name__ == "__main__":
runner = ModelRunner(input_dim=num_features)
param_grid = [
{"hidden_layers": [64, 32], "lr": 0.01},
{"hidden_layers": [128, 64], "lr": 0.01},
{"hidden_layers": [64, 64, 32], "lr": 0.005},
]
runner.run(param_grid, X_train, y_train, X_val, y_val, epochs=200)
runner.summary()
runner.plot_losses()
runner.plot_predictions()
runner.export_results("results_step8.csv")
# 保存第一个模型
best_model = runner.results[0]["model"]
runner.save_model(best_model, filename="best_model.pth")
- 加载已保存的模型
# 加载模型
runner.load_model(best_model, filename="best_model.pth")
# 使用加载的模型进行预测
y_pred = best_model(X_val).detach().numpy()
print(f"预测结果:\n{y_pred[:5]}")
✅ 功能总结
功能 | 说明 |
---|---|
save_model() |
将训练好的模型保存为文件 |
load_model() |
从文件加载已保存的模型 |
model.state_dict() |
仅保存和加载模型的参数(权重) |
model.eval() |
设置模型为推理模式(关闭 Dropout 和 BatchNorm) |
⚡ Step 9:自动调参(GridSearch / RandomSearch)
接下来,我们可以继续实现 自动调参 的功能。使用 GridSearchCV
或 RandomSearch
来自动搜索最佳的超参数配置,以提升模型性能。
如果你准备好继续,请告诉我!