好了 — 我已经为你写好并运行了一个可立即运行、仅依赖 NumPy 的动态分类神经网络示例。特点与说明:
- 支持用户通过
config
动态传入网络层级(每层的 units 与 activation)、训练参数(epochs、batch_size、learning_rate 等)来构建并训练网络。 - 没有依赖 PyTorch / TensorFlow,纯 NumPy 实现,保证在大多数 Python 环境中立即可运行。
- 附带合成的多类高斯数据集(可调整样本数与类别数),并给出训练曲线(Loss / Accuracy)。
- 模型参数保存到
/mnt/data/dynamic_nn_demo_params.npy
(已保存)。你可以下载并在需要时加载。
我已在运行环境中训练并保存了模型 —— 下载路径: 下载训练好的模型参数(dynamic_nn_demo_params.npy)
下面是你可以直接复制运行(或在 notebook 中执行)的完整脚本(与我运行的代码相同)。你可以直接修改 config
中的项以自定义网络与训练超参数。
# Dynamic neural network implemented with NumPy (no external DL libs required).
# Copy & run this whole file in a Python environment or notebook.
import numpy as np
import matplotlib.pyplot as plt
import os
def one_hot(labels, num_classes):
oh = np.zeros((labels.size, num_classes))
oh[np.arange(labels.size), labels] = 1
return oh
def accuracy(pred_probs, labels):
preds = np.argmax(pred_probs, axis=1)
return np.mean(preds == labels)
class Activations:
@staticmethod
def relu(x):
return np.maximum(0, x)
@staticmethod
def drelu(x):
return (x > 0).astype(float)
@staticmethod
def sigmoid(x):
return 1 / (1 + np.exp(-x))
@staticmethod
def dsigmoid(x):
s = Activations.sigmoid(x)
return s * (1 - s)
@staticmethod
def tanh(x):
return np.tanh(x)
@staticmethod
def dtanh(x):
return 1 - np.tanh(x)**2
@staticmethod
def softmax(x):
x = x - np.max(x, axis=1, keepdims=True)
e = np.exp(x)
return e / np.sum(e, axis=1, keepdims=True)
class Layer:
def __init__(self, in_dim, out_dim, activation='relu'):
self.in_dim = in_dim
self.out_dim = out_dim
self.activation = activation
# init
if activation == 'relu':
scale = np.sqrt(2.0 / in_dim)
elif activation in ('tanh', 'sigmoid'):
scale = np.sqrt(1.0 / in_dim)
else:
scale = 0.01
self.W = np.random.randn(in_dim, out_dim) * scale
self.b = np.zeros((1, out_dim))
self.x = None
self.z = None
def forward(self, x):
self.x = x
self.z = x.dot(self.W) + self.b
if self.activation == 'relu':
return Activations.relu(self.z)
elif self.activation == 'sigmoid':
return Activations.sigmoid(self.z)
elif self.activation == 'tanh':
return Activations.tanh(self.z)
elif self.activation == 'linear':
return self.z
else:
raise ValueError(f"Unsupported activation: {self.activation}")
def backward(self, grad_out):
if self.activation == 'relu':
grad_z = grad_out * Activations.drelu(self.z)
elif self.activation == 'sigmoid':
grad_z = grad_out * Activations.dsigmoid(self.z)
elif self.activation == 'tanh':
grad_z = grad_out * Activations.dtanh(self.z)
elif self.activation == 'linear':
grad_z = grad_out
else:
raise ValueError(f"Unsupported activation: {self.activation}")
m = self.x.shape[0]
dW = self.x.T.dot(grad_z) / m
db = np.sum(grad_z, axis=0, keepdims=True) / m
dX = grad_z.dot(self.W.T)
return dX, dW, db
class DynamicNN:
def __init__(self, input_dim, layer_specs, num_classes):
self.layers = []
in_dim = input_dim
for spec in layer_specs:
self.layers.append(Layer(in_dim, spec['units'], spec.get('activation', 'relu')))
in_dim = spec['units']
self.out_layer = Layer(in_dim, num_classes, activation='linear')
self.num_classes = num_classes
def forward(self, x):
out = x
for layer in self.layers:
out = layer.forward(out)
logits = self.out_layer.forward(out)
probs = Activations.softmax(logits)
return logits, probs
def compute_loss_and_grad(self, x, y_onehot):
logits, probs = self.forward(x)
m = x.shape[0]
eps = 1e-12
loss = -np.sum(y_onehot * np.log(probs + eps)) / m
grad_logits = (probs - y_onehot) / m
grad, dW, db = self.out_layer.backward(grad_logits)
grads = [{'layer': 'out', 'dW': dW, 'db': db}]
for layer in reversed(self.layers):
grad, dW, db = layer.backward(grad)
grads.append({'layer': layer, 'dW': dW, 'db': db})
return loss, probs, grads
def step(self, grads, lr):
out_grad = grads[0]
self.out_layer.W -= lr * out_grad['dW']
self.out_layer.b -= lr * out_grad['db']
for g, layer in zip(grads[1:], reversed(self.layers)):
layer.W -= lr * g['dW']
layer.b -= lr * g['db']
def predict(self, x):
_, probs = self.forward(x)
return np.argmax(probs, axis=1), probs
def save(self, path):
params = {
'out_W': self.out_layer.W,
'out_b': self.out_layer.b,
'layers': [(l.W, l.b, l.activation) for l in self.layers],
'num_classes': self.num_classes
}
np.save(path, params, allow_pickle=True)
def load(self, path):
d = np.load(path, allow_pickle=True).item()
self.out_layer.W = d['out_W']
self.out_layer.b = d['out_b']
layers_data = d['layers']
for l, data in zip(self.layers, layers_data):
W, b, activation = data
l.W = W
l.b = b
l.activation = activation
def generate_gaussian_blobs(n_samples=1500, centers=3, dim=2, seed=42, std=1.2):
rng = np.random.RandomState(seed)
samples_per = n_samples // centers
X = []
y = []
angles = np.linspace(0, 2*np.pi, centers, endpoint=False)
for i, a in enumerate(angles):
center = np.array([4*np.cos(a), 4*np.sin(a)])
X.append(rng.randn(samples_per, dim) * std + center)
y.append(np.full(samples_per, i))
X = np.vstack(X)
y = np.concatenate(y)
perm = rng.permutation(X.shape[0])
return X[perm], y[perm]
def train_model(config):
input_dim = config['input_dim']
layer_specs = config['layer_specs']
num_classes = config['num_classes']
epochs = config.get('epochs', 30)
batch_size = config.get('batch_size', 64)
lr = config.get('learning_rate', 0.01)
seed = config.get('seed', 42)
verbose = config.get('verbose', True)
X, y = generate_gaussian_blobs(n_samples=config.get('n_samples', 1200), centers=num_classes, dim=input_dim, seed=seed)
split = int(0.8 * X.shape[0])
X_train, y_train = X[:split], y[:split]
X_test, y_test = X[split:], y[split:]
y_train_oh = one_hot(y_train, num_classes)
y_test_oh = one_hot(y_test, num_classes)
mu = X_train.mean(axis=0, keepdims=True)
sigma = X_train.std(axis=0, keepdims=True) + 1e-8
X_train = (X_train - mu) / sigma
X_test = (X_test - mu) / sigma
model = DynamicNN(input_dim, layer_specs, num_classes)
steps_per_epoch = int(np.ceil(X_train.shape[0] / batch_size))
history = {'loss': [], 'train_acc': [], 'test_acc': []}
rng = np.random.RandomState(seed)
for ep in range(1, epochs+1):
perm = rng.permutation(X_train.shape[0])
X_train_sh = X_train[perm]
y_train_oh_sh = y_train_oh[perm]
epoch_loss = 0.0
for step in range(steps_per_epoch):
start = step * batch_size
end = start + batch_size
xb = X_train_sh[start:end]
yb = y_train_oh_sh[start:end]
loss, probs, grads = model.compute_loss_and_grad(xb, yb)
model.step(grads, lr)
epoch_loss += loss * xb.shape[0]
epoch_loss /= X_train.shape[0]
_, train_probs = model.forward(X_train)
train_acc = accuracy(train_probs, y_train)
_, test_probs = model.forward(X_test)
test_acc = accuracy(test_probs, y_test)
history['loss'].append(epoch_loss)
history['train_acc'].append(train_acc)
history['test_acc'].append(test_acc)
if verbose and (ep % max(1, epochs//6) == 0 or ep == 1 or ep == epochs):
print(f"Epoch {ep}/{epochs} - loss: {epoch_loss:.4f} - train_acc: {train_acc:.4f} - test_acc: {test_acc:.4f}")
save_path = "/mnt/data/dynamic_nn_demo_params.npy"
model.save(save_path)
if verbose:
print(f"\nModel saved to: {save_path}")
plt.figure(figsize=(8,4))
plt.subplot(1,2,1)
plt.plot(history['loss'])
plt.title('Loss')
plt.xlabel('Epoch')
plt.subplot(1,2,2)
plt.plot(history['train_acc'], label='train')
plt.plot(history['test_acc'], label='test')
plt.title('Accuracy')
plt.xlabel('Epoch')
plt.legend()
plt.tight_layout()
plt.show()
preds, probs = model.predict(X_test)
final_acc = accuracy(probs, y_test)
print(f"Final test accuracy: {final_acc:.4f} on {X_test.shape[0]} samples")
return model, history, save_path, (X_test, y_test, preds, probs)
# Example config — change this to experiment:
config = {
'input_dim': 2,
'layer_specs': [
{'units': 64, 'activation': 'relu'},
{'units': 32, 'activation': 'relu'},
],
'num_classes': 3,
'epochs': 30,
'batch_size': 64,
'learning_rate': 0.05,
'n_samples': 1200,
'seed': 123,
'verbose': True
}
model, history, save_path, test_info = train_model(config)
好的 👍 我帮你把上面 NumPy 实现的 动态分类模型 改写成 PyTorch 版本,保持原来“用户可通过配置动态指定层级和训练参数”的设计。这个版本支持 GPU(如果有的话),并且代码更加简洁(自动求导和优化器)。
下面是完整代码(可立即运行):
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
# ---------- 数据生成 ----------
def generate_gaussian_blobs(n_samples=1500, centers=3, dim=2, seed=42, std=1.2):
rng = np.random.RandomState(seed)
samples_per = n_samples // centers
X = []
y = []
angles = np.linspace(0, 2*np.pi, centers, endpoint=False)
for i, a in enumerate(angles):
center = np.array([4*np.cos(a), 4*np.sin(a)])
X.append(rng.randn(samples_per, dim) * std + center)
y.append(np.full(samples_per, i))
X = np.vstack(X)
y = np.concatenate(y)
perm = rng.permutation(X.shape[0])
return X[perm], y[perm]
# ---------- 动态模型 ----------
class DynamicNN(nn.Module):
def __init__(self, input_dim, layer_specs, num_classes):
super().__init__()
layers = []
in_dim = input_dim
for spec in layer_specs:
layers.append(nn.Linear(in_dim, spec["units"]))
if spec["activation"] == "relu":
layers.append(nn.ReLU())
elif spec["activation"] == "tanh":
layers.append(nn.Tanh())
elif spec["activation"] == "sigmoid":
layers.append(nn.Sigmoid())
# linear 则不加激活
in_dim = spec["units"]
layers.append(nn.Linear(in_dim, num_classes))
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x)
# ---------- 训练函数 ----------
def train_model(config):
# 数据
X, y = generate_gaussian_blobs(
n_samples=config.get("n_samples", 1200),
centers=config["num_classes"],
dim=config["input_dim"],
seed=config.get("seed", 42),
)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=config.get("seed", 42))
# 转 tensor
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
X_train = torch.tensor(X_train, dtype=torch.float32).to(device)
y_train = torch.tensor(y_train, dtype=torch.long).to(device)
X_test = torch.tensor(X_test, dtype=torch.float32).to(device)
y_test = torch.tensor(y_test, dtype=torch.long).to(device)
# 模型
model = DynamicNN(config["input_dim"], config["layer_specs"], config["num_classes"]).to(device)
optimizer = optim.Adam(model.parameters(), lr=config.get("learning_rate", 0.01))
criterion = nn.CrossEntropyLoss()
# 训练
epochs = config.get("epochs", 30)
batch_size = config.get("batch_size", 64)
history = {"loss": [], "train_acc": [], "test_acc": []}
for ep in range(1, epochs + 1):
model.train()
perm = torch.randperm(X_train.size(0))
epoch_loss = 0
correct = 0
for i in range(0, X_train.size(0), batch_size):
idx = perm[i : i + batch_size]
xb, yb = X_train[idx], y_train[idx]
optimizer.zero_grad()
out = model(xb)
loss = criterion(out, yb)
loss.backward()
optimizer.step()
epoch_loss += loss.item() * xb.size(0)
correct += (out.argmax(1) == yb).sum().item()
train_acc = correct / X_train.size(0)
epoch_loss /= X_train.size(0)
# 测试集
model.eval()
with torch.no_grad():
out_test = model(X_test)
test_acc = (out_test.argmax(1) == y_test).float().mean().item()
history["loss"].append(epoch_loss)
history["train_acc"].append(train_acc)
history["test_acc"].append(test_acc)
if ep % max(1, epochs // 6) == 0 or ep == 1 or ep == epochs:
print(f"Epoch {ep}/{epochs} - loss: {epoch_loss:.4f} - train_acc: {train_acc:.4f} - test_acc: {test_acc:.4f}")
# 画图
plt.figure(figsize=(8, 4))
plt.subplot(1, 2, 1)
plt.plot(history["loss"])
plt.title("Loss")
plt.subplot(1, 2, 2)
plt.plot(history["train_acc"], label="train")
plt.plot(history["test_acc"], label="test")
plt.legend()
plt.title("Accuracy")
plt.show()
return model, history, (X_test, y_test)
# ---------- 测试运行 ----------
if __name__ == "__main__":
config = {
"input_dim": 2,
"layer_specs": [
{"units": 64, "activation": "relu"},
{"units": 32, "activation": "tanh"},
],
"num_classes": 3,
"epochs": 30,
"batch_size": 64,
"learning_rate": 0.01,
"n_samples": 1200,
"seed": 123,
}
model, history, test_data = train_model(config)
运行后你会看到 loss 曲线和训练/测试准确率曲线,准确率一般可以到 95% 以上。
你可以自由修改 config["layer_specs"]
来搭建不同深度/激活函数的网络。
明白 ✅ 你希望做的是:
- 用 Excel 类似的表格数据 做分类任务;
- 数据用 pandas DataFrame 来生成(模拟 Excel),而不是实际读
.xlsx
文件; - 训练和测试时能跑通,并且能输出预测结果。
我给你一个 PyTorch 分类模型完整示例,包括:
- 用
pandas
生成模拟表格数据(混合数值型和类别型); - 对类别型特征做 One-Hot 编码;
- 用
train_test_split
划分训练/测试集; - 用 PyTorch 定义一个动态的 MLP 分类模型;
- 训练并输出 预测结果。
🔥 代码示例
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
# 1. 生成模拟Excel数据(DataFrame)
np.random.seed(42)
N = 300
df = pd.DataFrame({
"feature_num1": np.random.randn(N), # 数值特征1
"feature_num2": np.random.rand(N) * 10, # 数值特征2
"feature_cat": np.random.choice(["A", "B", "C"], size=N), # 类别特征
"label": np.random.choice([0, 1, 2], size=N) # 分类标签(3类)
})
print("示例数据:")
print(df.head())
# 2. 处理类别型特征(One-Hot)
df = pd.get_dummies(df, columns=["feature_cat"], drop_first=True)
# 特征和标签
X = df.drop("label", axis=1).values
y = df["label"].values
# 标准化数值特征(更利于训练)
scaler = StandardScaler()
X = scaler.fit_transform(X)
# 划分训练/测试集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# 转为Tensor
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
# 3. 定义动态神经网络
class DynamicNN(nn.Module):
def __init__(self, input_dim, layer_specs, num_classes):
super().__init__()
layers = []
in_dim = input_dim
for spec in layer_specs:
layers.append(nn.Linear(in_dim, spec["units"]))
if spec["activation"] == "relu":
layers.append(nn.ReLU())
elif spec["activation"] == "tanh":
layers.append(nn.Tanh())
elif spec["activation"] == "sigmoid":
layers.append(nn.Sigmoid())
in_dim = spec["units"]
layers.append(nn.Linear(in_dim, num_classes))
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x)
# 配置网络
config = {
"input_dim": X_train.shape[1],
"layer_specs": [
{"units": 32, "activation": "relu"},
{"units": 16, "activation": "tanh"},
],
"num_classes": len(np.unique(y)),
"epochs": 30,
"batch_size": 32,
"learning_rate": 0.01
}
# 4. 模型训练
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = DynamicNN(config["input_dim"], config["layer_specs"], config["num_classes"]).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=config["learning_rate"])
X_train, y_train = X_train.to(device), y_train.to(device)
X_test, y_test = X_test.to(device), y_test.to(device)
for epoch in range(1, config["epochs"] + 1):
model.train()
optimizer.zero_grad()
outputs = model(X_train)
loss = criterion(outputs, y_train)
loss.backward()
optimizer.step()
if epoch % 5 == 0 or epoch == 1 or epoch == config["epochs"]:
_, preds_train = torch.max(outputs, 1)
acc_train = (preds_train == y_train).float().mean().item()
model.eval()
with torch.no_grad():
outputs_test = model(X_test)
_, preds_test = torch.max(outputs_test, 1)
acc_test = (preds_test == y_test).float().mean().item()
print(f"Epoch {epoch}/{config['epochs']} | Loss: {loss.item():.4f} | Train Acc: {acc_train:.4f} | Test Acc: {acc_test:.4f}")
# 5. 预测结果
model.eval()
with torch.no_grad():
probs = torch.softmax(model(X_test), dim=1)
preds = probs.argmax(dim=1)
print("\n=== 测试集预测结果示例 ===")
for i in range(5):
print(f"真实: {y_test[i].item()} | 预测: {preds[i].item()} | 概率分布: {probs[i].cpu().numpy()}")
🚀 说明
- 数据生成
- 数值特征:
feature_num1, feature_num2
- 类别特征:
feature_cat
(A/B/C,转为 One-Hot) - 标签:
label
(三分类:0/1/2)
- 数值特征:
- 预处理
pd.get_dummies
自动把类别特征转成数值列StandardScaler
让数值特征标准化
- 模型
layer_specs
决定隐藏层结构(你可以随意改)- 自动支持二分类、多分类
- 输出
- 打印训练 & 测试集准确率
- 输出测试集前 5 个样本的 真实类别、预测类别、预测概率分布
明白啦 👍 你希望通过一个 列表(层级配置) 来控制神经网络的隐藏层结构,比如:
layers = [64, 32, 16]
表示:
- 输入层 → 隐藏层1(64个神经元)
- 隐藏层1 → 隐藏层2(32个神经元)
- 隐藏层2 → 隐藏层3(16个神经元)
- 最终连接到输出层(分类数)
下面我给你一个 完整的 PyTorch 分类模型训练代码,支持用 layers=[]
动态构建网络,用 pandas
生成模拟 Excel 数据(不用真实 Excel 文件),并展示训练+预测结果。
📌 动态分类模型代码
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import pandas as pd
import numpy as np
# ===== 1. 模拟Excel数据(分类问题) =====
np.random.seed(42)
num_samples = 500
num_features = 10
num_classes = 3
# 随机生成特征
X = np.random.randn(num_samples, num_features)
# 构造一个目标列 y (0,1,2 三分类)
y = np.random.randint(0, num_classes, size=num_samples)
# 转成 DataFrame,模拟 Excel 读进来的样子
df = pd.DataFrame(X, columns=[f"feature_{i}" for i in range(num_features)])
df["label"] = y
print("📊 模拟Excel数据示例:")
print(df.head())
# ===== 2. 划分训练集和测试集 =====
X_train, X_test, y_train, y_test = train_test_split(
df.drop("label", axis=1).values,
df["label"].values,
test_size=0.2,
random_state=42
)
# 转成张量
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
# ===== 3. 动态神经网络模型 =====
class DynamicNN(nn.Module):
def __init__(self, input_dim, hidden_layers, num_classes):
super(DynamicNN, self).__init__()
layers = []
in_dim = input_dim
# 动态生成隐藏层
for h in hidden_layers:
layers.append(nn.Linear(in_dim, h))
layers.append(nn.ReLU())
in_dim = h
# 输出层
layers.append(nn.Linear(in_dim, num_classes))
# 使用Sequential容器
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# ===== 4. 训练配置 =====
hidden_layers = [64, 32] # 你可以改成 []
model = DynamicNN(input_dim=num_features, hidden_layers=hidden_layers, num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
# ===== 5. 训练 =====
epochs = 50
for epoch in range(epochs):
optimizer.zero_grad()
outputs = model(X_train)
loss = criterion(outputs, y_train)
loss.backward()
optimizer.step()
if (epoch+1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")
# ===== 6. 测试 & 预测 =====
with torch.no_grad():
y_pred = model(X_test).argmax(dim=1)
print("\n📈 分类结果报告:")
print(classification_report(y_test, y_pred))
# 随机取前10条预测结果
df_results = pd.DataFrame({
"真实值": y_test.numpy()[:10],
"预测值": y_pred.numpy()[:10]
})
print("\n🔮 前10条预测结果:")
print(df_results)
📌 代码说明
-
hidden_layers
-
你可以传一个列表,定义隐藏层的结构,比如:
hidden_layers = [128, 64, 32]
或者不需要隐藏层:
hidden_layers = []
-
-
输入数据
- 用
pandas
模拟了一个 Excel 表格(10个特征 + 1个分类标签)。 - 你以后可以直接用
pd.read_excel("yourfile.xlsx")
替换。
- 用
-
输出结果
- 训练过程打印
Loss
。 - 最终打印分类指标(precision、recall、f1)。
- 并展示前10条的预测结果。
- 训练过程打印
好👌,我来帮你把这个动态分类模型升级,让它能处理 字符型数据(类别字段),同时保持对 多分类任务 的适应性。
核心思路是:
- 字符特征 → 数值化:用
LabelEncoder
或OneHotEncoder
转换成数字。 - 动态构建网络:
hidden_layers = []
或[64, 32, ...]
自由选择。 - 自动识别分类数量:模型根据
y
的不同类别数自动调整输出层。
📌 代码示例(字符分类 + 多分类场景)
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
import pandas as pd
import numpy as np
# ===== 1. 模拟字符型分类数据 =====
np.random.seed(42)
num_samples = 300
# 假设有 3 个类别特征(颜色, 形状, 大小)
colors = np.random.choice(["red", "blue", "green"], size=num_samples)
shapes = np.random.choice(["circle", "square", "triangle"], size=num_samples)
sizes = np.random.choice(["small", "medium", "large"], size=num_samples)
# 标签:三分类
labels = np.random.choice(["A", "B", "C"], size=num_samples)
# 转成 DataFrame(模拟 Excel)
df = pd.DataFrame({
"color": colors,
"shape": shapes,
"size": sizes,
"label": labels
})
print("📊 模拟Excel数据示例:")
print(df.head())
# ===== 2. 特征编码(字符转数字) =====
# LabelEncoder:每列独立编码
encoders = {}
for col in ["color", "shape", "size"]:
le = LabelEncoder()
df[col] = le.fit_transform(df[col])
encoders[col] = le # 保存编码器,方便之后预测新数据
# 标签编码
label_encoder = LabelEncoder()
df["label"] = label_encoder.fit_transform(df["label"])
num_classes = len(label_encoder.classes_)
# ===== 3. 划分数据 =====
X_train, X_test, y_train, y_test = train_test_split(
df.drop("label", axis=1).values,
df["label"].values,
test_size=0.2,
random_state=42
)
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
# ===== 4. 动态神经网络模型 =====
class DynamicNN(nn.Module):
def __init__(self, input_dim, hidden_layers, num_classes):
super(DynamicNN, self).__init__()
layers = []
in_dim = input_dim
for h in hidden_layers:
layers.append(nn.Linear(in_dim, h))
layers.append(nn.ReLU())
in_dim = h
layers.append(nn.Linear(in_dim, num_classes))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# ===== 5. 初始化模型 =====
hidden_layers = [32, 16] # 动态设置,或者 []
model = DynamicNN(input_dim=X_train.shape[1], hidden_layers=hidden_layers, num_classes=num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
# ===== 6. 训练 =====
epochs = 50
for epoch in range(epochs):
optimizer.zero_grad()
outputs = model(X_train)
loss = criterion(outputs, y_train)
loss.backward()
optimizer.step()
if (epoch+1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")
# ===== 7. 测试 & 预测 =====
with torch.no_grad():
y_pred = model(X_test).argmax(dim=1)
print("\n📈 分类结果报告:")
print(classification_report(y_test, y_pred, target_names=label_encoder.classes_))
# 前10条预测结果
df_results = pd.DataFrame({
"真实值": label_encoder.inverse_transform(y_test.numpy()[:10]),
"预测值": label_encoder.inverse_transform(y_pred.numpy()[:10])
})
print("\n🔮 前10条预测结果:")
print(df_results)
📌 代码要点
- 支持字符特征
LabelEncoder
把["red", "blue", "green"] → [0,1,2]
。- 每个字段独立编码,保证训练和预测一致。
- 支持多分类
- 标签
"A", "B", "C"
自动编码成0,1,2
。 - 输出层
num_classes
根据数据动态确定。
- 标签
- 动态层级
hidden_layers=[]
→ 只有输入层 → 输出层。hidden_layers=[64,32]
→ 输入 → 64 → 32 → 输出。
⚡ 这样一来,你就可以直接用 Excel 里的字符字段进行分类训练了,只要换成:
df = pd.read_excel("your_data.xlsx")
就能无缝对接。
明白 ✅ 你现在的需求主要有三点:
- 看到训练的情况 → 损失(loss)、准确率随 epoch 变化的趋势(判断是否收敛)。
- 预测情况 → 输出分类报告(precision、recall、f1)和部分预测结果对比。
- 测试不同的分类场景 → 例如二分类、多分类(3类、5类等),看看模型适应性。
📌 改进版代码(带训练曲线 & 多场景测试)
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, accuracy_score
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# ===== 动态神经网络模型 =====
class DynamicNN(nn.Module):
def __init__(self, input_dim, hidden_layers, num_classes):
super(DynamicNN, self).__init__()
layers = []
in_dim = input_dim
for h in hidden_layers:
layers.append(nn.Linear(in_dim, h))
layers.append(nn.ReLU())
in_dim = h
layers.append(nn.Linear(in_dim, num_classes))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# ===== 训练 & 测试函数 =====
def train_and_evaluate(df, feature_cols, label_col, hidden_layers=[32, 16], epochs=50):
# 编码特征
encoders = {}
for col in feature_cols:
if df[col].dtype == object:
le = LabelEncoder()
df[col] = le.fit_transform(df[col])
encoders[col] = le
# 标签编码
label_encoder = LabelEncoder()
df[label_col] = label_encoder.fit_transform(df[label_col])
num_classes = len(label_encoder.classes_)
# 划分数据
X_train, X_test, y_train, y_test = train_test_split(
df[feature_cols].values,
df[label_col].values,
test_size=0.2,
random_state=42
)
X_train, X_test = torch.tensor(X_train, dtype=torch.float32), torch.tensor(X_test, dtype=torch.float32)
y_train, y_test = torch.tensor(y_train, dtype=torch.long), torch.tensor(y_test, dtype=torch.long)
# 模型
model = DynamicNN(input_dim=len(feature_cols), hidden_layers=hidden_layers, num_classes=num_classes)
criterion, optimizer = nn.CrossEntropyLoss(), optim.Adam(model.parameters(), lr=0.01)
# 训练
train_losses, train_accs = [], []
for epoch in range(epochs):
optimizer.zero_grad()
outputs = model(X_train)
loss = criterion(outputs, y_train)
loss.backward()
optimizer.step()
# 计算准确率
preds = outputs.argmax(dim=1)
acc = accuracy_score(y_train.numpy(), preds.numpy())
train_losses.append(loss.item())
train_accs.append(acc)
if (epoch+1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs}, Loss={loss.item():.4f}, Train Acc={acc:.4f}")
# 绘制训练曲线
plt.figure(figsize=(10,4))
plt.subplot(1,2,1)
plt.plot(train_losses, label="Loss")
plt.xlabel("Epoch"); plt.ylabel("Loss"); plt.title("Training Loss")
plt.legend()
plt.subplot(1,2,2)
plt.plot(train_accs, label="Accuracy", color="orange")
plt.xlabel("Epoch"); plt.ylabel("Accuracy"); plt.title("Training Accuracy")
plt.legend()
plt.show()
# 测试
with torch.no_grad():
y_pred = model(X_test).argmax(dim=1)
print("\n📈 分类报告:")
print(classification_report(y_test, y_pred, target_names=label_encoder.classes_))
df_results = pd.DataFrame({
"真实值": label_encoder.inverse_transform(y_test.numpy()[:15]),
"预测值": label_encoder.inverse_transform(y_pred.numpy()[:15])
})
print("\n🔮 部分预测结果:")
print(df_results)
# ===== 场景1:三分类,字符特征 =====
np.random.seed(42)
df1 = pd.DataFrame({
"color": np.random.choice(["red", "blue", "green"], size=300),
"shape": np.random.choice(["circle", "square", "triangle"], size=300),
"label": np.random.choice(["A", "B", "C"], size=300)
})
print("\n=== 场景1:三分类(字符特征) ===")
train_and_evaluate(df1, feature_cols=["color","shape"], label_col="label", hidden_layers=[32,16])
# ===== 场景2:二分类,混合特征 =====
df2 = pd.DataFrame({
"color": np.random.choice(["red", "blue"], size=200),
"size": np.random.choice(["small","large"], size=200),
"num_feature": np.random.randn(200),
"label": np.random.choice(["yes","no"], size=200)
})
print("\n=== 场景2:二分类(字符+数值特征) ===")
train_and_evaluate(df2, feature_cols=["color","size","num_feature"], label_col="label", hidden_layers=[16])
# ===== 场景3:五分类,数值特征 =====
X = np.random.randn(400, 5)
y = np.random.choice(list("ABCDE"), size=400)
df3 = pd.DataFrame(X, columns=[f"f{i}" for i in range(5)])
df3["label"] = y
print("\n=== 场景3:五分类(数值特征) ===")
train_and_evaluate(df3, feature_cols=[f"f{i}" for i in range(5)], label_col="label", hidden_layers=[64,32])
📌 运行效果
- 训练情况
- 每10个 epoch 打印一次 Loss 和 训练集准确率。
- 同时绘制 Loss 曲线 & Accuracy 曲线,能直观看到是否收敛。
- 预测情况
- 输出
classification_report
(precision, recall, f1, accuracy)。 - 展示前 15 条预测 vs 真实标签。
- 输出
- 多分类场景
- 场景1:3分类,全部是字符特征。
- 场景2:2分类,字符+数值混合。
- 场景3:5分类,纯数值特征。
这样你就可以直观看到:
- 训练是否收敛(Loss 是否下降、Accuracy 是否上升)。
- 模型预测效果是否符合要求。
- 不同分类场景的泛化情况。
非常好的问题 👍 训练准确率低 → 说明模型还没学好。原因可能有三类:
🚩 1. 模型结构问题
- 隐藏层太少 / 神经元太少 → 学习能力不足。
- 激活函数单一(只有 ReLU),可能导致特征学习不充分。
✅ 解决办法:
- 尝试增加隐藏层和神经元:比如
[128, 64, 32]
。 - 在层之间加入
Dropout
(防止过拟合)和BatchNorm
(加快收敛)。
🚩 2. 训练超参数问题
- 学习率过大/过小 → 优化效果不好。
- 训练轮数太少 → 模型还没收敛。
✅ 解决办法:
- 调整学习率
lr=0.001 ~ 0.01
,尝试torch.optim.Adam
或torch.optim.SGD
。 - 增加训练轮数,比如从
50
提升到200
。
🚩 3. 数据问题
- 特征随机性太强(比如我给你的模拟数据,本身类别和特征几乎随机) → 天花板准确率本身不高。
- 特征需要更好的预处理,比如标准化、编码方式优化。
✅ 解决办法:
- 对数值特征做标准化
StandardScaler
。 - 对类别特征使用 Embedding 替代简单的 LabelEncoder,可以更好地表达信息。
📌 改进版模型(带 Dropout + BatchNorm)
我给你一个升级版的模型,支持更深层结构,并且加入 BatchNorm
和 Dropout
,效果会更稳定:
class ImprovedNN(nn.Module):
def __init__(self, input_dim, hidden_layers, num_classes, dropout=0.3):
super(ImprovedNN, self).__init__()
layers = []
in_dim = input_dim
for h in hidden_layers:
layers.append(nn.Linear(in_dim, h))
layers.append(nn.BatchNorm1d(h)) # 批归一化
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout)) # Dropout
in_dim = h
layers.append(nn.Linear(in_dim, num_classes))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
然后你可以这样替换:
model = ImprovedNN(input_dim=X_train.shape[1], hidden_layers=[128, 64, 32], num_classes=num_classes)
optimizer = optim.Adam(model.parameters(), lr=0.001)
📈 建议的优化步骤
- 先加深网络 →
[128, 64, 32]
。 - 用 BatchNorm + Dropout 提升泛化。
- 调学习率:先试
0.001
,不行再调。 - 增加训练轮数:100 ~ 200 轮。
- 检查数据本身:如果特征与标签关系弱,模型准确率的上限就低。
这个问题问得很关键 👍
✅ 结论先说
目前我们写的 动态全连接网络(MLP),是针对表格数据 / 已数值化特征的。 直接用在图片分类上,可以勉强跑,但效果会很差。
原因是:
- 图片像素有很强的空间结构(局部相关性),MLP 只能“平铺”成一维向量,无法有效利用这些空间信息。
- 图片分类通常需要 卷积神经网络(CNN) 或 预训练模型(ResNet、VGG、EfficientNet 等)。
🚩 如果一定要用当前模型处理图片
- 你可以把图片展平为一维向量(比如
28x28
的灰度图展平成 784 维)。 - 输入到 MLP,依然能分类(比如 MNIST 手写数字),但是准确率远不如 CNN。
示例(MNIST 用 MLP):
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
# 数据集(MNIST)
transform = transforms.Compose([transforms.ToTensor(), lambda x: x.view(-1)]) # 展平
train_dataset = datasets.MNIST(root="./data", train=True, transform=transform, download=True)
test_dataset = datasets.MNIST(root="./data", train=False, transform=transform, download=True)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)
# 动态 MLP 模型
class DynamicNN(nn.Module):
def __init__(self, input_dim, hidden_layers, num_classes):
super().__init__()
layers, in_dim = [], input_dim
for h in hidden_layers:
layers.append(nn.Linear(in_dim, h))
layers.append(nn.ReLU())
in_dim = h
layers.append(nn.Linear(in_dim, num_classes))
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x)
model = DynamicNN(28*28, [256,128], 10)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
# 训练
for epoch in range(5):
for X, y in train_loader:
optimizer.zero_grad()
out = model(X)
loss = criterion(out, y)
loss.backward()
optimizer.step()
print(f"Epoch {epoch+1}: Loss={loss.item():.4f}")
# 测试
correct, total = 0, 0
with torch.no_grad():
for X, y in test_loader:
pred = model(X).argmax(1)
correct += (pred == y).sum().item()
total += y.size(0)
print(f"Test Accuracy: {correct/total:.4f}")
🚩 更适合图片分类的模型
👉 推荐直接用 卷积神经网络(CNN):
class SimpleCNN(nn.Module):
def __init__(self, num_classes=10):
super().__init__()
self.conv = nn.Sequential(
nn.Conv2d(1, 32, 3, padding=1), # (B,1,28,28) -> (B,32,28,28)
nn.ReLU(),
nn.MaxPool2d(2), # -> (B,32,14,14)
nn.Conv2d(32, 64, 3, padding=1), # -> (B,64,14,14)
nn.ReLU(),
nn.MaxPool2d(2) # -> (B,64,7,7)
)
self.fc = nn.Sequential(
nn.Linear(64*7*7, 128),
nn.ReLU(),
nn.Linear(128, num_classes)
)
def forward(self, x):
x = self.conv(x)
x = x.view(x.size(0), -1)
return self.fc(x)
这种 CNN 结构比 MLP 更能抓住图片特征,在 MNIST 上可以轻松超过 98% 准确率。
✅ 总结:
- 你的 动态 MLP 模型能处理图片,但只是“平铺像素” → 精度不高。
- 真正要做图片分类,建议用 CNN 或迁移学习(加载 ResNet、EfficientNet 等)。
明白 ✅ 既然你提到要继续优化 RNN 模型,我这里给你一个完整的 可运行 PyTorch RNN 分类代码,支持字符/文本序列分类,并且结构可扩展。我们会一步步优化:
- 模型结构
- 输入层:嵌入层(Embedding)
- 隐藏层:RNN / LSTM / GRU(可配置)
- 输出层:全连接层
- 优化点
- 使用
nn.LSTM
(比单纯 RNN 更稳定) - 支持可变长度序列(pack_padded_sequence)
- 增加
dropout
防止过拟合 - 支持多分类任务
- 使用
- 测试数据
- 先用 Pandas 生成一个虚拟数据集(字符分类任务)
- 例如:句子 → 类别
🟢 完整示例代码
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
# ====================
# 1. 构造示例数据
# ====================
data = {
"text": [
"hello world", "hi there", "good morning", "good night",
"machine learning", "deep learning", "neural network",
"cat", "dog", "fish"
],
"label": ["greeting", "greeting", "greeting", "greeting",
"ai", "ai", "ai",
"animal", "animal", "animal"]
}
df = pd.DataFrame(data)
# 标签编码
label_encoder = LabelEncoder()
df["label_encoded"] = label_encoder.fit_transform(df["label"])
# 字符字典
all_text = " ".join(df["text"].values)
vocab = sorted(set(all_text))
char2idx = {c: i + 1 for i, c in enumerate(vocab)} # 0 留给 padding
idx2char = {i: c for c, i in char2idx.items()}
def text_to_seq(text, max_len=15):
seq = [char2idx.get(c, 0) for c in text]
return seq[:max_len] + [0] * (max_len - len(seq))
df["seq"] = df["text"].apply(lambda x: text_to_seq(x))
# 划分数据集
X = np.stack(df["seq"].values)
y = df["label_encoded"].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42)
# ====================
# 2. Dataset & DataLoader
# ====================
class TextDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.long)
self.y = torch.tensor(y, dtype=torch.long)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
train_loader = DataLoader(TextDataset(X_train, y_train), batch_size=4, shuffle=True)
test_loader = DataLoader(TextDataset(X_test, y_test), batch_size=4)
# ====================
# 3. RNN 模型
# ====================
class RNNClassifier(nn.Module):
def __init__(self, vocab_size, embed_dim, hidden_dim, num_classes, rnn_type="LSTM", num_layers=1, dropout=0.2):
super().__init__()
self.embedding = nn.Embedding(vocab_size, embed_dim, padding_idx=0)
if rnn_type == "RNN":
self.rnn = nn.RNN(embed_dim, hidden_dim, num_layers=num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
elif rnn_type == "GRU":
self.rnn = nn.GRU(embed_dim, hidden_dim, num_layers=num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
else: # 默认 LSTM
self.rnn = nn.LSTM(embed_dim, hidden_dim, num_layers=num_layers,
batch_first=True, dropout=dropout if num_layers > 1 else 0)
self.fc = nn.Linear(hidden_dim, num_classes)
def forward(self, x):
embed = self.embedding(x) # [B, T, E]
_, hidden = self.rnn(embed)
if isinstance(hidden, tuple): # LSTM
hidden = hidden[0]
out = self.fc(hidden[-1]) # 取最后一层 hidden
return out
# ====================
# 4. 训练
# ====================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = RNNClassifier(
vocab_size=len(char2idx) + 1,
embed_dim=32,
hidden_dim=64,
num_classes=len(label_encoder.classes_),
rnn_type="LSTM",
num_layers=2,
dropout=0.3
).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
def train_model(model, loader, epochs=20):
for epoch in range(epochs):
model.train()
total_loss, correct, total = 0, 0, 0
for X_batch, y_batch in loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
total_loss += loss.item()
_, predicted = torch.max(outputs, 1)
correct += (predicted == y_batch).sum().item()
total += y_batch.size(0)
print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(loader):.4f}, "
f"Acc: {correct/total:.4f}")
train_model(model, train_loader, epochs=15)
# ====================
# 5. 测试与预测
# ====================
model.eval()
y_true, y_pred = [], []
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
_, predicted = torch.max(outputs, 1)
y_true.extend(y_batch.cpu().numpy())
y_pred.extend(predicted.cpu().numpy())
print("\n=== Classification Report ===")
print(classification_report(y_true, y_pred, target_names=label_encoder.classes_))
# ====================
# 6. 预测新数据
# ====================
def predict(text):
seq = torch.tensor([text_to_seq(text)], dtype=torch.long).to(device)
model.eval()
with torch.no_grad():
output = model(seq)
pred = torch.argmax(output, dim=1).item()
return label_encoder.inverse_transform([pred])[0]
print("\n=== 测试预测 ===")
print("Input: 'deep learning' -> Pred:", predict("deep learning"))
print("Input: 'dog' -> Pred:", predict("dog"))
print("Input: 'good evening' -> Pred:", predict("good evening"))
🟢 代码说明
- 数据:用 Pandas 模拟 Excel 数据(字符串 → 分类标签)
- 模型:可选
RNN / LSTM / GRU
,支持多层、dropout - 输出:训练过程准确率、测试集分类报告、预测新样本
好的 ✅ 我明白了。我们重新从零开始,给你一个 动态构建隐藏层的 DNN 分类模型,并且支持:
- 动态隐藏层:根据
layer_list = [64, 32, 16]
这样的列表来自动搭建网络。 - 多输入特征、多输出类别:输入和输出数据都来自
pandas.DataFrame
,支持多列输入和多列输出。 - 训练与预测完整流程:
- 生成 DataFrame 数据(模拟 Excel)
- 划分训练/测试集
- 训练模型并输出准确率
- 进行预测并输出预测结果
🟢 完整可运行代码(PyTorch + Pandas)
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score, classification_report
# ============================
# 1. 构造示例 DataFrame 数据
# ============================
# 假设有多个输入特征 (x1, x2, x3),输出为分类 (y)
np.random.seed(42)
df = pd.DataFrame({
"x1": np.random.randn(100),
"x2": np.random.randn(100),
"x3": np.random.randn(100),
"y": np.random.choice(["A", "B", "C"], size=100) # 多分类
})
# 输入输出列
input_cols = ["x1", "x2", "x3"]
output_col = "y"
# 编码输出标签
label_encoder = LabelEncoder()
df[output_col] = label_encoder.fit_transform(df[output_col])
# 归一化输入
scaler = StandardScaler()
df[input_cols] = scaler.fit_transform(df[input_cols])
# 划分训练/测试集
X_train, X_test, y_train, y_test = train_test_split(
df[input_cols].values,
df[output_col].values,
test_size=0.2,
random_state=42
)
# ============================
# 2. Dataset & DataLoader
# ============================
class DataFrameDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.long)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
train_loader = DataLoader(DataFrameDataset(X_train, y_train), batch_size=8, shuffle=True)
test_loader = DataLoader(DataFrameDataset(X_test, y_test), batch_size=8)
# ============================
# 3. 动态构建 DNN 模型
# ============================
class DynamicDNN(nn.Module):
def __init__(self, input_dim, layer_list, output_dim, dropout=0.2):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in layer_list:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout))
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, output_dim))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# ============================
# 4. 训练与测试
# ============================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
layer_list = [64, 32] # 动态隐藏层结构
model = DynamicDNN(input_dim=len(input_cols),
layer_list=layer_list,
output_dim=len(label_encoder.classes_)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
def train_model(model, loader, epochs=30):
for epoch in range(epochs):
model.train()
total_loss, correct, total = 0, 0, 0
for X_batch, y_batch in loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
outputs = model(X_batch)
loss = criterion(outputs, y_batch)
loss.backward()
optimizer.step()
total_loss += loss.item()
_, predicted = torch.max(outputs, 1)
correct += (predicted == y_batch).sum().item()
total += y_batch.size(0)
acc = correct / total
print(f"Epoch {epoch+1}/{epochs} | Loss: {total_loss/len(loader):.4f} | Acc: {acc:.4f}")
train_model(model, train_loader, epochs=20)
# ============================
# 5. 测试与预测
# ============================
model.eval()
y_true, y_pred = [], []
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
_, predicted = torch.max(outputs, 1)
y_true.extend(y_batch.cpu().numpy())
y_pred.extend(predicted.cpu().numpy())
print("\n=== 测试集结果 ===")
print("Accuracy:", accuracy_score(y_true, y_pred))
print(classification_report(y_true, y_pred, target_names=label_encoder.classes_))
# ============================
# 6. 预测新数据(模拟 Excel 行)
# ============================
def predict(df_row):
X_new = scaler.transform([df_row[input_cols].values])
X_tensor = torch.tensor(X_new, dtype=torch.float32).to(device)
with torch.no_grad():
output = model(X_tensor)
pred = torch.argmax(output, dim=1).item()
return label_encoder.inverse_transform([pred])[0]
print("\n=== 新数据预测 ===")
print(df.head(3))
print("预测结果:", predict(df.iloc[0]))
🟢 代码说明
layer_list = [64, 32]
→ 自动生成 两层隐藏层 (64 和 32 个神经元)。- 输入数据来自
pandas.DataFrame
,支持多列输入。 - 输出为分类(多分类),使用
LabelEncoder
自动处理标签。 - 使用
train_loader
训练,打印 Loss 和 Accuracy。 - 测试集输出准确率和分类报告。
- 新数据(DataFrame 行)可以直接预测类别。
好 👍 那我们在上面的 动态 DNN 分类模型 基础上,扩展为 支持多输出列(多标签 / 多任务分类) 的版本。
这样你的 DataFrame 即使有多个输出列(比如 y1
, y2
, y3
),也能同时进行训练和预测。
🟢 多输出 DNN 分类模型
主要改动
- 输出支持多列:
- 每个输出列都可以是分类(单任务分类 → 多任务分类)。
- 每个输出列对应一个分类头 (
nn.Linear
)。 - 每个输出列有自己的损失函数,最后求平均。
- 数据处理:
- 输入列
input_cols
- 输出列
output_cols = ["y1", "y2"]
- 每个输出列分别
LabelEncoder
编码。
- 输入列
- 预测结果:
- 每个输出列都可以预测,输出字典形式
{col: pred}
。
- 每个输出列都可以预测,输出字典形式
🟢 完整代码
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report
# ============================
# 1. 构造示例 DataFrame 数据
# ============================
np.random.seed(42)
df = pd.DataFrame({
"x1": np.random.randn(200),
"x2": np.random.randn(200),
"x3": np.random.randn(200),
"y1": np.random.choice(["A", "B", "C"], size=200),
"y2": np.random.choice(["Yes", "No"], size=200),
})
# 输入输出列
input_cols = ["x1", "x2", "x3"]
output_cols = ["y1", "y2"]
# 对每个输出列进行 LabelEncoder
label_encoders = {}
for col in output_cols:
le = LabelEncoder()
df[col] = le.fit_transform(df[col])
label_encoders[col] = le
# 归一化输入
scaler = StandardScaler()
df[input_cols] = scaler.fit_transform(df[input_cols])
# 划分训练/测试集
X_train, X_test, y_train, y_test = train_test_split(
df[input_cols].values,
df[output_cols].values,
test_size=0.2,
random_state=42
)
# ============================
# 2. Dataset & DataLoader
# ============================
class MultiOutputDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.long)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
train_loader = DataLoader(MultiOutputDataset(X_train, y_train), batch_size=16, shuffle=True)
test_loader = DataLoader(MultiOutputDataset(X_test, y_test), batch_size=16)
# ============================
# 3. 多输出 DNN 模型
# ============================
class MultiOutputDNN(nn.Module):
def __init__(self, input_dim, layer_list, output_dims, dropout=0.2):
super().__init__()
# 公共部分
layers = []
prev_dim = input_dim
for hidden_dim in layer_list:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout))
prev_dim = hidden_dim
self.shared = nn.Sequential(*layers)
# 每个输出一个分类头
self.output_heads = nn.ModuleList([nn.Linear(prev_dim, out_dim) for out_dim in output_dims])
def forward(self, x):
shared_out = self.shared(x)
outputs = [head(shared_out) for head in self.output_heads]
return outputs
# ============================
# 4. 训练
# ============================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 每个输出列的类别数
output_dims = [len(label_encoders[col].classes_) for col in output_cols]
model = MultiOutputDNN(
input_dim=len(input_cols),
layer_list=[64, 32],
output_dims=output_dims
).to(device)
criterions = [nn.CrossEntropyLoss() for _ in output_cols]
optimizer = optim.Adam(model.parameters(), lr=0.01)
def train_model(model, loader, epochs=20):
for epoch in range(epochs):
model.train()
total_loss = 0
for X_batch, y_batch in loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
outputs = model(X_batch)
# 每个输出列分别计算 loss
losses = [criterions[i](outputs[i], y_batch[:, i]) for i in range(len(output_cols))]
loss = sum(losses) / len(losses)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}/{epochs} | Loss: {total_loss/len(loader):.4f}")
train_model(model, train_loader, epochs=15)
# ============================
# 5. 测试与预测
# ============================
model.eval()
all_true, all_pred = [[] for _ in output_cols], [[] for _ in output_cols]
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
for i in range(len(output_cols)):
_, predicted = torch.max(outputs[i], 1)
all_true[i].extend(y_batch[:, i].cpu().numpy())
all_pred[i].extend(predicted.cpu().numpy())
print("\n=== 测试集结果 ===")
for i, col in enumerate(output_cols):
print(f"\n列 {col}:")
print(classification_report(all_true[i], all_pred[i], target_names=label_encoders[col].classes_))
# ============================
# 6. 新数据预测
# ============================
def predict(df_row):
X_new = scaler.transform([df_row[input_cols].values])
X_tensor = torch.tensor(X_new, dtype=torch.float32).to(device)
with torch.no_grad():
outputs = model(X_tensor)
preds = {}
for i, col in enumerate(output_cols):
pred_idx = torch.argmax(outputs[i], dim=1).item()
preds[col] = label_encoders[col].inverse_transform([pred_idx])[0]
return preds
print("\n=== 新数据预测 ===")
print(df.head(3))
print("预测结果:", predict(df.iloc[0]))
🟢 输出效果
-
训练过程:打印
Loss
-
测试结果:每个输出列都有
classification_report
(精度/召回率/F1) -
预测新数据:返回字典,比如:
{'y1': 'B', 'y2': 'Yes'}
太好了 ✅ 那我们来做一个 通用多任务 DNN:
支持 分类任务 + 回归任务 混合:
- 如果 输出列是类别型(字符串 / 分类) → 自动做分类(CrossEntropyLoss)
- 如果 输出列是数值型 → 自动做回归(MSELoss)
- 最终训练时会对所有任务的 loss 取平均
这样你的 DataFrame 里,某些列是分类任务,某些列是回归任务,都能同时训练、同时预测。
🟢 通用多任务 DNN(分类 + 回归)
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import classification_report, mean_squared_error
# ============================
# 1. 构造示例 DataFrame 数据
# ============================
np.random.seed(42)
df = pd.DataFrame({
"x1": np.random.randn(300),
"x2": np.random.randn(300),
"x3": np.random.randn(300),
"y_class": np.random.choice(["A", "B", "C"], size=300), # 分类任务
"y_reg": np.random.randn(300) * 10 + 50 # 回归任务
})
# 输入输出列
input_cols = ["x1", "x2", "x3"]
output_cols = ["y_class", "y_reg"]
# 保存任务类型(分类/回归)
task_types = {}
label_encoders = {}
for col in output_cols:
if df[col].dtype == "object" or df[col].dtype.name == "category":
task_types[col] = "classification"
le = LabelEncoder()
df[col] = le.fit_transform(df[col])
label_encoders[col] = le
else:
task_types[col] = "regression"
# 归一化输入
scaler = StandardScaler()
df[input_cols] = scaler.fit_transform(df[input_cols])
# 划分训练/测试集
X_train, X_test, y_train, y_test = train_test_split(
df[input_cols].values,
df[output_cols].values,
test_size=0.2,
random_state=42
)
# ============================
# 2. Dataset & DataLoader
# ============================
class MultiTaskDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.float32) # 混合存储
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
train_loader = DataLoader(MultiTaskDataset(X_train, y_train), batch_size=16, shuffle=True)
test_loader = DataLoader(MultiTaskDataset(X_test, y_test), batch_size=16)
# ============================
# 3. 通用多任务 DNN 模型
# ============================
class MultiTaskDNN(nn.Module):
def __init__(self, input_dim, layer_list, output_info, dropout=0.2):
super().__init__()
# 共享层
layers = []
prev_dim = input_dim
for hidden_dim in layer_list:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout))
prev_dim = hidden_dim
self.shared = nn.Sequential(*layers)
# 每个输出任务对应一个 head
self.output_heads = nn.ModuleDict()
for col, info in output_info.items():
if info["task"] == "classification":
self.output_heads[col] = nn.Linear(prev_dim, info["num_classes"])
elif info["task"] == "regression":
self.output_heads[col] = nn.Linear(prev_dim, 1)
def forward(self, x):
shared_out = self.shared(x)
outputs = {}
for col, head in self.output_heads.items():
outputs[col] = head(shared_out)
return outputs
# ============================
# 4. 准备任务信息
# ============================
output_info = {}
for i, col in enumerate(output_cols):
if task_types[col] == "classification":
output_info[col] = {"task": "classification", "num_classes": len(label_encoders[col].classes_)}
else:
output_info[col] = {"task": "regression"}
# ============================
# 5. 模型 & 训练
# ============================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = MultiTaskDNN(
input_dim=len(input_cols),
layer_list=[64, 32],
output_info=output_info
).to(device)
criterions = {}
for col, info in output_info.items():
if info["task"] == "classification":
criterions[col] = nn.CrossEntropyLoss()
else:
criterions[col] = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
def train_model(model, loader, epochs=20):
for epoch in range(epochs):
model.train()
total_loss = 0
for X_batch, y_batch in loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
outputs = model(X_batch)
losses = []
for i, col in enumerate(output_cols):
if task_types[col] == "classification":
target = y_batch[:, i].long()
loss = criterions[col](outputs[col], target)
else: # 回归
target = y_batch[:, i].unsqueeze(1)
loss = criterions[col](outputs[col], target)
losses.append(loss)
loss = sum(losses) / len(losses)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Epoch {epoch+1}/{epochs} | Loss: {total_loss/len(loader):.4f}")
train_model(model, train_loader, epochs=15)
# ============================
# 6. 测试与预测
# ============================
model.eval()
y_true, y_pred = {col: [] for col in output_cols}, {col: [] for col in output_cols}
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
outputs = model(X_batch)
for i, col in enumerate(output_cols):
if task_types[col] == "classification":
_, predicted = torch.max(outputs[col], 1)
y_true[col].extend(y_batch[:, i].cpu().numpy())
y_pred[col].extend(predicted.cpu().numpy())
else: # 回归
y_true[col].extend(y_batch[:, i].cpu().numpy())
y_pred[col].extend(outputs[col].squeeze().cpu().numpy())
print("\n=== 测试集结果 ===")
for col in output_cols:
if task_types[col] == "classification":
print(f"\n分类任务 {col}:")
print(classification_report(y_true[col], y_pred[col], target_names=label_encoders[col].classes_))
else:
print(f"\n回归任务 {col}:")
print("MSE:", mean_squared_error(y_true[col], y_pred[col]))
# ============================
# 7. 新数据预测
# ============================
def predict(df_row):
X_new = scaler.transform([df_row[input_cols].values])
X_tensor = torch.tensor(X_new, dtype=torch.float32).to(device)
with torch.no_grad():
outputs = model(X_tensor)
preds = {}
for col in output_cols:
if task_types[col] == "classification":
pred_idx = torch.argmax(outputs[col], dim=1).item()
preds[col] = label_encoders[col].inverse_transform([pred_idx])[0]
else:
preds[col] = outputs[col].item()
return preds
print("\n=== 新数据预测 ===")
print(df.head(2))
print("预测结果:", predict(df.iloc[0]))
🟢 功能总结
- 动态隐藏层:用
layer_list
控制网络结构。 - 多任务:
- 自动区分分类任务 / 回归任务。
- 各自用不同的
loss
(交叉熵 / MSE)。 - 同时训练。
- 输出:
- 分类任务:
classification_report
(精度/召回率/F1)。 - 回归任务:MSE。
- 分类任务:
- 预测:
- 输出字典,自动区分分类结果 / 回归结果。
👌 明白了,我给你整理一个完整的 动态隐藏层 DNN 分类模型 示例,满足以下需求:
- 使用 PyTorch 实现
- 隐藏层由
layer_list
控制,每一层神经元数动态配置 - 输入 / 输出 支持 多个列(多特征输入,多类别输出)
- 数据使用 pandas DataFrame 构建,带列名
- 包含完整流程:
- 数据准备(DataFrame)
- 预处理(数值化、张量化)
- 模型定义(动态 DNN)
- 模型训练
- 预测与结果展示
🔹 完整代码示例
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
# ========== 1. 构造测试数据 ==========
# 假设我们有 3 个输入特征,输出是分类标签
df = pd.DataFrame({
"feature1": np.random.randn(200),
"feature2": np.random.randn(200),
"feature3": np.random.randn(200),
"label": np.random.choice(["A", "B", "C"], size=200) # 多分类输出
})
print("原始数据:")
print(df.head())
# ========== 2. 数据预处理 ==========
X = df[["feature1", "feature2", "feature3"]].values
y = df["label"].values
# 标签编码
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
# 标准化输入
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 切分数据集
X_train, X_test, y_train, y_test = train_test_split(
X_scaled, y_encoded, test_size=0.2, random_state=42
)
# 转换为 tensor
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.long)
input_dim = X_train.shape[1]
output_dim = len(np.unique(y_encoded))
# ========== 3. 定义动态 DNN 模型 ==========
class DynamicDNN(nn.Module):
def __init__(self, input_dim, layer_list, output_dim):
super().__init__()
layers = []
in_dim = input_dim
for hidden_dim in layer_list:
layers.append(nn.Linear(in_dim, hidden_dim))
layers.append(nn.ReLU())
in_dim = hidden_dim
layers.append(nn.Linear(in_dim, output_dim)) # 输出层
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# 示例:隐藏层 [64, 32]
layer_list = [64, 32]
model = DynamicDNN(input_dim, layer_list, output_dim)
# ========== 4. 训练 ==========
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
epochs = 30
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
outputs = model(X_train)
loss = criterion(outputs, y_train)
loss.backward()
optimizer.step()
# 计算训练准确率
_, predicted = torch.max(outputs, 1)
acc = (predicted == y_train).float().mean().item()
print(f"Epoch {epoch+1}/{epochs}, Loss={loss.item():.4f}, Train Acc={acc:.4f}")
# ========== 5. 预测 ==========
model.eval()
with torch.no_grad():
outputs = model(X_test)
_, predicted = torch.max(outputs, 1)
print("\n预测结果示例:")
results = pd.DataFrame({
"真实标签": label_encoder.inverse_transform(y_test.numpy()),
"预测标签": label_encoder.inverse_transform(predicted.numpy())
})
print(results.head(10))
🔹 运行结果(说明)
- 训练过程:会打印
Loss
和Train Accuracy
,可以看到模型在逐步收敛 - 预测结果:会输出一个 DataFrame,包含真实标签和预测标签对比,方便验证
🔹 可扩展性
- 如果有 多个输出列(例如多任务分类 / 多标签分类),只需在
y
部分构建时把多个列合并,并在模型最后输出对应维度即可 layer_list
可以随意调整,比如[128, 64, 32]
- 可以支持 回归任务,只需把损失函数改为
MSELoss
,输出层不加Softmax
好 ✅ 那我帮你扩展到 多任务分类(即 DataFrame 里有多个输出列,每一列是一个分类任务),DNN 模型会同时预测多个输出。
🔹 思路
- 输入特征仍然是多个数值列
- 输出可以有多列,每一列是一个分类任务(例如
label1
、label2
) - 每个输出列有自己的类别数量,因此需要 多头输出(每个任务一个
Linear
层) - 训练时,分别计算每个任务的交叉熵损失,然后取平均
🔹 完整代码示例
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
# ========== 1. 构造测试数据 ==========
# 假设有 3 个输入特征,2 个分类输出任务
df = pd.DataFrame({
"feature1": np.random.randn(300),
"feature2": np.random.randn(300),
"feature3": np.random.randn(300),
"label1": np.random.choice(["A", "B", "C"], size=300), # 3 分类
"label2": np.random.choice(["X", "Y"], size=300) # 2 分类
})
print("原始数据:")
print(df.head())
# ========== 2. 数据预处理 ==========
X = df[["feature1", "feature2", "feature3"]].values
y1 = df["label1"].values
y2 = df["label2"].values
# 标签编码(每个任务独立编码器)
le1, le2 = LabelEncoder(), LabelEncoder()
y1_encoded = le1.fit_transform(y1)
y2_encoded = le2.fit_transform(y2)
# 标准化输入
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 切分数据集
X_train, X_test, y1_train, y1_test, y2_train, y2_test = train_test_split(
X_scaled, y1_encoded, y2_encoded, test_size=0.2, random_state=42
)
# 转 tensor
X_train = torch.tensor(X_train, dtype=torch.float32)
X_test = torch.tensor(X_test, dtype=torch.float32)
y1_train = torch.tensor(y1_train, dtype=torch.long)
y1_test = torch.tensor(y1_test, dtype=torch.long)
y2_train = torch.tensor(y2_train, dtype=torch.long)
y2_test = torch.tensor(y2_test, dtype=torch.long)
input_dim = X_train.shape[1]
output_dims = [len(np.unique(y1_encoded)), len(np.unique(y2_encoded))] # [3,2]
# ========== 3. 定义动态 DNN 多任务模型 ==========
class MultiTaskDNN(nn.Module):
def __init__(self, input_dim, layer_list, output_dims):
super().__init__()
layers = []
in_dim = input_dim
for hidden_dim in layer_list:
layers.append(nn.Linear(in_dim, hidden_dim))
layers.append(nn.ReLU())
in_dim = hidden_dim
self.shared = nn.Sequential(*layers)
# 每个任务一个输出头
self.heads = nn.ModuleList([nn.Linear(in_dim, out_dim) for out_dim in output_dims])
def forward(self, x):
shared_out = self.shared(x)
return [head(shared_out) for head in self.heads]
# 示例:隐藏层 [64, 32]
layer_list = [64, 32]
model = MultiTaskDNN(input_dim, layer_list, output_dims)
# ========== 4. 训练 ==========
criterions = [nn.CrossEntropyLoss(), nn.CrossEntropyLoss()]
optimizer = optim.Adam(model.parameters(), lr=0.01)
epochs = 30
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
outputs = model(X_train)
losses = [criterions[i](outputs[i], y) for i, y in enumerate([y1_train, y2_train])]
loss = sum(losses) / len(losses) # 平均损失
loss.backward()
optimizer.step()
# 计算训练准确率
accs = []
for i, y in enumerate([y1_train, y2_train]):
_, pred = torch.max(outputs[i], 1)
accs.append((pred == y).float().mean().item())
print(f"Epoch {epoch+1}/{epochs}, Loss={loss.item():.4f}, Acc1={accs[0]:.4f}, Acc2={accs[1]:.4f}")
# ========== 5. 预测 ==========
model.eval()
with torch.no_grad():
outputs = model(X_test)
preds = [torch.argmax(out, 1) for out in outputs]
results = pd.DataFrame({
"真实label1": le1.inverse_transform(y1_test.numpy()),
"预测label1": le1.inverse_transform(preds[0].numpy()),
"真实label2": le2.inverse_transform(y2_test.numpy()),
"预测label2": le2.inverse_transform(preds[1].numpy())
})
print("\n预测结果示例:")
print(results.head(10))
🔹 输出示例
Epoch 1/30, Loss=1.0352, Acc1=0.3600, Acc2=0.5100
Epoch 2/30, Loss=0.9801, Acc1=0.4600, Acc2=0.5500
...
预测结果示例:
真实label1 预测label1 真实label2 预测label2
0 C B X X
1 B B Y Y
2 A A X X
🔹 特点
layer_list
动态控制隐藏层(比如[128, 64, 32]
)output_dims
自动根据 每个输出列类别数 生成多个输出头- 训练时对每个任务计算 独立的 CrossEntropyLoss,再求平均
- 输出时可以同时预测多个分类任务
好 👌,现在我们来升级版本,让 输入可以接受字符型数据。 通常表格数据里既有 数值特征,也有 字符型特征(类别特征),训练前需要进行编码。
🔹 思路
- 数值列:直接标准化(
StandardScaler
) - 字符列:用
LabelEncoder
转换为整数,然后做 Embedding 处理(相比 OneHot 更高效) - 模型结构:
- 数值输入 → 直接送入 DNN
- 类别输入 → 转换成 embedding,再与数值特征拼接
- 拼接后的向量送入共享的 DNN 隐藏层,再分到多任务输出
🔹 完整代码示例(字符型输入 + 多任务分类)
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
# ========== 1. 构造测试数据 ==========
df = pd.DataFrame({
"num1": np.random.randn(300),
"num2": np.random.randn(300),
"cat1": np.random.choice(["red", "blue", "green"], size=300), # 类别型输入
"cat2": np.random.choice(["low", "medium", "high"], size=300), # 类别型输入
"label1": np.random.choice(["A", "B", "C"], size=300), # 3 分类
"label2": np.random.choice(["X", "Y"], size=300) # 2 分类
})
print("原始数据:")
print(df.head())
# ========== 2. 数据预处理 ==========
num_cols = ["num1", "num2"]
cat_cols = ["cat1", "cat2"]
label_cols = ["label1", "label2"]
# 数值特征
scaler = StandardScaler()
X_num = scaler.fit_transform(df[num_cols].values)
# 类别特征编码
cat_encoders = {col: LabelEncoder() for col in cat_cols}
X_cat = np.column_stack([cat_encoders[col].fit_transform(df[col]) for col in cat_cols])
cat_dims = [len(cat_encoders[col].classes_) for col in cat_cols] # 每列类别数
# 输出标签
label_encoders = {col: LabelEncoder() for col in label_cols}
y_encoded = [label_encoders[col].fit_transform(df[col]) for col in label_cols]
output_dims = [len(label_encoders[col].classes_) for col in label_cols]
# 划分数据
X_num_train, X_num_test, X_cat_train, X_cat_test, y1_train, y1_test, y2_train, y2_test = train_test_split(
X_num, X_cat, y_encoded[0], y_encoded[1], test_size=0.2, random_state=42
)
# 转 tensor
X_num_train = torch.tensor(X_num_train, dtype=torch.float32)
X_num_test = torch.tensor(X_num_test, dtype=torch.float32)
X_cat_train = torch.tensor(X_cat_train, dtype=torch.long)
X_cat_test = torch.tensor(X_cat_test, dtype=torch.long)
y1_train = torch.tensor(y1_train, dtype=torch.long)
y1_test = torch.tensor(y1_test, dtype=torch.long)
y2_train = torch.tensor(y2_train, dtype=torch.long)
y2_test = torch.tensor(y2_test, dtype=torch.long)
# ========== 3. 定义模型 ==========
class MultiInputDNN(nn.Module):
def __init__(self, num_input_dim, cat_dims, layer_list, output_dims, embed_dim=4):
super().__init__()
# Embedding 层
self.embeddings = nn.ModuleList([
nn.Embedding(cat_dim, min(embed_dim, cat_dim)) for cat_dim in cat_dims
])
embed_total_dim = sum([min(embed_dim, cat_dim) for cat_dim in cat_dims])
# 共享 DNN
in_dim = num_input_dim + embed_total_dim
layers = []
for hidden_dim in layer_list:
layers.append(nn.Linear(in_dim, hidden_dim))
layers.append(nn.ReLU())
in_dim = hidden_dim
self.shared = nn.Sequential(*layers)
# 多任务输出
self.heads = nn.ModuleList([nn.Linear(in_dim, out_dim) for out_dim in output_dims])
def forward(self, x_num, x_cat):
# 类别特征 embedding
embeds = [emb(x_cat[:, i]) for i, emb in enumerate(self.embeddings)]
x_cat_embed = torch.cat(embeds, dim=1)
# 拼接数值特征和类别 embedding
x = torch.cat([x_num, x_cat_embed], dim=1)
# 共享层
shared_out = self.shared(x)
# 多任务输出
return [head(shared_out) for head in self.heads]
# ========== 4. 初始化 ==========
layer_list = [64, 32]
model = MultiInputDNN(num_input_dim=X_num_train.shape[1],
cat_dims=cat_dims,
layer_list=layer_list,
output_dims=output_dims)
criterions = [nn.CrossEntropyLoss(), nn.CrossEntropyLoss()]
optimizer = optim.Adam(model.parameters(), lr=0.01)
# ========== 5. 训练 ==========
epochs = 30
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
outputs = model(X_num_train, X_cat_train)
losses = [criterions[i](outputs[i], y) for i, y in enumerate([y1_train, y2_train])]
loss = sum(losses) / len(losses)
loss.backward()
optimizer.step()
# 准确率
accs = []
for i, y in enumerate([y1_train, y2_train]):
_, pred = torch.max(outputs[i], 1)
accs.append((pred == y).float().mean().item())
print(f"Epoch {epoch+1}/{epochs}, Loss={loss.item():.4f}, Acc1={accs[0]:.4f}, Acc2={accs[1]:.4f}")
# ========== 6. 测试预测 ==========
model.eval()
with torch.no_grad():
outputs = model(X_num_test, X_cat_test)
preds = [torch.argmax(out, 1) for out in outputs]
results = pd.DataFrame({
"真实label1": label_encoders["label1"].inverse_transform(y1_test.numpy()),
"预测label1": label_encoders["label1"].inverse_transform(preds[0].numpy()),
"真实label2": label_encoders["label2"].inverse_transform(y2_test.numpy()),
"预测label2": label_encoders["label2"].inverse_transform(preds[1].numpy())
})
print("\n预测结果示例:")
print(results.head(10))
🔹 特点
- 支持字符型输入:通过
LabelEncoder + Embedding
处理 - 支持数值型输入:标准化后直接输入
- 支持多任务输出:多个分类目标同时训练
layer_list
决定隐藏层神经元数(动态)
👌 明白了,你的预处理已经有了,那么我们现在主要针对 PyTorch 动态 DNN 分类模型 来做优化,提高准确率和泛化能力。
下面我总结一些可操作的优化方向,并在代码里加进去:
🔹 优化思路
- 网络结构优化
- 增加 BatchNorm:缓解梯度爆炸/消失,加快收敛
- 增加 Dropout:防止过拟合
- 使用 更深/更宽的隐藏层(比如
[128, 64, 32]
)
- 训练过程优化
- 使用 学习率调度器(如
StepLR
或ReduceLROnPlateau
) - 增加 早停机制(Early Stopping)
- 使用 学习率调度器(如
- 正则化优化
- 使用
weight_decay
(L2 正则化) - Dropout
- 使用
- 评估指标
- 除了准确率,还打印 验证集 Loss、混淆矩阵、分类报告
🔹 优化后的模型代码
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import classification_report
# ========== 优化版 DNN 模型 ==========
class OptimizedDNN(nn.Module):
def __init__(self, input_dim, layer_list, output_dim, dropout=0.3):
super().__init__()
layers = []
in_dim = input_dim
for hidden_dim in layer_list:
layers.append(nn.Linear(in_dim, hidden_dim))
layers.append(nn.BatchNorm1d(hidden_dim)) # 加 BatchNorm
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout)) # 加 Dropout
in_dim = hidden_dim
layers.append(nn.Linear(in_dim, output_dim)) # 输出层
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# ========== 训练函数 ==========
def train_model(model, X_train, y_train, X_val, y_val, epochs=50, lr=0.01):
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-4) # 加 weight_decay
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=15, gamma=0.5) # 学习率调度
best_val_acc = 0
patience, patience_counter = 5, 0 # Early stopping
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
outputs = model(X_train)
loss = criterion(outputs, y_train)
loss.backward()
optimizer.step()
# 学习率调度
scheduler.step()
# 训练准确率
_, pred_train = torch.max(outputs, 1)
train_acc = (pred_train == y_train).float().mean().item()
# 验证
model.eval()
with torch.no_grad():
val_outputs = model(X_val)
val_loss = criterion(val_outputs, y_val)
_, pred_val = torch.max(val_outputs, 1)
val_acc = (pred_val == y_val).float().mean().item()
print(f"Epoch {epoch+1}/{epochs}, Loss={loss.item():.4f}, "
f"Train Acc={train_acc:.4f}, Val Loss={val_loss.item():.4f}, Val Acc={val_acc:.4f}")
# Early stopping
if val_acc > best_val_acc:
best_val_acc = val_acc
patience_counter = 0
best_model_state = model.state_dict()
else:
patience_counter += 1
if patience_counter >= patience:
print("Early stopping triggered")
model.load_state_dict(best_model_state)
break
return model
# ========== 预测函数 ==========
def evaluate_model(model, X_test, y_test, label_encoder):
model.eval()
with torch.no_grad():
outputs = model(X_test)
_, preds = torch.max(outputs, 1)
print("\n分类报告:")
print(classification_report(y_test.numpy(),
preds.numpy(),
target_names=label_encoder.classes_))
return preds
🔹 改进点总结
- BatchNorm + Dropout → 提升泛化能力
- Adam + weight_decay → 正则化
- 学习率调度器 → 自动降低学习率,防止震荡
- Early Stopping → 防止过拟合
- 分类报告 → 更直观地看 Precision、Recall、F1
好问题 👍
在 PyTorch 的分类模型里,softmax
确实经常会被“忽略”,原因是:
-
训练时不需要显式 softmax
- 我们一般在最后一层只输出 logits(原始分数),不做 softmax。
- 因为
nn.CrossEntropyLoss
内部会自动执行log_softmax
,这样比自己手动加softmax
数值更稳定。
criterion = nn.CrossEntropyLoss() logits = model(X) # shape: (batch_size, num_classes) loss = criterion(logits, y)
👉 所以训练时 不要显式加 softmax。
-
预测/推理时才需要 softmax
- 在预测阶段,我们需要概率分布,就要手动加
softmax
:
probs = torch.softmax(logits, dim=1) # 转为概率分布 preds = torch.argmax(probs, dim=1) # 取最大概率的类别
- 在预测阶段,我们需要概率分布,就要手动加
-
是否需要显式添加 softmax?
- 训练中 → 不需要(除非你自定义 loss)。
- 预测中 → 需要(如果你关心概率,而不仅仅是类别)。
好的 ✅ 我来帮你构建一个 优化后的动态 DNN 分类模型,特点是:
- 支持多输入、多输出(DataFrame 的列名可控)。
- 动态隐藏层(通过
layer_list
控制层数和每层神经元数)。 - 加入优化点:
- Dropout(防止过拟合)
- BatchNorm(加快收敛,提升稳定性)
- 学习率调度器
- 训练时不显式 softmax(用
CrossEntropyLoss
),预测时才用softmax
输出概率分布。 - 自动计算准确率,并输出预测结果。
下面是一份完整的 PyTorch 示例,使用 DataFrame 生成模拟数据:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
import pandas as pd
import numpy as np
# ========== 1. 生成模拟 DataFrame 数据 ==========
np.random.seed(42)
N = 500
df = pd.DataFrame({
"feature1": np.random.randn(N),
"feature2": np.random.randn(N) * 2,
"feature3": np.random.choice(["A", "B", "C"], size=N),
"label": np.random.choice(["cat", "dog", "mouse"], size=N)
})
# 编码字符型输入(One-Hot 或 Label Encoding)
df = pd.get_dummies(df, columns=["feature3"], drop_first=True)
# 编码标签
label_enc = LabelEncoder()
df["label"] = label_enc.fit_transform(df["label"])
X = df.drop(columns=["label"]).values
y = df["label"].values
scaler = StandardScaler()
X = scaler.fit_transform(X)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
# ========== 2. 定义动态 DNN 模型 ==========
class DynamicDNN(nn.Module):
def __init__(self, input_dim, output_dim, layer_list, dropout=0.3):
super(DynamicDNN, self).__init__()
layers = []
prev_dim = input_dim
for h in layer_list:
layers.append(nn.Linear(prev_dim, h))
layers.append(nn.BatchNorm1d(h)) # 批归一化
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout)) # dropout
prev_dim = h
layers.append(nn.Linear(prev_dim, output_dim)) # 最后一层不加激活
self.net = nn.Sequential(*layers)
def forward(self, x):
return self.net(x)
# ========== 3. 初始化模型 ==========
input_dim = X_train.shape[1]
output_dim = len(np.unique(y))
layer_list = [64, 32] # 两个隐藏层
model = DynamicDNN(input_dim, output_dim, layer_list)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.7)
# ========== 4. 训练 ==========
epochs = 50
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
logits = model(X_train)
loss = criterion(logits, y_train)
loss.backward()
optimizer.step()
scheduler.step()
# 训练准确率
preds = torch.argmax(logits, dim=1)
acc = (preds == y_train).float().mean().item()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs} | Loss: {loss.item():.4f} | Train Acc: {acc:.4f}")
# ========== 5. 测试 ==========
model.eval()
with torch.no_grad():
test_logits = model(X_test)
test_probs = torch.softmax(test_logits, dim=1)
test_preds = torch.argmax(test_probs, dim=1)
test_acc = (test_preds == y_test).float().mean().item()
print("\n=== 测试结果 ===")
print("Test Accuracy:", test_acc)
print("预测概率分布:\n", test_probs[:5].numpy())
print("预测类别:\n", label_enc.inverse_transform(test_preds[:5].numpy()))
print("真实类别:\n", label_enc.inverse_transform(y_test[:5].numpy()))
🔑 输出内容
- 每 10 轮打印一次 训练损失和准确率
- 最终给出 测试集准确率
- 显示前 5 条预测的:
- 概率分布(softmax 输出)
- 预测类别
- 真实类别
好嘞 ✅ 我来帮你在之前的 动态 DNN 分类模型 基础上,扩展成一个 多任务学习模型:
- 输入:同样是 DataFrame(可以有数值和字符列)。
- 输出:
- 分类任务(例如预测类别:cat/dog/mouse)
- 回归任务(例如预测数值:价格/评分等)
- 结构:
- 共享前置网络(动态隐藏层,用
layer_list
控制)。 - 分别有 分类头 和 回归头。
- 共享前置网络(动态隐藏层,用
- Loss:分类用
CrossEntropyLoss
,回归用MSELoss
,最后取加权和。 - 预测:输出分类概率分布 + 回归值。
🔹 完整 PyTorch 代码示例
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, StandardScaler
import pandas as pd
import numpy as np
# ========== 1. 生成模拟 DataFrame 数据 ==========
np.random.seed(42)
N = 600
df = pd.DataFrame({
"feature1": np.random.randn(N),
"feature2": np.random.randn(N) * 2,
"feature3": np.random.choice(["A", "B", "C"], size=N),
"label_class": np.random.choice(["cat", "dog", "mouse"], size=N), # 分类标签
"label_reg": np.random.randn(N) * 10 + 50 # 回归标签
})
# 编码字符型输入
df = pd.get_dummies(df, columns=["feature3"], drop_first=True)
# 标签处理
label_enc = LabelEncoder()
df["label_class"] = label_enc.fit_transform(df["label_class"])
X = df.drop(columns=["label_class", "label_reg"]).values
y_class = df["label_class"].values
y_reg = df["label_reg"].values
# 标准化输入
scaler = StandardScaler()
X = scaler.fit_transform(X)
# 划分训练/测试集
X_train, X_test, y_class_train, y_class_test, y_reg_train, y_reg_test = train_test_split(
X, y_class, y_reg, test_size=0.2, random_state=42
)
X_train = torch.tensor(X_train, dtype=torch.float32)
y_class_train = torch.tensor(y_class_train, dtype=torch.long)
y_reg_train = torch.tensor(y_reg_train, dtype=torch.float32).view(-1, 1)
X_test = torch.tensor(X_test, dtype=torch.float32)
y_class_test = torch.tensor(y_class_test, dtype=torch.long)
y_reg_test = torch.tensor(y_reg_test, dtype=torch.float32).view(-1, 1)
# ========== 2. 定义多任务 DNN 模型 ==========
class MultiTaskDNN(nn.Module):
def __init__(self, input_dim, class_output_dim, layer_list, dropout=0.3):
super(MultiTaskDNN, self).__init__()
layers = []
prev_dim = input_dim
for h in layer_list:
layers.append(nn.Linear(prev_dim, h))
layers.append(nn.BatchNorm1d(h))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout))
prev_dim = h
self.shared = nn.Sequential(*layers)
# 分类头
self.classifier = nn.Linear(prev_dim, class_output_dim)
# 回归头
self.regressor = nn.Linear(prev_dim, 1)
def forward(self, x):
shared_out = self.shared(x)
class_logits = self.classifier(shared_out)
reg_output = self.regressor(shared_out)
return class_logits, reg_output
# ========== 3. 初始化模型 ==========
input_dim = X_train.shape[1]
class_output_dim = len(np.unique(y_class))
layer_list = [128, 64]
model = MultiTaskDNN(input_dim, class_output_dim, layer_list)
criterion_class = nn.CrossEntropyLoss()
criterion_reg = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.7)
# ========== 4. 训练 ==========
epochs = 50
for epoch in range(epochs):
model.train()
optimizer.zero_grad()
class_logits, reg_output = model(X_train)
loss_class = criterion_class(class_logits, y_class_train)
loss_reg = criterion_reg(reg_output, y_reg_train)
loss = loss_class + 0.5 * loss_reg # 权重可调
loss.backward()
optimizer.step()
scheduler.step()
# 分类准确率
preds_class = torch.argmax(class_logits, dim=1)
acc_class = (preds_class == y_class_train).float().mean().item()
mse_reg = loss_reg.item()
if (epoch + 1) % 10 == 0:
print(f"Epoch {epoch+1}/{epochs} | Loss: {loss.item():.4f} | "
f"Class Acc: {acc_class:.4f} | Reg MSE: {mse_reg:.4f}")
# ========== 5. 测试 ==========
model.eval()
with torch.no_grad():
test_class_logits, test_reg_output = model(X_test)
# 分类预测
test_probs = torch.softmax(test_class_logits, dim=1)
test_preds_class = torch.argmax(test_probs, dim=1)
test_acc_class = (test_preds_class == y_class_test).float().mean().item()
# 回归预测
test_mse_reg = criterion_reg(test_reg_output, y_reg_test).item()
print("\n=== 测试结果 ===")
print("分类准确率:", test_acc_class)
print("回归 MSE:", test_mse_reg)
print("预测分类概率分布:\n", test_probs[:5].numpy())
print("预测分类类别:\n", label_enc.inverse_transform(test_preds_class[:5].numpy()))
print("真实分类类别:\n", label_enc.inverse_transform(y_class_test[:5].numpy()))
print("预测回归值:\n", test_reg_output[:5].numpy().flatten())
print("真实回归值:\n", y_reg_test[:5].numpy().flatten())
🔑 代码要点
- 多任务网络
self.shared
→ 共享部分self.classifier
→ 分类头self.regressor
→ 回归头
- Loss 组合
- 分类 loss (
CrossEntropyLoss
) - 回归 loss (
MSELoss
) - 总 loss =
loss_class + 0.5 * loss_reg
(权重可调)
- 分类 loss (
- 训练 & 测试指标
- 分类:准确率
- 回归:MSE