训练流程测试

Posted by Shallow Dreameron June 2, 2025

你这个需求可以很好地使用 适配器模式 来构建一个灵活的模型训练框架,用于适配各种不同类型的模型(如线性回归、神经网络、LLM、大数据分析等)。

下面我将为你提供一个完整的结构化训练流程框架,包含以下模块:


✅ 总体流程(模块划分)

User Request
   ↓
1. 参数输入(模型参数 + 因子参数 + 训练数据列表)
   ↓
2. 数据清洗(缺失值、字段映射、类型识别)
   ↓
3. 数据预处理(归一化 / 标准化)
   ↓
4. 模型适配(选择不同模型类)
   ↓
5. 模型训练(统一的 train 接口)
   ↓
6. 数据后处理(如反归一化)
   ↓
7. 模型保存(本地或上传 S3)

✅ 项目结构建议

ml_pipeline/
├── adapters/                # 模型适配器
│   ├── base.py              # BaseModelAdapter
│   ├── linear_regression.py
│   ├── dnn.py
│   └── llm.py
├── data_cleaning/
│   ├── cleaner.py           # 数据清洗工具
│   └── field_mapper.py      # 字段映射/类型识别
├── preprocessing/
│   ├── preprocessor.py      # 归一化等预处理逻辑
│   └── postprocessor.py     # 反归一化
├── trainer.py               # 训练流程主入口
├── utils/
│   ├── io.py                # 数据文件读取(openpyxl + csv)
│   └── s3_uploader.py       # 模型文件上传
└── config/
    └── model_registry.py    # 注册支持的模型类型

✅ 核心类与流程实现(关键代码)

1. BaseModelAdapter(所有模型的统一接口)

# adapters/base.py
from abc import ABC, abstractmethod

class BaseModelAdapter(ABC):
    def __init__(self, model_params: dict):
        self.model_params = model_params

    @abstractmethod
    def preprocess(self, df):
        pass

    @abstractmethod
    def train(self, train_data, test_data):
        pass

    @abstractmethod
    def postprocess(self, results):
        pass

    @abstractmethod
    def save_model(self, path: str):
        pass

2. LinearRegressionAdapter 示例

# adapters/linear_regression.py
from sklearn.linear_model import LinearRegression
from .base import BaseModelAdapter

class LinearRegressionAdapter(BaseModelAdapter):
    def __init__(self, model_params):
        super().__init__(model_params)
        self.model = LinearRegression(**model_params)

    def preprocess(self, df):
        return df  # 线性模型通常不要求额外处理

    def train(self, train_data, test_data):
        X_train, y_train = train_data
        self.model.fit(X_train, y_train)
        return {"score": self.model.score(*test_data)}

    def postprocess(self, results):
        return results

    def save_model(self, path):
        import joblib
        joblib.dump(self.model, path)

3. 训练流程主入口

# trainer.py
from adapters import model_registry
from data_cleaning.cleaner import clean_dataset
from preprocessing.preprocessor import preprocess_dataset
from preprocessing.postprocessor import postprocess_result
from utils.io import load_datasets
from utils.s3_uploader import upload_model_file

