你这个需求可以很好地使用 适配器模式 来构建一个灵活的模型训练框架,用于适配各种不同类型的模型(如线性回归、神经网络、LLM、大数据分析等)。
下面我将为你提供一个完整的结构化训练流程框架,包含以下模块:
✅ 总体流程(模块划分)
User Request
↓
1. 参数输入(模型参数 + 因子参数 + 训练数据列表)
↓
2. 数据清洗(缺失值、字段映射、类型识别)
↓
3. 数据预处理(归一化 / 标准化)
↓
4. 模型适配(选择不同模型类)
↓
5. 模型训练(统一的 train 接口)
↓
6. 数据后处理(如反归一化)
↓
7. 模型保存(本地或上传 S3)
✅ 项目结构建议
ml_pipeline/
├── adapters/ # 模型适配器
│ ├── base.py # BaseModelAdapter
│ ├── linear_regression.py
│ ├── dnn.py
│ └── llm.py
├── data_cleaning/
│ ├── cleaner.py # 数据清洗工具
│ └── field_mapper.py # 字段映射/类型识别
├── preprocessing/
│ ├── preprocessor.py # 归一化等预处理逻辑
│ └── postprocessor.py # 反归一化
├── trainer.py # 训练流程主入口
├── utils/
│ ├── io.py # 数据文件读取(openpyxl + csv)
│ └── s3_uploader.py # 模型文件上传
└── config/
└── model_registry.py # 注册支持的模型类型
✅ 核心类与流程实现(关键代码)
1. BaseModelAdapter
(所有模型的统一接口)
# adapters/base.py
from abc import ABC, abstractmethod
class BaseModelAdapter(ABC):
def __init__(self, model_params: dict):
self.model_params = model_params
@abstractmethod
def preprocess(self, df):
pass
@abstractmethod
def train(self, train_data, test_data):
pass
@abstractmethod
def postprocess(self, results):
pass
@abstractmethod
def save_model(self, path: str):
pass
2. LinearRegressionAdapter
示例
# adapters/linear_regression.py
from sklearn.linear_model import LinearRegression
from .base import BaseModelAdapter
class LinearRegressionAdapter(BaseModelAdapter):
def __init__(self, model_params):
super().__init__(model_params)
self.model = LinearRegression(**model_params)
def preprocess(self, df):
return df # 线性模型通常不要求额外处理
def train(self, train_data, test_data):
X_train, y_train = train_data
self.model.fit(X_train, y_train)
return {"score": self.model.score(*test_data)}
def postprocess(self, results):
return results
def save_model(self, path):
import joblib
joblib.dump(self.model, path)
3. 训练流程主入口
# trainer.py
from adapters import model_registry
from data_cleaning.cleaner import clean_dataset
from preprocessing.preprocessor import preprocess_dataset
from preprocessing.postprocessor import postprocess_result
from utils.io import load_datasets
from utils.s3_uploader import upload_model_file
def train_pipeline(factor_params, model_params, model_type, dataset_files, publish=False):
# 1. 加载数据集
df = load_datasets(dataset_files)
# 2. 数据清洗
df_clean = clean_dataset(df, factor_params)
# 3. 数据预处理(归一化)
df_preprocessed, scaler = preprocess_dataset(df_clean, factor_params)
# 4. 分割训练测试集
from sklearn.model_selection import train_test_split
X = df_preprocessed.drop(columns=["target"])
y = df_preprocessed["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# 5. 获取模型适配器
adapter_class = model_registry.get(model_type)
if not adapter_class:
raise ValueError(f"不支持的模型类型: {model_type}")
model = adapter_class(model_params)
# 6. 训练
train_result = model.train((X_train, y_train), (X_test, y_test))
# 7. 后处理
final_result = model.postprocess(train_result)
# 8. 模型保存
save_path = f"./output_models/{model_type}_{time.time()}.pkl"
model.save_model(save_path)
if publish:
upload_model_file(save_path)
return final_result
4. 注册模型适配器
# config/model_registry.py
from adapters.linear_regression import LinearRegressionAdapter
from adapters.dnn import DNNAdapter
from adapters.llm import LLMAdapter
model_registry = {
"linear_regression": LinearRegressionAdapter,
"dnn": DNNAdapter,
"llm": LLMAdapter,
}
5. 清洗 + 预处理(示例)
# data_cleaning/cleaner.py
def clean_dataset(df, factor_params):
# 删除因子为空的行
for factor in factor_params:
df = df[df[factor["name"]].notnull()]
return df
# preprocessing/preprocessor.py
from sklearn.preprocessing import MinMaxScaler
def preprocess_dataset(df, factor_params):
numeric_cols = [f["name"] for f in factor_params if f.get("normalize") is True]
scaler = MinMaxScaler()
df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
return df, scaler
✅ 示例调用
result = train_pipeline(
factor_params=[{"name": "feature1", "normalize": True}, {"name": "feature2", "normalize": False}],
model_params={"fit_intercept": True},
model_type="linear_regression",
dataset_files=["/path/data1.xlsx", "/path/data2.csv"],
publish=True
)
print(result)
✅ 总结
- ✔ 使用适配器模式封装不同模型训练逻辑,便于扩展新模型。
- ✔ 数据清洗与预处理模块独立,便于复用和测试。
- ✔ 支持模型训练后发布(如上传到 S3)。
- ✔ 高度解耦、结构清晰,方便多人协作或服务化部署。
如果你希望我继续补全某个模块(如 DNNAdapter、反归一化、任务队列调用训练),可以告诉我,我会继续细化。
很好,这说明你需要构建一个灵活可配置的模型训练流程,其中某些步骤(如数据预处理、归一化、后处理、发布)可选执行,取决于模型类型或模型适配器的需要。
为此,我们可以增强适配器结构,使每个模型声明需要哪些流程步骤,并动态控制训练流程中各个节点的执行。
✅ 整体设计目标
我们希望实现如下形式的训练控制流程:
1. 模型适配器声明:我是否需要归一化?是否需要后处理?
2. 训练主流程读取声明内容,仅执行需要的步骤
✅ 修改点一:模型适配器接口扩展
我们为 BaseModelAdapter
增加特性声明:
# adapters/base.py
class BaseModelAdapter(ABC):
def __init__(self, model_params: dict):
self.model_params = model_params
# ========= 新增:模型所需训练流程特性声明 =========
def need_preprocess(self) -> bool:
return False
def need_postprocess(self) -> bool:
return False
def need_normalization(self) -> bool:
return False
# ========= 原始接口 =========
@abstractmethod
def preprocess(self, df):
pass
@abstractmethod
def train(self, train_data, test_data):
pass
@abstractmethod
def postprocess(self, results):
pass
@abstractmethod
def save_model(self, path: str):
pass
✅ 修改点二:适配器按需实现声明方法
例如:线性回归需要归一化,不需要后处理
# adapters/linear_regression.py
class LinearRegressionAdapter(BaseModelAdapter):
def need_preprocess(self):
return True
def need_normalization(self):
return True
def need_postprocess(self):
return False
# ... 其他接口同前 ...
✅ 修改点三:训练主流程改造(按需执行)
# trainer.py
def train_pipeline(factor_params, model_params, model_type, dataset_files, publish=False):
df = load_datasets(dataset_files)
df_clean = clean_dataset(df, factor_params)
adapter_class = model_registry.get(model_type)
if not adapter_class:
raise ValueError(f"不支持的模型类型: {model_type}")
model = adapter_class(model_params)
# ========= 是否需要预处理(归一化) =========
if model.need_preprocess():
if model.need_normalization():
df_clean, scaler = preprocess_dataset(df_clean, factor_params)
else:
df_clean, scaler = df_clean, None
else:
scaler = None
# ========= 分割训练集和测试集 =========
from sklearn.model_selection import train_test_split
X = df_clean.drop(columns=["target"])
y = df_clean["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# ========= 模型训练 =========
train_result = model.train((X_train, y_train), (X_test, y_test))
# ========= 可选后处理 =========
if model.need_postprocess():
final_result = model.postprocess(train_result)
else:
final_result = train_result
# ========= 模型保存 & 可选发布 =========
save_path = f"./output_models/{model_type}_{int(time.time())}.pkl"
model.save_model(save_path)
if publish:
upload_model_file(save_path)
return final_result
✅ 总结:你获得了什么
功能 | 实现方式 |
---|---|
每步是否执行 | 模型适配器提供 .need_xxx() 接口进行控制 |
扩展支持的模型类型 | 只需实现新模型适配器类,声明需求并实现接口 |
灵活性 | 流程每个节点都可以按模型要求决定是否执行 |
解耦性 | 每个模块(清洗、预处理、训练、后处理)都可复用 |
如果你希望下一步我帮你:
- 添加某个具体模型适配器(如 DNN、LLM)
- 支持某些模型在训练过程中记录日志(用于前端显示)
- 通过队列或任务管理器(如 Celery)执行训练任务
欢迎继续告诉我,我会直接给出对应代码。
明白了,你的需求是:
在模型训练流程中,每个步骤(如归一化、预处理、后处理等)是否执行由两方决定:
- 模型制作者(适配器内部定义):设置默认流程行为(最高优先级)。
- 前端用户配置:用户可以覆盖部分行为,如果适配器允许。
并且:
- 模型适配器优先级高:如果适配器强制某步骤执行/跳过,则用户无法控制。
- 用户控制优先使用:如果适配器没有明确强制要求执行或跳过,则用户控制生效。
✅ 整体控制逻辑(流程节点执行判断)
我们引入一个通用判断函数:
def should_execute_step(
step_name: str,
user_control: dict,
adapter_control: dict
) -> bool:
"""
决定是否执行某一流程节点:
- adapter_control 优先级更高(必须执行或必须跳过)
- 否则看用户是否传入控制值
- 否则默认执行为 False
"""
if step_name in adapter_control:
return adapter_control[step_name]
return user_control.get(step_name, False)
✅ 模型适配器定义接口(带默认行为)
每个模型可以通过适配器定义流程行为,例如:
class BaseModelAdapter(ABC):
def __init__(self, model_params: dict):
self.model_params = model_params
@property
def pipeline_control(self) -> dict:
"""
定义模型默认行为:
{
"preprocess": True, # 表示模型需要预处理
"normalize": False, # 表示模型不需要归一化
"postprocess": None, # 表示是否需要后处理由用户决定
}
None 表示未指定,允许用户决定。
"""
return {}
@abstractmethod
def preprocess(self, df):
pass
@abstractmethod
def train(self, train_data, test_data):
pass
@abstractmethod
def postprocess(self, results):
pass
@abstractmethod
def save_model(self, path: str):
pass
示例:线性回归适配器强制需要归一化、不做后处理:
class LinearRegressionAdapter(BaseModelAdapter):
@property
def pipeline_control(self):
return {
"preprocess": True,
"normalize": True,
"postprocess": False
}
# 实现相关接口...
✅ 主训练流程调整(引入双重控制)
def train_pipeline(
factor_params: dict,
model_params: dict,
model_type: str,
dataset_files: List[Union[str, IO]],
user_pipeline_control: dict,
publish: bool = False
):
df = load_datasets(dataset_files)
df_clean = clean_dataset(df, factor_params)
adapter_class = model_registry.get(model_type)
if not adapter_class:
raise ValueError(f"模型类型不支持: {model_type}")
model = adapter_class(model_params)
adapter_control = model.pipeline_control
# 判断是否执行各个步骤
do_preprocess = should_execute_step("preprocess", user_pipeline_control, adapter_control)
do_normalize = should_execute_step("normalize", user_pipeline_control, adapter_control)
do_postprocess = should_execute_step("postprocess", user_pipeline_control, adapter_control)
# ========= 预处理 =========
scaler = None
if do_preprocess:
if do_normalize:
df_clean, scaler = preprocess_dataset(df_clean, factor_params)
else:
df_clean, scaler = df_clean, None
# ========= 划分数据集 =========
from sklearn.model_selection import train_test_split
X = df_clean.drop(columns=["target"])
y = df_clean["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# ========= 训练 =========
train_result = model.train((X_train, y_train), (X_test, y_test))
# ========= 后处理 =========
if do_postprocess:
final_result = model.postprocess(train_result)
else:
final_result = train_result
# ========= 保存 & 发布 =========
save_path = f"./output_models/{model_type}_{int(time.time())}.pkl"
model.save_model(save_path)
if publish:
upload_model_file(save_path)
return final_result
✅ 使用示例
# 用户传入的控制参数
user_pipeline_control = {
"preprocess": True,
"normalize": False,
"postprocess": True
}
train_pipeline(
factor_params={...},
model_params={...},
model_type="linear_regression",
dataset_files=["/path/to/data1.csv", "/path/to/data2.csv"],
user_pipeline_control=user_pipeline_control,
publish=True
)
即使用户设置 normalize=False
,但如果 adapter.pipeline_control["normalize"] = True
,仍会执行归一化。
✅ 总结
功能 | 实现方式 |
---|---|
每个节点控制是否执行 | 适配器 pipeline_control 和用户传参共同决定 |
优先级控制 | 适配器优先;用户传参为次选;否则默认 False |
新增模型适配简单 | 只需声明默认流程行为 + 实现训练接口即可 |
可维护性高 | 每个模块职责清晰(数据处理、训练逻辑、保存逻辑) |
如果你需要我继续提供:
- 流程结构封装为模块包
- 任务持久化与 Celery 队列集成
- 将训练流程日志回传前端
欢迎继续告诉我,我可以继续补充完整示例。
好的,以下是将“模型训练流程”封装为模块包,并支持将训练日志实时(流式)回传前端的完整设计思路与代码实现方案:
✅ 1. 模块结构设计
我们将整个训练流程封装为一个模块包,例如 ai_pipeline
:
ai_pipeline/
├── __init__.py
├── base.py # 适配器基类与通用控制逻辑
├── adapters/ # 各种模型适配器
│ ├── __init__.py
│ ├── linear_regression.py
│ └── ...
├── data_handler.py # 数据加载、清洗、预处理等
├── pipeline.py # 主流程控制
├── logger.py # 日志工具(流式输出)
└── registry.py # 模型注册器
✅ 2. 流式日志 logger 实现(logger.py
)
import sys
import time
from typing import Callable
class StreamLogger:
def __init__(self, log_func: Callable[[str], None] = print):
self.log_func = log_func
def log(self, msg: str):
timestamp = time.strftime("[%Y-%m-%d %H:%M:%S]")
self.log_func(f"{timestamp} {msg}")
# 用于队列或 WebSocket 场景的回调注入
logger = StreamLogger()
✅ 3. 数据处理逻辑(data_handler.py
)
略(你已经有了读取多个文件、字段匹配、归一化、反归一化等逻辑),只要封装成函数并接受 logger 实例即可。
例如:
def clean_and_preprocess(datasets, factor_config, normalize=False, logger=None):
logger = logger or print
logger("开始加载与合并数据集")
# 合并、清洗、归一化等操作...
✅ 4. 适配器定义(base.py
)
from abc import ABC, abstractmethod
class BaseModelAdapter(ABC):
def __init__(self, model_params: dict, logger=None):
self.model_params = model_params
self.logger = logger or print
@property
def pipeline_control(self) -> dict:
return {}
@abstractmethod
def train(self, X_train, y_train, X_test, y_test):
pass
@abstractmethod
def save_model(self, path: str):
pass
def postprocess(self, results):
return results
✅ 5. 训练主流程(pipeline.py
)
from .registry import model_registry
from .data_handler import clean_and_preprocess
from .logger import logger
from .base import BaseModelAdapter
from .utils import should_execute_step
from sklearn.model_selection import train_test_split
def train_pipeline(
factor_params: dict,
model_params: dict,
model_type: str,
dataset_files: list,
user_pipeline_control: dict,
logger_instance=None
):
log = logger_instance or logger.log
log(f"开始训练任务: 模型类型={model_type}")
# 1. 加载模型适配器
model_class = model_registry.get(model_type)
if not model_class:
raise ValueError(f"不支持的模型类型: {model_type}")
model = model_class(model_params, logger=log)
adapter_control = model.pipeline_control
# 2. 数据处理
normalize = should_execute_step("normalize", user_pipeline_control, adapter_control)
df, scaler = clean_and_preprocess(dataset_files, factor_params, normalize, logger=log)
X = df.drop(columns=["target"])
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# 3. 训练
log("开始模型训练...")
result = model.train(X_train, y_train, X_test, y_test)
# 4. 后处理(可选)
if should_execute_step("postprocess", user_pipeline_control, adapter_control):
result = model.postprocess(result)
# 5. 保存模型
model_path = f"./outputs/model_{model_type}_{int(time.time())}.pkl"
model.save_model(model_path)
log(f"模型保存到: {model_path}")
return result
✅ 6. WebSocket 回传(适用于 Web 前端)
Django + Channels 示例(consumer)
# consumers.py
from channels.generic.websocket import WebsocketConsumer
import threading
from ai_pipeline import train_pipeline
from ai_pipeline.logger import StreamLogger
class TrainingConsumer(WebsocketConsumer):
def connect(self):
self.accept()
def disconnect(self, code):
pass
def receive(self, text_data=None, bytes_data=None):
import json
payload = json.loads(text_data)
def send_log(msg):
self.send(json.dumps({"type": "log", "message": msg}))
logger = StreamLogger(log_func=send_log)
# 后台线程跑任务
threading.Thread(target=train_pipeline, kwargs={
"factor_params": payload["factorParams"],
"model_params": payload["modelParams"],
"model_type": payload["modelType"],
"dataset_files": payload["datasetFiles"],
"user_pipeline_control": payload.get("control", {}),
"logger_instance": logger.log
}).start()
✅ 总结:关键特性一览
功能点 | 实现方式 |
---|---|
训练模块解耦封装 | ai_pipeline 包 |
支持多种模型适配 | 适配器模式 + 注册器 |
控制流程节点执行 | should_execute_step() 方法 |
双重控制(模型优先,用户可控) | pipeline_control + user config |
日志可回传 | 自定义 logger + WebSocket |
可集成 Celery | logger 与任务线程解耦 |
如需我:
- 增加训练结果指标日志(R²、MAE等)
- 封装成 pip install 包结构
- 提供前端消息处理示例(如 Vue + Element Plus)
都可以继续扩展。是否需要我打包成一个可直接运行的项目结构示例?
# ai_pipeline/__init__.py
# 空文件,使 ai_pipeline 成为模块包
# ai_pipeline/logger.py
import time
from typing import Callable
class StreamLogger:
def __init__(self, log_func: Callable[[str], None] = print):
self.log_func = log_func
def log(self, msg: str):
timestamp = time.strftime("[%Y-%m-%d %H:%M:%S]")
self.log_func(f"{timestamp} {msg}")
logger = StreamLogger()
# ai_pipeline/utils.py
def should_execute_step(step: str, user_control: dict, adapter_control: dict):
if step in adapter_control:
return adapter_control[step]
return user_control.get(step, False)
# ai_pipeline/base.py
from abc import ABC, abstractmethod
class BaseModelAdapter(ABC):
def __init__(self, model_params: dict, logger=None):
self.model_params = model_params
self.logger = logger or print
@property
def pipeline_control(self) -> dict:
return {}
@abstractmethod
def train(self, X_train, y_train, X_test, y_test):
pass
@abstractmethod
def save_model(self, path: str):
pass
def postprocess(self, results):
return results
# ai_pipeline/adapters/linear_regression.py
import joblib
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
from ai_pipeline.base import BaseModelAdapter
class LinearRegressionAdapter(BaseModelAdapter):
def __init__(self, model_params: dict, logger=None):
super().__init__(model_params, logger)
self.model = LinearRegression(**model_params)
@property
def pipeline_control(self):
return {"normalize": True, "postprocess": True}
def train(self, X_train, y_train, X_test, y_test):
self.logger("训练线性回归模型...")
self.model.fit(X_train, y_train)
y_pred = self.model.predict(X_test)
mse = mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
self.logger(f"模型评估 - MSE: {mse:.4f}, R2: {r2:.4f}")
return {"mse": mse, "r2": r2}
def save_model(self, path: str):
joblib.dump(self.model, path)
self.logger(f"模型保存到 {path}")
# ai_pipeline/registry.py
from ai_pipeline.adapters.linear_regression import LinearRegressionAdapter
model_registry = {
"linear_regression": LinearRegressionAdapter
}
# ai_pipeline/data_handler.py
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
# 示例因子配置:{"age": {"type": "numeric", "normalize": True}}
def clean_and_preprocess(dataset_files, factor_config, normalize=False, logger=None):
logger = logger or print
logger("加载并合并数据集...")
df_list = []
for path in dataset_files:
df = pd.read_csv(path)
df_list.append(df)
# 合并并对齐列名(顺序不一致时)
base_cols = df_list[0].columns
df_list = [df[base_cols] for df in df_list]
df = pd.concat(df_list, ignore_index=True).drop_duplicates()
logger("根据因子配置进行数据清洗...")
for col, config in factor_config.items():
if config.get("type") == "numeric":
df = df[df[col].notna()]
if "min" in config:
df = df[df[col] >= config["min"]]
if "max" in config:
df = df[df[col] <= config["max"]]
elif config.get("type") == "category":
allowed = config.get("values")
if allowed:
df = df[df[col].isin(allowed)]
scaler = None
if normalize:
numeric_cols = [col for col, conf in factor_config.items() if conf.get("type") == "numeric" and conf.get("normalize")]
if numeric_cols:
logger("对数值型字段进行归一化...")
scaler = MinMaxScaler()
df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
return df, scaler
# ai_pipeline/pipeline.py
import time
from sklearn.model_selection import train_test_split
from ai_pipeline.registry import model_registry
from ai_pipeline.logger import logger
from ai_pipeline.utils import should_execute_step
from ai_pipeline.data_handler import clean_and_preprocess
def train_pipeline(
factor_params: dict,
model_params: dict,
model_type: str,
dataset_files: list,
user_pipeline_control: dict,
logger_instance=None
):
log = logger_instance or logger.log
log(f"开始训练任务: 模型类型={model_type}")
model_class = model_registry.get(model_type)
if not model_class:
raise ValueError(f"不支持的模型类型: {model_type}")
model = model_class(model_params, logger=log)
adapter_control = model.pipeline_control
normalize = should_execute_step("normalize", user_pipeline_control, adapter_control)
df, scaler = clean_and_preprocess(dataset_files, factor_params, normalize, logger=log)
X = df.drop(columns=["target"])
y = df["target"]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
log("开始模型训练...")
result = model.train(X_train, y_train, X_test, y_test)
if should_execute_step("postprocess", user_pipeline_control, adapter_control):
result = model.postprocess(result)
model_path = f"./outputs/model_{model_type}_{int(time.time())}.pkl"
model.save_model(model_path)
log(f"模型训练完成,保存至: {model_path}")
return result
### 🧠 模型适配器基类(base.py)
```python
# ai_pipeline/base.py
from abc import ABC, abstractmethod
class BaseModelAdapter(ABC):
def __init__(self, model_params, logger):
self.model_params = model_params
self.logger = logger
@abstractmethod
def train(self, X_train, y_train):
pass
@abstractmethod
def predict(self, X):
pass
@abstractmethod
def save(self, path):
pass
```
### ✅ 模型注册表(registry.py)
```python
# ai_pipeline/registry.py
from ai_pipeline.adapters.linear_regression import LinearRegressionAdapter
from ai_pipeline.adapters.dnn import DNNAdapter
MODEL_REGISTRY = {
"linear_regression": LinearRegressionAdapter,
"dnn": DNNAdapter,
}
```
### ➕ 新增 DNN 模型适配器(adapters/dnn.py)
```python
# ai_pipeline/adapters/dnn.py
from ai_pipeline.base import BaseModelAdapter
from sklearn.neural_network import MLPRegressor
import joblib
class DNNAdapter(BaseModelAdapter):
def __init__(self, model_params, logger):
super().__init__(model_params, logger)
self.model = MLPRegressor(**model_params)
def train(self, X_train, y_train):
self.logger("[DNN] 开始训练")
self.model.fit(X_train, y_train)
self.logger("[DNN] 训练完成")
def predict(self, X):
return self.model.predict(X)
def save(self, path):
joblib.dump(self.model, path)
```
### 🧪 示例测试调用更新
```python
from ai_pipeline.pipeline import train_pipeline
# 使用深度神经网络模型训练
result = train_pipeline(
factor_params={
"feature1": {"type": "numeric", "normalize": True},
"feature2": {"type": "numeric", "normalize": True},
},
model_params={
"hidden_layer_sizes": (50, 50),
"activation": "relu",
"max_iter": 200
},
model_type="dnn",
dataset_files=["dataset1.csv", "dataset2.csv"],
user_pipeline_control={
"normalize": True,
"postprocess": True,
}
)
```
### ✅ 流程兼容性说明
- 模型只需要继承 `BaseModelAdapter`
- 注册在 `registry.py` 的 `MODEL_REGISTRY` 中
- 控制流程结构和日志机制通用,适配任意结构化模型
如需添加支持分类模型、NLP 模型、或外部服务模型(如 OpenAI),结构完全适配,只需按上述结构新增适配器。是否还需要我添加一个如 HuggingFace BERT 的适配器?
### 🧠 模型适配器基类(base.py)
```python
# ai_pipeline/base.py
from abc import ABC, abstractmethod
class BaseModelAdapter(ABC):
def __init__(self, model_params, logger):
self.model_params = model_params # 模型参数(如层数、学习率等)
self.logger = logger # 日志函数或流式日志输出器
@abstractmethod
def train(self, X_train, y_train):
pass # 子类实现:训练模型
@abstractmethod
def predict(self, X):
pass # 子类实现:执行预测
@abstractmethod
def save(self, path):
pass # 子类实现:保存模型
```
### ✅ 模型注册表(registry.py)
```python
# ai_pipeline/registry.py
from ai_pipeline.adapters.linear_regression import LinearRegressionAdapter
from ai_pipeline.adapters.dnn import DNNAdapter
from ai_pipeline.adapters.huggingface import HuggingFaceTextAdapter
MODEL_REGISTRY = {
"linear_regression": LinearRegressionAdapter,
"dnn": DNNAdapter,
"huggingface_text": HuggingFaceTextAdapter,
}
```
### ➕ HuggingFace 文本模型适配器(adapters/huggingface.py)
```python
# ai_pipeline/adapters/huggingface.py
from ai_pipeline.base import BaseModelAdapter
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import torch
import os
class HuggingFaceTextAdapter(BaseModelAdapter):
def __init__(self, model_params, logger):
super().__init__(model_params, logger)
model_name = model_params.get("model_name", "distilbert-base-uncased-finetuned-sst-2-english")
self.logger(f"[HF] 加载模型: {model_name}")
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
self.pipeline = pipeline("sentiment-analysis", model=self.model, tokenizer=self.tokenizer)
def train(self, X_train, y_train):
self.logger("[HF] 文本模型当前为预训练推理模式,跳过训练阶段")
def predict(self, X):
self.logger("[HF] 开始文本预测")
texts = X["text"] if isinstance(X, dict) else X # 支持 DataFrame 或原始列表
return self.pipeline(texts)
def save(self, path):
self.logger("[HF] 保存模型和分词器")
self.model.save_pretrained(path)
self.tokenizer.save_pretrained(path)
```
### ✅ 说明(每行解释)
```python
model_name = model_params.get("model_name", "distilbert-base-uncased-finetuned-sst-2-english")
# 获取模型名称(用户传入或默认使用情感分析模型)
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
# 加载 HuggingFace 模型和 tokenizer
self.pipeline = pipeline("sentiment-analysis", model=self.model, tokenizer=self.tokenizer)
# 封装为 transformers pipeline,便于直接推理
```
### 🧪 示例测试调用(文本模型)
```python
from ai_pipeline.pipeline import train_pipeline
result = train_pipeline(
factor_params={}, # 文本模型通常不需要数值因子
model_params={"model_name": "distilbert-base-uncased-finetuned-sst-2-english"},
model_type="huggingface_text",
dataset_files=["texts.csv"],
user_pipeline_control={
"normalize": False,
"postprocess": False,
}
)
```
### 📦 输入文本数据要求
- CSV 文件应包含 `text` 字段列:
```
text
"I love this product."
"This is terrible."
```
---
如需扩展为多标签分类、多语言模型,只需更换模型名和 pipeline 类型即可。
是否接下来需要接入类似 DeepSeek 或 OpenAI API 的推理模型?
## ✅ AI 平台通用模型训练流程模块(训练流程、适配器结构、API 推理模型支持)
---
## 📐 项目模块概览
```bash
ai_pipeline/
├── base.py # 模型适配器基类(定义接口)
├── pipeline.py # 主训练流程模块
├── registry.py # 模型注册表(用于模型类型查找)
├── adapters/
│ ├── __init__.py
│ ├── linear_regression.py # sklearn 线性回归模型适配器
│ ├── dnn.py # sklearn DNN 模型适配器
│ ├── huggingface.py # 本地 transformer 模型适配器
│ └── openai_api.py # 外部 OpenAI / DeepSeek 接口适配器
```
---
## 📦 模型适配器基类(base.py)
```python
# ai_pipeline/base.py
from abc import ABC, abstractmethod
class BaseModelAdapter(ABC):
def __init__(self, model_params, logger):
self.model_params = model_params # 模型初始化参数
self.logger = logger # 日志函数
@abstractmethod
def train(self, X_train, y_train):
pass # 训练方法
@abstractmethod
def predict(self, X):
pass # 预测方法
@abstractmethod
def save(self, path):
pass # 模型保存方法
```
---
## 🧠 外部 API 模型适配器(如 OpenAI 或 DeepSeek)(adapters/openai_api.py)
```python
# ai_pipeline/adapters/openai_api.py
import openai
from ai_pipeline.base import BaseModelAdapter
class OpenAIChatAdapter(BaseModelAdapter):
def __init__(self, model_params, logger):
super().__init__(model_params, logger)
self.api_key = model_params.get("api_key")
self.model = model_params.get("model", "gpt-4")
openai.api_key = self.api_key
logger(f"[OpenAI] 使用模型:{self.model}")
def train(self, X_train, y_train):
self.logger("[OpenAI] 外部 API 无需训练")
def predict(self, X):
prompt = X.get("prompt")
self.logger(f"[OpenAI] 请求提示词: {prompt}")
response = openai.ChatCompletion.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
stream=True
)
output = ""
for chunk in response:
delta = chunk["choices"][0]["delta"].get("content", "")
output += delta
self.logger(delta, stream=True) # 流式返回
return output
def save(self, path):
self.logger("[OpenAI] API 模型无需保存")
```
### 🔍 每行解释:
```python
openai.api_key = self.api_key
# 设置 OpenAI API 密钥
response = openai.ChatCompletion.create(..., stream=True)
# 启用流式响应,逐块接收模型输出
for chunk in response:
delta = chunk["choices"][0]["delta"].get("content", "")
self.logger(delta, stream=True)
# 将内容逐条流式输出给前端
```
---
## 🔌 模型注册表(registry.py)
```python
# ai_pipeline/registry.py
from ai_pipeline.adapters.linear_regression import LinearRegressionAdapter
from ai_pipeline.adapters.dnn import DNNAdapter
from ai_pipeline.adapters.huggingface import HuggingFaceTextAdapter
from ai_pipeline.adapters.openai_api import OpenAIChatAdapter
MODEL_REGISTRY = {
"linear_regression": LinearRegressionAdapter,
"dnn": DNNAdapter,
"huggingface_text": HuggingFaceTextAdapter,
"openai_chat": OpenAIChatAdapter,
}
```
---
## 🔁 通用训练流程封装模块(pipeline.py)
```python
# ai_pipeline/pipeline.py
import pandas as pd
from ai_pipeline.registry import MODEL_REGISTRY
def default_logger(msg, stream=False):
print("[LOG]", msg, flush=True) # 默认日志输出函数
def train_pipeline(factor_params, model_params, model_type,
dataset_files, user_pipeline_control=None,
base_pipeline_control=None, logger=default_logger):
logger("[Pipeline] 加载模型适配器")
model_cls = MODEL_REGISTRY[model_type]
model = model_cls(model_params, logger)
logger("[Pipeline] 读取并合并数据集")
df = pd.concat([pd.read_csv(f) for f in dataset_files], ignore_index=True)
df.drop_duplicates(inplace=True)
logger("[Pipeline] 应用字段映射和因子清洗")
df = df.dropna(subset=factor_params.get("required_fields", []))
pipeline_ctl = {"normalize": True, "postprocess": True} # 默认流程开关
if base_pipeline_control:
pipeline_ctl.update(base_pipeline_control)
if user_pipeline_control:
pipeline_ctl.update(user_pipeline_control)
if pipeline_ctl["normalize"]:
logger("[Pipeline] 执行归一化")
for col in factor_params.get("numeric_fields", []):
df[col] = (df[col] - df[col].mean()) / df[col].std()
logger("[Pipeline] 划分训练集和测试集")
train_df = df.sample(frac=0.8, random_state=42)
test_df = df.drop(train_df.index)
logger("[Pipeline] 开始训练模型")
X_train, y_train = train_df.drop("target", axis=1), train_df["target"]
model.train(X_train, y_train)
logger("[Pipeline] 模型训练完成,开始预测")
y_pred = model.predict(test_df.drop("target", axis=1))
if pipeline_ctl["postprocess"]:
logger("[Pipeline] 进行反归一化(伪操作)")
logger("[Pipeline] 保存模型")
model.save("saved_models/{}_model.bin".format(model_type))
return {"predictions": y_pred}
```
### 🔍 每行代码解释
```python
model_cls = MODEL_REGISTRY[model_type]
model = model_cls(model_params, logger)
# 获取模型类并初始化实例
pd.concat([pd.read_csv(f) for f in dataset_files])
# 合并多个 CSV 数据集
df.dropna(subset=factor_params.get("required_fields", []))
# 删除因子缺失值的样本
if pipeline_ctl["normalize"]:
for col in factor_params.get("numeric_fields", []):
df[col] = ...
# 对数值型字段进行归一化
model.train(X_train, y_train)
# 启动模型训练
model.predict(...)
# 执行预测(可返回数组或文本)
model.save(...)
# 保存模型到指定路径
```
---
## ✅ 示例使用:
```python
train_pipeline(
factor_params={
"required_fields": ["text"],
"numeric_fields": [],
},
model_params={
"api_key": "sk-xxx",
"model": "gpt-4"
},
model_type="openai_chat",
dataset_files=["chat_prompts.csv"],
user_pipeline_control={"normalize": False, "postprocess": False}
)
```
CSV 内容:
```csv
prompt
"请帮我总结这段话"
"解释什么是 Transformer 模型"
```
---
是否需要我为你加入 S3 自动上传、训练日志实时 WebSocket 回传、或模型发布逻辑?
# 完整的 AI 模型训练流程框架(含适配器 + 注册表 + 数据处理 + 模型实例)
本架构设计适用于:
- 多模型适配(线性回归、大语言模型、分析模型等)
- 模型训练流程节点的灵活控制(归一化、后处理等)
- 支持用户控制与模型创建者控制的优先级机制
- 多数据集合并处理、字段统一、清洗、归一化等
- 训练日志支持实时推送(可接入 WebSocket)
---
## 项目目录结构
```
ml_platform/
├── core/
│ ├── registry.py # 模型注册表
│ ├── base_model.py # 模型适配器基类
│ └── linear_regression.py # 示例模型:线性回归
├── pipeline/
│ └── trainer.py # 主训练流程控制器
├── preprocess/
│ ├── dataset_handler.py # 数据集读取与合并(支持 openpyxl)
│ └── cleaner.py # 数据清洗与预处理逻辑
└── run_train.py # 模拟训练流程入口
```
---
## core/registry.py
```python
# 模型注册中心,所有模型需注册进来以便训练器使用
MODEL_REGISTRY = {}
def register_model(model_type):
def wrapper(cls):
MODEL_REGISTRY[model_type] = cls
return cls
return wrapper
def get_model_class(model_type):
return MODEL_REGISTRY.get(model_type)
```
---
## core/base_model.py
```python
# 所有模型必须继承的基类
from abc import ABC, abstractmethod
class BaseModel(ABC):
def __init__(self, model_params, factor_params, control_config):
self.model_params = model_params
self.factor_params = factor_params
self.control_config = control_config # 归一化/后处理执行标志
@abstractmethod
def train(self, train_data):
pass
@abstractmethod
def predict(self, test_data):
pass
def should_normalize(self):
return self.control_config.get("normalize", False)
def should_postprocess(self):
return self.control_config.get("postprocess", False)
```
---
## core/linear_regression.py
```python
# 示例:线性回归模型
import numpy as np
from sklearn.linear_model import LinearRegression
from core.base_model import BaseModel
from core.registry import register_model
@register_model("linear_regression")
class LinearRegressionModel(BaseModel):
def __init__(self, model_params, factor_params, control_config):
super().__init__(model_params, factor_params, control_config)
self.model = LinearRegression(**self.model_params)
def train(self, train_data):
X, y = train_data.drop(columns=["target"]), train_data["target"]
self.model.fit(X, y)
def predict(self, test_data):
return self.model.predict(test_data)
```
---
## preprocess/dataset_handler.py
```python
# 使用 openpyxl 读取所有数据集内容,返回 pandas DataFrame
import pandas as pd
from openpyxl import load_workbook
import io
def read_excel_as_dataframe(file):
if isinstance(file, (str, bytes)):
wb = load_workbook(filename=file, read_only=True)
else:
wb = load_workbook(filename=io.BytesIO(file.read()), read_only=True)
sheet = wb.active
data = [[cell.value for cell in row] for row in sheet.iter_rows()]
headers, rows = data[0], data[1:]
return pd.DataFrame(rows, columns=headers)
def merge_datasets(files):
dfs = [read_excel_as_dataframe(f) for f in files]
# 对齐列名顺序后合并
columns = dfs[0].columns
aligned_dfs = [df[columns] for df in dfs]
merged = pd.concat(aligned_dfs, ignore_index=True).drop_duplicates()
return merged
```
---
## preprocess/cleaner.py
```python
# 数据清洗:包括因子缺失剔除、字段映射、归一化
import pandas as pd
def clean_data(df, factor_config):
for factor in factor_config:
name = factor["name"]
dtype = factor.get("type")
if dtype == "number":
min_val = factor.get("min")
max_val = factor.get("max")
enum = factor.get("enum")
if min_val is not None:
df = df[df[name] >= min_val]
if max_val is not None:
df = df[df[name] <= max_val]
if enum:
df = df[df[name].isin(enum)]
elif dtype == "string":
enum = factor.get("enum")
if enum:
df = df[df[name].isin(enum)]
df = df.dropna(subset=[f["name"] for f in factor_config])
return df
def normalize_data(df, factor_config):
for factor in factor_config:
if factor.get("normalize") and factor.get("type") == "number":
col = factor["name"]
min_val = df[col].min()
max_val = df[col].max()
df[col] = (df[col] - min_val) / (max_val - min_val)
return df
def split_data(df, train_ratio=0.8):
df = df.sample(frac=1).reset_index(drop=True)
split = int(len(df) * train_ratio)
return df[:split], df[split:]
```
---
## pipeline/trainer.py
```python
from core.registry import get_model_class
from preprocess.dataset_handler import merge_datasets
from preprocess.cleaner import clean_data, normalize_data, split_data
class TrainingPipeline:
def __init__(self, model_type, model_params, factor_params, dataset_files, control_flags):
self.model_type = model_type
self.model_params = model_params
self.factor_params = factor_params
self.dataset_files = dataset_files
self.control_flags = control_flags
def run(self):
print("[流程] 合并数据集...")
df = merge_datasets(self.dataset_files)
print("[流程] 清洗数据...")
df = clean_data(df, self.factor_params)
if self.control_flags.get("normalize"):
print("[流程] 数据归一化...")
df = normalize_data(df, self.factor_params)
print("[流程] 划分训练集/测试集...")
train_df, test_df = split_data(df)
print("[流程] 加载模型并训练...")
model_cls = get_model_class(self.model_type)
if not model_cls:
raise ValueError(f"不支持的模型类型: {self.model_type}")
model = model_cls(self.model_params, self.factor_params, self.control_flags)
model.train(train_df)
print("[流程] 训练完成,执行预测...")
preds = model.predict(test_df.drop(columns=["target"]))
if model.should_postprocess():
print("[流程] 执行后处理...")
# 可以添加反归一化等逻辑
print("[流程] 保存模型与结果...")
# TODO: 保存模型到 S3、本地等
return preds
```
---
## run_train.py(训练入口模拟)
```python
from pipeline.trainer import TrainingPipeline
# 模拟前端请求传参
model_type = "linear_regression"
model_params = {"fit_intercept": True}
factor_params = [
{"name": "age", "type": "number", "normalize": True, "min": 10, "max": 80},
{"name": "gender", "type": "string", "enum": ["male", "female"]}
]
dataset_files = ["./datasets/sample1.xlsx", "./datasets/sample2.xlsx"]
control_flags = {"normalize": True, "postprocess": False}
pipeline = TrainingPipeline(
model_type=model_type,
model_params=model_params,
factor_params=factor_params,
dataset_files=dataset_files,
control_flags=control_flags
)
preds = pipeline.run()
print("预测结果:", preds)
```
---
## ✅ 总结
该框架支持:
- 各种模型注册与训练流程解耦
- 使用 openpyxl 进行 Excel 读取
- 基于因子配置的数据清洗、归一化与拆分
- 模型训练与预测接口标准统一
- 控制逻辑灵活,可扩展其他模型
📌 如需:
- S3 发布与模型持久化
- 使用 Celery 加入任务队列
- 日志实时回传(WebSocket 或 SSE)
可继续扩展,我可以为你添加相关模块。
当然可以!我们以一个线性 DNN 回归模型为例,使用 PyTorch 编写完整训练流程,并展示如何使用训练集(train)、验证集(val)和测试集(test)。
我们会涵盖以下内容:
- 数据拆分(train/val/test)
- 构建模型
- 模型训练(使用 train、val)
- 模型评估(使用 test)
- 每一步代码都有详细注释 ✅
✅ 环境依赖(请先安装)
pip install torch pandas scikit-learn
✅ 完整代码(使用 PyTorch 实现线性 DNN 回归)
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np
# ======== 模拟数据准备 =========
# 假设我们有一个房价预测数据集
def generate_synthetic_data(n=1000):
np.random.seed(42)
X = np.random.rand(n, 3) * 100 # 三个特征:面积、卧室数、距离
y = 0.3 * X[:, 0] + 0.5 * X[:, 1] - 0.2 * X[:, 2] + np.random.randn(n) * 2
return pd.DataFrame(X, columns=["area", "bedrooms", "distance"]), pd.Series(y, name="price")
# 生成数据
X_df, y_series = generate_synthetic_data()
# ======== 数据预处理(归一化) =========
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_df)
X_scaled_df = pd.DataFrame(X_scaled, columns=X_df.columns)
# ======== 数据集拆分 =========
X_trainval, X_test, y_trainval, y_test = train_test_split(X_scaled_df, y_series, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_trainval, y_trainval, test_size=0.2, random_state=42)
# ======== PyTorch 数据集封装 =========
class RegressionDataset(Dataset):
def __init__(self, X, y):
self.X = torch.tensor(X.values, dtype=torch.float32)
self.y = torch.tensor(y.values, dtype=torch.float32).view(-1, 1)
def __len__(self):
return len(self.X)
def __getitem__(self, idx):
return self.X[idx], self.y[idx]
# 构建 Dataloader
train_loader = DataLoader(RegressionDataset(X_train, y_train), batch_size=32, shuffle=True)
val_loader = DataLoader(RegressionDataset(X_val, y_val), batch_size=32)
test_loader = DataLoader(RegressionDataset(X_test, y_test), batch_size=32)
# ======== 构建 DNN 模型 =========
class LinearDNNRegressor(nn.Module):
def __init__(self, input_dim):
super().__init__()
self.net = nn.Sequential(
nn.Linear(input_dim, 64),
nn.ReLU(),
nn.Linear(64, 32),
nn.ReLU(),
nn.Linear(32, 1) # 输出为一个回归值
)
def forward(self, x):
return self.net(x)
# 实例化模型、损失函数、优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LinearDNNRegressor(input_dim=X_train.shape[1]).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# ======== 模型训练过程(使用 train 和 val)=========
num_epochs = 50
best_val_loss = float("inf")
best_model_state = None
for epoch in range(num_epochs):
model.train()
train_loss = 0
for X_batch, y_batch in train_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
optimizer.zero_grad()
y_pred = model(X_batch)
loss = criterion(y_pred, y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item() * len(X_batch)
train_loss /= len(train_loader.dataset)
# 验证过程
model.eval()
val_loss = 0
with torch.no_grad():
for X_batch, y_batch in val_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
y_pred = model(X_batch)
loss = criterion(y_pred, y_batch)
val_loss += loss.item() * len(X_batch)
val_loss /= len(val_loader.dataset)
# 打印日志
print(f"[Epoch {epoch+1}] Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")
# 保存最优模型
if val_loss < best_val_loss:
best_val_loss = val_loss
best_model_state = model.state_dict()
# 恢复最优模型
model.load_state_dict(best_model_state)
# ======== 模型测试(使用 test)=========
model.eval()
test_loss = 0
with torch.no_grad():
for X_batch, y_batch in test_loader:
X_batch, y_batch = X_batch.to(device), y_batch.to(device)
y_pred = model(X_batch)
loss = criterion(y_pred, y_batch)
test_loss += loss.item() * len(X_batch)
test_loss /= len(test_loader.dataset)
print(f"Test Loss (MSE): {test_loss:.4f}")
✅ 每一步解释
数据部分:
generate_synthetic_data
:生成三列特征和一个目标值的模拟数据。train_test_split
:将数据分为训练集、验证集和测试集。
模型训练:
- 训练过程中每个 epoch:
- 使用训练集(train_loader)进行梯度更新。
- 使用验证集(val_loader)评估模型效果。
- 如果验证损失更优,就保存当前模型参数。
模型评估:
- 使用测试集(test_loader)在训练完成后一次性评估,报告最终模型在“未知数据”上的表现。
✅ 小结(训练 vs 验证 vs 测试)
阶段 | 目标 | 使用的数据 |
---|---|---|
训练 | 更新模型参数 | train_loader |
验证 | 提前停止或调参 | val_loader |
测试 | 模拟生产环境评估 | test_loader |
如果你后续要将这套训练结构封装成可配置的适配器模块(如你前面提到的训练服务),我可以帮你封装并加上日志、模型发布和预测流程的对接。
是否需要我将这段代码封装为训练服务类并适配你前面讨论的接口?
下面是一个动态生成神经网络结构(DNN)的实现示例,并支持:
- 🧠 动态设置层结构(隐藏层数量、大小)
- 🔢 自动识别类别特征并添加 Embedding 层
- 📐 Embedding 的维度(嵌入维度)根据类别数量动态确定
- 🧪 输入可为 pandas DataFrame
- ✅ 结构清晰、适合集成到前面训练流程中
✅ 使用示例前的说明
我们考虑如下输入特征场景:
- 连续特征(数值型):
area
,distance
- 类别特征(分类型):
city
,material
(字符串或 int)
✅ 动态神经网络完整代码(含注释)
import torch
import torch.nn as nn
import pandas as pd
class DynamicDNN(nn.Module):
def __init__(self, df: pd.DataFrame,
categorical_cols: list[str],
continuous_cols: list[str],
hidden_layers: list[int] = [64, 32],
embedding_dim_rule=lambda n: min(50, (n + 1) // 2)):
"""
:param df: 原始 DataFrame,用于统计 embedding 信息
:param categorical_cols: 类别型列名
:param continuous_cols: 数值型列名
:param hidden_layers: 隐藏层的结构,例如 [64, 32]
:param embedding_dim_rule: 一个函数,输入类别数,输出 embedding 维度
"""
super().__init__()
self.categorical_cols = categorical_cols
self.continuous_cols = continuous_cols
# ===== 构建 Embedding 层 =====
self.embeddings = nn.ModuleDict()
total_emb_dim = 0
for col in categorical_cols:
num_classes = df[col].nunique()
emb_dim = embedding_dim_rule(num_classes)
self.embeddings[col] = nn.Embedding(num_classes, emb_dim)
total_emb_dim += emb_dim
self.input_dim = total_emb_dim + len(continuous_cols)
# ===== 构建隐藏层(全连接) =====
layers = []
prev_dim = self.input_dim
for h_dim in hidden_layers:
layers.append(nn.Linear(prev_dim, h_dim))
layers.append(nn.ReLU())
prev_dim = h_dim
layers.append(nn.Linear(prev_dim, 1)) # 回归任务输出一维
self.model = nn.Sequential(*layers)
def forward(self, x_cat: dict, x_cont: torch.Tensor):
"""
:param x_cat: dict,每个 key 是类别列名,value 是 LongTensor 的 column vector
:param x_cont: FloatTensor,连续特征的 batch 输入
"""
emb_list = []
for col in self.categorical_cols:
emb = self.embeddings[col](x_cat[col])
emb_list.append(emb)
x = torch.cat(emb_list + [x_cont], dim=1)
return self.model(x)
✅ 数据准备示例
import numpy as np
from sklearn.preprocessing import LabelEncoder
# 模拟数据
df = pd.DataFrame({
"area": np.random.rand(1000) * 100,
"distance": np.random.rand(1000) * 10,
"city": np.random.choice(["A", "B", "C", "D"], size=1000),
"material": np.random.choice(["wood", "brick", "steel"], size=1000),
"price": np.random.rand(1000) * 500
})
categorical_cols = ["city", "material"]
continuous_cols = ["area", "distance"]
# Label encode categorical columns
label_encoders = {}
for col in categorical_cols:
le = LabelEncoder()
df[col] = le.fit_transform(df[col])
label_encoders[col] = le
# 转换为张量输入
x_cat_tensor = {col: torch.tensor(df[col].values, dtype=torch.long) for col in categorical_cols}
x_cont_tensor = torch.tensor(df[continuous_cols].values, dtype=torch.float32)
y_tensor = torch.tensor(df["price"].values, dtype=torch.float32).view(-1, 1)
# 构建模型
model = DynamicDNN(df, categorical_cols, continuous_cols)
pred = model(x_cat_tensor, x_cont_tensor)
print("预测输出形状:", pred.shape) # [batch_size, 1]
✅ 输出说明
- Embedding 层的维度是动态确定的,如
city
有 4 个类别,则默认嵌入维度为(4 + 1) // 2 = 2
- 模型结构由
hidden_layers
控制,可自由扩展 - 支持多列 embedding,同步处理连续值与类别值
- 最后输出 shape 为
[batch_size, 1]
,适用于回归
✅ 接入你现有框架的方法建议
- 将此
DynamicDNN
模块添加为模型适配器的实现类(如之前的BaseModelAdapter
子类)。 - 在数据清洗过程中,自动识别类别列并传入
DynamicDNN
。 - 模型训练器调用
model(x_cat_dict, x_cont_tensor)
进行前向传播。 - 自动构建
x_cat_dict
和x_cont_tensor
(已集成在前面的 Dataset 封装也可以做这个事)。
如果你希望我帮你封装成适配器模型、添加训练和预测模块、加入日志或任务队列支持,我可以继续帮你构建下一个模块。是否继续?
下面是将 DNN 回归模型封装成适配器形式,并添加完整的训练、验证、测试与预测模块的代码。你可以将这套代码作为平台中“模型适配器”的一部分集成进通用训练流程中。
📦 目录结构建议
ml_adapters/
├── __init__.py
├── base.py # 基类定义
├── dnn_regression.py # 本次实现的 DNN 模型适配器
🧩 base.py – 通用模型适配器基类
# ml_adapters/base.py
from abc import ABC, abstractmethod
class BaseModelAdapter(ABC):
@abstractmethod
def train(self, train_data, val_data, **kwargs):
pass
@abstractmethod
def evaluate(self, test_data):
pass
@abstractmethod
def predict(self, input_data):
pass
@abstractmethod
def save(self, path):
pass
@abstractmethod
def load(self, path):
pass
🧠 dnn_regression.py – DNN 适配器实现
# ml_adapters/dnn_regression.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import os
from .base import BaseModelAdapter
class DynamicDNN(nn.Module):
def __init__(self, df: pd.DataFrame, categorical_cols, continuous_cols, hidden_layers=[64, 32]):
super().__init__()
self.categorical_cols = categorical_cols
self.continuous_cols = continuous_cols
self.embeddings = nn.ModuleDict()
total_emb_dim = 0
for col in categorical_cols:
num_classes = df[col].nunique()
emb_dim = min(50, (num_classes + 1) // 2)
self.embeddings[col] = nn.Embedding(num_classes, emb_dim)
total_emb_dim += emb_dim
self.input_dim = total_emb_dim + len(continuous_cols)
layers = []
prev = self.input_dim
for h in hidden_layers:
layers.append(nn.Linear(prev, h))
layers.append(nn.ReLU())
prev = h
layers.append(nn.Linear(prev, 1))
self.model = nn.Sequential(*layers)
def forward(self, x_cat_dict, x_cont):
emb_list = [self.embeddings[k](x_cat_dict[k]) for k in self.categorical_cols]
x = torch.cat(emb_list + [x_cont], dim=1)
return self.model(x)
class DNNRegressionAdapter(BaseModelAdapter):
def __init__(self, df, categorical_cols, continuous_cols, target_col, hidden_layers=[64, 32], device=None):
self.df = df
self.categorical_cols = categorical_cols
self.continuous_cols = continuous_cols
self.target_col = target_col
self.hidden_layers = hidden_layers
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
self.model = DynamicDNN(df, categorical_cols, continuous_cols, hidden_layers).to(self.device)
def _prepare_loader(self, df, batch_size=32, shuffle=True):
x_cat = {col: torch.tensor(df[col].values, dtype=torch.long).to(self.device) for col in self.categorical_cols}
x_cont = torch.tensor(df[self.continuous_cols].values, dtype=torch.float32).to(self.device)
y = torch.tensor(df[self.target_col].values, dtype=torch.float32).view(-1, 1).to(self.device)
return DataLoader(TensorDataset(x_cont, y, *x_cat.values()), batch_size=batch_size, shuffle=shuffle)
def _batch_to_input(self, batch):
x_cont, y, *cat_vals = batch
x_cat_dict = {col: val for col, val in zip(self.categorical_cols, cat_vals)}
return x_cat_dict, x_cont, y
def train(self, train_df, val_df=None, epochs=10, lr=0.001, batch_size=32):
train_loader = self._prepare_loader(train_df, batch_size)
val_loader = self._prepare_loader(val_df, batch_size, shuffle=False) if val_df is not None else None
optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
criterion = nn.MSELoss()
for epoch in range(epochs):
self.model.train()
total_loss = 0
for batch in train_loader:
x_cat, x_cont, y = self._batch_to_input(batch)
pred = self.model(x_cat, x_cont)
loss = criterion(pred, y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"[Train] Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}")
if val_loader:
self.evaluate(val_df, prefix="[Val]")
def evaluate(self, test_df, prefix="[Test]"):
test_loader = self._prepare_loader(test_df, shuffle=False)
self.model.eval()
total_loss = 0
criterion = nn.MSELoss()
with torch.no_grad():
for batch in test_loader:
x_cat, x_cont, y = self._batch_to_input(batch)
pred = self.model(x_cat, x_cont)
loss = criterion(pred, y)
total_loss += loss.item()
print(f"{prefix} MSE: {total_loss/len(test_loader):.4f}")
def predict(self, input_df):
self.model.eval()
loader = self._prepare_loader(input_df, shuffle=False)
predictions = []
with torch.no_grad():
for batch in loader:
x_cat, x_cont, _ = self._batch_to_input(batch)
pred = self.model(x_cat, x_cont)
predictions.append(pred.cpu())
return torch.cat(predictions).numpy()
def save(self, path):
torch.save(self.model.state_dict(), path)
def load(self, path):
self.model.load_state_dict(torch.load(path, map_location=self.device))
✅ 测试示例
import pandas as pd
from sklearn.model_selection import train_test_split
from ml_adapters.dnn_regression import DNNRegressionAdapter
# 生成测试数据
df = pd.DataFrame({
"city": pd.Categorical(["A", "B", "A", "C", "B", "A"]).codes,
"material": pd.Categorical(["wood", "steel", "brick", "wood", "steel", "brick"]).codes,
"area": [50, 60, 70, 80, 90, 100],
"distance": [3, 4, 5, 6, 7, 8],
"price": [100, 120, 140, 160, 180, 200]
})
train_df, test_df = train_test_split(df, test_size=0.33)
model = DNNRegressionAdapter(
df=train_df,
categorical_cols=["city", "material"],
continuous_cols=["area", "distance"],
target_col="price"
)
model.train(train_df, test_df, epochs=10)
model.evaluate(test_df)
preds = model.predict(test_df)
print("预测结果:", preds)
# 模型保存 & 加载
model.save("dnn_model.pth")
model.load("dnn_model.pth")
✅ 支持集成任务流程
你可以把这个类注册到你的适配器工厂,例如:
ADAPTERS = {
"dnn_regression": DNNRegressionAdapter,
# future: "linear_regression": LinearRegressionAdapter,
}
是否需要我继续:
- 将其接入训练主流程管理器
- 添加日志记录与任务 ID 标识
- 提供模型发布(如 S3 上传)与预测服务接口
需要我继续完善哪些部分?
是的,可以并且推荐将归一化和反归一化逻辑抽象出来,作为可插拔的归一化策略模块(Strategy Pattern),这样可以:
- ✅ 支持多种归一化方法(MinMax、Z-score、Log 等);
- ✅ 保证归一化与反归一化是一一对应的;
- ✅ 保持训练模块解耦,只依赖统一接口;
- ✅ 易于测试和扩展。
✅ 一、结构设计(归一化策略接口)
normalization/
├── __init__.py
├── base.py # 抽象基类
├── minmax.py # MinMax归一化
├── zscore.py # ZScore标准化
├── factory.py # 创建策略的工厂方法
✅ 二、代码实现
base.py
(抽象归一化接口)
from abc import ABC, abstractmethod
import pandas as pd
class Normalizer(ABC):
@abstractmethod
def fit(self, series: pd.Series):
"""计算归一化所需的参数"""
pass
@abstractmethod
def transform(self, series: pd.Series) -> pd.Series:
"""执行归一化"""
pass
@abstractmethod
def inverse_transform(self, series: pd.Series) -> pd.Series:
"""反归一化"""
pass
@abstractmethod
def get_params(self) -> dict:
"""返回参数(用于持久化或调试)"""
pass
minmax.py
from .base import Normalizer
import pandas as pd
class MinMaxNormalizer(Normalizer):
def __init__(self):
self.min_val = None
self.max_val = None
def fit(self, series: pd.Series):
self.min_val = float(series.min())
self.max_val = float(series.max())
def transform(self, series: pd.Series) -> pd.Series:
return (series - self.min_val) / (self.max_val - self.min_val + 1e-8)
def inverse_transform(self, series: pd.Series) -> pd.Series:
return series * (self.max_val - self.min_val) + self.min_val
def get_params(self):
return {"min": self.min_val, "max": self.max_val}
zscore.py
from .base import Normalizer
import pandas as pd
class ZScoreNormalizer(Normalizer):
def __init__(self):
self.mean = None
self.std = None
def fit(self, series: pd.Series):
self.mean = float(series.mean())
self.std = float(series.std())
def transform(self, series: pd.Series) -> pd.Series:
return (series - self.mean) / (self.std + 1e-8)
def inverse_transform(self, series: pd.Series) -> pd.Series:
return series * self.std + self.mean
def get_params(self):
return {"mean": self.mean, "std": self.std}
factory.py
(策略工厂)
from .minmax import MinMaxNormalizer
from .zscore import ZScoreNormalizer
def get_normalizer(strategy: str):
"""
获取归一化策略
"""
strategy = strategy.lower()
if strategy == "minmax":
return MinMaxNormalizer()
elif strategy == "zscore":
return ZScoreNormalizer()
else:
raise ValueError(f"Unsupported normalization strategy: {strategy}")
✅ 三、在数据预处理中使用归一化策略
在 DataPreprocessor
中集成:
from normalization.factory import get_normalizer
class DataPreprocessor:
def __init__(self, factor_configs: Dict[str, Dict]):
self.factor_configs = factor_configs
self.normalizers = {}
def normalize(self, df: pd.DataFrame) -> pd.DataFrame:
for col, cfg in self.factor_configs.items():
if cfg.get("type") == "numerical" and cfg.get("normalize") and not cfg.get("encoded"):
strategy = cfg.get("strategy", "minmax")
normalizer = get_normalizer(strategy)
normalizer.fit(df[col])
df[col] = normalizer.transform(df[col])
self.normalizers[col] = normalizer
return df
def inverse_normalize(self, df: pd.DataFrame) -> pd.DataFrame:
for col, normalizer in self.normalizers.items():
df[col] = normalizer.inverse_transform(df[col])
return df
✅ 四、字段配置中的扩展参数示例
factor_configs = {
"age": {
"type": "numerical",
"normalize": True,
"strategy": "minmax", # 也可以使用 "zscore"
"encoded": False
},
"height": {
"type": "numerical",
"normalize": True,
"strategy": "zscore",
"encoded": False
}
}
✅ 五、优势总结
优点 | 描述 |
---|---|
✅ 插拔式策略 | 可以快速添加自定义归一化方式 |
✅ 接口统一 | 所有归一化类都有 fit/transform/inverse_transform |
✅ 模块解耦 | 训练器/预处理器不依赖具体实现细节 |
✅ 适配场景广 | 不同模型可要求不同字段归一化策略 |
是否还需要我:
- 增加自定义归一化方式(如对数归一化、最大绝对值归一化)?
- 将归一化策略序列化保存,方便模型部署反归一化时加载使用?
- 加入日志记录和调试模式?
我可以根据你的使用场景继续封装。
为了在模型部署时支持 反归一化操作,我们需要将每个字段使用的归一化策略和其参数进行 序列化保存(保存到文件或数据库),并在加载模型时一并还原这些归一化配置。
✅ 一、核心目标
- 在训练阶段:
- 每个字段的归一化策略(如 MinMax、ZScore)和对应参数被保存;
- 在预测/部署阶段:
- 从保存的配置中 加载归一化器对象,用于
inverse_transform
反归一化操作。
- 从保存的配置中 加载归一化器对象,用于
✅ 二、改造 Normalizer 接口(支持保存/加载)
base.py
增加 to_dict()
/ from_dict()
方法
from abc import ABC, abstractmethod
import pandas as pd
class Normalizer(ABC):
@abstractmethod
def fit(self, series: pd.Series):
pass
@abstractmethod
def transform(self, series: pd.Series) -> pd.Series:
pass
@abstractmethod
def inverse_transform(self, series: pd.Series) -> pd.Series:
pass
@abstractmethod
def get_params(self) -> dict:
pass
@abstractmethod
def to_dict(self) -> dict:
"""用于序列化保存"""
pass
@classmethod
@abstractmethod
def from_dict(cls, data: dict):
"""从序列化数据中还原归一化器"""
pass
✅ 三、实现 MinMaxNormalizer 的保存与加载
minmax.py
from .base import Normalizer
import pandas as pd
class MinMaxNormalizer(Normalizer):
def __init__(self, min_val=None, max_val=None):
self.min_val = min_val
self.max_val = max_val
def fit(self, series: pd.Series):
self.min_val = float(series.min())
self.max_val = float(series.max())
def transform(self, series: pd.Series) -> pd.Series:
return (series - self.min_val) / (self.max_val - self.min_val + 1e-8)
def inverse_transform(self, series: pd.Series) -> pd.Series:
return series * (self.max_val - self.min_val) + self.min_val
def get_params(self):
return {"min": self.min_val, "max": self.max_val}
def to_dict(self):
return {
"type": "minmax",
"min_val": self.min_val,
"max_val": self.max_val
}
@classmethod
def from_dict(cls, data: dict):
return cls(min_val=data["min_val"], max_val=data["max_val"])
✅ 四、通用加载函数
factory.py
扩展反序列化功能
from .minmax import MinMaxNormalizer
from .zscore import ZScoreNormalizer
NORMALIZER_CLASSES = {
"minmax": MinMaxNormalizer,
"zscore": ZScoreNormalizer
}
def get_normalizer(strategy: str):
strategy = strategy.lower()
if strategy in NORMALIZER_CLASSES:
return NORMALIZER_CLASSES[strategy]()
else:
raise ValueError(f"Unsupported normalization strategy: {strategy}")
def load_normalizer(data: dict):
strategy = data["type"]
if strategy in NORMALIZER_CLASSES:
return NORMALIZER_CLASSES[strategy].from_dict(data)
else:
raise ValueError(f"Unknown normalizer type in serialized data: {strategy}")
✅ 五、保存和加载示例(JSON 文件)
✅ 保存归一化器配置
import json
def save_normalizers(normalizers: dict, path: str):
"""
保存归一化器配置为 JSON 文件
"""
data = {col: normalizer.to_dict() for col, normalizer in normalizers.items()}
with open(path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2)
✅ 加载归一化器配置
from normalization.factory import load_normalizer
def load_normalizers(path: str) -> dict:
"""
加载归一化器配置
"""
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
return {col: load_normalizer(cfg) for col, cfg in data.items()}
✅ 六、在训练与预测阶段使用
训练结束时保存配置:
preprocessor = DataPreprocessor(factor_configs)
df = preprocessor.normalize(df)
save_normalizers(preprocessor.normalizers, "normalizers.json")
部署预测时加载配置并反归一化:
normalizers = load_normalizers("normalizers.json")
for col, normalizer in normalizers.items():
df[col] = normalizer.inverse_transform(df[col])
✅ 七、总结优势
功能点 | 说明 |
---|---|
✅ 归一化策略可选 | 每个字段独立选择合适的归一化策略 |
✅ 可反归一化 | 支持模型预测输出回转到原始单位 |
✅ 可持久化 | 可存为 JSON,适合模型部署与调试 |
✅ 模型解耦 | 模型本身只处理数值,不关心归一化策略 |
如果你需要,我也可以扩展支持:
- 将归一化策略保存在数据库中;
- 将
normalizers.json
自动与模型文件打包; - 多模型共享的归一化缓存目录设计。
是否需要将它们集成进你当前的训练流程结构?
反归一化的使用位置取决于你的 目标 是什么:可视化指标、还原输出、或生成结果报告。下面是详细的说明:
✅ 一、反归一化应该使用在哪些场景?
📌 1. 预测结果输出前(部署或离线推理)
场景:你使用模型预测数据后,需要将模型输出还原为用户可理解的原始单位(如价格、温度、销售量等)。
- 位置:模型预测结果之后,模型输出 →
inverse_transform
- 是否必须:✅必须,如果模型输出的是归一化后的值
# 模型预测
y_pred_norm = model.predict(X_test)
# 反归一化
y_pred = normalizer.inverse_transform(y_pred_norm)
📌 2. 评估指标计算前(MSE、MAE、R² 等)
场景:你希望评估指标的单位是“真实世界单位”,否则归一化数据下的 MAE、MSE 没有实际意义。
- 位置:模型预测 + 标签都需反归一化后再计算指标
- 是否必须:✅推荐(除非你就想看归一化单位下的误差)
# 反归一化
y_pred = normalizer.inverse_transform(y_pred_norm)
y_true = normalizer.inverse_transform(y_test)
# 再进行误差评估
mae = mean_absolute_error(y_true, y_pred)
📌 3. 训练集输出可视化(非必须)
场景:如果你在训练后想画图观察模型对训练集拟合得如何,通常你也会想看原始单位的输出。
- 位置:仅用于可视化、报告生成等环节
- 是否必须:❌可选
✅ 二、反归一化应该 不使用 的地方
🚫 1. 模型内部(例如 loss 计算)
- 在模型训练过程中,所有的 loss 计算、反向传播都应在 归一化后的数据 上进行。
- 不应在模型内部对输出
inverse_transform
后再计算 loss。
# 错误 ❌
loss = MSE(y_true, normalizer.inverse_transform(model(X)))
# 正确 ✅
loss = MSE(y_true_normalized, model(X))
✅ 三、你的情况说明
我现在是在模型训练后的外部对训练数据进行的反归一化
✅ 正确做法应是:
- 训练前归一化输入和输出
- 训练时使用归一化后的数据
- 训练后不对训练数据反归一化
- 除非你要画图看模型效果,或输出预测结果
- 预测时模型输出进行反归一化
✅ 四、总结
阶段 | 是否归一化 | 是否反归一化 | 原因 |
---|---|---|---|
训练前数据 | ✅ 需要 | ❌ 不需要 | 提升模型训练稳定性 |
模型内部 | ✅ 使用归一化 | ❌ 不做反归一化 | loss 计算基于归一化 |
模型输出 | ✅ 输出归一化 | ✅ 再反归一化 | 提供用户真实单位的预测 |
评估指标 | ✅ 预测+标签反归一化 | ✅ | 保证指标有实际意义 |
可视化 | ✅ 反归一化 | ✅ | 呈现原始值曲线 |
是否需要我整合到你目前的模型训练和预测框架中?我可以直接帮你写一个完整的训练 + 预测 + 反归一化评估的示例。
下面是一个整合了训练、预测、反归一化、评估的完整流程代码示例,使用线性回归(或 DNN)模型为例,并且适配你的框架结构:
✅ 一、目录结构概览(推荐)
ml_platform/
├── normalizers/
│ └── base.py # 归一化策略基础类
│ └── minmax.py # MinMax归一化
├── models/
│ └── base_adapter.py # 模型适配器接口
│ └── linear.py # 线性模型适配器
├── trainer/
│ └── trainer.py # 核心训练与评估逻辑
├── utils/
│ └── io.py # 模型保存与归一化参数保存
└── main.py # 示例入口点
✅ 二、归一化组件(minmax.py)
# ml_platform/normalizers/minmax.py
import numpy as np
import json
class MinMaxNormalizer:
def __init__(self):
self.min = None
self.max = None
def fit(self, data: np.ndarray):
self.min = data.min(axis=0)
self.max = data.max(axis=0)
def transform(self, data: np.ndarray) -> np.ndarray:
return (data - self.min) / (self.max - self.min + 1e-8)
def inverse_transform(self, data: np.ndarray) -> np.ndarray:
return data * (self.max - self.min + 1e-8) + self.min
def save(self, filepath):
with open(filepath, 'w') as f:
json.dump({'min': self.min.tolist(), 'max': self.max.tolist()}, f)
def load(self, filepath):
with open(filepath, 'r') as f:
data = json.load(f)
self.min = np.array(data['min'])
self.max = np.array(data['max'])
✅ 三、模型适配器接口与线性实现(base_adapter.py, linear.py)
# ml_platform/models/base_adapter.py
class BaseModelAdapter:
def train(self, X_train, y_train, X_val, y_val): ...
def predict(self, X): ...
def save(self, path): ...
def load(self, path): ...
# ml_platform/models/linear.py
from sklearn.linear_model import LinearRegression
from .base_adapter import BaseModelAdapter
import joblib
class LinearModelAdapter(BaseModelAdapter):
def __init__(self):
self.model = LinearRegression()
def train(self, X_train, y_train, X_val=None, y_val=None):
self.model.fit(X_train, y_train)
def predict(self, X):
return self.model.predict(X)
def save(self, path):
joblib.dump(self.model, path)
def load(self, path):
self.model = joblib.load(path)
✅ 四、训练器实现(trainer.py)
# ml_platform/trainer/trainer.py
import numpy as np
from ml_platform.normalizers.minmax import MinMaxNormalizer
class ModelTrainer:
def __init__(self, model_adapter, normalization_enabled=True):
self.model = model_adapter
self.normalizer = MinMaxNormalizer() if normalization_enabled else None
def train(self, X_train, y_train, X_val=None, y_val=None):
if self.normalizer:
self.normalizer.fit(y_train)
y_train = self.normalizer.transform(y_train)
if y_val is not None:
y_val = self.normalizer.transform(y_val)
self.model.train(X_train, y_train, X_val, y_val)
def predict(self, X):
y_pred = self.model.predict(X)
if self.normalizer:
y_pred = self.normalizer.inverse_transform(y_pred)
return y_pred
def evaluate(self, X_test, y_test):
y_pred = self.model.predict(X_test)
if self.normalizer:
y_pred = self.normalizer.inverse_transform(y_pred)
y_test = self.normalizer.inverse_transform(y_test)
mae = np.mean(np.abs(y_test - y_pred))
mse = np.mean((y_test - y_pred) ** 2)
return {"MAE": mae, "MSE": mse}
✅ 五、测试入口(main.py)
# ml_platform/main.py
import numpy as np
from ml_platform.models.linear import LinearModelAdapter
from ml_platform.trainer.trainer import ModelTrainer
# 1. 模拟数据
X = np.random.rand(100, 5)
y = X @ np.array([1.5, -2, 0.5, 3.0, 0.2]) + 0.1 * np.random.randn(100)
# 2. 划分数据集
train_size = int(len(X) * 0.7)
val_size = int(len(X) * 0.1)
X_train, X_val, X_test = X[:train_size], X[train_size:train_size+val_size], X[train_size+val_size:]
y_train, y_val, y_test = y[:train_size], y[train_size:train_size+val_size], y[train_size+val_size:]
# 3. 模型初始化与训练
model_adapter = LinearModelAdapter()
trainer = ModelTrainer(model_adapter)
trainer.train(X_train, y_train.reshape(-1, 1), X_val, y_val.reshape(-1, 1))
# 4. 模型预测与评估
pred = trainer.predict(X_test)
metrics = trainer.evaluate(X_test, y_test.reshape(-1, 1))
print("预测值:", pred[:5].flatten())
print("评估结果:", metrics)
✅ 六、重点说明
模块 | 说明 |
---|---|
MinMaxNormalizer |
实现了 fit , transform , inverse_transform , 可序列化 |
LinearModelAdapter |
适配 sklearn 的线性回归模型,封装 train , predict , save/load |
ModelTrainer |
负责整体训练流程,适配归一化、反归一化逻辑与评估 |
main.py |
模拟流程并展示整个训练、预测、评估过程 |
是否还希望我添加:
- 对归一化参数的保存/加载逻辑(部署使用)?
- 支持流式训练日志回传给前端?
- 接入任务队列(如 Celery)调用整个流程?
我们将归一化与反归一化模块作为流程节点 NormalizationNode
,整合进现有的训练适配器结构中,并支持日志回传、策略保存、加载与切换归一化方法。以下是完整的模块集成方案。
✅ 模块结构说明
ml_pipeline/
│
├── base_node.py # 流程节点基类
├── normalization_strategies.py # 多种归一化策略定义
├── normalization_node.py # 封装的归一化流程节点
├── training_flow.py # 训练流程定义与任务运行管理
├── adapter_base.py # 模型训练适配器基类
├── dnn_regression_adapter.py # 示例:DNN回归模型适配器
└── utils/
└── logger.py # 日志回传模块
1️⃣ base_node.py
# ml_pipeline/base_node.py
from abc import ABC, abstractmethod
class BaseNode(ABC):
def __init__(self, name: str):
self.name = name
@abstractmethod
def run(self, context: dict) -> dict:
"""执行当前节点的逻辑,接收并返回 context"""
pass
2️⃣ normalization_strategies.py
# ml_pipeline/normalization_strategies.py
import pandas as pd
import joblib
from abc import ABC, abstractmethod
class NormalizationStrategy(ABC):
@abstractmethod
def fit(self, data: pd.DataFrame): pass
@abstractmethod
def transform(self, data: pd.DataFrame) -> pd.DataFrame: pass
@abstractmethod
def inverse_transform(self, data: pd.DataFrame) -> pd.DataFrame: pass
def save(self, path: str):
joblib.dump(self, path)
def load(self, path: str):
obj = joblib.load(path)
self.__dict__.update(obj.__dict__)
class MinMaxNormalization(NormalizationStrategy):
def __init__(self):
self.min = None
self.max = None
def fit(self, data: pd.DataFrame):
self.min = data.min()
self.max = data.max()
def transform(self, data: pd.DataFrame) -> pd.DataFrame:
return (data - self.min) / (self.max - self.min + 1e-8)
def inverse_transform(self, data: pd.DataFrame) -> pd.DataFrame:
return data * (self.max - self.min + 1e-8) + self.min
class StandardNormalization(NormalizationStrategy):
def __init__(self):
self.mean = None
self.std = None
def fit(self, data: pd.DataFrame):
self.mean = data.mean()
self.std = data.std()
def transform(self, data: pd.DataFrame) -> pd.DataFrame:
return (data - self.mean) / (self.std + 1e-8)
def inverse_transform(self, data: pd.DataFrame) -> pd.DataFrame:
return data * (self.std + 1e-8) + self.mean
def get_normalization_strategy(name: str) -> NormalizationStrategy:
if name == "minmax":
return MinMaxNormalization()
elif name == "standard":
return StandardNormalization()
else:
raise ValueError(f"Unknown normalization strategy: {name}")
3️⃣ normalization_node.py
# ml_pipeline/normalization_node.py
from .base_node import BaseNode
from .normalization_strategies import get_normalization_strategy
from typing import Optional
import pandas as pd
import os
class NormalizationNode(BaseNode):
def __init__(self, strategy_name: str, normalize_columns: Optional[list] = None, save_path: Optional[str] = None):
super().__init__("NormalizationNode")
self.strategy_name = strategy_name
self.normalize_columns = normalize_columns
self.save_path = save_path or f"./normalizers/{strategy_name}_normalizer.pkl"
os.makedirs(os.path.dirname(self.save_path), exist_ok=True)
def run(self, context: dict) -> dict:
df = context["train_data"]
strategy = get_normalization_strategy(self.strategy_name)
columns = self.normalize_columns or df.select_dtypes(include=['float', 'int']).columns.tolist()
strategy.fit(df[columns])
df[columns] = strategy.transform(df[columns])
context["train_data"] = df
strategy.save(self.save_path)
context["normalization_strategy"] = strategy
context["normalize_columns"] = columns
return context
4️⃣ 训练流程整合(training_flow.py)
# ml_pipeline/training_flow.py
from .normalization_node import NormalizationNode
from .base_node import BaseNode
from typing import List
import logging
logger = logging.getLogger(__name__)
class TrainingFlow:
def __init__(self, nodes: List[BaseNode]):
self.nodes = nodes
def run(self, context: dict):
for node in self.nodes:
logger.info(f"Running node: {node.name}")
context = node.run(context)
return context
✅ 示例使用(run_training.py)
from ml_pipeline.training_flow import TrainingFlow
from ml_pipeline.normalization_node import NormalizationNode
from mynodes.data_cleaning_node import DataCleaningNode
from mynodes.train_model_node import TrainModelNode
import pandas as pd
# 加载训练数据
df = pd.read_csv("data.csv")
context = {
"train_data": df,
"model_type": "dnn_regression"
}
flow = TrainingFlow(nodes=[
DataCleaningNode(),
NormalizationNode(strategy_name="minmax"),
TrainModelNode()
])
context = flow.run(context)
🔁 在模型推理时反归一化(例如预测)
# predict.py
from ml_pipeline.normalization_strategies import get_normalization_strategy
strategy = get_normalization_strategy("minmax")
strategy.load("./normalizers/minmax_normalizer.pkl")
# 假设 result 是预测输出
inverse_result = strategy.inverse_transform(result)
🔍 总结
步骤 | 功能 | 模块 |
---|---|---|
初始化 | 归一化节点添加到流程 | NormalizationNode |
执行 | 归一化训练数据 | run() 方法 |
保存 | 归一化策略持久化 | joblib.dump() |
预测 | 使用相同策略反归一化预测值 | inverse_transform() |
需要我进一步为你接入模型适配器结构(比如 DNNRegressionAdapter
)并结合流程节点自动运行吗?我也可以帮你生成对应的训练日志流式返回逻辑。
下面是完整机制的实现,确保训练和预测阶段使用一致的归一化策略,包括:
- ✅ 流程节点封装(归一化、反归一化)
- ✅ 策略保存与加载
- ✅ 模型元信息记录归一化策略路径
- ✅ 示例代码集成与注释
✅ 1. 归一化策略基类 + 多策略实现
# normalization/strategies/base.py
import abc
import pandas as pd
import pickle
class BaseNormalizer(abc.ABC):
def __init__(self, fields: list[str]):
self.fields = fields
@abc.abstractmethod
def fit(self, df: pd.DataFrame): ...
@abc.abstractmethod
def transform(self, df: pd.DataFrame) -> pd.DataFrame: ...
@abc.abstractmethod
def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame: ...
def save(self, path: str):
with open(path, "wb") as f:
pickle.dump(self, f)
@classmethod
def load(cls, path: str) -> "BaseNormalizer":
with open(path, "rb") as f:
return pickle.load(f)
# normalization/strategies/minmax.py
import pandas as pd
from .base import BaseNormalizer
class MinMaxNormalizer(BaseNormalizer):
def __init__(self, fields: list[str]):
super().__init__(fields)
self.min = {}
self.max = {}
def fit(self, df: pd.DataFrame):
for field in self.fields:
self.min[field] = df[field].min()
self.max[field] = df[field].max()
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
for field in self.fields:
df[field] = (df[field] - self.min[field]) / (self.max[field] - self.min[field])
return df
def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
for field in self.fields:
df[field] = df[field] * (self.max[field] - self.min[field]) + self.min[field]
return df
✅ 2. 归一化策略工厂
# normalization/factory.py
from .strategies.minmax import MinMaxNormalizer
def get_normalizer(name: str, fields: list[str]):
if name == "minmax":
return MinMaxNormalizer(fields)
raise ValueError(f"Unsupported normalization strategy: {name}")
✅ 3. 封装流程节点(Node)
# pipeline/nodes/normalize.py
from normalization.factory import get_normalizer
class NormalizationNode:
def __init__(self, strategy_name: str, fields: list[str], save_path: str):
self.strategy_name = strategy_name
self.fields = fields
self.save_path = save_path
def run(self, df: pd.DataFrame) -> pd.DataFrame:
strategy = get_normalizer(self.strategy_name, self.fields)
strategy.fit(df)
df_transformed = strategy.transform(df)
strategy.save(self.save_path)
return df_transformed
# pipeline/nodes/inverse_normalize.py
from normalization.factory import get_normalizer
class InverseNormalizationNode:
def __init__(self, strategy_name: str, fields: list[str], load_path: str):
self.strategy_name = strategy_name
self.fields = fields
self.load_path = load_path
def run(self, df: pd.DataFrame) -> pd.DataFrame:
strategy = get_normalizer(self.strategy_name, self.fields)
strategy = strategy.load(self.load_path)
return strategy.inverse_transform(df)
✅ 4. 模型元信息保存格式(示例)
{
"model_id": "model_xyz",
"normalization": {
"strategy": "minmax",
"fields": ["age", "income"],
"path": "./models/model_xyz/normalizer_minmax.pkl"
}
}
可以保存在数据库字段、JSON配置或模型目录中。
✅ 5. 在训练流程中使用
df_raw = pd.read_csv(dataset_path)
fields_to_normalize = ["age", "income"]
# 步骤1:归一化
norm_node = NormalizationNode("minmax", fields_to_normalize, save_path="./models/model_xyz/normalizer_minmax.pkl")
df_normalized = norm_node.run(df_raw)
# 步骤2:训练模型...
✅ 6. 在预测流程中使用
df_input = pd.read_csv(input_path)
fields_to_normalize = ["age", "income"]
# 步骤1:归一化输入
strategy = get_normalizer("minmax", fields_to_normalize).load("./models/model_xyz/normalizer_minmax.pkl")
df_input_normalized = strategy.transform(df_input)
# 步骤2:预测
predictions = model.predict(df_input_normalized)
# 步骤3:反归一化
inverse_node = InverseNormalizationNode("minmax", ["target"], "./models/model_xyz/normalizer_minmax.pkl")
df_output = inverse_node.run(predictions)
✅ 下一步建议
你可以:
- ✅ 将这些节点整合入你已有的
TrainingPipeline
中 - ✅ 为预测流程增加相应的标准 Node
- ✅ 添加数据库记录支持记录
normalizer_path
是否需要我为你:
- 添加其他归一化策略(如 Z-Score、Robust)?
- 将这些节点自动加入你的流程框架中并支持 YAML/JSON 配置方式加载?
- 为训练/预测流程封装完整的 CLI 或 Django API 接口?
告诉我你的下一步目标,我帮你完善。
当然可以!你完全可以不使用任何第三方库,使用 Python 标准库 + 自定义结构 来实现一个模块化、流程化的模型训练系统。
下面我将为你提供一个不依赖任何第三方框架(如 Airflow、Prefect、Hydra 等)的训练流程设计方案,包含:
- ✅ 核心流程引擎(支持链式 / 函数式)
- ✅ 配置驱动支持(基于字典或 JSON)
- ✅ 无第三方依赖
- ✅ 简洁清晰、适合平台扩展
✅ 一、核心思路
- 每个流程步骤是一个类,继承
BaseNode
,实现run(context)
方法。 context
是一个 Python 字典,传递数据。Pipeline
类接收多个步骤,并依次调用它们的run()
。
✅ 二、完整示例代码(无第三方依赖)
1️⃣ base_node.py
— 基础流程节点定义
# base_node.py
from abc import ABC, abstractmethod
class BaseNode(ABC):
def __init__(self, name: str):
self.name = name
@abstractmethod
def run(self, context: dict) -> dict:
"""处理 context 并返回更新后的 context"""
pass
2️⃣ 自定义几个节点(步骤模块)
# nodes.py
from base_node import BaseNode
class LoadDatasetNode(BaseNode):
def __init__(self, paths: list[str]):
super().__init__("LoadDataset")
self.paths = paths
def run(self, context: dict) -> dict:
print(f"[{self.name}] 加载数据: {self.paths}")
context["raw_data"] = f"Data from {self.paths}"
return context
class CleanDataNode(BaseNode):
def __init__(self, required_columns: list[str]):
super().__init__("CleanData")
self.required_columns = required_columns
def run(self, context: dict) -> dict:
print(f"[{self.name}] 清洗数据:保留 {self.required_columns}")
context["clean_data"] = f"Cleaned({context['raw_data']})"
return context
class TrainModelNode(BaseNode):
def __init__(self, model_type: str, params: dict):
super().__init__("TrainModel")
self.model_type = model_type
self.params = params
def run(self, context: dict) -> dict:
print(f"[{self.name}] 训练模型:{self.model_type} with {self.params}")
context["trained_model"] = f"Model({self.model_type})"
return context
3️⃣ pipeline.py
— 流程管理器
# pipeline.py
from typing import List
from base_node import BaseNode
class Pipeline:
def __init__(self):
self.steps: List[BaseNode] = []
def add(self, step: BaseNode):
self.steps.append(step)
return self # 支持链式调用
def run(self, context=None):
context = context or {}
for step in self.steps:
print(f"\n➡️ 执行步骤:{step.name}")
context = step.run(context)
print("\n✅ 流程完成")
return context
4️⃣ 使用示例:链式方式调用流程
# run_pipeline.py
from pipeline import Pipeline
from nodes import LoadDatasetNode, CleanDataNode, TrainModelNode
pipeline = (
Pipeline()
.add(LoadDatasetNode(paths=["data1.xlsx", "data2.xlsx"]))
.add(CleanDataNode(required_columns=["age", "salary"]))
.add(TrainModelNode(model_type="dnn", params={"layers": [64, 32]}))
)
final_context = pipeline.run()
print("\n最终输出上下文:", final_context)
✅ 三、支持 JSON/字典 配置执行
用字典定义:
pipeline_config = [
{"type": "LoadDatasetNode", "params": {"paths": ["a.xlsx", "b.xlsx"]}},
{"type": "CleanDataNode", "params": {"required_columns": ["x", "y"]}},
{"type": "TrainModelNode", "params": {"model_type": "dnn", "params": {"layers": [64]}}}
]
加载执行(无需外部依赖):
# dynamic_pipeline.py
from nodes import LoadDatasetNode, CleanDataNode, TrainModelNode
from pipeline import Pipeline
NODE_MAP = {
"LoadDatasetNode": LoadDatasetNode,
"CleanDataNode": CleanDataNode,
"TrainModelNode": TrainModelNode,
}
def load_pipeline_from_config(config: list[dict]) -> Pipeline:
pipeline = Pipeline()
for step in config:
node_class = NODE_MAP[step["type"]]
node = node_class(**step["params"])
pipeline.add(node)
return pipeline
✅ 四、特点总结
特性 | 说明 |
---|---|
✅ 无依赖 | 全部使用 Python 标准库 |
🔌 模块解耦 | 每个步骤单独封装,便于复用与测试 |
📦 易平台集成 | 可嵌入 Django/Celery 任意任务中 |
📋 可配置 | 字典或 JSON 控制流程 |
🧩 可扩展 | 新节点只需继承 BaseNode 即可 |
✅ 是否继续?
我可以帮你:
- 🔁 增加数据预处理、归一化、模型保存等 Node
- 🧪 生成一个测试运行示例(训练 + 模型保存 + 反归一化)
- ☁️ 对接 Celery 异步队列/日志流式输出
- 🛠️ 整理为一个 Python 包结构,供你项目导入使用
你想要我下一步完善哪部分?是否希望我提供完整的项目结构并封装为模块包?
很好,我们来扩展你的无第三方依赖训练流程框架:
✅ 目标
- 增加以下通用节点:
PreprocessNode
(数据预处理,如缺失填充、类型转换等)NormalizeNode
(数值型归一化)SaveModelNode
(保存模型结构或权重信息)
- 每个节点支持一个可扩展的
persist()
机制:- 可选是否持久化结果(如保存至 S3、本地或数据库)
- 暂不实现逻辑,仅设计通用接口与结构
✅ 修改 BaseNode
:添加持久化挂钩
# base_node.py
from abc import ABC, abstractmethod
class BaseNode(ABC):
def __init__(self, name: str, persist: bool = False):
self.name = name
self.persist = persist # 是否启用持久化
@abstractmethod
def run(self, context: dict) -> dict:
pass
def save_result(self, key: str, value):
"""用于保存结果,可扩展到 S3、数据库或本地"""
if self.persist:
print(f"[{self.name}] 🔄 模拟持久化保存:{key} -> {value[:50]}...")
# 真实实现中可调用 save_to_s3/save_to_db/save_to_disk 等
✅ 新增节点 1:预处理节点 PreprocessNode
# nodes.py
class PreprocessNode(BaseNode):
def __init__(self, fields: list[str], fillna: float = 0.0, persist: bool = False):
super().__init__("Preprocess", persist)
self.fields = fields
self.fillna = fillna
def run(self, context: dict) -> dict:
print(f"[{self.name}] 数据预处理:填充字段 {self.fields} 的缺失值为 {self.fillna}")
data = context["clean_data"]
processed = f"Preprocessed({data})"
context["preprocessed_data"] = processed
self.save_result("preprocessed_data", processed)
return context
✅ 新增节点 2:归一化节点 NormalizeNode
class NormalizeNode(BaseNode):
def __init__(self, fields: list[str], strategy: str = "minmax", persist: bool = False):
super().__init__("Normalize", persist)
self.fields = fields
self.strategy = strategy
def run(self, context: dict) -> dict:
print(f"[{self.name}] 对字段 {self.fields} 使用策略 {self.strategy} 归一化")
data = context.get("preprocessed_data") or context.get("clean_data")
normalized = f"Normalized({data})"
context["normalized_data"] = normalized
self.save_result("normalized_data", normalized)
return context
✅ 新增节点 3:模型保存节点 SaveModelNode
class SaveModelNode(BaseNode):
def __init__(self, save_path: str, persist: bool = False):
super().__init__("SaveModel", persist)
self.save_path = save_path
def run(self, context: dict) -> dict:
model = context.get("trained_model")
print(f"[{self.name}] 保存模型到路径 {self.save_path}")
self.save_result("model", f"ModelFile({model})")
context["model_path"] = self.save_path
return context
✅ 使用示例(链式或函数式均可)
pipeline = (
Pipeline()
.add(LoadDatasetNode(paths=["a.xlsx", "b.xlsx"]))
.add(CleanDataNode(required_columns=["age", "income"]))
.add(PreprocessNode(fields=["age", "income"], fillna=0.0, persist=True))
.add(NormalizeNode(fields=["age", "income"], persist=True))
.add(TrainModelNode(model_type="dnn", params={"layers": [64, 32]}))
.add(SaveModelNode(save_path="./model/dnn.pkl", persist=True))
)
context = pipeline.run()
✅ 持久化扩展建议
你可以通过策略类实现持久化支持:
class ResultSaver:
def __init__(self, mode="local"):
self.mode = mode # 可为 'local', 's3', 'db'
def save(self, key: str, value):
if self.mode == "local":
print(f"[Local] Save {key}")
elif self.mode == "s3":
print(f"[S3] Upload {key}")
elif self.mode == "db":
print(f"[DB] Insert {key}")
在每个 Node
中调用 ResultSaver().save(key, value)
即可轻松扩展。
✅ 小结
模块 | 说明 |
---|---|
PreprocessNode |
执行字段的缺失填充等预处理操作 |
NormalizeNode |
支持不同归一化策略(占位实现) |
SaveModelNode |
模拟模型文件保存 |
BaseNode.persist |
控制是否保存节点中间数据 |
save_result() |
模拟将结果写入存储系统,后续可注入实际逻辑 |
是否需要我为你:
- 💾 添加
ResultSaver
可注入策略示例 - ☁️ 整合 Django 中的 S3 存储(例如使用
boto3
)作为save_result()
的默认实现 - 🧪 为每个 Node 写一份单元测试/模拟上下文测试
告诉我你希望优先推进哪一块,我立刻为你扩展。
当然可以,我将为你详细解释这个模型训练流程的整体设计逻辑,以及每个部分的代码结构和作用。我们使用的是自定义流程系统,完全基于 Python 标准库构建,具备:
- 模块化(每一步都是一个 Node)
- 解耦(数据通过上下文 context 流转)
- 可扩展(支持模型、清洗、归一化等扩展)
- 可持久化(支持保存中间结果至 S3/数据库等)
✅ 一、整体流程概览图
训练流程(Pipeline)
┌─────────────────────────────┐
│ Pipeline.run() │
│ ┌─────┐ ┌──────┐ ┌─────┐ │
│ │Node1│→→→│Node2 │→→→│Node3│→→ ... │
│ └─────┘ └──────┘ └─────┘ │
└─────────────────────────────┘
每个 Node 执行 run(context),更新上下文并返回
✅ 二、核心结构说明
1. BaseNode
: 所有流程步骤的基类
# base_node.py
from abc import ABC, abstractmethod
class BaseNode(ABC):
def __init__(self, name: str, persist: bool = False):
self.name = name
self.persist = persist # 是否持久化当前节点输出
@abstractmethod
def run(self, context: dict) -> dict:
pass
def save_result(self, key: str, value):
"""持久化保存结果(目前为打印模拟)"""
if self.persist:
print(f"[{self.name}] 模拟保存 {key}: {str(value)[:50]}...")
✅ 功能说明:
- 提供统一的
run()
接口,所有流程节点必须实现。 - 提供
save_result()
,预留持久化能力。
2. Pipeline
: 执行流程的调度器
# pipeline.py
class Pipeline:
def __init__(self):
self.steps = []
def add(self, step: BaseNode):
self.steps.append(step)
return self # 支持链式调用
def run(self, context=None):
context = context or {}
for step in self.steps:
print(f"➡️ 执行步骤:{step.name}")
context = step.run(context)
return context
✅ 功能说明:
- 接收多个 Node 实例
- 按顺序执行每个
run(context)
,将返回结果传给下一个 Node
✅ 三、每个 Node 的作用及代码逻辑
3.1 加载数据 LoadDatasetNode
class LoadDatasetNode(BaseNode):
def __init__(self, paths):
super().__init__("LoadDataset")
self.paths = paths
def run(self, context):
print(f"[{self.name}] 加载数据:{self.paths}")
context["raw_data"] = f"Data from {self.paths}" # 实际应读取文件
return context
- 接收多个路径 → 模拟加载数据 → 保存为
raw_data
3.2 数据清洗 CleanDataNode
class CleanDataNode(BaseNode):
def __init__(self, required_columns):
super().__init__("CleanData")
self.required_columns = required_columns
def run(self, context):
print(f"[{self.name}] 清洗字段:{self.required_columns}")
data = context["raw_data"]
cleaned = f"Cleaned({data})"
context["clean_data"] = cleaned
return context
- 清洗只保留指定字段 → 存入
clean_data
3.3 数据预处理 PreprocessNode
class PreprocessNode(BaseNode):
def __init__(self, fields, fillna=0.0, persist=False):
super().__init__("Preprocess", persist)
self.fields = fields
self.fillna = fillna
def run(self, context):
print(f"[{self.name}] 缺失值填充:字段={self.fields}, 填充={self.fillna}")
data = context["clean_data"]
processed = f"Preprocessed({data})"
context["preprocessed_data"] = processed
self.save_result("preprocessed_data", processed)
return context
- 执行缺失填充、字段校验等预处理逻辑
- 可选持久化保存(如到 S3)
3.4 数据归一化 NormalizeNode
class NormalizeNode(BaseNode):
def __init__(self, fields, strategy="minmax", persist=False):
super().__init__("Normalize", persist)
self.fields = fields
self.strategy = strategy
def run(self, context):
print(f"[{self.name}] 对字段 {self.fields} 使用 {self.strategy} 归一化")
data = context.get("preprocessed_data") or context["clean_data"]
normalized = f"Normalized({data})"
context["normalized_data"] = normalized
self.save_result("normalized_data", normalized)
return context
- 对数值型字段进行归一化
- 将归一化后的结果保存在
normalized_data
3.5 模型训练 TrainModelNode
class TrainModelNode(BaseNode):
def __init__(self, model_type, params):
super().__init__("TrainModel")
self.model_type = model_type
self.params = params
def run(self, context):
print(f"[{self.name}] 训练模型类型={self.model_type} 参数={self.params}")
model = f"TrainedModel({self.model_type})"
context["trained_model"] = model
return context
- 根据模型类型和参数训练(此处为模拟)
- 保存训练好的模型对象
3.6 模型保存 SaveModelNode
class SaveModelNode(BaseNode):
def __init__(self, save_path, persist=False):
super().__init__("SaveModel", persist)
self.save_path = save_path
def run(self, context):
model = context["trained_model"]
print(f"[{self.name}] 保存模型到 {self.save_path}")
self.save_result("model", f"ModelFile({model})")
context["model_path"] = self.save_path
return context
- 将模型保存路径保存到上下文(实际可调用 pickle/S3 保存)
- 使用
save_result()
预留真实保存接口
✅ 四、运行入口脚本示例
# run_pipeline.py
from pipeline import Pipeline
from nodes import LoadDatasetNode, CleanDataNode, PreprocessNode, NormalizeNode, TrainModelNode, SaveModelNode
pipeline = (
Pipeline()
.add(LoadDatasetNode(paths=["data1.xlsx", "data2.xlsx"]))
.add(CleanDataNode(required_columns=["age", "salary"]))
.add(PreprocessNode(fields=["age", "salary"], fillna=0, persist=True))
.add(NormalizeNode(fields=["age", "salary"], strategy="minmax", persist=True))
.add(TrainModelNode(model_type="dnn", params={"layers": [64, 32]}))
.add(SaveModelNode(save_path="./model/dnn.pkl", persist=True))
)
context = pipeline.run()
print("最终输出:", context)
✅ 总结逻辑流程图
context 结构
┌────────────┐
│ raw_data │ ← LoadDatasetNode
│ clean_data │ ← CleanDataNode
│ preprocessed_data ← PreprocessNode
│ normalized_data ← NormalizeNode
│ trained_model ← TrainModelNode
│ model_path ← SaveModelNode
└────────────┘
✅ 你下一步可以选择:
- 让我帮你将这个结构组织成完整的 Python 包目录
- 生成真实的数据处理与训练逻辑(使用 Pandas、PyTorch)
- 接入任务队列 Celery/Redis + 异步训练流程
- 接入 S3、PostgreSQL 进行持久化存储
告诉我你希望进一步完善哪个模块,我就继续为你实现。
这是一个非常重要也非常有价值的问题 —— 在训练流程中,每个节点(Node)之间确实可能存在数据依赖关系,我们需要用一套通用又清晰的方式来:
- 规范每个节点的输入(依赖)和输出(产出)
- 避免数据丢失或执行顺序出错
- 支持灵活插拔、重排流程的能力
✅ 总体方案:节点的输入输出依赖管理
我们为每个 BaseNode
增加两个显式属性:
属性 | 说明 |
---|---|
requires: list[str] |
依赖的 context 键,即前置步骤必须产出的内容 |
produces: list[str] |
本步骤输出的 context 键,用于后续步骤消费 |
✅ 1. 修改 BaseNode
支持依赖声明检查
class BaseNode(ABC):
def __init__(self, name: str, persist: bool = False):
self.name = name
self.persist = persist
self.requires: list[str] = [] # 依赖的 context key
self.produces: list[str] = [] # 本节点输出的 context key
@abstractmethod
def run(self, context: dict) -> dict:
pass
def save_result(self, key: str, value):
if self.persist:
print(f"[{self.name}] 模拟持久化:{key} -> {str(value)[:50]}")
def check_requirements(self, context: dict):
missing = [k for k in self.requires if k not in context]
if missing:
raise ValueError(f"[{self.name}] 缺少依赖数据: {missing}")
在 Pipeline.run()
中增加依赖检查:
def run(self, context=None):
context = context or {}
for step in self.steps:
print(f"\n➡️ 执行步骤:{step.name}")
step.check_requirements(context)
context = step.run(context)
return context
✅ 2. 示例:修改每个节点的实现
✅ LoadDatasetNode
class LoadDatasetNode(BaseNode):
def __init__(self, paths):
super().__init__("LoadDataset")
self.paths = paths
self.produces = ["raw_data"]
def run(self, context):
data = f"Data from {self.paths}"
context["raw_data"] = data
return context
✅ CleanDataNode
class CleanDataNode(BaseNode):
def __init__(self, required_columns):
super().__init__("CleanData")
self.required_columns = required_columns
self.requires = ["raw_data"]
self.produces = ["clean_data"]
def run(self, context):
data = context["raw_data"]
cleaned = f"Cleaned({data})"
context["clean_data"] = cleaned
return context
✅ NormalizeNode
class NormalizeNode(BaseNode):
def __init__(self, fields, strategy="minmax", persist=False):
super().__init__("Normalize", persist)
self.fields = fields
self.strategy = strategy
self.requires = ["preprocessed_data"] # 或 ["clean_data"]
self.produces = ["normalized_data"]
def run(self, context):
data = context.get("preprocessed_data") or context["clean_data"]
normalized = f"Normalized({data})"
context["normalized_data"] = normalized
self.save_result("normalized_data", normalized)
return context
✅ 3. 增强:自动检测节点顺序是否合理(可选)
在 Pipeline.run()
之前,做一个顺序合理性预检查:
def validate_dependencies(self):
available = set()
for step in self.steps:
missing = [key for key in step.requires if key not in available]
if missing:
raise ValueError(f"[{step.name}] 顺序错误,缺少依赖:{missing}")
available.update(step.produces)
调用:
def run(self, context=None):
self.validate_dependencies()
...
✅ 总结:设计收益
特性 | 实现 |
---|---|
明确数据流依赖 | 每个节点 requires 明确依赖哪些键 |
防止乱序执行 | Pipeline.validate_dependencies() 自动检查 |
保证流程健壮性 | 如果缺失依赖,提前抛错 |
易于维护 | 插拔节点时不用看内部实现,只看 requires 和 produces |
易于文档生成 | 可自动输出流程图与依赖说明 |
✅ 示例:自动文档生成(可选)
你甚至可以自动打印出流程:
for step in pipeline.steps:
print(f"🔧 {step.name}")
print(f" ➤ 输入依赖: {step.requires}")
print(f" ➤ 输出结果: {step.produces}")
✅ 下一步建议
我可以继续帮你:
- 🔧 提炼出
Node
的 schema 描述和注册机制 - 📊 生成流程图(例如输出为 Graphviz 或 Mermaid 图)
- 🧪 为每个节点设计单元测试和上下文模拟测试
- ☁️ 实现
save_result()
的持久化策略(如 S3、PostgreSQL)
你想优先处理哪一块?我可以马上开始。