def train_pipeline(factor_params, model_params, model_type, dataset_files, publish=False):
    # 1. 加载数据集
    df = load_datasets(dataset_files)

    # 2. 数据清洗
    df_clean = clean_dataset(df, factor_params)

    # 3. 数据预处理(归一化)
    df_preprocessed, scaler = preprocess_dataset(df_clean, factor_params)

    # 4. 分割训练测试集
    from sklearn.model_selection import train_test_split
    X = df_preprocessed.drop(columns=["target"])
    y = df_preprocessed["target"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    # 5. 获取模型适配器
    adapter_class = model_registry.get(model_type)
    if not adapter_class:
        raise ValueError(f"不支持的模型类型: {model_type}")
    model = adapter_class(model_params)

    # 6. 训练
    train_result = model.train((X_train, y_train), (X_test, y_test))

    # 7. 后处理
    final_result = model.postprocess(train_result)

    # 8. 模型保存
    save_path = f"./output_models/{model_type}_{time.time()}.pkl"
    model.save_model(save_path)

    if publish:
        upload_model_file(save_path)

    return final_result

4. 注册模型适配器

# config/model_registry.py
from adapters.linear_regression import LinearRegressionAdapter
from adapters.dnn import DNNAdapter
from adapters.llm import LLMAdapter

model_registry = {
    "linear_regression": LinearRegressionAdapter,
    "dnn": DNNAdapter,
    "llm": LLMAdapter,
}

5. 清洗 + 预处理(示例)

# data_cleaning/cleaner.py
def clean_dataset(df, factor_params):
    # 删除因子为空的行
    for factor in factor_params:
        df = df[df[factor["name"]].notnull()]
    return df

# preprocessing/preprocessor.py
from sklearn.preprocessing import MinMaxScaler

def preprocess_dataset(df, factor_params):
    numeric_cols = [f["name"] for f in factor_params if f.get("normalize") is True]
    scaler = MinMaxScaler()
    df[numeric_cols] = scaler.fit_transform(df[numeric_cols])
    return df, scaler

✅ 示例调用

result = train_pipeline(
    factor_params=[{"name": "feature1", "normalize": True}, {"name": "feature2", "normalize": False}],
    model_params={"fit_intercept": True},
    model_type="linear_regression",
    dataset_files=["/path/data1.xlsx", "/path/data2.csv"],
    publish=True
)
print(result)

✅ 总结

  • ✔ 使用适配器模式封装不同模型训练逻辑,便于扩展新模型。
  • ✔ 数据清洗与预处理模块独立,便于复用和测试。
  • ✔ 支持模型训练后发布(如上传到 S3)。
  • ✔ 高度解耦、结构清晰,方便多人协作或服务化部署。

如果你希望我继续补全某个模块(如 DNNAdapter、反归一化、任务队列调用训练),可以告诉我,我会继续细化。

很好,这说明你需要构建一个灵活可配置的模型训练流程,其中某些步骤(如数据预处理、归一化、后处理、发布)可选执行,取决于模型类型或模型适配器的需要

为此,我们可以增强适配器结构,使每个模型声明需要哪些流程步骤,并动态控制训练流程中各个节点的执行。


✅ 整体设计目标

我们希望实现如下形式的训练控制流程:

1. 模型适配器声明:我是否需要归一化?是否需要后处理?
2. 训练主流程读取声明内容,仅执行需要的步骤

✅ 修改点一:模型适配器接口扩展

我们为 BaseModelAdapter 增加特性声明:

# adapters/base.py
class BaseModelAdapter(ABC):
    def __init__(self, model_params: dict):
        self.model_params = model_params

    # ========= 新增:模型所需训练流程特性声明 =========
    def need_preprocess(self) -> bool:
        return False

    def need_postprocess(self) -> bool:
        return False

    def need_normalization(self) -> bool:
        return False

    # ========= 原始接口 =========
    @abstractmethod
    def preprocess(self, df):
        pass

    @abstractmethod
    def train(self, train_data, test_data):
        pass

    @abstractmethod
    def postprocess(self, results):
        pass

    @abstractmethod
    def save_model(self, path: str):
        pass

✅ 修改点二:适配器按需实现声明方法

例如:线性回归需要归一化,不需要后处理

# adapters/linear_regression.py
class LinearRegressionAdapter(BaseModelAdapter):
    def need_preprocess(self):
        return True

    def need_normalization(self):
        return True

    def need_postprocess(self):
        return False

    # ... 其他接口同前 ...

✅ 修改点三:训练主流程改造(按需执行)

# trainer.py
def train_pipeline(factor_params, model_params, model_type, dataset_files, publish=False):
    df = load_datasets(dataset_files)
    df_clean = clean_dataset(df, factor_params)

    adapter_class = model_registry.get(model_type)
    if not adapter_class:
        raise ValueError(f"不支持的模型类型: {model_type}")
    model = adapter_class(model_params)

    # ========= 是否需要预处理(归一化) =========
    if model.need_preprocess():
        if model.need_normalization():
            df_clean, scaler = preprocess_dataset(df_clean, factor_params)
        else:
            df_clean, scaler = df_clean, None
    else:
        scaler = None

    # ========= 分割训练集和测试集 =========
    from sklearn.model_selection import train_test_split
    X = df_clean.drop(columns=["target"])
    y = df_clean["target"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    # ========= 模型训练 =========
    train_result = model.train((X_train, y_train), (X_test, y_test))

    # ========= 可选后处理 =========
    if model.need_postprocess():
        final_result = model.postprocess(train_result)
    else:
        final_result = train_result

    # ========= 模型保存 & 可选发布 =========
    save_path = f"./output_models/{model_type}_{int(time.time())}.pkl"
    model.save_model(save_path)

    if publish:
        upload_model_file(save_path)

    return final_result

✅ 总结:你获得了什么

功能 实现方式
每步是否执行 模型适配器提供 .need_xxx() 接口进行控制
扩展支持的模型类型 只需实现新模型适配器类,声明需求并实现接口
灵活性 流程每个节点都可以按模型要求决定是否执行
解耦性 每个模块(清洗、预处理、训练、后处理)都可复用

如果你希望下一步我帮你:

  • 添加某个具体模型适配器(如 DNN、LLM)
  • 支持某些模型在训练过程中记录日志(用于前端显示)
  • 通过队列或任务管理器(如 Celery)执行训练任务

欢迎继续告诉我,我会直接给出对应代码。

明白了,你的需求是:

在模型训练流程中,每个步骤(如归一化、预处理、后处理等)是否执行由两方决定

  1. 模型制作者(适配器内部定义):设置默认流程行为(最高优先级)。
  2. 前端用户配置:用户可以覆盖部分行为,如果适配器允许。

并且:

  • 模型适配器优先级高:如果适配器强制某步骤执行/跳过,则用户无法控制。
  • 用户控制优先使用:如果适配器没有明确强制要求执行或跳过,则用户控制生效。

✅ 整体控制逻辑(流程节点执行判断)

我们引入一个通用判断函数:

def should_execute_step(
    step_name: str,
    user_control: dict,
    adapter_control: dict
) -> bool:
    """
    决定是否执行某一流程节点:
    - adapter_control 优先级更高(必须执行或必须跳过)
    - 否则看用户是否传入控制值
    - 否则默认执行为 False
    """
    if step_name in adapter_control:
        return adapter_control[step_name]
    return user_control.get(step_name, False)

✅ 模型适配器定义接口(带默认行为)

每个模型可以通过适配器定义流程行为,例如:

class BaseModelAdapter(ABC):
    def __init__(self, model_params: dict):
        self.model_params = model_params

    @property
    def pipeline_control(self) -> dict:
        """
        定义模型默认行为:
        {
            "preprocess": True,   # 表示模型需要预处理
            "normalize": False,   # 表示模型不需要归一化
            "postprocess": None,  # 表示是否需要后处理由用户决定
        }
        None 表示未指定,允许用户决定。
        """
        return {}

    @abstractmethod
    def preprocess(self, df):
        pass

    @abstractmethod
    def train(self, train_data, test_data):
        pass

    @abstractmethod
    def postprocess(self, results):
        pass

    @abstractmethod
    def save_model(self, path: str):
        pass

示例:线性回归适配器强制需要归一化、不做后处理:

class LinearRegressionAdapter(BaseModelAdapter):
    @property
    def pipeline_control(self):
        return {
            "preprocess": True,
            "normalize": True,
            "postprocess": False
        }

    # 实现相关接口...

✅ 主训练流程调整(引入双重控制)

def train_pipeline(
    factor_params: dict,
    model_params: dict,
    model_type: str,
    dataset_files: List[Union[str, IO]],
    user_pipeline_control: dict,
    publish: bool = False
):
    df = load_datasets(dataset_files)
    df_clean = clean_dataset(df, factor_params)

    adapter_class = model_registry.get(model_type)
    if not adapter_class:
        raise ValueError(f"模型类型不支持: {model_type}")

    model = adapter_class(model_params)
    adapter_control = model.pipeline_control

    # 判断是否执行各个步骤
    do_preprocess = should_execute_step("preprocess", user_pipeline_control, adapter_control)
    do_normalize = should_execute_step("normalize", user_pipeline_control, adapter_control)
    do_postprocess = should_execute_step("postprocess", user_pipeline_control, adapter_control)

    # ========= 预处理 =========
    scaler = None
    if do_preprocess:
        if do_normalize:
            df_clean, scaler = preprocess_dataset(df_clean, factor_params)
        else:
            df_clean, scaler = df_clean, None

    # ========= 划分数据集 =========
    from sklearn.model_selection import train_test_split
    X = df_clean.drop(columns=["target"])
    y = df_clean["target"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    # ========= 训练 =========
    train_result = model.train((X_train, y_train), (X_test, y_test))

    # ========= 后处理 =========
    if do_postprocess:
        final_result = model.postprocess(train_result)
    else:
        final_result = train_result

    # ========= 保存 & 发布 =========
    save_path = f"./output_models/{model_type}_{int(time.time())}.pkl"
    model.save_model(save_path)

    if publish:
        upload_model_file(save_path)

    return final_result

✅ 使用示例

# 用户传入的控制参数
user_pipeline_control = {
    "preprocess": True,
    "normalize": False,
    "postprocess": True
}

train_pipeline(
    factor_params={...},
    model_params={...},
    model_type="linear_regression",
    dataset_files=["/path/to/data1.csv", "/path/to/data2.csv"],
    user_pipeline_control=user_pipeline_control,
    publish=True
)

即使用户设置 normalize=False,但如果 adapter.pipeline_control["normalize"] = True,仍会执行归一化。


✅ 总结

功能 实现方式
每个节点控制是否执行 适配器 pipeline_control 和用户传参共同决定
优先级控制 适配器优先;用户传参为次选;否则默认 False
新增模型适配简单 只需声明默认流程行为 + 实现训练接口即可
可维护性高 每个模块职责清晰(数据处理、训练逻辑、保存逻辑)

如果你需要我继续提供:

  • 流程结构封装为模块包
  • 任务持久化与 Celery 队列集成
  • 将训练流程日志回传前端

欢迎继续告诉我,我可以继续补充完整示例。

好的,以下是将“模型训练流程”封装为模块包,并支持将训练日志实时(流式)回传前端的完整设计思路与代码实现方案:


✅ 1. 模块结构设计

我们将整个训练流程封装为一个模块包,例如 ai_pipeline

ai_pipeline/
├── __init__.py
├── base.py                  # 适配器基类与通用控制逻辑
├── adapters/                # 各种模型适配器
│   ├── __init__.py
│   ├── linear_regression.py
│   └── ...
├── data_handler.py          # 数据加载、清洗、预处理等
├── pipeline.py              # 主流程控制
├── logger.py                # 日志工具(流式输出)
└── registry.py              # 模型注册器

✅ 2. 流式日志 logger 实现(logger.py

import sys
import time
from typing import Callable


class StreamLogger:
    def __init__(self, log_func: Callable[[str], None] = print):
        self.log_func = log_func

    def log(self, msg: str):
        timestamp = time.strftime("[%Y-%m-%d %H:%M:%S]")
        self.log_func(f"{timestamp} {msg}")

# 用于队列或 WebSocket 场景的回调注入
logger = StreamLogger()

✅ 3. 数据处理逻辑(data_handler.py

略(你已经有了读取多个文件、字段匹配、归一化、反归一化等逻辑),只要封装成函数并接受 logger 实例即可。

例如:

def clean_and_preprocess(datasets, factor_config, normalize=False, logger=None):
    logger = logger or print
    logger("开始加载与合并数据集")
    # 合并、清洗、归一化等操作...

✅ 4. 适配器定义(base.py

from abc import ABC, abstractmethod


class BaseModelAdapter(ABC):
    def __init__(self, model_params: dict, logger=None):
        self.model_params = model_params
        self.logger = logger or print

    @property
    def pipeline_control(self) -> dict:
        return {}

    @abstractmethod
    def train(self, X_train, y_train, X_test, y_test):
        pass

    @abstractmethod
    def save_model(self, path: str):
        pass

    def postprocess(self, results):
        return results

✅ 5. 训练主流程(pipeline.py

from .registry import model_registry
from .data_handler import clean_and_preprocess
from .logger import logger
from .base import BaseModelAdapter
from .utils import should_execute_step
from sklearn.model_selection import train_test_split


def train_pipeline(
    factor_params: dict,
    model_params: dict,
    model_type: str,
    dataset_files: list,
    user_pipeline_control: dict,
    logger_instance=None
):
    log = logger_instance or logger.log
    log(f"开始训练任务: 模型类型={model_type}")

    # 1. 加载模型适配器
    model_class = model_registry.get(model_type)
    if not model_class:
        raise ValueError(f"不支持的模型类型: {model_type}")

    model = model_class(model_params, logger=log)
    adapter_control = model.pipeline_control

    # 2. 数据处理
    normalize = should_execute_step("normalize", user_pipeline_control, adapter_control)
    df, scaler = clean_and_preprocess(dataset_files, factor_params, normalize, logger=log)

    X = df.drop(columns=["target"])
    y = df["target"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    # 3. 训练
    log("开始模型训练...")
    result = model.train(X_train, y_train, X_test, y_test)

    # 4. 后处理(可选)
    if should_execute_step("postprocess", user_pipeline_control, adapter_control):
        result = model.postprocess(result)

    # 5. 保存模型
    model_path = f"./outputs/model_{model_type}_{int(time.time())}.pkl"
    model.save_model(model_path)
    log(f"模型保存到: {model_path}")

    return result

✅ 6. WebSocket 回传(适用于 Web 前端)

Django + Channels 示例(consumer)

# consumers.py
from channels.generic.websocket import WebsocketConsumer
import threading
from ai_pipeline import train_pipeline
from ai_pipeline.logger import StreamLogger

class TrainingConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()

    def disconnect(self, code):
        pass

    def receive(self, text_data=None, bytes_data=None):
        import json
        payload = json.loads(text_data)
        
        def send_log(msg):
            self.send(json.dumps({"type": "log", "message": msg}))

        logger = StreamLogger(log_func=send_log)

        # 后台线程跑任务
        threading.Thread(target=train_pipeline, kwargs={
            "factor_params": payload["factorParams"],
            "model_params": payload["modelParams"],
            "model_type": payload["modelType"],
            "dataset_files": payload["datasetFiles"],
            "user_pipeline_control": payload.get("control", {}),
            "logger_instance": logger.log
        }).start()

✅ 总结:关键特性一览

功能点 实现方式
训练模块解耦封装 ai_pipeline
支持多种模型适配 适配器模式 + 注册器
控制流程节点执行 should_execute_step() 方法
双重控制(模型优先,用户可控) pipeline_control + user config
日志可回传 自定义 logger + WebSocket
可集成 Celery logger 与任务线程解耦

如需我:

  • 增加训练结果指标日志(R²、MAE等)
  • 封装成 pip install 包结构
  • 提供前端消息处理示例(如 Vue + Element Plus)

都可以继续扩展。是否需要我打包成一个可直接运行的项目结构示例?

# ai_pipeline/__init__.py
# 空文件,使 ai_pipeline 成为模块包

# ai_pipeline/logger.py
import time
from typing import Callable

class StreamLogger:
    def __init__(self, log_func: Callable[[str], None] = print):
        self.log_func = log_func

    def log(self, msg: str):
        timestamp = time.strftime("[%Y-%m-%d %H:%M:%S]")
        self.log_func(f"{timestamp} {msg}")

logger = StreamLogger()

# ai_pipeline/utils.py
def should_execute_step(step: str, user_control: dict, adapter_control: dict):
    if step in adapter_control:
        return adapter_control[step]
    return user_control.get(step, False)

# ai_pipeline/base.py
from abc import ABC, abstractmethod

class BaseModelAdapter(ABC):
    def __init__(self, model_params: dict, logger=None):
        self.model_params = model_params
        self.logger = logger or print

    @property
    def pipeline_control(self) -> dict:
        return {}

    @abstractmethod
    def train(self, X_train, y_train, X_test, y_test):
        pass

    @abstractmethod
    def save_model(self, path: str):
        pass

    def postprocess(self, results):
        return results

# ai_pipeline/adapters/linear_regression.py
import joblib
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error, r2_score
from ai_pipeline.base import BaseModelAdapter

class LinearRegressionAdapter(BaseModelAdapter):
    def __init__(self, model_params: dict, logger=None):
        super().__init__(model_params, logger)
        self.model = LinearRegression(**model_params)

    @property
    def pipeline_control(self):
        return {"normalize": True, "postprocess": True}

    def train(self, X_train, y_train, X_test, y_test):
        self.logger("训练线性回归模型...")
        self.model.fit(X_train, y_train)
        y_pred = self.model.predict(X_test)
        mse = mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        self.logger(f"模型评估 - MSE: {mse:.4f}, R2: {r2:.4f}")
        return {"mse": mse, "r2": r2}

    def save_model(self, path: str):
        joblib.dump(self.model, path)
        self.logger(f"模型保存到 {path}")

# ai_pipeline/registry.py
from ai_pipeline.adapters.linear_regression import LinearRegressionAdapter

model_registry = {
    "linear_regression": LinearRegressionAdapter
}

# ai_pipeline/data_handler.py
import pandas as pd
from sklearn.preprocessing import MinMaxScaler

# 示例因子配置:{"age": {"type": "numeric", "normalize": True}}
def clean_and_preprocess(dataset_files, factor_config, normalize=False, logger=None):
    logger = logger or print
    logger("加载并合并数据集...")
    
    df_list = []
    for path in dataset_files:
        df = pd.read_csv(path)
        df_list.append(df)

    # 合并并对齐列名(顺序不一致时)
    base_cols = df_list[0].columns
    df_list = [df[base_cols] for df in df_list]
    df = pd.concat(df_list, ignore_index=True).drop_duplicates()

    logger("根据因子配置进行数据清洗...")
    for col, config in factor_config.items():
        if config.get("type") == "numeric":
            df = df[df[col].notna()]
            if "min" in config:
                df = df[df[col] >= config["min"]]
            if "max" in config:
                df = df[df[col] <= config["max"]]
        elif config.get("type") == "category":
            allowed = config.get("values")
            if allowed:
                df = df[df[col].isin(allowed)]

    scaler = None
    if normalize:
        numeric_cols = [col for col, conf in factor_config.items() if conf.get("type") == "numeric" and conf.get("normalize")]
        if numeric_cols:
            logger("对数值型字段进行归一化...")
            scaler = MinMaxScaler()
            df[numeric_cols] = scaler.fit_transform(df[numeric_cols])

    return df, scaler

# ai_pipeline/pipeline.py
import time
from sklearn.model_selection import train_test_split
from ai_pipeline.registry import model_registry
from ai_pipeline.logger import logger
from ai_pipeline.utils import should_execute_step
from ai_pipeline.data_handler import clean_and_preprocess

def train_pipeline(
    factor_params: dict,
    model_params: dict,
    model_type: str,
    dataset_files: list,
    user_pipeline_control: dict,
    logger_instance=None
):
    log = logger_instance or logger.log
    log(f"开始训练任务: 模型类型={model_type}")

    model_class = model_registry.get(model_type)
    if not model_class:
        raise ValueError(f"不支持的模型类型: {model_type}")

    model = model_class(model_params, logger=log)
    adapter_control = model.pipeline_control

    normalize = should_execute_step("normalize", user_pipeline_control, adapter_control)
    df, scaler = clean_and_preprocess(dataset_files, factor_params, normalize, logger=log)

    X = df.drop(columns=["target"])
    y = df["target"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    log("开始模型训练...")
    result = model.train(X_train, y_train, X_test, y_test)

    if should_execute_step("postprocess", user_pipeline_control, adapter_control):
        result = model.postprocess(result)

    model_path = f"./outputs/model_{model_type}_{int(time.time())}.pkl"
    model.save_model(model_path)
    log(f"模型训练完成,保存至: {model_path}")

    return result

### 🧠 模型适配器基类(base.py)
```python
# ai_pipeline/base.py
from abc import ABC, abstractmethod

class BaseModelAdapter(ABC):
    def __init__(self, model_params, logger):
        self.model_params = model_params
        self.logger = logger

    @abstractmethod
    def train(self, X_train, y_train):
        pass

    @abstractmethod
    def predict(self, X):
        pass

    @abstractmethod
    def save(self, path):
        pass
```

### ✅ 模型注册表(registry.py)
```python
# ai_pipeline/registry.py
from ai_pipeline.adapters.linear_regression import LinearRegressionAdapter
from ai_pipeline.adapters.dnn import DNNAdapter

MODEL_REGISTRY = {
    "linear_regression": LinearRegressionAdapter,
    "dnn": DNNAdapter,
}
```

### ➕ 新增 DNN 模型适配器(adapters/dnn.py)
```python
# ai_pipeline/adapters/dnn.py
from ai_pipeline.base import BaseModelAdapter
from sklearn.neural_network import MLPRegressor
import joblib

class DNNAdapter(BaseModelAdapter):
    def __init__(self, model_params, logger):
        super().__init__(model_params, logger)
        self.model = MLPRegressor(**model_params)

    def train(self, X_train, y_train):
        self.logger("[DNN] 开始训练")
        self.model.fit(X_train, y_train)
        self.logger("[DNN] 训练完成")

    def predict(self, X):
        return self.model.predict(X)

    def save(self, path):
        joblib.dump(self.model, path)
```

### 🧪 示例测试调用更新
```python
from ai_pipeline.pipeline import train_pipeline

# 使用深度神经网络模型训练
result = train_pipeline(
    factor_params={
        "feature1": {"type": "numeric", "normalize": True},
        "feature2": {"type": "numeric", "normalize": True},
    },
    model_params={
        "hidden_layer_sizes": (50, 50),
        "activation": "relu",
        "max_iter": 200
    },
    model_type="dnn",
    dataset_files=["dataset1.csv", "dataset2.csv"],
    user_pipeline_control={
        "normalize": True,
        "postprocess": True,
    }
)
```

### ✅ 流程兼容性说明
- 模型只需要继承 `BaseModelAdapter`
- 注册在 `registry.py`  `MODEL_REGISTRY` 
- 控制流程结构和日志机制通用适配任意结构化模型

如需添加支持分类模型NLP 模型或外部服务模型 OpenAI),结构完全适配只需按上述结构新增适配器是否还需要我添加一个如 HuggingFace BERT 的适配器

### 🧠 模型适配器基类(base.py)
```python
# ai_pipeline/base.py
from abc import ABC, abstractmethod

class BaseModelAdapter(ABC):
    def __init__(self, model_params, logger):
        self.model_params = model_params  # 模型参数(如层数、学习率等)
        self.logger = logger              # 日志函数或流式日志输出器

    @abstractmethod
    def train(self, X_train, y_train):
        pass  # 子类实现:训练模型

    @abstractmethod
    def predict(self, X):
        pass  # 子类实现:执行预测

    @abstractmethod
    def save(self, path):
        pass  # 子类实现:保存模型
```

### ✅ 模型注册表(registry.py)
```python
# ai_pipeline/registry.py
from ai_pipeline.adapters.linear_regression import LinearRegressionAdapter
from ai_pipeline.adapters.dnn import DNNAdapter
from ai_pipeline.adapters.huggingface import HuggingFaceTextAdapter

MODEL_REGISTRY = {
    "linear_regression": LinearRegressionAdapter,
    "dnn": DNNAdapter,
    "huggingface_text": HuggingFaceTextAdapter,
}
```

### ➕ HuggingFace 文本模型适配器(adapters/huggingface.py)
```python
# ai_pipeline/adapters/huggingface.py
from ai_pipeline.base import BaseModelAdapter
from transformers import pipeline, AutoTokenizer, AutoModelForSequenceClassification
import torch
import os

class HuggingFaceTextAdapter(BaseModelAdapter):
    def __init__(self, model_params, logger):
        super().__init__(model_params, logger)
        model_name = model_params.get("model_name", "distilbert-base-uncased-finetuned-sst-2-english")
        self.logger(f"[HF] 加载模型: {model_name}")
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
        self.pipeline = pipeline("sentiment-analysis", model=self.model, tokenizer=self.tokenizer)

    def train(self, X_train, y_train):
        self.logger("[HF] 文本模型当前为预训练推理模式,跳过训练阶段")

    def predict(self, X):
        self.logger("[HF] 开始文本预测")
        texts = X["text"] if isinstance(X, dict) else X  # 支持 DataFrame 或原始列表
        return self.pipeline(texts)

    def save(self, path):
        self.logger("[HF] 保存模型和分词器")
        self.model.save_pretrained(path)
        self.tokenizer.save_pretrained(path)
```

### ✅ 说明(每行解释)
```python
model_name = model_params.get("model_name", "distilbert-base-uncased-finetuned-sst-2-english")
# 获取模型名称(用户传入或默认使用情感分析模型)

self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSequenceClassification.from_pretrained(model_name)
# 加载 HuggingFace 模型和 tokenizer

self.pipeline = pipeline("sentiment-analysis", model=self.model, tokenizer=self.tokenizer)
# 封装为 transformers pipeline,便于直接推理
```

### 🧪 示例测试调用(文本模型)
```python
from ai_pipeline.pipeline import train_pipeline

result = train_pipeline(
    factor_params={},  # 文本模型通常不需要数值因子
    model_params={"model_name": "distilbert-base-uncased-finetuned-sst-2-english"},
    model_type="huggingface_text",
    dataset_files=["texts.csv"],
    user_pipeline_control={
        "normalize": False,
        "postprocess": False,
    }
)
```

### 📦 输入文本数据要求
- CSV 文件应包含 `text` 字段列
```
text
"I love this product."
"This is terrible."
```

---
如需扩展为多标签分类多语言模型只需更换模型名和 pipeline 类型即可
是否接下来需要接入类似 DeepSeek  OpenAI API 的推理模型

## ✅ AI 平台通用模型训练流程模块(训练流程、适配器结构、API 推理模型支持)

---

## 📐 项目模块概览
```bash
ai_pipeline/
├── base.py                    # 模型适配器基类(定义接口)
├── pipeline.py                # 主训练流程模块
├── registry.py                # 模型注册表(用于模型类型查找)
├── adapters/
   ├── __init__.py
   ├── linear_regression.py   # sklearn 线性回归模型适配器
   ├── dnn.py                 # sklearn DNN 模型适配器
   ├── huggingface.py         # 本地 transformer 模型适配器
   └── openai_api.py          # 外部 OpenAI / DeepSeek 接口适配器
```

---

## 📦 模型适配器基类(base.py)
```python
# ai_pipeline/base.py
from abc import ABC, abstractmethod

class BaseModelAdapter(ABC):
    def __init__(self, model_params, logger):
        self.model_params = model_params  # 模型初始化参数
        self.logger = logger              # 日志函数

    @abstractmethod
    def train(self, X_train, y_train):
        pass  # 训练方法

    @abstractmethod
    def predict(self, X):
        pass  # 预测方法

    @abstractmethod
    def save(self, path):
        pass  # 模型保存方法
```

---

## 🧠 外部 API 模型适配器(如 OpenAI 或 DeepSeek)(adapters/openai_api.py)
```python
# ai_pipeline/adapters/openai_api.py
import openai
from ai_pipeline.base import BaseModelAdapter

class OpenAIChatAdapter(BaseModelAdapter):
    def __init__(self, model_params, logger):
        super().__init__(model_params, logger)
        self.api_key = model_params.get("api_key")
        self.model = model_params.get("model", "gpt-4")
        openai.api_key = self.api_key
        logger(f"[OpenAI] 使用模型:{self.model}")

    def train(self, X_train, y_train):
        self.logger("[OpenAI] 外部 API 无需训练")

    def predict(self, X):
        prompt = X.get("prompt")
        self.logger(f"[OpenAI] 请求提示词: {prompt}")

        response = openai.ChatCompletion.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            stream=True
        )

        output = ""
        for chunk in response:
            delta = chunk["choices"][0]["delta"].get("content", "")
            output += delta
            self.logger(delta, stream=True)  # 流式返回
        return output

    def save(self, path):
        self.logger("[OpenAI] API 模型无需保存")
```

### 🔍 每行解释:
```python
openai.api_key = self.api_key
# 设置 OpenAI API 密钥

response = openai.ChatCompletion.create(..., stream=True)
# 启用流式响应,逐块接收模型输出

for chunk in response:
    delta = chunk["choices"][0]["delta"].get("content", "")
    self.logger(delta, stream=True)
# 将内容逐条流式输出给前端
```

---

## 🔌 模型注册表(registry.py)
```python
# ai_pipeline/registry.py
from ai_pipeline.adapters.linear_regression import LinearRegressionAdapter
from ai_pipeline.adapters.dnn import DNNAdapter
from ai_pipeline.adapters.huggingface import HuggingFaceTextAdapter
from ai_pipeline.adapters.openai_api import OpenAIChatAdapter

MODEL_REGISTRY = {
    "linear_regression": LinearRegressionAdapter,
    "dnn": DNNAdapter,
    "huggingface_text": HuggingFaceTextAdapter,
    "openai_chat": OpenAIChatAdapter,
}
```

---

## 🔁 通用训练流程封装模块(pipeline.py)
```python
# ai_pipeline/pipeline.py
import pandas as pd
from ai_pipeline.registry import MODEL_REGISTRY

def default_logger(msg, stream=False):
    print("[LOG]", msg, flush=True)  # 默认日志输出函数

def train_pipeline(factor_params, model_params, model_type,
                   dataset_files, user_pipeline_control=None,
                   base_pipeline_control=None, logger=default_logger):

    logger("[Pipeline] 加载模型适配器")
    model_cls = MODEL_REGISTRY[model_type]
    model = model_cls(model_params, logger)

    logger("[Pipeline] 读取并合并数据集")
    df = pd.concat([pd.read_csv(f) for f in dataset_files], ignore_index=True)
    df.drop_duplicates(inplace=True)

    logger("[Pipeline] 应用字段映射和因子清洗")
    df = df.dropna(subset=factor_params.get("required_fields", []))

    pipeline_ctl = {"normalize": True, "postprocess": True}  # 默认流程开关
    if base_pipeline_control:
        pipeline_ctl.update(base_pipeline_control)
    if user_pipeline_control:
        pipeline_ctl.update(user_pipeline_control)

    if pipeline_ctl["normalize"]:
        logger("[Pipeline] 执行归一化")
        for col in factor_params.get("numeric_fields", []):
            df[col] = (df[col] - df[col].mean()) / df[col].std()

    logger("[Pipeline] 划分训练集和测试集")
    train_df = df.sample(frac=0.8, random_state=42)
    test_df = df.drop(train_df.index)

    logger("[Pipeline] 开始训练模型")
    X_train, y_train = train_df.drop("target", axis=1), train_df["target"]
    model.train(X_train, y_train)

    logger("[Pipeline] 模型训练完成,开始预测")
    y_pred = model.predict(test_df.drop("target", axis=1))

    if pipeline_ctl["postprocess"]:
        logger("[Pipeline] 进行反归一化(伪操作)")

    logger("[Pipeline] 保存模型")
    model.save("saved_models/{}_model.bin".format(model_type))

    return {"predictions": y_pred}
```

### 🔍 每行代码解释
```python
model_cls = MODEL_REGISTRY[model_type]
model = model_cls(model_params, logger)
# 获取模型类并初始化实例

pd.concat([pd.read_csv(f) for f in dataset_files])
# 合并多个 CSV 数据集

df.dropna(subset=factor_params.get("required_fields", []))
# 删除因子缺失值的样本

if pipeline_ctl["normalize"]:
    for col in factor_params.get("numeric_fields", []):
        df[col] = ...
# 对数值型字段进行归一化

model.train(X_train, y_train)
# 启动模型训练

model.predict(...)
# 执行预测(可返回数组或文本)

model.save(...)
# 保存模型到指定路径
```

---

## ✅ 示例使用:
```python
train_pipeline(
    factor_params={
        "required_fields": ["text"],
        "numeric_fields": [],
    },
    model_params={
        "api_key": "sk-xxx",
        "model": "gpt-4"
    },
    model_type="openai_chat",
    dataset_files=["chat_prompts.csv"],
    user_pipeline_control={"normalize": False, "postprocess": False}
)
```

CSV 内容
```csv
prompt
"请帮我总结这段话"
"解释什么是 Transformer 模型"
```

---

是否需要我为你加入 S3 自动上传训练日志实时 WebSocket 回传或模型发布逻辑

# 完整的 AI 模型训练流程框架(含适配器 + 注册表 + 数据处理 + 模型实例)

本架构设计适用于
- 多模型适配线性回归大语言模型分析模型等
- 模型训练流程节点的灵活控制归一化后处理等
- 支持用户控制与模型创建者控制的优先级机制
- 多数据集合并处理字段统一清洗归一化等
- 训练日志支持实时推送可接入 WebSocket

---

## 项目目录结构
```
ml_platform/
├── core/
   ├── registry.py                # 模型注册表
   ├── base_model.py             # 模型适配器基类
   └── linear_regression.py      # 示例模型:线性回归
├── pipeline/
   └── trainer.py                # 主训练流程控制器
├── preprocess/
   ├── dataset_handler.py        # 数据集读取与合并(支持 openpyxl)
   └── cleaner.py                # 数据清洗与预处理逻辑
└── run_train.py                 # 模拟训练流程入口
```

---

## core/registry.py
```python
# 模型注册中心,所有模型需注册进来以便训练器使用
MODEL_REGISTRY = {}

def register_model(model_type):
    def wrapper(cls):
        MODEL_REGISTRY[model_type] = cls
        return cls
    return wrapper

def get_model_class(model_type):
    return MODEL_REGISTRY.get(model_type)
```

---

## core/base_model.py
```python
# 所有模型必须继承的基类
from abc import ABC, abstractmethod

class BaseModel(ABC):
    def __init__(self, model_params, factor_params, control_config):
        self.model_params = model_params
        self.factor_params = factor_params
        self.control_config = control_config  # 归一化/后处理执行标志

    @abstractmethod
    def train(self, train_data):
        pass

    @abstractmethod
    def predict(self, test_data):
        pass

    def should_normalize(self):
        return self.control_config.get("normalize", False)

    def should_postprocess(self):
        return self.control_config.get("postprocess", False)
```

---

## core/linear_regression.py
```python
# 示例:线性回归模型
import numpy as np
from sklearn.linear_model import LinearRegression
from core.base_model import BaseModel
from core.registry import register_model

@register_model("linear_regression")
class LinearRegressionModel(BaseModel):
    def __init__(self, model_params, factor_params, control_config):
        super().__init__(model_params, factor_params, control_config)
        self.model = LinearRegression(**self.model_params)

    def train(self, train_data):
        X, y = train_data.drop(columns=["target"]), train_data["target"]
        self.model.fit(X, y)

    def predict(self, test_data):
        return self.model.predict(test_data)
```

---

## preprocess/dataset_handler.py
```python
# 使用 openpyxl 读取所有数据集内容,返回 pandas DataFrame
import pandas as pd
from openpyxl import load_workbook
import io

def read_excel_as_dataframe(file):
    if isinstance(file, (str, bytes)):
        wb = load_workbook(filename=file, read_only=True)
    else:
        wb = load_workbook(filename=io.BytesIO(file.read()), read_only=True)
    sheet = wb.active
    data = [[cell.value for cell in row] for row in sheet.iter_rows()]
    headers, rows = data[0], data[1:]
    return pd.DataFrame(rows, columns=headers)

def merge_datasets(files):
    dfs = [read_excel_as_dataframe(f) for f in files]
    # 对齐列名顺序后合并
    columns = dfs[0].columns
    aligned_dfs = [df[columns] for df in dfs]
    merged = pd.concat(aligned_dfs, ignore_index=True).drop_duplicates()
    return merged
```

---

## preprocess/cleaner.py
```python
# 数据清洗:包括因子缺失剔除、字段映射、归一化
import pandas as pd

def clean_data(df, factor_config):
    for factor in factor_config:
        name = factor["name"]
        dtype = factor.get("type")
        if dtype == "number":
            min_val = factor.get("min")
            max_val = factor.get("max")
            enum = factor.get("enum")
            if min_val is not None:
                df = df[df[name] >= min_val]
            if max_val is not None:
                df = df[df[name] <= max_val]
            if enum:
                df = df[df[name].isin(enum)]
        elif dtype == "string":
            enum = factor.get("enum")
            if enum:
                df = df[df[name].isin(enum)]
    df = df.dropna(subset=[f["name"] for f in factor_config])
    return df

def normalize_data(df, factor_config):
    for factor in factor_config:
        if factor.get("normalize") and factor.get("type") == "number":
            col = factor["name"]
            min_val = df[col].min()
            max_val = df[col].max()
            df[col] = (df[col] - min_val) / (max_val - min_val)
    return df

def split_data(df, train_ratio=0.8):
    df = df.sample(frac=1).reset_index(drop=True)
    split = int(len(df) * train_ratio)
    return df[:split], df[split:]
```

---

## pipeline/trainer.py
```python
from core.registry import get_model_class
from preprocess.dataset_handler import merge_datasets
from preprocess.cleaner import clean_data, normalize_data, split_data

class TrainingPipeline:
    def __init__(self, model_type, model_params, factor_params, dataset_files, control_flags):
        self.model_type = model_type
        self.model_params = model_params
        self.factor_params = factor_params
        self.dataset_files = dataset_files
        self.control_flags = control_flags

    def run(self):
        print("[流程] 合并数据集...")
        df = merge_datasets(self.dataset_files)

        print("[流程] 清洗数据...")
        df = clean_data(df, self.factor_params)

        if self.control_flags.get("normalize"):
            print("[流程] 数据归一化...")
            df = normalize_data(df, self.factor_params)

        print("[流程] 划分训练集/测试集...")
        train_df, test_df = split_data(df)

        print("[流程] 加载模型并训练...")
        model_cls = get_model_class(self.model_type)
        if not model_cls:
            raise ValueError(f"不支持的模型类型: {self.model_type}")

        model = model_cls(self.model_params, self.factor_params, self.control_flags)
        model.train(train_df)

        print("[流程] 训练完成,执行预测...")
        preds = model.predict(test_df.drop(columns=["target"]))

        if model.should_postprocess():
            print("[流程] 执行后处理...")
            # 可以添加反归一化等逻辑

        print("[流程] 保存模型与结果...")
        # TODO: 保存模型到 S3、本地等

        return preds
```

---

## run_train.py(训练入口模拟)
```python
from pipeline.trainer import TrainingPipeline

# 模拟前端请求传参
model_type = "linear_regression"
model_params = {"fit_intercept": True}
factor_params = [
    {"name": "age", "type": "number", "normalize": True, "min": 10, "max": 80},
    {"name": "gender", "type": "string", "enum": ["male", "female"]}
]
dataset_files = ["./datasets/sample1.xlsx", "./datasets/sample2.xlsx"]
control_flags = {"normalize": True, "postprocess": False}

pipeline = TrainingPipeline(
    model_type=model_type,
    model_params=model_params,
    factor_params=factor_params,
    dataset_files=dataset_files,
    control_flags=control_flags
)
preds = pipeline.run()
print("预测结果:", preds)
```

---

## ✅ 总结
该框架支持
- 各种模型注册与训练流程解耦
- 使用 openpyxl 进行 Excel 读取
- 基于因子配置的数据清洗归一化与拆分
- 模型训练与预测接口标准统一
- 控制逻辑灵活可扩展其他模型

📌 如需
- S3 发布与模型持久化
- 使用 Celery 加入任务队列
- 日志实时回传WebSocket  SSE

可继续扩展我可以为你添加相关模块

当然可以!我们以一个线性 DNN 回归模型为例,使用 PyTorch 编写完整训练流程,并展示如何使用训练集(train)、验证集(val)和测试集(test)。

我们会涵盖以下内容:

  1. 数据拆分(train/val/test)
  2. 构建模型
  3. 模型训练(使用 train、val)
  4. 模型评估(使用 test)
  5. 每一步代码都有详细注释 ✅

✅ 环境依赖(请先安装)

pip install torch pandas scikit-learn

✅ 完整代码(使用 PyTorch 实现线性 DNN 回归)

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import numpy as np

# ======== 模拟数据准备 =========
# 假设我们有一个房价预测数据集
def generate_synthetic_data(n=1000):
    np.random.seed(42)
    X = np.random.rand(n, 3) * 100  # 三个特征:面积、卧室数、距离
    y = 0.3 * X[:, 0] + 0.5 * X[:, 1] - 0.2 * X[:, 2] + np.random.randn(n) * 2
    return pd.DataFrame(X, columns=["area", "bedrooms", "distance"]), pd.Series(y, name="price")

# 生成数据
X_df, y_series = generate_synthetic_data()

# ======== 数据预处理(归一化) =========
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X_df)
X_scaled_df = pd.DataFrame(X_scaled, columns=X_df.columns)

# ======== 数据集拆分 =========
X_trainval, X_test, y_trainval, y_test = train_test_split(X_scaled_df, y_series, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_trainval, y_trainval, test_size=0.2, random_state=42)

# ======== PyTorch 数据集封装 =========
class RegressionDataset(Dataset):
    def __init__(self, X, y):
        self.X = torch.tensor(X.values, dtype=torch.float32)
        self.y = torch.tensor(y.values, dtype=torch.float32).view(-1, 1)
    
    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# 构建 Dataloader
train_loader = DataLoader(RegressionDataset(X_train, y_train), batch_size=32, shuffle=True)
val_loader = DataLoader(RegressionDataset(X_val, y_val), batch_size=32)
test_loader = DataLoader(RegressionDataset(X_test, y_test), batch_size=32)

# ======== 构建 DNN 模型 =========
class LinearDNNRegressor(nn.Module):
    def __init__(self, input_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Linear(32, 1)  # 输出为一个回归值
        )

    def forward(self, x):
        return self.net(x)

# 实例化模型、损失函数、优化器
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LinearDNNRegressor(input_dim=X_train.shape[1]).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# ======== 模型训练过程(使用 train 和 val)=========
num_epochs = 50
best_val_loss = float("inf")
best_model_state = None

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        y_pred = model(X_batch)
        loss = criterion(y_pred, y_batch)
        loss.backward()
        optimizer.step()
        train_loss += loss.item() * len(X_batch)

    train_loss /= len(train_loader.dataset)

    # 验证过程
    model.eval()
    val_loss = 0
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch, y_batch = X_batch.to(device), y_batch.to(device)
            y_pred = model(X_batch)
            loss = criterion(y_pred, y_batch)
            val_loss += loss.item() * len(X_batch)
    val_loss /= len(val_loader.dataset)

    # 打印日志
    print(f"[Epoch {epoch+1}] Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f}")

    # 保存最优模型
    if val_loss < best_val_loss:
        best_val_loss = val_loss
        best_model_state = model.state_dict()

# 恢复最优模型
model.load_state_dict(best_model_state)

# ======== 模型测试(使用 test)=========
model.eval()
test_loss = 0
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        y_pred = model(X_batch)
        loss = criterion(y_pred, y_batch)
        test_loss += loss.item() * len(X_batch)

test_loss /= len(test_loader.dataset)
print(f"Test Loss (MSE): {test_loss:.4f}")

✅ 每一步解释

数据部分:

  • generate_synthetic_data:生成三列特征和一个目标值的模拟数据。
  • train_test_split:将数据分为训练集、验证集和测试集。

模型训练:

  • 训练过程中每个 epoch:
    • 使用训练集(train_loader)进行梯度更新。
    • 使用验证集(val_loader)评估模型效果。
    • 如果验证损失更优,就保存当前模型参数。

模型评估:

  • 使用测试集(test_loader)在训练完成后一次性评估,报告最终模型在“未知数据”上的表现。

✅ 小结(训练 vs 验证 vs 测试)

阶段 目标 使用的数据
训练 更新模型参数 train_loader
验证 提前停止或调参 val_loader
测试 模拟生产环境评估 test_loader

如果你后续要将这套训练结构封装成可配置的适配器模块(如你前面提到的训练服务),我可以帮你封装并加上日志、模型发布和预测流程的对接。

是否需要我将这段代码封装为训练服务类并适配你前面讨论的接口?

下面是一个动态生成神经网络结构(DNN)的实现示例,并支持:

  • 🧠 动态设置层结构(隐藏层数量、大小)
  • 🔢 自动识别类别特征并添加 Embedding 层
  • 📐 Embedding 的维度(嵌入维度)根据类别数量动态确定
  • 🧪 输入可为 pandas DataFrame
  • ✅ 结构清晰、适合集成到前面训练流程中

✅ 使用示例前的说明

我们考虑如下输入特征场景:

  • 连续特征(数值型):area, distance
  • 类别特征(分类型):city, material(字符串或 int)

✅ 动态神经网络完整代码(含注释)

import torch
import torch.nn as nn
import pandas as pd

class DynamicDNN(nn.Module):
    def __init__(self, df: pd.DataFrame, 
                 categorical_cols: list[str], 
                 continuous_cols: list[str],
                 hidden_layers: list[int] = [64, 32],
                 embedding_dim_rule=lambda n: min(50, (n + 1) // 2)):
        """
        :param df: 原始 DataFrame,用于统计 embedding 信息
        :param categorical_cols: 类别型列名
        :param continuous_cols: 数值型列名
        :param hidden_layers: 隐藏层的结构,例如 [64, 32]
        :param embedding_dim_rule: 一个函数,输入类别数,输出 embedding 维度
        """
        super().__init__()
        self.categorical_cols = categorical_cols
        self.continuous_cols = continuous_cols

        # ===== 构建 Embedding 层 =====
        self.embeddings = nn.ModuleDict()
        total_emb_dim = 0
        for col in categorical_cols:
            num_classes = df[col].nunique()
            emb_dim = embedding_dim_rule(num_classes)
            self.embeddings[col] = nn.Embedding(num_classes, emb_dim)
            total_emb_dim += emb_dim

        self.input_dim = total_emb_dim + len(continuous_cols)

        # ===== 构建隐藏层(全连接) =====
        layers = []
        prev_dim = self.input_dim
        for h_dim in hidden_layers:
            layers.append(nn.Linear(prev_dim, h_dim))
            layers.append(nn.ReLU())
            prev_dim = h_dim
        layers.append(nn.Linear(prev_dim, 1))  # 回归任务输出一维
        self.model = nn.Sequential(*layers)

    def forward(self, x_cat: dict, x_cont: torch.Tensor):
        """
        :param x_cat: dict,每个 key 是类别列名,value 是 LongTensor 的 column vector
        :param x_cont: FloatTensor,连续特征的 batch 输入
        """
        emb_list = []
        for col in self.categorical_cols:
            emb = self.embeddings[col](x_cat[col])
            emb_list.append(emb)

        x = torch.cat(emb_list + [x_cont], dim=1)
        return self.model(x)

✅ 数据准备示例

import numpy as np
from sklearn.preprocessing import LabelEncoder

# 模拟数据
df = pd.DataFrame({
    "area": np.random.rand(1000) * 100,
    "distance": np.random.rand(1000) * 10,
    "city": np.random.choice(["A", "B", "C", "D"], size=1000),
    "material": np.random.choice(["wood", "brick", "steel"], size=1000),
    "price": np.random.rand(1000) * 500
})

categorical_cols = ["city", "material"]
continuous_cols = ["area", "distance"]

# Label encode categorical columns
label_encoders = {}
for col in categorical_cols:
    le = LabelEncoder()
    df[col] = le.fit_transform(df[col])
    label_encoders[col] = le

# 转换为张量输入
x_cat_tensor = {col: torch.tensor(df[col].values, dtype=torch.long) for col in categorical_cols}
x_cont_tensor = torch.tensor(df[continuous_cols].values, dtype=torch.float32)
y_tensor = torch.tensor(df["price"].values, dtype=torch.float32).view(-1, 1)

# 构建模型
model = DynamicDNN(df, categorical_cols, continuous_cols)
pred = model(x_cat_tensor, x_cont_tensor)

print("预测输出形状:", pred.shape)  # [batch_size, 1]

✅ 输出说明

  • Embedding 层的维度是动态确定的,如 city 有 4 个类别,则默认嵌入维度为 (4 + 1) // 2 = 2
  • 模型结构由 hidden_layers 控制,可自由扩展
  • 支持多列 embedding,同步处理连续值与类别值
  • 最后输出 shape 为 [batch_size, 1],适用于回归

✅ 接入你现有框架的方法建议

  1. 将此 DynamicDNN 模块添加为模型适配器的实现类(如之前的 BaseModelAdapter 子类)。
  2. 在数据清洗过程中,自动识别类别列并传入 DynamicDNN
  3. 模型训练器调用 model(x_cat_dict, x_cont_tensor) 进行前向传播。
  4. 自动构建 x_cat_dictx_cont_tensor(已集成在前面的 Dataset 封装也可以做这个事)。

如果你希望我帮你封装成适配器模型添加训练和预测模块加入日志或任务队列支持,我可以继续帮你构建下一个模块。是否继续?

下面是将 DNN 回归模型封装成适配器形式,并添加完整的训练、验证、测试与预测模块的代码。你可以将这套代码作为平台中“模型适配器”的一部分集成进通用训练流程中。


📦 目录结构建议

ml_adapters/
├── __init__.py
├── base.py              # 基类定义
├── dnn_regression.py    # 本次实现的 DNN 模型适配器

🧩 base.py – 通用模型适配器基类

# ml_adapters/base.py
from abc import ABC, abstractmethod

class BaseModelAdapter(ABC):
    @abstractmethod
    def train(self, train_data, val_data, **kwargs):
        pass

    @abstractmethod
    def evaluate(self, test_data):
        pass

    @abstractmethod
    def predict(self, input_data):
        pass

    @abstractmethod
    def save(self, path):
        pass

    @abstractmethod
    def load(self, path):
        pass

🧠 dnn_regression.py – DNN 适配器实现

# ml_adapters/dnn_regression.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
import pandas as pd
import os

from .base import BaseModelAdapter


class DynamicDNN(nn.Module):
    def __init__(self, df: pd.DataFrame, categorical_cols, continuous_cols, hidden_layers=[64, 32]):
        super().__init__()
        self.categorical_cols = categorical_cols
        self.continuous_cols = continuous_cols

        self.embeddings = nn.ModuleDict()
        total_emb_dim = 0
        for col in categorical_cols:
            num_classes = df[col].nunique()
            emb_dim = min(50, (num_classes + 1) // 2)
            self.embeddings[col] = nn.Embedding(num_classes, emb_dim)
            total_emb_dim += emb_dim

        self.input_dim = total_emb_dim + len(continuous_cols)

        layers = []
        prev = self.input_dim
        for h in hidden_layers:
            layers.append(nn.Linear(prev, h))
            layers.append(nn.ReLU())
            prev = h
        layers.append(nn.Linear(prev, 1))
        self.model = nn.Sequential(*layers)

    def forward(self, x_cat_dict, x_cont):
        emb_list = [self.embeddings[k](x_cat_dict[k]) for k in self.categorical_cols]
        x = torch.cat(emb_list + [x_cont], dim=1)
        return self.model(x)


class DNNRegressionAdapter(BaseModelAdapter):
    def __init__(self, df, categorical_cols, continuous_cols, target_col, hidden_layers=[64, 32], device=None):
        self.df = df
        self.categorical_cols = categorical_cols
        self.continuous_cols = continuous_cols
        self.target_col = target_col
        self.hidden_layers = hidden_layers
        self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
        self.model = DynamicDNN(df, categorical_cols, continuous_cols, hidden_layers).to(self.device)

    def _prepare_loader(self, df, batch_size=32, shuffle=True):
        x_cat = {col: torch.tensor(df[col].values, dtype=torch.long).to(self.device) for col in self.categorical_cols}
        x_cont = torch.tensor(df[self.continuous_cols].values, dtype=torch.float32).to(self.device)
        y = torch.tensor(df[self.target_col].values, dtype=torch.float32).view(-1, 1).to(self.device)
        return DataLoader(TensorDataset(x_cont, y, *x_cat.values()), batch_size=batch_size, shuffle=shuffle)

    def _batch_to_input(self, batch):
        x_cont, y, *cat_vals = batch
        x_cat_dict = {col: val for col, val in zip(self.categorical_cols, cat_vals)}
        return x_cat_dict, x_cont, y

    def train(self, train_df, val_df=None, epochs=10, lr=0.001, batch_size=32):
        train_loader = self._prepare_loader(train_df, batch_size)
        val_loader = self._prepare_loader(val_df, batch_size, shuffle=False) if val_df is not None else None

        optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
        criterion = nn.MSELoss()

        for epoch in range(epochs):
            self.model.train()
            total_loss = 0
            for batch in train_loader:
                x_cat, x_cont, y = self._batch_to_input(batch)
                pred = self.model(x_cat, x_cont)
                loss = criterion(pred, y)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()
                total_loss += loss.item()
            print(f"[Train] Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(train_loader):.4f}")

            if val_loader:
                self.evaluate(val_df, prefix="[Val]")

    def evaluate(self, test_df, prefix="[Test]"):
        test_loader = self._prepare_loader(test_df, shuffle=False)
        self.model.eval()
        total_loss = 0
        criterion = nn.MSELoss()
        with torch.no_grad():
            for batch in test_loader:
                x_cat, x_cont, y = self._batch_to_input(batch)
                pred = self.model(x_cat, x_cont)
                loss = criterion(pred, y)
                total_loss += loss.item()
        print(f"{prefix} MSE: {total_loss/len(test_loader):.4f}")

    def predict(self, input_df):
        self.model.eval()
        loader = self._prepare_loader(input_df, shuffle=False)
        predictions = []
        with torch.no_grad():
            for batch in loader:
                x_cat, x_cont, _ = self._batch_to_input(batch)
                pred = self.model(x_cat, x_cont)
                predictions.append(pred.cpu())
        return torch.cat(predictions).numpy()

    def save(self, path):
        torch.save(self.model.state_dict(), path)

    def load(self, path):
        self.model.load_state_dict(torch.load(path, map_location=self.device))

✅ 测试示例

import pandas as pd
from sklearn.model_selection import train_test_split
from ml_adapters.dnn_regression import DNNRegressionAdapter

# 生成测试数据
df = pd.DataFrame({
    "city": pd.Categorical(["A", "B", "A", "C", "B", "A"]).codes,
    "material": pd.Categorical(["wood", "steel", "brick", "wood", "steel", "brick"]).codes,
    "area": [50, 60, 70, 80, 90, 100],
    "distance": [3, 4, 5, 6, 7, 8],
    "price": [100, 120, 140, 160, 180, 200]
})

train_df, test_df = train_test_split(df, test_size=0.33)

model = DNNRegressionAdapter(
    df=train_df,
    categorical_cols=["city", "material"],
    continuous_cols=["area", "distance"],
    target_col="price"
)

model.train(train_df, test_df, epochs=10)
model.evaluate(test_df)

preds = model.predict(test_df)
print("预测结果:", preds)

# 模型保存 & 加载
model.save("dnn_model.pth")
model.load("dnn_model.pth")

✅ 支持集成任务流程

你可以把这个类注册到你的适配器工厂,例如:

ADAPTERS = {
    "dnn_regression": DNNRegressionAdapter,
    # future: "linear_regression": LinearRegressionAdapter,
}

是否需要我继续:

  • 将其接入训练主流程管理器
  • 添加日志记录与任务 ID 标识
  • 提供模型发布(如 S3 上传)与预测服务接口

需要我继续完善哪些部分?

是的,可以并且推荐将归一化和反归一化逻辑抽象出来,作为可插拔的归一化策略模块(Strategy Pattern),这样可以:

  • ✅ 支持多种归一化方法(MinMax、Z-score、Log 等);
  • ✅ 保证归一化与反归一化是一一对应的;
  • ✅ 保持训练模块解耦,只依赖统一接口;
  • ✅ 易于测试和扩展。

✅ 一、结构设计(归一化策略接口)

normalization/
├── __init__.py
├── base.py                  # 抽象基类
├── minmax.py                # MinMax归一化
├── zscore.py                # ZScore标准化
├── factory.py               # 创建策略的工厂方法

✅ 二、代码实现

base.py(抽象归一化接口)

from abc import ABC, abstractmethod
import pandas as pd

class Normalizer(ABC):
    @abstractmethod
    def fit(self, series: pd.Series):
        """计算归一化所需的参数"""
        pass

    @abstractmethod
    def transform(self, series: pd.Series) -> pd.Series:
        """执行归一化"""
        pass

    @abstractmethod
    def inverse_transform(self, series: pd.Series) -> pd.Series:
        """反归一化"""
        pass

    @abstractmethod
    def get_params(self) -> dict:
        """返回参数(用于持久化或调试)"""
        pass

minmax.py

from .base import Normalizer
import pandas as pd

class MinMaxNormalizer(Normalizer):
    def __init__(self):
        self.min_val = None
        self.max_val = None

    def fit(self, series: pd.Series):
        self.min_val = float(series.min())
        self.max_val = float(series.max())

    def transform(self, series: pd.Series) -> pd.Series:
        return (series - self.min_val) / (self.max_val - self.min_val + 1e-8)

    def inverse_transform(self, series: pd.Series) -> pd.Series:
        return series * (self.max_val - self.min_val) + self.min_val

    def get_params(self):
        return {"min": self.min_val, "max": self.max_val}

zscore.py

from .base import Normalizer
import pandas as pd

class ZScoreNormalizer(Normalizer):
    def __init__(self):
        self.mean = None
        self.std = None

    def fit(self, series: pd.Series):
        self.mean = float(series.mean())
        self.std = float(series.std())

    def transform(self, series: pd.Series) -> pd.Series:
        return (series - self.mean) / (self.std + 1e-8)

    def inverse_transform(self, series: pd.Series) -> pd.Series:
        return series * self.std + self.mean

    def get_params(self):
        return {"mean": self.mean, "std": self.std}

factory.py(策略工厂)

from .minmax import MinMaxNormalizer
from .zscore import ZScoreNormalizer

def get_normalizer(strategy: str):
    """
    获取归一化策略
    """
    strategy = strategy.lower()
    if strategy == "minmax":
        return MinMaxNormalizer()
    elif strategy == "zscore":
        return ZScoreNormalizer()
    else:
        raise ValueError(f"Unsupported normalization strategy: {strategy}")

✅ 三、在数据预处理中使用归一化策略

DataPreprocessor 中集成:

from normalization.factory import get_normalizer

class DataPreprocessor:
    def __init__(self, factor_configs: Dict[str, Dict]):
        self.factor_configs = factor_configs
        self.normalizers = {}

    def normalize(self, df: pd.DataFrame) -> pd.DataFrame:
        for col, cfg in self.factor_configs.items():
            if cfg.get("type") == "numerical" and cfg.get("normalize") and not cfg.get("encoded"):
                strategy = cfg.get("strategy", "minmax")
                normalizer = get_normalizer(strategy)
                normalizer.fit(df[col])
                df[col] = normalizer.transform(df[col])
                self.normalizers[col] = normalizer
        return df

    def inverse_normalize(self, df: pd.DataFrame) -> pd.DataFrame:
        for col, normalizer in self.normalizers.items():
            df[col] = normalizer.inverse_transform(df[col])
        return df

✅ 四、字段配置中的扩展参数示例

factor_configs = {
    "age": {
        "type": "numerical",
        "normalize": True,
        "strategy": "minmax",     # 也可以使用 "zscore"
        "encoded": False
    },
    "height": {
        "type": "numerical",
        "normalize": True,
        "strategy": "zscore",
        "encoded": False
    }
}

✅ 五、优势总结

优点 描述
✅ 插拔式策略 可以快速添加自定义归一化方式
✅ 接口统一 所有归一化类都有 fit/transform/inverse_transform
✅ 模块解耦 训练器/预处理器不依赖具体实现细节
✅ 适配场景广 不同模型可要求不同字段归一化策略

是否还需要我:

  • 增加自定义归一化方式(如对数归一化、最大绝对值归一化)?
  • 将归一化策略序列化保存,方便模型部署反归一化时加载使用?
  • 加入日志记录和调试模式?

我可以根据你的使用场景继续封装。

为了在模型部署时支持 反归一化操作,我们需要将每个字段使用的归一化策略和其参数进行 序列化保存(保存到文件或数据库),并在加载模型时一并还原这些归一化配置。


✅ 一、核心目标

  • 在训练阶段:
    • 每个字段的归一化策略(如 MinMax、ZScore)和对应参数被保存;
  • 在预测/部署阶段:
    • 从保存的配置中 加载归一化器对象,用于 inverse_transform 反归一化操作。

✅ 二、改造 Normalizer 接口(支持保存/加载)

base.py 增加 to_dict() / from_dict() 方法

from abc import ABC, abstractmethod
import pandas as pd

class Normalizer(ABC):
    @abstractmethod
    def fit(self, series: pd.Series):
        pass

    @abstractmethod
    def transform(self, series: pd.Series) -> pd.Series:
        pass

    @abstractmethod
    def inverse_transform(self, series: pd.Series) -> pd.Series:
        pass

    @abstractmethod
    def get_params(self) -> dict:
        pass

    @abstractmethod
    def to_dict(self) -> dict:
        """用于序列化保存"""
        pass

    @classmethod
    @abstractmethod
    def from_dict(cls, data: dict):
        """从序列化数据中还原归一化器"""
        pass

✅ 三、实现 MinMaxNormalizer 的保存与加载

minmax.py

from .base import Normalizer
import pandas as pd

class MinMaxNormalizer(Normalizer):
    def __init__(self, min_val=None, max_val=None):
        self.min_val = min_val
        self.max_val = max_val

    def fit(self, series: pd.Series):
        self.min_val = float(series.min())
        self.max_val = float(series.max())

    def transform(self, series: pd.Series) -> pd.Series:
        return (series - self.min_val) / (self.max_val - self.min_val + 1e-8)

    def inverse_transform(self, series: pd.Series) -> pd.Series:
        return series * (self.max_val - self.min_val) + self.min_val

    def get_params(self):
        return {"min": self.min_val, "max": self.max_val}

    def to_dict(self):
        return {
            "type": "minmax",
            "min_val": self.min_val,
            "max_val": self.max_val
        }

    @classmethod
    def from_dict(cls, data: dict):
        return cls(min_val=data["min_val"], max_val=data["max_val"])

✅ 四、通用加载函数

factory.py 扩展反序列化功能

from .minmax import MinMaxNormalizer
from .zscore import ZScoreNormalizer

NORMALIZER_CLASSES = {
    "minmax": MinMaxNormalizer,
    "zscore": ZScoreNormalizer
}

def get_normalizer(strategy: str):
    strategy = strategy.lower()
    if strategy in NORMALIZER_CLASSES:
        return NORMALIZER_CLASSES[strategy]()
    else:
        raise ValueError(f"Unsupported normalization strategy: {strategy}")

def load_normalizer(data: dict):
    strategy = data["type"]
    if strategy in NORMALIZER_CLASSES:
        return NORMALIZER_CLASSES[strategy].from_dict(data)
    else:
        raise ValueError(f"Unknown normalizer type in serialized data: {strategy}")

✅ 五、保存和加载示例(JSON 文件)

✅ 保存归一化器配置

import json

def save_normalizers(normalizers: dict, path: str):
    """
    保存归一化器配置为 JSON 文件
    """
    data = {col: normalizer.to_dict() for col, normalizer in normalizers.items()}
    with open(path, "w", encoding="utf-8") as f:
        json.dump(data, f, indent=2)

✅ 加载归一化器配置

from normalization.factory import load_normalizer

def load_normalizers(path: str) -> dict:
    """
    加载归一化器配置
    """
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    return {col: load_normalizer(cfg) for col, cfg in data.items()}

✅ 六、在训练与预测阶段使用

训练结束时保存配置:

preprocessor = DataPreprocessor(factor_configs)
df = preprocessor.normalize(df)
save_normalizers(preprocessor.normalizers, "normalizers.json")

部署预测时加载配置并反归一化:

normalizers = load_normalizers("normalizers.json")
for col, normalizer in normalizers.items():
    df[col] = normalizer.inverse_transform(df[col])

✅ 七、总结优势

功能点 说明
✅ 归一化策略可选 每个字段独立选择合适的归一化策略
✅ 可反归一化 支持模型预测输出回转到原始单位
✅ 可持久化 可存为 JSON,适合模型部署与调试
✅ 模型解耦 模型本身只处理数值,不关心归一化策略

如果你需要,我也可以扩展支持:

  • 将归一化策略保存在数据库中;
  • normalizers.json 自动与模型文件打包;
  • 多模型共享的归一化缓存目录设计。

是否需要将它们集成进你当前的训练流程结构?

反归一化的使用位置取决于你的 目标 是什么:可视化指标、还原输出、或生成结果报告。下面是详细的说明:


✅ 一、反归一化应该使用在哪些场景?

📌 1. 预测结果输出前(部署或离线推理)

场景:你使用模型预测数据后,需要将模型输出还原为用户可理解的原始单位(如价格、温度、销售量等)。

  • 位置:模型预测结果之后,模型输出 → inverse_transform
  • 是否必须:✅必须,如果模型输出的是归一化后的值
# 模型预测
y_pred_norm = model.predict(X_test)

# 反归一化
y_pred = normalizer.inverse_transform(y_pred_norm)

📌 2. 评估指标计算前(MSE、MAE、R² 等)

场景:你希望评估指标的单位是“真实世界单位”,否则归一化数据下的 MAE、MSE 没有实际意义。

  • 位置:模型预测 + 标签都需反归一化后再计算指标
  • 是否必须:✅推荐(除非你就想看归一化单位下的误差)
# 反归一化
y_pred = normalizer.inverse_transform(y_pred_norm)
y_true = normalizer.inverse_transform(y_test)

# 再进行误差评估
mae = mean_absolute_error(y_true, y_pred)

📌 3. 训练集输出可视化(非必须)

场景:如果你在训练后想画图观察模型对训练集拟合得如何,通常你也会想看原始单位的输出。

  • 位置:仅用于可视化、报告生成等环节
  • 是否必须:❌可选

✅ 二、反归一化应该 不使用 的地方

🚫 1. 模型内部(例如 loss 计算)

  • 在模型训练过程中,所有的 loss 计算、反向传播都应在 归一化后的数据 上进行。
  • 不应在模型内部对输出 inverse_transform 后再计算 loss。
# 错误 ❌
loss = MSE(y_true, normalizer.inverse_transform(model(X)))

# 正确 ✅
loss = MSE(y_true_normalized, model(X))

✅ 三、你的情况说明

我现在是在模型训练后的外部对训练数据进行的反归一化

✅ 正确做法应是:

  1. 训练前归一化输入和输出
  2. 训练时使用归一化后的数据
  3. 训练后不对训练数据反归一化
    • 除非你要画图看模型效果,或输出预测结果
  4. 预测时模型输出进行反归一化

✅ 四、总结

阶段 是否归一化 是否反归一化 原因
训练前数据 ✅ 需要 ❌ 不需要 提升模型训练稳定性
模型内部 ✅ 使用归一化 ❌ 不做反归一化 loss 计算基于归一化
模型输出 ✅ 输出归一化 ✅ 再反归一化 提供用户真实单位的预测
评估指标 ✅ 预测+标签反归一化 保证指标有实际意义
可视化 ✅ 反归一化 呈现原始值曲线

是否需要我整合到你目前的模型训练和预测框架中?我可以直接帮你写一个完整的训练 + 预测 + 反归一化评估的示例。

下面是一个整合了训练、预测、反归一化、评估的完整流程代码示例,使用线性回归(或 DNN)模型为例,并且适配你的框架结构:


✅ 一、目录结构概览(推荐)

ml_platform/
├── normalizers/
│   └── base.py         # 归一化策略基础类
│   └── minmax.py       # MinMax归一化
├── models/
│   └── base_adapter.py # 模型适配器接口
│   └── linear.py       # 线性模型适配器
├── trainer/
│   └── trainer.py      # 核心训练与评估逻辑
├── utils/
│   └── io.py           # 模型保存与归一化参数保存
└── main.py             # 示例入口点

✅ 二、归一化组件(minmax.py)

# ml_platform/normalizers/minmax.py
import numpy as np
import json

class MinMaxNormalizer:
    def __init__(self):
        self.min = None
        self.max = None

    def fit(self, data: np.ndarray):
        self.min = data.min(axis=0)
        self.max = data.max(axis=0)

    def transform(self, data: np.ndarray) -> np.ndarray:
        return (data - self.min) / (self.max - self.min + 1e-8)

    def inverse_transform(self, data: np.ndarray) -> np.ndarray:
        return data * (self.max - self.min + 1e-8) + self.min

    def save(self, filepath):
        with open(filepath, 'w') as f:
            json.dump({'min': self.min.tolist(), 'max': self.max.tolist()}, f)

    def load(self, filepath):
        with open(filepath, 'r') as f:
            data = json.load(f)
            self.min = np.array(data['min'])
            self.max = np.array(data['max'])

✅ 三、模型适配器接口与线性实现(base_adapter.py, linear.py)

# ml_platform/models/base_adapter.py
class BaseModelAdapter:
    def train(self, X_train, y_train, X_val, y_val): ...
    def predict(self, X): ...
    def save(self, path): ...
    def load(self, path): ...

# ml_platform/models/linear.py
from sklearn.linear_model import LinearRegression
from .base_adapter import BaseModelAdapter
import joblib

class LinearModelAdapter(BaseModelAdapter):
    def __init__(self):
        self.model = LinearRegression()

    def train(self, X_train, y_train, X_val=None, y_val=None):
        self.model.fit(X_train, y_train)

    def predict(self, X):
        return self.model.predict(X)

    def save(self, path):
        joblib.dump(self.model, path)

    def load(self, path):
        self.model = joblib.load(path)

✅ 四、训练器实现(trainer.py)

# ml_platform/trainer/trainer.py
import numpy as np
from ml_platform.normalizers.minmax import MinMaxNormalizer

class ModelTrainer:
    def __init__(self, model_adapter, normalization_enabled=True):
        self.model = model_adapter
        self.normalizer = MinMaxNormalizer() if normalization_enabled else None

    def train(self, X_train, y_train, X_val=None, y_val=None):
        if self.normalizer:
            self.normalizer.fit(y_train)
            y_train = self.normalizer.transform(y_train)
            if y_val is not None:
                y_val = self.normalizer.transform(y_val)

        self.model.train(X_train, y_train, X_val, y_val)

    def predict(self, X):
        y_pred = self.model.predict(X)
        if self.normalizer:
            y_pred = self.normalizer.inverse_transform(y_pred)
        return y_pred

    def evaluate(self, X_test, y_test):
        y_pred = self.model.predict(X_test)
        if self.normalizer:
            y_pred = self.normalizer.inverse_transform(y_pred)
            y_test = self.normalizer.inverse_transform(y_test)

        mae = np.mean(np.abs(y_test - y_pred))
        mse = np.mean((y_test - y_pred) ** 2)
        return {"MAE": mae, "MSE": mse}

✅ 五、测试入口(main.py)

# ml_platform/main.py
import numpy as np
from ml_platform.models.linear import LinearModelAdapter
from ml_platform.trainer.trainer import ModelTrainer

# 1. 模拟数据
X = np.random.rand(100, 5)
y = X @ np.array([1.5, -2, 0.5, 3.0, 0.2]) + 0.1 * np.random.randn(100)

# 2. 划分数据集
train_size = int(len(X) * 0.7)
val_size = int(len(X) * 0.1)
X_train, X_val, X_test = X[:train_size], X[train_size:train_size+val_size], X[train_size+val_size:]
y_train, y_val, y_test = y[:train_size], y[train_size:train_size+val_size], y[train_size+val_size:]

# 3. 模型初始化与训练
model_adapter = LinearModelAdapter()
trainer = ModelTrainer(model_adapter)
trainer.train(X_train, y_train.reshape(-1, 1), X_val, y_val.reshape(-1, 1))

# 4. 模型预测与评估
pred = trainer.predict(X_test)
metrics = trainer.evaluate(X_test, y_test.reshape(-1, 1))

print("预测值:", pred[:5].flatten())
print("评估结果:", metrics)

✅ 六、重点说明

模块 说明
MinMaxNormalizer 实现了 fit, transform, inverse_transform, 可序列化
LinearModelAdapter 适配 sklearn 的线性回归模型,封装 train, predict, save/load
ModelTrainer 负责整体训练流程,适配归一化、反归一化逻辑与评估
main.py 模拟流程并展示整个训练、预测、评估过程

是否还希望我添加:

  • 对归一化参数的保存/加载逻辑(部署使用)?
  • 支持流式训练日志回传给前端?
  • 接入任务队列(如 Celery)调用整个流程?

我们将归一化与反归一化模块作为流程节点 NormalizationNode,整合进现有的训练适配器结构中,并支持日志回传、策略保存、加载与切换归一化方法。以下是完整的模块集成方案。


✅ 模块结构说明

ml_pipeline/
│
├── base_node.py               # 流程节点基类
├── normalization_strategies.py  # 多种归一化策略定义
├── normalization_node.py     # 封装的归一化流程节点
├── training_flow.py          # 训练流程定义与任务运行管理
├── adapter_base.py           # 模型训练适配器基类
├── dnn_regression_adapter.py # 示例:DNN回归模型适配器
└── utils/
    └── logger.py             # 日志回传模块

1️⃣ base_node.py

# ml_pipeline/base_node.py
from abc import ABC, abstractmethod

class BaseNode(ABC):
    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def run(self, context: dict) -> dict:
        """执行当前节点的逻辑,接收并返回 context"""
        pass

2️⃣ normalization_strategies.py

# ml_pipeline/normalization_strategies.py
import pandas as pd
import joblib
from abc import ABC, abstractmethod

class NormalizationStrategy(ABC):
    @abstractmethod
    def fit(self, data: pd.DataFrame): pass

    @abstractmethod
    def transform(self, data: pd.DataFrame) -> pd.DataFrame: pass

    @abstractmethod
    def inverse_transform(self, data: pd.DataFrame) -> pd.DataFrame: pass

    def save(self, path: str):
        joblib.dump(self, path)

    def load(self, path: str):
        obj = joblib.load(path)
        self.__dict__.update(obj.__dict__)

class MinMaxNormalization(NormalizationStrategy):
    def __init__(self):
        self.min = None
        self.max = None

    def fit(self, data: pd.DataFrame):
        self.min = data.min()
        self.max = data.max()

    def transform(self, data: pd.DataFrame) -> pd.DataFrame:
        return (data - self.min) / (self.max - self.min + 1e-8)

    def inverse_transform(self, data: pd.DataFrame) -> pd.DataFrame:
        return data * (self.max - self.min + 1e-8) + self.min

class StandardNormalization(NormalizationStrategy):
    def __init__(self):
        self.mean = None
        self.std = None

    def fit(self, data: pd.DataFrame):
        self.mean = data.mean()
        self.std = data.std()

    def transform(self, data: pd.DataFrame) -> pd.DataFrame:
        return (data - self.mean) / (self.std + 1e-8)

    def inverse_transform(self, data: pd.DataFrame) -> pd.DataFrame:
        return data * (self.std + 1e-8) + self.mean

def get_normalization_strategy(name: str) -> NormalizationStrategy:
    if name == "minmax":
        return MinMaxNormalization()
    elif name == "standard":
        return StandardNormalization()
    else:
        raise ValueError(f"Unknown normalization strategy: {name}")

3️⃣ normalization_node.py

# ml_pipeline/normalization_node.py
from .base_node import BaseNode
from .normalization_strategies import get_normalization_strategy
from typing import Optional
import pandas as pd
import os

class NormalizationNode(BaseNode):
    def __init__(self, strategy_name: str, normalize_columns: Optional[list] = None, save_path: Optional[str] = None):
        super().__init__("NormalizationNode")
        self.strategy_name = strategy_name
        self.normalize_columns = normalize_columns
        self.save_path = save_path or f"./normalizers/{strategy_name}_normalizer.pkl"
        os.makedirs(os.path.dirname(self.save_path), exist_ok=True)

    def run(self, context: dict) -> dict:
        df = context["train_data"]
        strategy = get_normalization_strategy(self.strategy_name)
        columns = self.normalize_columns or df.select_dtypes(include=['float', 'int']).columns.tolist()
        strategy.fit(df[columns])
        df[columns] = strategy.transform(df[columns])
        context["train_data"] = df
        strategy.save(self.save_path)
        context["normalization_strategy"] = strategy
        context["normalize_columns"] = columns
        return context

4️⃣ 训练流程整合(training_flow.py)

# ml_pipeline/training_flow.py
from .normalization_node import NormalizationNode
from .base_node import BaseNode
from typing import List
import logging

logger = logging.getLogger(__name__)

class TrainingFlow:
    def __init__(self, nodes: List[BaseNode]):
        self.nodes = nodes

    def run(self, context: dict):
        for node in self.nodes:
            logger.info(f"Running node: {node.name}")
            context = node.run(context)
        return context

✅ 示例使用(run_training.py)

from ml_pipeline.training_flow import TrainingFlow
from ml_pipeline.normalization_node import NormalizationNode
from mynodes.data_cleaning_node import DataCleaningNode
from mynodes.train_model_node import TrainModelNode
import pandas as pd

# 加载训练数据
df = pd.read_csv("data.csv")

context = {
    "train_data": df,
    "model_type": "dnn_regression"
}

flow = TrainingFlow(nodes=[
    DataCleaningNode(),
    NormalizationNode(strategy_name="minmax"),
    TrainModelNode()
])

context = flow.run(context)

🔁 在模型推理时反归一化(例如预测)

# predict.py
from ml_pipeline.normalization_strategies import get_normalization_strategy

strategy = get_normalization_strategy("minmax")
strategy.load("./normalizers/minmax_normalizer.pkl")

# 假设 result 是预测输出
inverse_result = strategy.inverse_transform(result)

🔍 总结

步骤 功能 模块
初始化 归一化节点添加到流程 NormalizationNode
执行 归一化训练数据 run() 方法
保存 归一化策略持久化 joblib.dump()
预测 使用相同策略反归一化预测值 inverse_transform()

需要我进一步为你接入模型适配器结构(比如 DNNRegressionAdapter)并结合流程节点自动运行吗?我也可以帮你生成对应的训练日志流式返回逻辑。

下面是完整机制的实现,确保训练和预测阶段使用一致的归一化策略,包括:

  1. ✅ 流程节点封装(归一化、反归一化)
  2. ✅ 策略保存与加载
  3. ✅ 模型元信息记录归一化策略路径
  4. ✅ 示例代码集成与注释

✅ 1. 归一化策略基类 + 多策略实现

# normalization/strategies/base.py
import abc
import pandas as pd
import pickle

class BaseNormalizer(abc.ABC):
    def __init__(self, fields: list[str]):
        self.fields = fields

    @abc.abstractmethod
    def fit(self, df: pd.DataFrame): ...
    
    @abc.abstractmethod
    def transform(self, df: pd.DataFrame) -> pd.DataFrame: ...
    
    @abc.abstractmethod
    def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame: ...
    
    def save(self, path: str):
        with open(path, "wb") as f:
            pickle.dump(self, f)
    
    @classmethod
    def load(cls, path: str) -> "BaseNormalizer":
        with open(path, "rb") as f:
            return pickle.load(f)
# normalization/strategies/minmax.py
import pandas as pd
from .base import BaseNormalizer

class MinMaxNormalizer(BaseNormalizer):
    def __init__(self, fields: list[str]):
        super().__init__(fields)
        self.min = {}
        self.max = {}

    def fit(self, df: pd.DataFrame):
        for field in self.fields:
            self.min[field] = df[field].min()
            self.max[field] = df[field].max()

    def transform(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        for field in self.fields:
            df[field] = (df[field] - self.min[field]) / (self.max[field] - self.min[field])
        return df

    def inverse_transform(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        for field in self.fields:
            df[field] = df[field] * (self.max[field] - self.min[field]) + self.min[field]
        return df

✅ 2. 归一化策略工厂

# normalization/factory.py
from .strategies.minmax import MinMaxNormalizer

def get_normalizer(name: str, fields: list[str]):
    if name == "minmax":
        return MinMaxNormalizer(fields)
    raise ValueError(f"Unsupported normalization strategy: {name}")

✅ 3. 封装流程节点(Node)

# pipeline/nodes/normalize.py
from normalization.factory import get_normalizer

class NormalizationNode:
    def __init__(self, strategy_name: str, fields: list[str], save_path: str):
        self.strategy_name = strategy_name
        self.fields = fields
        self.save_path = save_path

    def run(self, df: pd.DataFrame) -> pd.DataFrame:
        strategy = get_normalizer(self.strategy_name, self.fields)
        strategy.fit(df)
        df_transformed = strategy.transform(df)
        strategy.save(self.save_path)
        return df_transformed
# pipeline/nodes/inverse_normalize.py
from normalization.factory import get_normalizer

class InverseNormalizationNode:
    def __init__(self, strategy_name: str, fields: list[str], load_path: str):
        self.strategy_name = strategy_name
        self.fields = fields
        self.load_path = load_path

    def run(self, df: pd.DataFrame) -> pd.DataFrame:
        strategy = get_normalizer(self.strategy_name, self.fields)
        strategy = strategy.load(self.load_path)
        return strategy.inverse_transform(df)

✅ 4. 模型元信息保存格式(示例)

{
  "model_id": "model_xyz",
  "normalization": {
    "strategy": "minmax",
    "fields": ["age", "income"],
    "path": "./models/model_xyz/normalizer_minmax.pkl"
  }
}

可以保存在数据库字段、JSON配置或模型目录中。


✅ 5. 在训练流程中使用

df_raw = pd.read_csv(dataset_path)
fields_to_normalize = ["age", "income"]

# 步骤1:归一化
norm_node = NormalizationNode("minmax", fields_to_normalize, save_path="./models/model_xyz/normalizer_minmax.pkl")
df_normalized = norm_node.run(df_raw)

# 步骤2:训练模型...

✅ 6. 在预测流程中使用

df_input = pd.read_csv(input_path)
fields_to_normalize = ["age", "income"]

# 步骤1:归一化输入
strategy = get_normalizer("minmax", fields_to_normalize).load("./models/model_xyz/normalizer_minmax.pkl")
df_input_normalized = strategy.transform(df_input)

# 步骤2:预测
predictions = model.predict(df_input_normalized)

# 步骤3:反归一化
inverse_node = InverseNormalizationNode("minmax", ["target"], "./models/model_xyz/normalizer_minmax.pkl")
df_output = inverse_node.run(predictions)

✅ 下一步建议

你可以:

  • ✅ 将这些节点整合入你已有的 TrainingPipeline
  • ✅ 为预测流程增加相应的标准 Node
  • ✅ 添加数据库记录支持记录 normalizer_path

是否需要我为你:

  • 添加其他归一化策略(如 Z-Score、Robust)?
  • 将这些节点自动加入你的流程框架中并支持 YAML/JSON 配置方式加载?
  • 为训练/预测流程封装完整的 CLI 或 Django API 接口?

告诉我你的下一步目标,我帮你完善。

当然可以!你完全可以不使用任何第三方库,使用 Python 标准库 + 自定义结构 来实现一个模块化、流程化的模型训练系统。

下面我将为你提供一个不依赖任何第三方框架(如 Airflow、Prefect、Hydra 等)的训练流程设计方案,包含:

  • ✅ 核心流程引擎(支持链式 / 函数式)
  • ✅ 配置驱动支持(基于字典或 JSON)
  • ✅ 无第三方依赖
  • ✅ 简洁清晰、适合平台扩展

✅ 一、核心思路

  • 每个流程步骤是一个类,继承 BaseNode,实现 run(context) 方法。
  • context 是一个 Python 字典,传递数据。
  • Pipeline 类接收多个步骤,并依次调用它们的 run()

✅ 二、完整示例代码(无第三方依赖)

1️⃣ base_node.py — 基础流程节点定义

# base_node.py
from abc import ABC, abstractmethod

class BaseNode(ABC):
    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def run(self, context: dict) -> dict:
        """处理 context 并返回更新后的 context"""
        pass

2️⃣ 自定义几个节点(步骤模块)

# nodes.py
from base_node import BaseNode

class LoadDatasetNode(BaseNode):
    def __init__(self, paths: list[str]):
        super().__init__("LoadDataset")
        self.paths = paths

    def run(self, context: dict) -> dict:
        print(f"[{self.name}] 加载数据: {self.paths}")
        context["raw_data"] = f"Data from {self.paths}"
        return context


class CleanDataNode(BaseNode):
    def __init__(self, required_columns: list[str]):
        super().__init__("CleanData")
        self.required_columns = required_columns

    def run(self, context: dict) -> dict:
        print(f"[{self.name}] 清洗数据:保留 {self.required_columns}")
        context["clean_data"] = f"Cleaned({context['raw_data']})"
        return context


class TrainModelNode(BaseNode):
    def __init__(self, model_type: str, params: dict):
        super().__init__("TrainModel")
        self.model_type = model_type
        self.params = params

    def run(self, context: dict) -> dict:
        print(f"[{self.name}] 训练模型:{self.model_type} with {self.params}")
        context["trained_model"] = f"Model({self.model_type})"
        return context

3️⃣ pipeline.py — 流程管理器

# pipeline.py
from typing import List
from base_node import BaseNode

class Pipeline:
    def __init__(self):
        self.steps: List[BaseNode] = []

    def add(self, step: BaseNode):
        self.steps.append(step)
        return self  # 支持链式调用

    def run(self, context=None):
        context = context or {}
        for step in self.steps:
            print(f"\n➡️ 执行步骤:{step.name}")
            context = step.run(context)
        print("\n✅ 流程完成")
        return context

4️⃣ 使用示例:链式方式调用流程

# run_pipeline.py
from pipeline import Pipeline
from nodes import LoadDatasetNode, CleanDataNode, TrainModelNode

pipeline = (
    Pipeline()
    .add(LoadDatasetNode(paths=["data1.xlsx", "data2.xlsx"]))
    .add(CleanDataNode(required_columns=["age", "salary"]))
    .add(TrainModelNode(model_type="dnn", params={"layers": [64, 32]}))
)

final_context = pipeline.run()
print("\n最终输出上下文:", final_context)

✅ 三、支持 JSON/字典 配置执行

用字典定义:

pipeline_config = [
    {"type": "LoadDatasetNode", "params": {"paths": ["a.xlsx", "b.xlsx"]}},
    {"type": "CleanDataNode", "params": {"required_columns": ["x", "y"]}},
    {"type": "TrainModelNode", "params": {"model_type": "dnn", "params": {"layers": [64]}}}
]

加载执行(无需外部依赖):

# dynamic_pipeline.py
from nodes import LoadDatasetNode, CleanDataNode, TrainModelNode
from pipeline import Pipeline

NODE_MAP = {
    "LoadDatasetNode": LoadDatasetNode,
    "CleanDataNode": CleanDataNode,
    "TrainModelNode": TrainModelNode,
}

def load_pipeline_from_config(config: list[dict]) -> Pipeline:
    pipeline = Pipeline()
    for step in config:
        node_class = NODE_MAP[step["type"]]
        node = node_class(**step["params"])
        pipeline.add(node)
    return pipeline

✅ 四、特点总结

特性 说明
✅ 无依赖 全部使用 Python 标准库
🔌 模块解耦 每个步骤单独封装,便于复用与测试
📦 易平台集成 可嵌入 Django/Celery 任意任务中
📋 可配置 字典或 JSON 控制流程
🧩 可扩展 新节点只需继承 BaseNode 即可

✅ 是否继续?

我可以帮你:

  • 🔁 增加数据预处理、归一化、模型保存等 Node
  • 🧪 生成一个测试运行示例(训练 + 模型保存 + 反归一化)
  • ☁️ 对接 Celery 异步队列/日志流式输出
  • 🛠️ 整理为一个 Python 包结构,供你项目导入使用

你想要我下一步完善哪部分?是否希望我提供完整的项目结构并封装为模块包?

很好,我们来扩展你的无第三方依赖训练流程框架:


✅ 目标

  • 增加以下通用节点:
    • PreprocessNode(数据预处理,如缺失填充、类型转换等)
    • NormalizeNode(数值型归一化)
    • SaveModelNode(保存模型结构或权重信息)
  • 每个节点支持一个可扩展的persist()机制
    • 可选是否持久化结果(如保存至 S3、本地或数据库)
    • 暂不实现逻辑,仅设计通用接口与结构

✅ 修改 BaseNode:添加持久化挂钩

# base_node.py
from abc import ABC, abstractmethod

class BaseNode(ABC):
    def __init__(self, name: str, persist: bool = False):
        self.name = name
        self.persist = persist  # 是否启用持久化

    @abstractmethod
    def run(self, context: dict) -> dict:
        pass

    def save_result(self, key: str, value):
        """用于保存结果,可扩展到 S3、数据库或本地"""
        if self.persist:
            print(f"[{self.name}] 🔄 模拟持久化保存:{key} -> {value[:50]}...")
            # 真实实现中可调用 save_to_s3/save_to_db/save_to_disk 等

✅ 新增节点 1:预处理节点 PreprocessNode

# nodes.py
class PreprocessNode(BaseNode):
    def __init__(self, fields: list[str], fillna: float = 0.0, persist: bool = False):
        super().__init__("Preprocess", persist)
        self.fields = fields
        self.fillna = fillna

    def run(self, context: dict) -> dict:
        print(f"[{self.name}] 数据预处理:填充字段 {self.fields} 的缺失值为 {self.fillna}")
        data = context["clean_data"]
        processed = f"Preprocessed({data})"
        context["preprocessed_data"] = processed
        self.save_result("preprocessed_data", processed)
        return context

✅ 新增节点 2:归一化节点 NormalizeNode

class NormalizeNode(BaseNode):
    def __init__(self, fields: list[str], strategy: str = "minmax", persist: bool = False):
        super().__init__("Normalize", persist)
        self.fields = fields
        self.strategy = strategy

    def run(self, context: dict) -> dict:
        print(f"[{self.name}] 对字段 {self.fields} 使用策略 {self.strategy} 归一化")
        data = context.get("preprocessed_data") or context.get("clean_data")
        normalized = f"Normalized({data})"
        context["normalized_data"] = normalized
        self.save_result("normalized_data", normalized)
        return context

✅ 新增节点 3:模型保存节点 SaveModelNode

class SaveModelNode(BaseNode):
    def __init__(self, save_path: str, persist: bool = False):
        super().__init__("SaveModel", persist)
        self.save_path = save_path

    def run(self, context: dict) -> dict:
        model = context.get("trained_model")
        print(f"[{self.name}] 保存模型到路径 {self.save_path}")
        self.save_result("model", f"ModelFile({model})")
        context["model_path"] = self.save_path
        return context

✅ 使用示例(链式或函数式均可)

pipeline = (
    Pipeline()
    .add(LoadDatasetNode(paths=["a.xlsx", "b.xlsx"]))
    .add(CleanDataNode(required_columns=["age", "income"]))
    .add(PreprocessNode(fields=["age", "income"], fillna=0.0, persist=True))
    .add(NormalizeNode(fields=["age", "income"], persist=True))
    .add(TrainModelNode(model_type="dnn", params={"layers": [64, 32]}))
    .add(SaveModelNode(save_path="./model/dnn.pkl", persist=True))
)
context = pipeline.run()

✅ 持久化扩展建议

你可以通过策略类实现持久化支持:

class ResultSaver:
    def __init__(self, mode="local"):
        self.mode = mode  # 可为 'local', 's3', 'db'

    def save(self, key: str, value):
        if self.mode == "local":
            print(f"[Local] Save {key}")
        elif self.mode == "s3":
            print(f"[S3] Upload {key}")
        elif self.mode == "db":
            print(f"[DB] Insert {key}")

在每个 Node 中调用 ResultSaver().save(key, value) 即可轻松扩展。


✅ 小结

模块 说明
PreprocessNode 执行字段的缺失填充等预处理操作
NormalizeNode 支持不同归一化策略(占位实现)
SaveModelNode 模拟模型文件保存
BaseNode.persist 控制是否保存节点中间数据
save_result() 模拟将结果写入存储系统,后续可注入实际逻辑

是否需要我为你:

  • 💾 添加 ResultSaver 可注入策略示例
  • ☁️ 整合 Django 中的 S3 存储(例如使用 boto3)作为 save_result() 的默认实现
  • 🧪 为每个 Node 写一份单元测试/模拟上下文测试

告诉我你希望优先推进哪一块,我立刻为你扩展。

当然可以,我将为你详细解释这个模型训练流程的整体设计逻辑,以及每个部分的代码结构和作用。我们使用的是自定义流程系统,完全基于 Python 标准库构建,具备:

  • 模块化(每一步都是一个 Node)
  • 解耦(数据通过上下文 context 流转)
  • 可扩展(支持模型、清洗、归一化等扩展)
  • 可持久化(支持保存中间结果至 S3/数据库等)

✅ 一、整体流程概览图

训练流程(Pipeline)

┌─────────────────────────────┐
│        Pipeline.run()       │
│ ┌─────┐   ┌──────┐   ┌─────┐ │
│ │Node1│→→→│Node2 │→→→│Node3│→→ ... │
│ └─────┘   └──────┘   └─────┘ │
└─────────────────────────────┘

每个 Node 执行 run(context),更新上下文并返回

✅ 二、核心结构说明

1. BaseNode: 所有流程步骤的基类

# base_node.py
from abc import ABC, abstractmethod

class BaseNode(ABC):
    def __init__(self, name: str, persist: bool = False):
        self.name = name
        self.persist = persist  # 是否持久化当前节点输出

    @abstractmethod
    def run(self, context: dict) -> dict:
        pass

    def save_result(self, key: str, value):
        """持久化保存结果(目前为打印模拟)"""
        if self.persist:
            print(f"[{self.name}] 模拟保存 {key}: {str(value)[:50]}...")

✅ 功能说明:

  • 提供统一的 run() 接口,所有流程节点必须实现。
  • 提供 save_result(),预留持久化能力。

2. Pipeline: 执行流程的调度器

# pipeline.py
class Pipeline:
    def __init__(self):
        self.steps = []

    def add(self, step: BaseNode):
        self.steps.append(step)
        return self  # 支持链式调用

    def run(self, context=None):
        context = context or {}
        for step in self.steps:
            print(f"➡️ 执行步骤:{step.name}")
            context = step.run(context)
        return context

✅ 功能说明:

  • 接收多个 Node 实例
  • 按顺序执行每个 run(context),将返回结果传给下一个 Node

✅ 三、每个 Node 的作用及代码逻辑

3.1 加载数据 LoadDatasetNode

class LoadDatasetNode(BaseNode):
    def __init__(self, paths):
        super().__init__("LoadDataset")
        self.paths = paths

    def run(self, context):
        print(f"[{self.name}] 加载数据:{self.paths}")
        context["raw_data"] = f"Data from {self.paths}"  # 实际应读取文件
        return context
  • 接收多个路径 → 模拟加载数据 → 保存为 raw_data

3.2 数据清洗 CleanDataNode

class CleanDataNode(BaseNode):
    def __init__(self, required_columns):
        super().__init__("CleanData")
        self.required_columns = required_columns

    def run(self, context):
        print(f"[{self.name}] 清洗字段:{self.required_columns}")
        data = context["raw_data"]
        cleaned = f"Cleaned({data})"
        context["clean_data"] = cleaned
        return context
  • 清洗只保留指定字段 → 存入 clean_data

3.3 数据预处理 PreprocessNode

class PreprocessNode(BaseNode):
    def __init__(self, fields, fillna=0.0, persist=False):
        super().__init__("Preprocess", persist)
        self.fields = fields
        self.fillna = fillna

    def run(self, context):
        print(f"[{self.name}] 缺失值填充:字段={self.fields}, 填充={self.fillna}")
        data = context["clean_data"]
        processed = f"Preprocessed({data})"
        context["preprocessed_data"] = processed
        self.save_result("preprocessed_data", processed)
        return context
  • 执行缺失填充、字段校验等预处理逻辑
  • 可选持久化保存(如到 S3)

3.4 数据归一化 NormalizeNode

class NormalizeNode(BaseNode):
    def __init__(self, fields, strategy="minmax", persist=False):
        super().__init__("Normalize", persist)
        self.fields = fields
        self.strategy = strategy

    def run(self, context):
        print(f"[{self.name}] 对字段 {self.fields} 使用 {self.strategy} 归一化")
        data = context.get("preprocessed_data") or context["clean_data"]
        normalized = f"Normalized({data})"
        context["normalized_data"] = normalized
        self.save_result("normalized_data", normalized)
        return context
  • 对数值型字段进行归一化
  • 将归一化后的结果保存在 normalized_data

3.5 模型训练 TrainModelNode

class TrainModelNode(BaseNode):
    def __init__(self, model_type, params):
        super().__init__("TrainModel")
        self.model_type = model_type
        self.params = params

    def run(self, context):
        print(f"[{self.name}] 训练模型类型={self.model_type} 参数={self.params}")
        model = f"TrainedModel({self.model_type})"
        context["trained_model"] = model
        return context
  • 根据模型类型和参数训练(此处为模拟)
  • 保存训练好的模型对象

3.6 模型保存 SaveModelNode

class SaveModelNode(BaseNode):
    def __init__(self, save_path, persist=False):
        super().__init__("SaveModel", persist)
        self.save_path = save_path

    def run(self, context):
        model = context["trained_model"]
        print(f"[{self.name}] 保存模型到 {self.save_path}")
        self.save_result("model", f"ModelFile({model})")
        context["model_path"] = self.save_path
        return context
  • 将模型保存路径保存到上下文(实际可调用 pickle/S3 保存)
  • 使用 save_result() 预留真实保存接口

✅ 四、运行入口脚本示例

# run_pipeline.py
from pipeline import Pipeline
from nodes import LoadDatasetNode, CleanDataNode, PreprocessNode, NormalizeNode, TrainModelNode, SaveModelNode

pipeline = (
    Pipeline()
    .add(LoadDatasetNode(paths=["data1.xlsx", "data2.xlsx"]))
    .add(CleanDataNode(required_columns=["age", "salary"]))
    .add(PreprocessNode(fields=["age", "salary"], fillna=0, persist=True))
    .add(NormalizeNode(fields=["age", "salary"], strategy="minmax", persist=True))
    .add(TrainModelNode(model_type="dnn", params={"layers": [64, 32]}))
    .add(SaveModelNode(save_path="./model/dnn.pkl", persist=True))
)

context = pipeline.run()
print("最终输出:", context)

✅ 总结逻辑流程图

context 结构
┌────────────┐
│ raw_data   │ ← LoadDatasetNode
│ clean_data │ ← CleanDataNode
│ preprocessed_data ← PreprocessNode
│ normalized_data   ← NormalizeNode
│ trained_model     ← TrainModelNode
│ model_path        ← SaveModelNode
└────────────┘

✅ 你下一步可以选择:

  1. 让我帮你将这个结构组织成完整的 Python 包目录
  2. 生成真实的数据处理与训练逻辑(使用 Pandas、PyTorch)
  3. 接入任务队列 Celery/Redis + 异步训练流程
  4. 接入 S3、PostgreSQL 进行持久化存储

告诉我你希望进一步完善哪个模块,我就继续为你实现。

这是一个非常重要也非常有价值的问题 —— 在训练流程中,每个节点(Node)之间确实可能存在数据依赖关系,我们需要用一套通用又清晰的方式来:

  • 规范每个节点的输入(依赖)和输出(产出)
  • 避免数据丢失或执行顺序出错
  • 支持灵活插拔、重排流程的能力

✅ 总体方案:节点的输入输出依赖管理

我们为每个 BaseNode 增加两个显式属性:

属性 说明
requires: list[str] 依赖的 context 键,即前置步骤必须产出的内容
produces: list[str] 本步骤输出的 context 键,用于后续步骤消费

✅ 1. 修改 BaseNode 支持依赖声明检查

class BaseNode(ABC):
    def __init__(self, name: str, persist: bool = False):
        self.name = name
        self.persist = persist
        self.requires: list[str] = []    # 依赖的 context key
        self.produces: list[str] = []    # 本节点输出的 context key

    @abstractmethod
    def run(self, context: dict) -> dict:
        pass

    def save_result(self, key: str, value):
        if self.persist:
            print(f"[{self.name}] 模拟持久化:{key} -> {str(value)[:50]}")

    def check_requirements(self, context: dict):
        missing = [k for k in self.requires if k not in context]
        if missing:
            raise ValueError(f"[{self.name}] 缺少依赖数据: {missing}")

Pipeline.run() 中增加依赖检查:

def run(self, context=None):
    context = context or {}
    for step in self.steps:
        print(f"\n➡️ 执行步骤:{step.name}")
        step.check_requirements(context)
        context = step.run(context)
    return context

✅ 2. 示例:修改每个节点的实现

LoadDatasetNode

class LoadDatasetNode(BaseNode):
    def __init__(self, paths):
        super().__init__("LoadDataset")
        self.paths = paths
        self.produces = ["raw_data"]

    def run(self, context):
        data = f"Data from {self.paths}"
        context["raw_data"] = data
        return context

CleanDataNode

class CleanDataNode(BaseNode):
    def __init__(self, required_columns):
        super().__init__("CleanData")
        self.required_columns = required_columns
        self.requires = ["raw_data"]
        self.produces = ["clean_data"]

    def run(self, context):
        data = context["raw_data"]
        cleaned = f"Cleaned({data})"
        context["clean_data"] = cleaned
        return context

NormalizeNode

class NormalizeNode(BaseNode):
    def __init__(self, fields, strategy="minmax", persist=False):
        super().__init__("Normalize", persist)
        self.fields = fields
        self.strategy = strategy
        self.requires = ["preprocessed_data"]  # 或 ["clean_data"]
        self.produces = ["normalized_data"]

    def run(self, context):
        data = context.get("preprocessed_data") or context["clean_data"]
        normalized = f"Normalized({data})"
        context["normalized_data"] = normalized
        self.save_result("normalized_data", normalized)
        return context

✅ 3. 增强:自动检测节点顺序是否合理(可选)

Pipeline.run() 之前,做一个顺序合理性预检查

def validate_dependencies(self):
    available = set()
    for step in self.steps:
        missing = [key for key in step.requires if key not in available]
        if missing:
            raise ValueError(f"[{step.name}] 顺序错误,缺少依赖:{missing}")
        available.update(step.produces)

调用:

def run(self, context=None):
    self.validate_dependencies()
    ...

✅ 总结:设计收益

特性 实现
明确数据流依赖 每个节点 requires 明确依赖哪些键
防止乱序执行 Pipeline.validate_dependencies() 自动检查
保证流程健壮性 如果缺失依赖,提前抛错
易于维护 插拔节点时不用看内部实现,只看 requiresproduces
易于文档生成 可自动输出流程图与依赖说明

✅ 示例:自动文档生成(可选)

你甚至可以自动打印出流程:

for step in pipeline.steps:
    print(f"🔧 {step.name}")
    print(f"   ➤ 输入依赖: {step.requires}")
    print(f"   ➤ 输出结果: {step.produces}")

✅ 下一步建议

我可以继续帮你:

  1. 🔧 提炼出 Node 的 schema 描述和注册机制
  2. 📊 生成流程图(例如输出为 Graphviz 或 Mermaid 图)
  3. 🧪 为每个节点设计单元测试和上下文模拟测试
  4. ☁️ 实现 save_result() 的持久化策略(如 S3、PostgreSQL)

你想优先处理哪一块?我可以马上开始。