模型适配器测试

Posted by Shallow Dreameron May 14, 2025

为了设计一个通用的 模型训练流程框架,能够适配多种模型类型(如线性回归、数据分析、大语言模型等),最好的方法是使用 “适配器模式(Adapter Pattern)” 来封装不同类型模型的差异,使它们统一实现一套训练和评估接口,便于调度、管理、日志记录、结果展示等。


🧩 一、核心设计理念

  • 统一接口:定义一个抽象基类 BaseModelAdapter,所有模型适配器继承它,实现统一的方法。
  • 适配器类:针对不同模型类型(如线性回归、大语言模型等)定义对应的适配器类。
  • 训练管理器:提供统一的模型调度、训练执行、日志收集、状态保存等功能。
  • 可扩展性强:新增模型类型时,只需新增适配器并注册即可,无需改动主流程。

📦 二、目录结构建议(Django + Python)

ai_platform/
├── adapters/                 # 所有模型适配器
│   ├── base.py              # BaseModelAdapter 抽象类
│   ├── linear.py            # 线性回归适配器
│   ├── analysis.py          # 数据分析适配器
│   └── llm.py               # 大语言模型适配器
├── services/
│   └── trainer.py           # 统一训练器/任务管理器
├── models/
│   └── task.py              # ModelTrainTask 等任务模型
└── utils/
    └── logger.py            # 日志工具

🧱 三、核心接口定义

BaseModelAdapter(抽象基类)

# adapters/base.py
from abc import ABC, abstractmethod

class BaseModelAdapter(ABC):
    def __init__(self, config, dataset):
        self.config = config      # 用户配置参数
        self.dataset = dataset    # 数据集对象或路径

    @abstractmethod
    def preprocess(self):
        """预处理数据"""
        pass

    @abstractmethod
    def train(self):
        """模型训练主逻辑"""
        pass

    @abstractmethod
    def evaluate(self):
        """模型评估逻辑"""
        pass

    @abstractmethod
    def save_model(self, output_path):
        """保存模型到文件/数据库"""
        pass

🧠 四、模型适配器示例

1️⃣ 线性回归适配器

# adapters/linear.py
from .base import BaseModelAdapter
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import joblib

class LinearRegressionAdapter(BaseModelAdapter):
    def preprocess(self):
        self.X = self.dataset.drop(columns=['target'])
        self.y = self.dataset['target']

    def train(self):
        self.model = LinearRegression()
        self.model.fit(self.X, self.y)

    def evaluate(self):
        y_pred = self.model.predict(self.X)
        return {"mse": mean_squared_error(self.y, y_pred)}

    def save_model(self, output_path):
        joblib.dump(self.model, output_path)

2️⃣ 大语言模型适配器(如调用 OpenAI GPT)

# adapters/llm.py
from .base import BaseModelAdapter
import openai

class LLMAdapter(BaseModelAdapter):
    def preprocess(self):
        self.prompt = self.config.get("prompt", "")

    def train(self):
        # 通常大语言模型是 zero/few-shot,无需训练,只推理
        self.output = openai.ChatCompletion.create(
            model=self.config.get("model", "gpt-4"),
            messages=[{"role": "user", "content": self.prompt}],
            temperature=self.config.get("temperature", 0.7)
        )

    def evaluate(self):
        return {"output": self.output["choices"][0]["message"]["content"]}

    def save_model(self, output_path):
        # LLM 模型无法本地保存,仅记录 prompt & output
        with open(output_path, 'w') as f:
            f.write(self.output["choices"][0]["message"]["content"])

⚙️ 五、训练任务管理器(训练入口)

# services/trainer.py
from adapters.linear import LinearRegressionAdapter
from adapters.llm import LLMAdapter

# 注册适配器
ADAPTER_REGISTRY = {
    "linear_regression": LinearRegressionAdapter,
    "llm": LLMAdapter,
    # 可拓展更多模型
}

class ModelTrainer:
    def __init__(self, task_config, dataset):
        self.task_type = task_config["task_type"]
        self.adapter_class = ADAPTER_REGISTRY.get(self.task_type)

        if not self.adapter_class:
            raise ValueError(f"Unsupported model type: {self.task_type}")

        self.adapter = self.adapter_class(task_config, dataset)

    def run(self):
        self.adapter.preprocess()
        self.adapter.train()
        results = self.adapter.evaluate()
        self.adapter.save_model(self._output_path())
        return results

    def _output_path(self):
        return f"/tmp/models/{self.task_type}_result.pkl"

🔌 六、使用方式(Django 视图或 Celery 中调用)

from services.trainer import ModelTrainer
from datasets.utils import load_dataset  # 自定义函数:从数据库或 S3 载入数据

def train_model(task_id):
    task_config = {
        "task_type": "linear_regression",
        "learning_rate": 0.01,
        "prompt": "总结这段文本...",
    }
    dataset = load_dataset(task_id)
    trainer = ModelTrainer(task_config, dataset)
    result = trainer.run()
    return result

🧮 七、模型配置字段建议

字段名 含义 模型类型
task_type 模型任务类型(如 llm, linear_regression 所有
learning_rate 学习率 ML/DL 模型
prompt 输入提示文本 LLM
temperature 控制 LLM 输出的随机性 LLM
max_iterations 最大训练轮数 ML 模型
target_column 标签列名 ML 模型
model_name 预设模型名(如 gpt-4, glm 等) LLM

✅ 八、优势总结

  • 解耦强:每种模型适配器独立实现,不会相互影响。
  • 易扩展:新增模型只需添加一个类并注册。
  • 接口统一:主训练逻辑和调用方式不变,便于统一日志、状态管理、资源调度。
  • 适合平台化:非常适用于 AI 平台、AutoML 系统、训练平台构建。

如果你有特定模型、特定输入/输出需求(比如图像、多分类、时序预测等),我可以帮你进一步扩展对应的适配器类。

需要我继续帮你完善数据库模型设计、Celery 任务封装、前端对接建议吗?

了解了,我将基于你的要求重新设计并精简训练与预测流程:

  • 从数据库中的模型模板表加载指定模型类型的默认参数;
  • 与前端传入的模型参数进行合并(前端值优先);
  • 使用适配器模式完成清洗、预处理、训练、保存、发布、预测的全流程;
  • 所有结构解耦、接口清晰、逻辑易读可维护。

我会输出关键模块代码和说明,确保你能直接集成使用。请稍等,我整理完就告诉你。

后端模型训练与预测流程设计

架构概述

数据管道是一种自动化的数据处理工具,它可以从多个源获取数据,对其进行清洗、转换和整合,然后将其提供给机器学习模型进行训练。在本方案中,流程包括:

  • 数据清洗:合并多个 CSV 数据集、按因子筛选字段并去重;
  • 数据预处理:对标记为归一化的数值列进行标准化;
  • 模型训练:根据模型类型选择对应的适配器进行训练;
  • 模型保存与发布:训练结束后将模型保存到本地文件,用户可选择将其上传到 S3;
  • 模型预测:其他用户提供输入字段,根据已发布模型的 ID 和类型加载模型并推理。

数据清洗与预处理模块

首先进行数据清洗:读取并合并前端传入的多个 CSV 字符串,按因子列表筛选所需字段,最后去除重复行。完成数据清洗后,对需要归一化的数值列进行处理,实现基本的标准化。以下是示例代码结构:

import pandas as pd
from typing import List
import io

class DataCleaner:
    @staticmethod
    def merge_datasets(csv_contents: List[str]) -> pd.DataFrame:
        """
        合并多个 CSV 文本数据为一个 DataFrame
        """
        df_list = []
        for csv in csv_contents:
            df_list.append(pd.read_csv(io.StringIO(csv)))
        df = pd.concat(df_list, ignore_index=True)
        return df

    @staticmethod
    def filter_fields(df: pd.DataFrame, field_names: List[str]) -> pd.DataFrame:
        """
        根据字段名筛选列
        """
        return df[field_names]

    @staticmethod
    def drop_duplicates(df: pd.DataFrame) -> pd.DataFrame:
        """
        去除重复行
        """
        return df.drop_duplicates()
import pandas as pd
from typing import Dict

class DataPreprocessor:
    @staticmethod
    def normalize(df: pd.DataFrame, factors: List[Dict]) -> pd.DataFrame:
        """
        对需要归一化的数值列进行标准化处理
        """
        for factor in factors:
            name = factor['name']
            if factor.get('normalize') and pd.api.types.is_numeric_dtype(df[name]):
                # 使用均值-标准差标准化
                df[name] = (df[name] - df[name].mean()) / df[name].std()
        return df

模型适配器接口与实现

使用适配器模式设计统一的模型接口。定义一个 BaseModelAdapter 抽象类,规定 trainpredictsaveload 等方法,所有模型类型的适配器都继承该接口。例如:

import pandas as pd
from typing import Dict, Any
import joblib
from abc import ABC, abstractmethod

class BaseModelAdapter(ABC):
    @abstractmethod
    def train(self, X: pd.DataFrame, y: pd.Series, params: Dict):
        pass

    @abstractmethod
    def predict(self, X: pd.DataFrame) -> Any:
        pass

    def save(self, filepath: str):
        """
        将训练好的模型保存到文件
        """
        joblib.dump(self.model, filepath)

    @classmethod
    def load(cls, filepath: str):
        """
        从文件加载模型
        """
        adapter = cls()
        adapter.model = joblib.load(filepath)
        return adapter
from sklearn.linear_model import LinearRegression
from xgboost import XGBRegressor

class LinearRegressionAdapter(BaseModelAdapter):
    def __init__(self):
        self.model = LinearRegression()

    def train(self, X, y, params):
        # 初始化模型时应用参数(如正则化系数等)
        self.model = LinearRegression(**params)
        self.model.fit(X, y)

    def predict(self, X):
        return self.model.predict(X)

class XGBoostAdapter(BaseModelAdapter):
    def __init__(self):
        self.model = XGBRegressor()

    def train(self, X, y, params):
        self.model = XGBRegressor(**params)
        self.model.fit(X, y)

    def predict(self, X):
        return self.model.predict(X)

# 若支持大语言模型,可实现对应适配器
class LLMAdapter(BaseModelAdapter):
    def __init__(self):
        self.model = None  # 初始化大语言模型(例如加载预训练模型)

    def train(self, X, y, params):
        # 对于 LLM 可执行微调或其它训练逻辑
        pass

    def predict(self, X):
        # 使用大语言模型进行推理
        return self.model.generate(X)

训练调度与参数管理

训练模块负责协调整个训练流程。首先,根据前端传入的模型类型,从模板表加载默认参数,然后与前端给定的参数合并(前端优先)。示例代码如下:

class ModelTemplateRepo:
    """
    模型模板表,存储每种模型类型的默认参数
    """
    TEMPLATE = {
        "linear_regression": {"fit_intercept": True, "normalize": False},
        "xgboost": {"max_depth": 5, "learning_rate": 0.1},
        # 可以添加其他模型类型及其默认参数
    }

    @staticmethod
    def get_default_params(model_type: str) -> Dict:
        return ModelTemplateRepo.TEMPLATE.get(model_type, {})

# 适配器注册表:根据模型类型获取对应的 Adapter 类
ADAPTER_REGISTRY = {
    "linear_regression": LinearRegressionAdapter,
    "xgboost": XGBoostAdapter,
    "llm": LLMAdapter
}

class TrainingScheduler:
    def __init__(self):
        pass

    def train_model(self, factors: List[Dict], model_type: str,
                    user_params: Dict, csv_contents: List[str]) -> str:
        # 数据清洗
        df = DataCleaner.merge_datasets(csv_contents)
        df = DataCleaner.filter_fields(df, [f['name'] for f in factors])
        df = DataCleaner.drop_duplicates(df)
        # 数据预处理
        df = DataPreprocessor.normalize(df, factors)
        # 假设最后一个字段为目标变量
        X = df[[f['name'] for f in factors if f['name'] != factors[-1]['name']]]
        y = df[factors[-1]['name']]
        # 参数合并(模板默认参数 + 用户参数)
        default_params = ModelTemplateRepo.get_default_params(model_type)
        params = {**default_params, **user_params}
        # 训练模型
        adapter_cls = ADAPTER_REGISTRY.get(model_type)
        adapter = adapter_cls()
        adapter.train(X, y, params)
        # 保存模型到本地
        model_path = f"{model_type}_model.pkl"
        adapter.save(model_path)
        return model_path  # 返回模型文件路径或 ID

模型保存与发布

模型训练结束后,模型文件保存在本地。用户确认后,可将模型上传至 S3 等外部存储,便于后续调用。例如:

import boto3

class ModelRegistry:
    @staticmethod
    def upload_to_s3(local_path: str, bucket_name: str, model_id: str):
        """
        将本地模型文件上传到 S3
        """
        s3 = boto3.client('s3')
        s3.upload_file(local_path, bucket_name, f"{model_id}.pkl")

模型预测模块

预测模块根据传入的已发布模型 ID 和类型加载模型,并对新数据进行预测:

from pandas import DataFrame

class Predictor:
    def __init__(self, adapter_registry):
        self.adapter_registry = adapter_registry

    def predict(self, model_type: str, model_path: str, input_data: DataFrame):
        """
        加载指定模型并进行预测
        """
        adapter_cls = self.adapter_registry.get(model_type)
        adapter = adapter_cls.load(model_path)
        return adapter.predict(input_data)

# 使用示例
adapter_registry = ADAPTER_REGISTRY
predictor = Predictor(adapter_registry)
# 其他用户传入的预测数据示例
# input_df = pd.DataFrame([...])
# preds = predictor.predict("linear_regression", "linear_regression_model.pkl", input_df)

以上代码结构实现了从数据清洗到模型训练再到预测的完整流程。各模块设计通用简洁,可根据需要扩展更多模型类型和功能。

参考文献: 数据管道核心概念;多源数据合并去重;适配器模式介绍。

明白了。我将为你设计一个完整的后端框架,使用适配器模式来支持多种模型训练与预测流程,包含数据清洗、预处理、模型训练、模型发布(S3)、预测参数传递等功能,确保结构解耦、清晰可扩展。

我会在完成后提供各个模块的代码结构与逻辑说明,供你直接集成。请稍等,我马上整理完毕。

后端模型训练与预测框架设计

系统架构概览

  • 数据处理模块:负责合并多个数据集(CSV 内容)、按因子参数选取特征列、去重,并根据配置对数值特征进行归一化等预处理。
  • 模型适配器接口:定义统一的训练与预测接口(trainpredict 等方法),各模型类型通过实现该接口适配器来解耦具体算法。
  • 示例适配器:针对线性回归、逻辑回归、XGBoost、预训练大语言模型等不同模型,实现具体的适配器类,负责调用相应框架训练/推理并保存模型。
  • 训练调度器:负责接收前端请求(因子参数、模型参数、数据文件内容),依次调用数据处理、选择适配器训练,并调用模型注册与发布逻辑进行保存和标记。
  • 模型注册与发布模块:管理训练后的模型元数据(如模型 ID、本地路径、状态),在用户确认发布后通过 boto3 上传至 S3,并更新模型发布状态。
  • 预测服务:允许其他用户调用已发布模型进行预测,动态接收输入参数,加载模型并输出预测结果。

各模块通过接口解耦,例如训练调度器只依赖于数据处理和适配器接口,不直接依赖具体模型库;新增模型类型时,只需新增对应的适配器类并注册到工厂即可,无需修改核心流程。

模型适配器接口

采用适配器模式为各模型类型定义统一接口,以便训练调度器和预测服务可以透明调用。下面给出一个抽象基类 ModelAdapter,定义统一的训练、预测、模型保存/加载接口:

from abc import ABC, abstractmethod

class ModelAdapter(ABC):
    """
    模型适配器基类,定义统一的训练和预测接口。
    """

    def __init__(self, model_params: dict):
        """
        初始化适配器时,可传入模型配置参数。
        """
        self.model_params = model_params
        self.model = None  # 在训练后保存模型实例

    @abstractmethod
    def train(self, X, y):
        """
        训练模型。X 为特征数据集(DataFrame 或 ndarray),y 为目标标签。
        """
        pass

    @abstractmethod
    def predict(self, X):
        """
        使用训练好的模型进行预测。X 为输入特征,可为单条或多条数据。
        返回预测结果。
        """
        pass

    @abstractmethod
    def save_model(self, file_path: str):
        """
        将训练好的模型保存到指定文件路径。
        """
        pass

    @abstractmethod
    def load_model(self, file_path: str):
        """
        从指定路径加载模型,用于预测时使用。
        """
        pass
  • 解耦设计:训练调度器和预测服务通过该接口与具体模型适配器交互,不需要知道模型细节。新增模型类型时,只需实现该接口并注册,无需修改已有代码。
  • 动态扩展:可通过工厂模式或注册表维护模型类型与适配器类的映射,如 "linear_regression" -> LinearRegressionAdapter,方便动态选择适配器。
  • 统一调用:统一的 train/predict 接口,参数为特征数据和输入值,支持动态参数输入。

示例适配器

以下提供两个示例适配器,一个用于线性回归,一个用于预训练大语言模型(LLM),以展示不同模型类型的适配器实现思路。

  • 线性回归适配器(使用 scikit-learn):
from sklearn.linear_model import LinearRegression
import joblib

class LinearRegressionAdapter(ModelAdapter):
    def train(self, X, y):
        self.model = LinearRegression(**self.model_params.get('params', {}))
        self.model.fit(X, y)
        return self.model

    def predict(self, X):
        if self.model is None:
            raise RuntimeError("模型尚未训练或加载")
        return self.model.predict(X)

    def save_model(self, file_path: str):
        if self.model is None:
            raise RuntimeError("无模型可保存")
        joblib.dump(self.model, file_path)

    def load_model(self, file_path: str):
        self.model = joblib.load(file_path)
        return self.model
  • LLM(大语言模型)适配器(使用 Hugging Face Transformers):
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline
import torch

class LLMAdapter(ModelAdapter):
    def train(self, train_texts, train_labels):
        """
        简要示例:在分类任务下,用预训练模型进行微调。
        实际实现可使用 Trainer 或自定义训练循环。
        """
        model_name = self.model_params.get('model_name', 'bert-base-uncased')
        num_labels = self.model_params.get('num_labels', 2)
        tokenizer = AutoTokenizer.from_pretrained(model_name)
        model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)
        # 这里可进行微调(代码简略,实际可用 Trainer)
        self.tokenizer = tokenizer
        self.model = model
        return model

    def predict(self, texts):
        """
        使用模型进行文本分类预测。texts 可以是单个字符串或字符串列表。
        """
        if self.model is None:
            raise RuntimeError("模型尚未训练或加载")
        # 使用pipeline简化推理
        classifier = pipeline("text-classification", model=self.model, tokenizer=self.tokenizer)
        results = classifier(texts)
        return results

    def save_model(self, dir_path: str):
        """
        保存整个模型及tokenizer到文件夹。
        """
        if self.model is None:
            raise RuntimeError("无模型可保存")
        self.model.save_pretrained(dir_path)
        self.tokenizer.save_pretrained(dir_path)

    def load_model(self, dir_path: str):
        """
        从文件夹加载模型及tokenizer。
        """
        model_name = self.model_params.get('model_name', 'bert-base-uncased')
        self.tokenizer = AutoTokenizer.from_pretrained(dir_path)
        self.model = AutoModelForSequenceClassification.from_pretrained(dir_path)
        return self.model

这两个示例展示了:LinearRegressionAdapter 负责调用 sklearn 接口训练、预测,并用 joblib 保存模型;LLMAdapter 则使用 Hugging Face 进行文本分类示例,展示对微调和推理的包装。其它模型类型(如逻辑回归、XGBoost 等)也可类似实现各自的适配器类,复用相同的统一接口。

数据处理模块

数据处理模块负责从前端接收的数据文件内容开始,进行清洗和预处理,最终生成可供模型训练的特征矩阵和标签。设计中将其解耦为单独的类,比如 DataProcessor。主要功能包括:

  1. 数据清洗:合并前端传来的多个 CSV 文件内容(字符串形式),根据因子列表提取对应列,并去除重复行。
  2. 归一化等预处理:遍历因子配置,如果某列标记需要归一化且为数值类型,则对该列进行 Min-Max 归一化或其他方式转换。

下面给出一个示例实现:

import pandas as pd
from io import StringIO
from sklearn.preprocessing import MinMaxScaler

class DataProcessor:
    def clean(self, file_contents: list, factors: list) -> pd.DataFrame:
        """
        合并多个 CSV 内容,提取因子相关的列,并去除重复值。
        - file_contents: 包含多个 CSV 格式字符串的列表
        - factors: 因子配置列表,每个因子包含 'name' (列名) 和其他属性
        """
        # 读取并合并所有 CSV 内容
        dfs = [pd.read_csv(StringIO(content)) for content in file_contents]
        df = pd.concat(dfs, ignore_index=True)

        # 提取因子指定的列
        selected_cols = [f['name'] for f in factors]
        df = df.loc[:, selected_cols]

        # 去除完全重复的行
        df = df.drop_duplicates().reset_index(drop=True)
        return df

    def preprocess(self, df: pd.DataFrame, factors: list) -> pd.DataFrame:
        """
        对数值列进行归一化处理(如配置了 normalize=True)。
        - df: 上一步清洗后的 DataFrame
        - factors: 因子配置,包含类型和是否归一化的标志
        """
        for f in factors:
            col = f['name']
            if f.get('type') == 'numeric' and f.get('normalize', False):
                scaler = MinMaxScaler()
                df[col] = scaler.fit_transform(df[[col]])
        return df
  • 解耦设计TrainingScheduler 会调用 DataProcessor,而不会关心具体的清洗逻辑细节。若需要增加新预处理方法(如标准化、缺失值填充等),只要在 DataProcessor 中扩展即可,不影响其他模块。
  • 数据读取:使用 StringIO 读取 CSV 内容字符串,实现“文件内容”而非路径传递的读取方式。
  • 去重和筛选:直接使用 pandasdrop_duplicates() 以及列选择功能实现。

训练调度器

训练调度器(TrainingScheduler)负责协调整个训练流程,将前端传入的参数和数据分发给各子模块,并管理模型训练与保存的流程。主要逻辑包括:

  • 接收前端参数:因子配置列表 factor_params、模型配置 model_params、以及一个或多个数据文件内容 data_contents
  • 调用数据处理模块:先进行数据清洗 (clean),再进行预处理 (preprocess)。
  • 选择模型适配器:根据 model_params['type'],从工厂或注册表中获取对应的适配器实例。
  • 调用适配器训练:传入特征矩阵和标签进行训练,并保存模型到本地临时路径。
  • 注册模型元数据:将模型保存路径、参数等信息注册到模型注册表,并生成模型 ID 。
  • 等待发布:训练完成后模型先处于“已训练未发布”状态,待用户确认后上传到 S3。

下面是一个示例实现:

import uuid
import os

class TrainingScheduler:
    def __init__(self, data_processor, adapter_factory, model_registry):
        self.data_processor = data_processor
        self.adapter_factory = adapter_factory
        self.model_registry = model_registry

    def schedule(self, factor_params: list, model_params: dict, data_contents: list) -> str:
        """
        调度训练流程,返回模型ID(唯一标识)。
        """
        # 1. 数据清洗和预处理
        df_clean = self.data_processor.clean(data_contents, factor_params)
        df_processed = self.data_processor.preprocess(df_clean, factor_params)

        # 提取特征矩阵X和标签y
        target_col = model_params['target']  # 假设模型参数里指定了目标列
        feature_cols = [f['name'] for f in factor_params if f['name'] != target_col]
        X = df_processed[feature_cols]
        y = df_processed[target_col]

        # 2. 获取对应的模型适配器并训练
        model_type = model_params['type']
        adapter = self.adapter_factory.get_adapter(model_type, model_params)
        adapter.train(X, y)

        # 3. 模型保存(本地)
        model_id = str(uuid.uuid4())
        local_dir = f"/tmp/model_{model_id}"
        os.makedirs(local_dir, exist_ok=True)
        adapter.save_model(local_dir)  # 保存模型或模型文件夹

        # 4. 注册模型信息
        self.model_registry.register(model_id, {
            'type': model_type,
            'local_path': local_dir,
            'params': model_params,
            'published': False
        })
        return model_id

    def publish_model(self, model_id: str):
        """
        用户确认发布后调用此方法,将模型上传到 S3 并更新状态。
        """
        record = self.model_registry.get(model_id)
        if not record:
            raise ValueError("模型ID不存在")
        if record['published']:
            return  # 已发布无需重复
        local_path = record['local_path']
        # 构造 S3 key,可根据需求使用模型ID等信息命名
        s3_key = f"models/{model_id}.zip"
        uploader = S3Uploader(bucket_name="your-bucket-name")
        # 假设模型保存为一个目录,这里先压缩为zip上传,示例代码
        zip_path = f"{local_path}.zip"
        os.system(f"zip -r {zip_path} {local_path}")
        uploader.upload_file(zip_path, s3_key)
        # 更新发布状态
        record['published'] = True
        record['s3_key'] = s3_key
        self.model_registry.update(model_id, record)
  • 模型ID:生成唯一 model_id(如使用 uuid),便于后续引用和状态管理。
  • 本地保存:先将模型保存到后端本地路径(或临时目录),再在发布时统一上传。
  • 待发布状态:训练完成后,模型在注册表中标记为未发布状态,等待用户确认再触发 S3 上传。
  • S3 上传:示例中用 S3Uploader(见下文)进行上传,这里假设模型先压缩后上传到指定 bucket。实际可根据模型类型选择不同的打包方式。

模型注册与发布

ModelRegistry 负责维护所有模型的元数据,包括状态(已训练/已发布)、本地路径、S3 路径等信息。下面是一个简单的注册表设计:

class ModelRegistry:
    def __init__(self):
        # 存储模型元数据,key 为 model_id
        self._store = {}

    def register(self, model_id: str, metadata: dict):
        """
        注册新模型信息。
        """
        self._store[model_id] = metadata

    def update(self, model_id: str, metadata: dict):
        """
        更新模型元数据(如发布状态、S3路径等)。
        """
        if model_id in self._store:
            self._store[model_id].update(metadata)
        else:
            raise KeyError("模型ID未找到")

    def get(self, model_id: str) -> dict:
        """
        获取模型元数据。
        """
        return self._store.get(model_id)
  • 元数据管理metadata 包含了模型类型、参数、本地存储路径、是否发布、S3 存储键等信息。实际系统中可以结合数据库或持久化存储,此处简化为内存字典。
  • 发布逻辑:当调用 TrainingScheduler.publish_model(model_id) 时,会上传模型至 S3,并更新 metadata['published'] = True 及相应的 S3 存储路径。这样预测服务可依据 published 状态决定是否允许预测调用。
  • 后续扩展:可增加版本控制、访问权限、模型描述等字段,使模型注册表更加完善。

S3 上传工具

使用 boto3 客户端实现模型文件上传到 S3。示例工具类如下:

import boto3

class S3Uploader:
    def __init__(self, bucket_name: str):
        self.s3 = boto3.client('s3')
        self.bucket = bucket_name

    def upload_file(self, file_path: str, key: str):
        """
        上传本地文件到指定 S3 bucket 的 key 路径。
        """
        self.s3.upload_file(file_path, self.bucket, key)
  • 在生产环境中,需要配置 AWS 的访问密钥或使用 IAM 角色进行认证。
  • file_path 支持本地路径,key 定义了存储在 S3 上的文件名(可包含目录)。
  • 上传完成后,注册表中保存对应的 S3 路径信息,以便后续加载使用。

模型预测服务

预测服务允许其他用户指定已发布的模型 ID 并输入特征进行预测。主要步骤:

  1. 加载模型:根据模型 ID 从注册表获取模型元数据,检查是否已发布。若模型文件尚在本地,可直接 load_model;若仅在 S3 上,则先下载(或通过 load_model 支持直接S3 URL)。
  2. 动态输入:接收前端传入的预测参数,通常为键值对形式的特征值。在使用时需要将其转换为模型适配器能接受的格式(如 DataFrame 或矩阵)。
  3. 调用预测:使用适配器的 predict 方法进行预测,并返回结果给前端。

示例实现如下:

class PredictionService:
    def __init__(self, adapter_factory, model_registry):
        self.adapter_factory = adapter_factory
        self.model_registry = model_registry

    def predict(self, model_id: str, input_data: dict):
        """
        使用已发布模型进行预测。
        - model_id: 注册表中的模型ID
        - input_data: 包含特征名称和值的字典(动态参数输入)
        """
        record = self.model_registry.get(model_id)
        if not record or not record.get('published', False):
            raise ValueError("模型未发布或不存在")

        model_type = record['type']
        model_params = record['params']
        # 获取对应适配器
        adapter = self.adapter_factory.get_adapter(model_type, model_params)

        # 若需要,从 S3 下载模型,这里假设本地已有模型目录,否则实现下载逻辑
        local_path = record['local_path']
        adapter.load_model(local_path)

        # 将输入数据转换为 DataFrame (或适配器支持的格式)
        import pandas as pd
        input_df = pd.DataFrame([input_data])

        # 预测
        predictions = adapter.predict(input_df)
        return predictions
  • 动态参数:前端可传入任意需要的特征字段和值,服务端将其包装为单行的 DataFrame 进行预测。这样 predict 接口无需固定输入维度。
  • 模型加载:假设模型已经上传并下载至本地(或本地保留训练结果),load_model 负责加载模型实例。若模型仅在 S3,可在此处加下载代码(如 boto3.download_file)后再加载。
  • 返回结果:将模型输出直接返回给前端。对于分类任务可能需要映射类别标签,回归则返回数值。

代码调用示例

下面给出一个简单的流程示例,说明如何使用上述组件完成训练和预测:

# 假设前端传递:
factor_params = [
    {'name': 'age', 'type': 'numeric', 'normalize': True},
    {'name': 'salary', 'type': 'numeric', 'normalize': True},
    {'name': 'gender', 'type': 'categorical', 'normalize': False},
    {'name': 'income', 'type': 'numeric', 'normalize': False},  # 目标列
]
model_params = {
    'type': 'linear_regression',
    'target': 'income',
    'params': {}  # 线性回归超参数
}
data_contents = [csv_string1, csv_string2]  # CSV文件内容字符串列表

# 初始化组件
data_processor = DataProcessor()
adapter_factory = ModelAdapterFactory()  # 假设工厂已实现类型->适配器映射
model_registry = ModelRegistry()
scheduler = TrainingScheduler(data_processor, adapter_factory, model_registry)

# 调度训练
model_id = scheduler.schedule(factor_params, model_params, data_contents)
print(f"模型已训练完成,ID = {model_id},等待发布。")

# 用户确认发布后调用
scheduler.publish_model(model_id)
print("模型已发布至 S3。")

# 预测示例
prediction_service = PredictionService(adapter_factory, model_registry)
input_features = {'age': 30, 'salary': 60000, 'gender': 'Male'}
result = prediction_service.predict(model_id, input_features)
print("预测结果:", result)

以上示例演示了整个流程:前端传入数据后,训练调度器完成数据处理和模型训练;训练完成后,管理员确认发布模型;其他用户再调用预测服务,通过模型 ID 和动态输入参数获取预测结果。整个过程中的各模块职责清晰,使用适配器模式保证了对不同模型类型的一致调用接口,并通过工厂/注册表支持灵活扩展新的模型类型。

下面是一个高度解耦、可扩展AI 模型训练与预测通用流程实现,采用适配器模式来支持多种模型(线性回归、XGBoost、大语言模型等),并围绕你的需求(数据清洗、预处理、模型训练、保存、发布、预测)进行模块化拆分。


🧩 1. 目录结构建议

ai_platform/
├── adapters/                 # 各类模型适配器
│   ├── base.py              # 模型适配器接口
│   ├── linear_regression.py
│   ├── xgboost_model.py
│   └── llm_model.py
├── core/
│   ├── cleaner.py           # 数据清洗
│   ├── preprocessor.py      # 数据预处理
│   ├── scheduler.py         # 训练调度器
│   ├── registry.py          # 模型保存和发布
│   └── predictor.py         # 预测模块
├── config/
│   └── registry.py          # 模型类型注册表
└── main.py                  # 训练/预测入口

🧠 2. 适配器接口定义(adapters/base.py

from abc import ABC, abstractmethod
import pandas as pd
from typing import Dict, Any

class BaseModelAdapter(ABC):
    @abstractmethod
    def train(self, X: pd.DataFrame, y: pd.Series, params: Dict[str, Any]):
        pass

    @abstractmethod
    def predict(self, X: pd.DataFrame) -> Any:
        pass

    @abstractmethod
    def save(self, path: str):
        pass

    @abstractmethod
    def load(self, path: str):
        pass

📈 3. 示例模型适配器:线性回归(adapters/linear_regression.py

from sklearn.linear_model import LinearRegression
from .base import BaseModelAdapter
import joblib

class LinearRegressionAdapter(BaseModelAdapter):
    def __init__(self):
        self.model = None

    def train(self, X, y, params):
        self.model = LinearRegression(**params)
        self.model.fit(X, y)

    def predict(self, X):
        return self.model.predict(X)

    def save(self, path):
        joblib.dump(self.model, path)

    def load(self, path):
        self.model = joblib.load(path)

其他模型如 XGBoost 和 LLM 同理。


🧼 4. 数据清洗和预处理模块(core/cleaner.py, core/preprocessor.py

# cleaner.py
import pandas as pd
import io
from typing import List

class DataCleaner:
    @staticmethod
    def clean(csv_contents: List[str], factor_names: List[str]) -> pd.DataFrame:
        dfs = [pd.read_csv(io.StringIO(c)) for c in csv_contents]
        df = pd.concat(dfs).drop_duplicates()
        return df[factor_names]
# preprocessor.py
class DataPreprocessor:
    @staticmethod
    def normalize(df: pd.DataFrame, factors: List[Dict]):
        for factor in factors:
            name = factor["name"]
            if factor.get("normalize") and pd.api.types.is_numeric_dtype(df[name]):
                df[name] = (df[name] - df[name].mean()) / df[name].std()
        return df

🧠 5. 模型调度器:训练执行器(core/scheduler.py

from config.registry import ADAPTER_REGISTRY
from .cleaner import DataCleaner
from .preprocessor import DataPreprocessor
import pandas as pd
import os

class TrainingScheduler:
    def train(self, model_type: str, factors: List[Dict], model_params: Dict, csv_contents: List[str]) -> str:
        # 1. 清洗数据
        field_names = [f["name"] for f in factors]
        df = DataCleaner.clean(csv_contents, field_names)

        # 2. 预处理
        df = DataPreprocessor.normalize(df, factors)

        # 3. 生成 X/y
        target = field_names[-1]
        X = df.drop(columns=[target])
        y = df[target]

        # 4. 获取适配器
        adapter_cls = ADAPTER_REGISTRY[model_type]
        model = adapter_cls()
        model.train(X, y, model_params)

        # 5. 保存模型
        path = f"models/{model_type}_{hash(tuple(field_names))}.pkl"
        os.makedirs("models", exist_ok=True)
        model.save(path)

        return path

☁️ 6. 发布模块(core/registry.py

import boto3

class ModelPublisher:
    def upload_to_s3(self, local_path: str, model_key: str, bucket: str):
        s3 = boto3.client("s3")
        s3.upload_file(local_path, bucket, model_key)

🔍 7. 推理模块(core/predictor.py

from config.registry import ADAPTER_REGISTRY

class Predictor:
    def predict(self, model_type: str, model_path: str, input_df):
        adapter_cls = ADAPTER_REGISTRY[model_type]
        model = adapter_cls()
        model.load(model_path)
        return model.predict(input_df)

🧩 8. 模型注册表(config/registry.py

from adapters.linear_regression import LinearRegressionAdapter
from adapters.xgboost_model import XGBoostAdapter
from adapters.llm_model import LLMAdapter

ADAPTER_REGISTRY = {
    "linear_regression": LinearRegressionAdapter,
    "xgboost": XGBoostAdapter,
    "llm": LLMAdapter
}

🚀 9. 主调用流程(main.py 示例)

from core.scheduler import TrainingScheduler
from core.predictor import Predictor

# 模拟前端输入
factors = [
    {"name": "feature1", "normalize": True},
    {"name": "feature2", "normalize": False},
    {"name": "target", "normalize": False}
]
model_type = "linear_regression"
model_params = {"fit_intercept": True}
csvs = [open("data1.csv").read(), open("data2.csv").read()]

# 训练流程
scheduler = TrainingScheduler()
model_path = scheduler.train(model_type, factors, model_params, csvs)

# 推理流程
predictor = Predictor()
import pandas as pd
input_data = pd.DataFrame([{"feature1": 1.2, "feature2": 3.4}])
pred = predictor.predict(model_type, model_path, input_data)
print(pred)

✅ 总结:核心解耦点

模块 功能 解耦说明
BaseModelAdapter 模型标准接口 适配不同模型训练与推理逻辑
TrainingScheduler 调度训练流程 独立于具体模型与数据格式
DataCleaner + DataPreprocessor 数据处理模块 不依赖模型逻辑,按因子驱动
ADAPTER_REGISTRY 适配器注册表 模型扩展零侵入
Predictor 推理统一入口 封装模型加载和推理逻辑

如你需要,我可以进一步补充:

  • FastAPI 接口封装;
  • S3 接口本地 mock 测试;
  • 对接任务队列(如 Celery)执行异步训练。

是否需要我继续封装这些模块?

以下是完整的多数据集清洗模块 MultiDatasetProcessor 的实现、说明和测试示例。


✅ 模块功能概览

MultiDatasetProcessor 主要实现以下步骤:

  1. 加载多个数据文件(支持 CSV 和 Excel)
  2. 合并数据集并去除重复值
  3. 根据“因子配置”自动清洗和字段映射
  4. 推断字段类型(自动识别数值/字符型)
  5. 支持数值归一化
  6. 训练集/测试集按比例划分

✅ 使用方式

示例数据:dataset1.csv, dataset2.xlsx

示例内容(字段一致):

年龄 性别 工资
25 7000
32 8000
45 10000

示例因子配置

factor_config = {
    "features": [
        {"source": "年龄", "alias": "age", "min": 18, "max": 65},
        {"source": "性别", "alias": "gender", "enum": ["男", "女"]},
        {"source": "工资", "alias": "salary"}
    ],
    "normalize": True  # 归一化所有数值型字段
}

✅ 测试代码示例

from processor import MultiDatasetProcessor

filepaths = ["dataset1.csv", "dataset2.xlsx"]

# 实例化处理器
processor = MultiDatasetProcessor(filepaths=filepaths, factor_config=factor_config)

# 执行清洗
cleaned_df = processor.clean()
print("清洗后的数据:")
print(cleaned_df.head())

# 数据拆分
X_train, X_test, y_train, y_test = processor.split(target_column="salary")

print("\n训练特征集:")
print(X_train.head())

print("\n测试目标集:")
print(y_test.head())

✅ 推荐的封装方式(可选)

你可以将这个模块打包为:

  • Django app 的工具模块(放入 utils/services/ 中)
  • 独立 pip 包用于多项目复用
  • 搭配 Celery 后台任务异步执行
  • 提供 API 接口(如上传数据后清洗和返回版本 ID)

如果你需要我帮你进一步封装为 API 或任务队列任务,或者实现日志记录等功能,请告诉我,我可以继续为你补充后端结构与接口设计。

非常好,这种组合方式在 AI 平台中非常常见:

  • openpyxl 读取 Excel 文件(尤其支持 .xlsx,避免 pandas.read_excel 在部分格式或合并单元格下表现不佳)
  • 然后使用 pandas 进行 高效的数据清洗与分析

✅ 使用 openpyxl + pandas 读取 Excel 并清洗数据的推荐流程

📦 安装依赖(如未安装)

pip install openpyxl pandas

🧠 示例:读取 Excel、清洗并归一化字段

import openpyxl
import pandas as pd
from sklearn.preprocessing import StandardScaler


def read_excel_with_openpyxl(path: str, sheet_name: str = None) -> pd.DataFrame:
    """
    使用 openpyxl 读取 Excel 的指定 sheet,然后转换为 pandas DataFrame
    :param path: Excel 文件路径
    :param sheet_name: 表名,默认读取第一个
    :return: DataFrame
    """
    wb = openpyxl.load_workbook(path, data_only=True)
    sheet = wb[sheet_name] if sheet_name else wb.active

    # 读取表头
    headers = [cell.value for cell in next(sheet.iter_rows(min_row=1, max_row=1))]

    # 读取数据
    data = []
    for row in sheet.iter_rows(min_row=2, values_only=True):
        data.append(row)

    df = pd.DataFrame(data, columns=headers)
    return df


def clean_and_normalize(df: pd.DataFrame, factor_config: dict) -> pd.DataFrame:
    """
    根据因子配置清洗字段并归一化(仅数值型)
    :param df: 原始 DataFrame
    :param factor_config: 因子配置
    :return: 清洗后的 DataFrame
    """
    result = {}
    scaler = StandardScaler()

    for feature in factor_config["features"]:
        source = feature["source"]
        alias = feature["alias"]
        col = df[source].copy()

        # 字段类型推断
        if pd.api.types.is_numeric_dtype(col):
            col = col.astype(float)

            # 处理 min / max / enum 规则
            if "min" in feature:
                col = col[col >= feature["min"]]
            if "max" in feature:
                col = col[col <= feature["max"]]
            if "enum" in feature:
                col = col[col.isin(feature["enum"])]
        else:
            if "enum" in feature:
                col = col[col.isin(feature["enum"])]

        result[alias] = col

    cleaned_df = pd.DataFrame(result).dropna()

    # 如果开启归一化,只对数值型字段进行
    if factor_config.get("normalize"):
        numeric_cols = [f["alias"] for f in factor_config["features"]
                        if pd.api.types.is_numeric_dtype(cleaned_df[f["alias"]])]
        cleaned_df[numeric_cols] = scaler.fit_transform(cleaned_df[numeric_cols])

    return cleaned_df

✅ 示例因子配置 & 使用

factor_config = {
    "features": [
        {"source": "年龄", "alias": "age", "min": 18, "max": 100},
        {"source": "性别", "alias": "gender", "enum": ["男", "女"]}
    ],
    "normalize": True
}

df_raw = read_excel_with_openpyxl("数据样本.xlsx")
df_cleaned = clean_and_normalize(df_raw, factor_config)

🔁 数据后续处理(可继续进行训练/分割)

from sklearn.model_selection import train_test_split

X = df_cleaned.drop(columns=["target_column"])
y = df_cleaned["target_column"]
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.8)

✅ 优势总结

步骤 工具 优点
读取 Excel openpyxl 更好控制单元格、公式、合并格式
转换为 DataFrame pandas 高效处理、清洗、分析
类型处理 pandas + 类型推断 自动适配字符/数值/空值
归一化 sklearn.preprocessing.StandardScaler 统一归一化流程

如果你希望把这部分做成一个统一封装模块、或提供支持 API 调用或任务队列集成的形式,我可以继续为你抽象成服务或独立包。是否需要?

下面是一个设计良好、注释清晰、支持字段类型自动推断、可打包成独立服务的 数据清洗与预处理模块,可在你的 AI 平台中复用。


✅ 功能概览

  • 接收多个数据集(CSV/DataFrame),合并去重;
  • 使用因子配置进行字段映射、清洗(包括数值范围、枚举等);
  • 自动推断字段类型(字符型 / 数值型);
  • 可选归一化;
  • 训练集 / 测试集划分;
  • 可打包为独立模块,便于复用。

🗂️ 目录结构建议(可放入平台工具包)

your_project/
├── utils/
│   ├── __init__.py
│   └── data_cleaner.py  # ← 本模块

🧠 data_cleaner.py 代码实现(含详细注释)

import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from pandas.api.types import is_numeric_dtype, is_string_dtype
from typing import List, Dict, Tuple, Union, Optional


class DataCleaner:
    """
    数据清洗与预处理模块,可自动推断字段类型,并执行清洗、归一化和数据分割。
    """

    def __init__(self, factor_config: Dict, train_ratio: float = 0.8):
        """
        :param factor_config: 因子配置,定义特征、目标字段、归一化规则等
        :param train_ratio: 训练集占比 (默认 80%)
        """
        self.factor_config = factor_config
        self.train_ratio = train_ratio
        self.scaler = StandardScaler() if factor_config.get("normalize") else None

    def _load_csvs(self, csv_contents: List[Union[str, pd.DataFrame]]) -> pd.DataFrame:
        """
        读取 CSV 字符串或 DataFrame 列表,合并并去重
        """
        dfs = []
        for content in csv_contents:
            if isinstance(content, str):
                df = pd.read_csv(pd.compat.StringIO(content))
            elif isinstance(content, pd.DataFrame):
                df = content.copy()
            else:
                raise ValueError("CSV 内容必须是字符串或 DataFrame")
            dfs.append(df)

        merged = pd.concat(dfs, ignore_index=True).drop_duplicates()
        return merged

    def _infer_type(self, series: pd.Series) -> str:
        """
        推断字段类型:返回 'numeric' 或 'categorical'
        """
        if is_numeric_dtype(series):
            return "numeric"
        elif is_string_dtype(series):
            return "categorical"
        else:
            # 可拓展支持 datetime 等
            return "categorical"

    def _clean_feature(self, df: pd.DataFrame, feature: Dict) -> pd.Series:
        """
        映射、清洗单个特征字段,根据类型推断或配置执行值校验
        """
        source_col = feature["source"]
        alias_col = feature["alias"]
        col = df[source_col].copy()

        # 推断字段类型或读取配置
        dtype = feature.get("type") or self._infer_type(col)

        # 数值型校验
        if dtype == "numeric":
            if "min" in feature:
                col = col[col >= feature["min"]]
            if "max" in feature:
                col = col[col <= feature["max"]]
            if "enum" in feature:
                col = col[col.isin(feature["enum"])]

        # 枚举型校验
        elif dtype == "categorical":
            if "enum" in feature:
                col = col[col.isin(feature["enum"])]

        else:
            raise ValueError(f"不支持的数据类型: {dtype}")

        return col.rename(alias_col)

    def _clean_target(self, df: pd.DataFrame) -> pd.Series:
        """
        清洗目标字段(一般无需枚举限制)
        """
        target = self.factor_config["target"]
        target_col = df[target["source"]].copy()
        return target_col.rename(target["alias"])

    def clean(self, csv_contents: List[Union[str, pd.DataFrame]]) -> Tuple[pd.DataFrame, pd.Series]:
        """
        合并、映射并清洗特征 + 目标列,返回 X 和 y
        """
        df = self._load_csvs(csv_contents)

        # 清洗所有特征列
        feature_series = []
        for feature in self.factor_config["features"]:
            if feature["source"] not in df.columns:
                raise KeyError(f"缺少字段: {feature['source']}")
            cleaned = self._clean_feature(df, feature)
            feature_series.append(cleaned)

        # 合并为特征 DataFrame
        X = pd.concat(feature_series, axis=1)

        # 清洗目标字段
        y = self._clean_target(df)

        # 合并后统一去除 NA(防止对齐失败)
        data = pd.concat([X, y], axis=1).dropna()
        X_cleaned = data.drop(columns=[y.name])
        y_cleaned = data[y.name]

        # 执行归一化(仅对数值型列)
        if self.scaler:
            numeric_cols = [f["alias"] for f in self.factor_config["features"]
                            if f.get("type") == "numeric" or self._infer_type(df[f["source"]]) == "numeric"]
            X_cleaned[numeric_cols] = self.scaler.fit_transform(X_cleaned[numeric_cols])

        return X_cleaned, y_cleaned

    def split(self, X: pd.DataFrame, y: pd.Series) -> Tuple:
        """
        拆分训练集 / 测试集
        """
        return train_test_split(X, y, train_size=self.train_ratio, shuffle=True, random_state=42)

    def process(self, csv_contents: List[Union[str, pd.DataFrame]]) -> Tuple:
        """
        一体化处理:清洗 + 拆分
        """
        X, y = self.clean(csv_contents)
        return self.split(X, y)

✅ 使用方式(其他模块可直接引用)

from utils.data_cleaner import DataCleaner

# 假设来自前端的因子配置:
factor_config = {
    "features": [
        {"alias": "Age", "source": "年龄", "min": 18, "max": 100},
        {"alias": "Gender", "source": "性别", "enum": ["男", "女"]}
    ],
    "target": {"alias": "收入", "source": "年收入"},
    "normalize": True
}

cleaner = DataCleaner(factor_config, train_ratio=0.8)
X_train, X_test, y_train, y_test = cleaner.process([df1, df2])

☑️ 模块复用建议

你可以将 DataCleaner 放入平台的:

  • ✅ 模型训练入口前的数据清洗阶段;
  • ✅ 数据分析模块前的特征筛选阶段;
  • ✅ 自动建模、调参任务的统一预处理接口;
  • ✅ 接入任务队列中,实现异步数据预处理。

如需:

  • 字段名映射持久化(保存历史映射方案)
  • 字段类型智能标注系统(由样本自动识别配置)
  • 将模块转为 API 微服务

我可以继续为你构建。是否现在就打包成一个 FastAPI 服务或任务任务模块?

好的,我们将升级数据清洗模块以支持以下能力:


✅ 功能需求(扩展版)

  1. 字段映射支持:字段名可能与数据表中的不一致,支持统一字段映射。
  2. 字段类型判断与清洗规则
    • 数值型字段:支持 min / max 限制、枚举值限制。
    • 字符型字段:支持枚举值(白名单)校验。
  3. 字段缺失与异常值处理:非法值、缺失值直接过滤或日志记录。
  4. 统一字段名处理后再做归一化、切分等操作。

🧩 因子配置结构(示例)

{
  "features": [
    {
      "alias": "Age",          // 前端字段名(统一名)
      "source": "age_years",   // 原数据字段名
      "type": "numeric",
      "min": 18,
      "max": 100
    },
    {
      "alias": "Gender",
      "source": "sex",
      "type": "categorical",
      "enum": ["Male", "Female"]
    }
  ],
  "target": {
    "alias": "bmi",
    "source": "body_mass_index"
  },
  "normalize": true
}

🧩 模块设计(简化核心逻辑)

文件路径:core/data/processor.py

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from typing import List, Dict, Tuple, Union


class DataProcessorV2:
    def __init__(self, factor_config: Dict, train_ratio: float = 0.8):
        self.feature_config = factor_config["features"]
        self.target_config = factor_config["target"]
        self.normalize = factor_config.get("normalize", False)
        self.train_ratio = train_ratio

    def load_and_merge(self, csv_contents: List[Union[str, pd.DataFrame]]) -> pd.DataFrame:
        dfs = []
        for item in csv_contents:
            if isinstance(item, str):
                df = pd.read_csv(pd.compat.StringIO(item))
            elif isinstance(item, pd.DataFrame):
                df = item
            else:
                raise ValueError("Unsupported csv content type")
            dfs.append(df)
        merged = pd.concat(dfs, ignore_index=True)
        return merged.drop_duplicates()

    def map_and_clean(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]:
        cleaned_df = pd.DataFrame()

        for feature in self.feature_config:
            src = feature["source"]
            dst = feature["alias"]
            dtype = feature["type"]

            if src not in df.columns:
                raise ValueError(f"Missing feature column: {src}")

            col = df[src].copy()

            if dtype == "numeric":
                if "min" in feature:
                    col = col[col >= feature["min"]]
                if "max" in feature:
                    col = col[col <= feature["max"]]
                if "enum" in feature:
                    col = col[col.isin(feature["enum"])]
            elif dtype == "categorical":
                if "enum" in feature:
                    col = col[col.isin(feature["enum"])]
            else:
                raise ValueError(f"Unsupported type {dtype} for feature {dst}")

            cleaned_df[dst] = col

        # 处理 target
        target_src = self.target_config["source"]
        target_dst = self.target_config["alias"]

        if target_src not in df.columns:
            raise ValueError("Missing target column")

        target_col = df[target_src]
        cleaned_df[target_dst] = target_col

        # 去除含有 NA 的行
        cleaned_df.dropna(inplace=True)

        # 提取 X 和 y
        X = cleaned_df[[f["alias"] for f in self.feature_config]]
        y = cleaned_df[target_dst]

        # 归一化
        if self.normalize:
            numeric_features = [f["alias"] for f in self.feature_config if f["type"] == "numeric"]
            scaler = StandardScaler()
            X[numeric_features] = scaler.fit_transform(X[numeric_features])

        return X, y

    def split_data(self, X: pd.DataFrame, y: pd.Series) -> Tuple:
        return train_test_split(X, y, train_size=self.train_ratio, shuffle=True, random_state=42)

    def process(self, csv_contents: List[Union[str, pd.DataFrame]]) -> Tuple:
        merged_df = self.load_and_merge(csv_contents)
        X, y = self.map_and_clean(merged_df)
        return self.split_data(X, y)

✅ 使用示例

from core.data.processor import DataProcessorV2

factor_config = {
    "features": [
        {"alias": "Age", "source": "age", "type": "numeric", "min": 18, "max": 80},
        {"alias": "Sex", "source": "gender", "type": "categorical", "enum": ["Male", "Female"]}
    ],
    "target": {"alias": "BMI", "source": "bmi"},
    "normalize": True
}

processor = DataProcessorV2(factor_config=factor_config, train_ratio=0.75)
X_train, X_test, y_train, y_test = processor.process([df1, df2])

✅ 扩展建议(后续可加)

  • 支持字段类型自动推断(pandas.api.types
  • 缺失值填充策略(均值、众数、自定义)
  • 日志记录每步过滤/丢弃的数量
  • 报错集中收集,反馈至任务系统

需要我帮你补充到任务队列流程或加入日志记录模块吗?我也可以帮你写成独立服务或包,供平台多个模块复用。

以下是完整的代码说明、注释及示例测试方法,封装为一个通用的数据清洗模块,支持平台复用。


✅ 模块功能概述

这个模块 MultiDatasetProcessor 用于处理多个 Excel 或 CSV 数据集,具有以下功能:

  1. 读取多文件(支持路径或内存文件对象)
  2. 对列名排序和标准化,确保结构一致
  3. 合并多个数据集并去重
  4. 根据因子配置执行字段映射、最大值/最小值/枚举过滤
  5. 归一化数值型字段(可选)
  6. 根据目标字段切分训练/测试数据

✅ 使用方式示例(测试代码)

from io import BytesIO

# 模拟因子配置
factor_config = {
    "features": [
        {
            "source": "Age",        # 原字段名
            "alias": "age",         # 目标字段名
            "min": 18,
            "max": 60
        },
        {
            "source": "Gender",
            "alias": "gender",
            "enum": ["Male", "Female"]
        },
        {
            "source": "Income",
            "alias": "income",
            "min": 3000
        }
    ],
    "normalize": True  # 是否进行归一化
}

# 模拟多个数据文件路径或文件对象(可替换为实际上传文件对象)
files = [
    "dataset1.csv",
    "dataset2.xlsx",
]

# 初始化处理器
processor = MultiDatasetProcessor(files=files, factor_config=factor_config)

# 清洗数据
cleaned_df = processor.clean()

# 打印清洗结果
print("Cleaned Data:\n", cleaned_df.head())

# 分割训练集和测试集
X_train, X_test, y_train, y_test = processor.split(target_column="income")

print("Train shape:", X_train.shape, y_train.shape)
print("Test shape:", X_test.shape, y_test.shape)

✅ 模块复用建议

你可以将该模块保存为 Python 包或微服务(如 Flask FastAPI):

封装为包

创建目录结构:

my_data_cleaner/
├── __init__.py
├── processor.py

processor.py 中存放上面的类。

接入任务队列(Celery 示例)

# tasks.py
from celery import shared_task
from my_data_cleaner.processor import MultiDatasetProcessor

@shared_task
def run_data_cleaning(files, factor_config):
    processor = MultiDatasetProcessor(files, factor_config)
    cleaned = processor.clean()
    return cleaned.to_json()

✅ 小结

功能项 是否支持
多数据集列名不一致 ✅ 列标准化并按名称排序
数据清洗 ✅ 根据因子配置(数值/字符)
归一化 ✅ 针对数值字段
类型推断 ✅ 自动识别数值或字符型
文件来源支持 ✅ CSV, Excel, 路径/内存对象
模块复用 ✅ 可作为包/服务/任务队列调用

如果你需要我将此模块整理为 Python 包、FastAPI 服务或 Celery 工具,请告诉我,我可以继续帮助你封装部署。

以下是修改后的完整 Python 模块,已将文件读取逻辑更新为使用 openpyxl 来处理 Excel 文件,并辅以 pandas 进行数据操作,支持 CSV、Excel 文件路径或 BytesIO 文件对象的多数据集读取、清洗与切分。


✅ 模块说明:MultiDatasetProcessor

📌 功能概览

  • 支持多个 CSV / Excel 文件
  • 使用 openpyxl 读取 .xlsx 文件,确保兼容多格式
  • 对字段进行统一对齐、清洗、字段映射
  • 支持数值范围/枚举/归一化等因子规则
  • 支持训练/测试集切分

✅ 使用示例

from io import BytesIO

factor_config = {
    "features": [
        {
            "source": "年龄",
            "alias": "age",
            "min": 18,
            "max": 60
        },
        {
            "source": "性别",
            "alias": "gender",
            "enum": ["男", "女"]
        },
        {
            "source": "收入",
            "alias": "income",
            "min": 3000
        }
    ],
    "normalize": True
}

files = ["dataset1.csv", "dataset2.xlsx"]

processor = MultiDatasetProcessor(files=files, factor_config=factor_config)

cleaned = processor.clean()
print(cleaned.head())

X_train, X_test, y_train, y_test = processor.split(target_column="income")
print("Train:", X_train.shape, y_train.shape)
print("Test:", X_test.shape, y_test.shape)

✅ 核心代码功能解释(简要)

函数 说明
_read_excel_with_openpyxl 使用 openpyxl 读取 Excel,获取表头与数据,转为 pandas.DataFrame
_load_single_file 识别文件类型,决定使用 pandas 还是 openpyxl 加载
_align_columns 标准化列名并按字母排序,用于列对齐
_load_and_merge 逐个读取数据集并合并去重
clean 按照因子配置进行字段映射、筛选、归一化等处理
split 按照设定比例对清洗后的数据进行训练/测试拆分

如你希望将该模块封装为 API 服务或 Celery 任务队列支持组件,也可以继续进行集成设计。如果你想要我生成完整 FastAPI 或 Flask 服务结构,请告诉我。

下面是更新后的 MultiDatasetProcessor,实现了 所有数据集文件只用 openpyxl(Excel)和 Python 标准库的 csv(CSV) 来读取,再交由 pandas 统一封装成 DataFrame 进行后续处理。这样就绕过了有时 pandas.read_csv 可能出现的问题。


import os
import csv
from io import TextIOWrapper, BytesIO
from typing import List, Union

import pandas as pd
from openpyxl import load_workbook


class MultiDatasetProcessor:
    """
    多数据集处理器,只使用 openpyxl 读取 Excel,使用 csv 库读取 CSV,
    并基于因子配置执行清洗、归一化和训练/测试拆分。
    """

    def __init__(
        self,
        files: List[Union[str, BytesIO]],
        factor_config: dict,
        sheet_name: str = None
    ):
        """
        :param files: 文件路径或文件对象列表(.csv 或 .xlsx/.xls)
        :param factor_config: 因子配置,见示例
        :param sheet_name: Excel 的 sheet 名称(可选)
        """
        self.files = files
        self.factor_config = factor_config
        self.sheet_name = sheet_name
        # 读取并合并所有文件
        self.df = self._load_and_merge()
        self.cleaned_df = None

    def _read_excel_with_openpyxl(self, file: Union[str, BytesIO]) -> pd.DataFrame:
        """
        使用 openpyxl 读取 Excel 文件,将其转成 pandas.DataFrame。
        """
        # 加载工作簿
        wb = load_workbook(filename=file, data_only=True)
        # 选择指定 sheet 或第一个 sheet
        ws = wb[self.sheet_name or wb.sheetnames[0]]
        data = list(ws.values)
        # 第一行作为表头
        headers = [str(h).strip() if h is not None else "" for h in data[0]]
        # 后续行作为数据
        values = data[1:]
        return pd.DataFrame(values, columns=headers)

    def _read_csv_with_stdlib(self, file: Union[str, BytesIO]) -> pd.DataFrame:
        """
        使用 Python 标准库 csv 读取 CSV 文件,将其转成 pandas.DataFrame。
        """
        # 打开文件路径或文件对象
        if isinstance(file, str):
            f = open(file, newline='', encoding='utf-8')
        else:
            # BytesIO -> 文本流
            f = TextIOWrapper(file, encoding='utf-8')
        reader = csv.reader(f)
        rows = list(reader)
        f.close()

        if not rows:
            return pd.DataFrame()

        # 第一行是表头
        headers = [h.strip() for h in rows[0]]
        # 剩余行是数据
        values = rows[1:]
        return pd.DataFrame(values, columns=headers)

    def _load_single_file(self, file: Union[str, BytesIO]) -> pd.DataFrame:
        """
        判断文件类型并调用相应的读取函数。
        """
        # 识别扩展名
        if isinstance(file, str):
            ext = os.path.splitext(file)[1].lower()
        else:
            # 如果是 BytesIO 对象,默认当成 Excel
            ext = '.xlsx'

        if ext == '.csv':
            return self._read_csv_with_stdlib(file)
        elif ext in ('.xls', '.xlsx'):
            return self._read_excel_with_openpyxl(file)
        else:
            raise ValueError(f"不支持的文件类型: {ext}")

    def _align_columns(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        标准化列名(去除首尾空格)并按字母排序,使多个 DataFrame 列顺序一致。
        """
        df.columns = [col.strip() for col in df.columns]
        return df.reindex(sorted(df.columns), axis=1)

    def _load_and_merge(self) -> pd.DataFrame:
        """
        逐个加载所有文件,列对齐后合并,并去重。
        """
        aligned = []
        for file in self.files:
            df = self._load_single_file(file)
            df = self._align_columns(df)
            aligned.append(df)

        combined = pd.concat(aligned, ignore_index=True)
        return combined.drop_duplicates()

    def _infer_column_type(self, col: pd.Series) -> str:
        """
        自动判断字段类型:数值型或字符串型。
        """
        if pd.api.types.is_numeric_dtype(col):
            return "numeric"
        return "string"

    def clean(self) -> pd.DataFrame:
        """
        按因子配置清洗数据:
        1) 字段映射(source -> alias)
        2) 数值型字段根据 min/max/enum 过滤
        3) 字符型字段根据 enum 过滤
        4) 删除任何 alias 列中含 NA 的行
        5) 对数值型字段执行标准化(如 normalize=True)
        """
        result = {}
        for feature in self.factor_config["features"]:
            src = feature["source"]
            alias = feature["alias"]
            if src not in self.df.columns:
                continue

            col = self.df[src].copy()
            col_type = self._infer_column_type(col)

            # 数值型清洗
            if col_type == "numeric":
                col = pd.to_numeric(col, errors="coerce")
                if "min" in feature:
                    col = col.where(col >= feature["min"])
                if "max" in feature:
                    col = col.where(col <= feature["max"])
                if "enum" in feature:
                    col = col.where(col.isin(feature["enum"]))
            # 字符型清洗
            else:
                col = col.astype(str).str.strip()
                if "enum" in feature:
                    col = col.where(col.isin(feature["enum"]))

            result[alias] = col

        # 构造清洗后的 DataFrame,删除任何含 NA 的行
        cleaned = pd.DataFrame(result).dropna()

        # 归一化数值型字段
        if self.factor_config.get("normalize"):
            from sklearn.preprocessing import StandardScaler
            scaler = StandardScaler()
            num_cols = [
                f["alias"] for f in self.factor_config["features"]
                if f["alias"] in cleaned.columns and pd.api.types.is_numeric_dtype(cleaned[f["alias"]])
            ]
            if num_cols:
                cleaned[num_cols] = scaler.fit_transform(cleaned[num_cols])

        self.cleaned_df = cleaned
        return cleaned

    def split(self, target_column: str, train_ratio: float = 0.8):
        """
        将清洗后的数据拆分为训练集和测试集:
        :param target_column: 目标字段(alias 名称)
        :param train_ratio: 训练集比例
        :return: X_train, X_test, y_train, y_test
        """
        from sklearn.model_selection import train_test_split

        if self.cleaned_df is None:
            raise RuntimeError("请先调用 `.clean()` 方法")

        X = self.cleaned_df.drop(columns=[target_column])
        y = self.cleaned_df[target_column]
        return train_test_split(X, y, train_size=train_ratio, random_state=42)

核心行-by-行解释

  1. load_workbookcsv.reader
    • openpyxl.load_workbook 读取 Excel,获取所有单元格值。
    • 用 Python 标准库 csv.reader 读取 CSV,不依赖 pandas.read_csv
  2. _align_columns
    • df.columns = [col.strip() …]:去除列名前后空格。
    • df.reindex(sorted(df.columns), axis=1):按字母顺序排列列,保证不同文件列顺序一致。
  3. 合并与去重
    • pd.concat(aligned, ignore_index=True):合并所有 DataFrame。
    • .drop_duplicates():去除重复行。
  4. 字段清洗
    • pd.to_numeric(..., errors="coerce"):将列转为数值,无法转换的变为 NaN。
    • col.where(cond):只有满足条件的行保留,否则置为 NaN。
    • 对字符串列做 .str.strip() 并用 .isin() 过滤枚举值。
  5. 删除缺失
    • pd.DataFrame(result).dropna():对所有 alias 列,任何包含 NaN 的行都会被删除,确保每条数据完整。
  6. 归一化
    • 使用 StandardScaler 对所有数值型字段做 fit_transform
  7. 拆分数据
    • train_test_split:按 train_ratio 比例随机切分训练/测试集。

测试示例

from io import BytesIO

# 示例因子配置
factor_config = {
    "features": [
        {"source": "Age", "alias": "age", "min": 18, "max": 60},
        {"source": "Gender", "alias": "gender", "enum": ["Male", "Female"]},
        {"source": "Income", "alias": "income", "min": 3000}
    ],
    "normalize": True
}

# 假设 dataset1.xlsx, dataset2.xlsx 放在当前目录
files = ["dataset1.xlsx", "dataset2.xlsx"]

processor = MultiDatasetProcessor(files, factor_config)
# 清洗数据
cleaned_df = processor.clean()
print("Cleaned DataFrame:\n", cleaned_df.head())

# 拆分训练和测试集
X_train, X_test, y_train, y_test = processor.split(target_column="income")
print(f"Train X shape: {X_train.shape}, y shape: {y_train.shape}")
print(f"Test X shape: {X_test.shape}, y shape: {y_test.shape}")

这样,所有的文件读取都仅依赖 openpyxl(Excel)和标准库 csv(CSV),避免了 pandas.read_csv 的偶发问题,同时保留了强大的 DataFrame 操作能力。

对,用 openpyxl 只支持 Excel 格式(.xls/.xlsx),它无法读取 CSV。要处理 CSV,你需要用 Python 的 csv 模块或 pandas.read_csv(如果它稳定)来读取。下面给你一个简单示例,演示如何统一「openpyxl + csv 模块」来读取 Excel 和 CSV 文件,并拼成 DataFrame。

import os
import csv
from io import TextIOWrapper, BytesIO
import pandas as pd
from openpyxl import load_workbook

def read_file_to_df(file: Union[str, BytesIO], sheet_name: str = None) -> pd.DataFrame:
    """
    根据扩展名决定读取方式:
    - Excel (.xls/.xlsx) 用 openpyxl
    - CSV (.csv) 用 Python csv 模块
    返回 pandas.DataFrame
    """
    # 判断扩展名
    if isinstance(file, str):
        ext = os.path.splitext(file)[1].lower()
    else:
        # 文件对象,假设是 CSV or Excel,根据场景决定
        # 这里如果是 BytesIO 且想读 CSV,可传入一个标志
        raise ValueError("请传入文件路径以识别类型")

    if ext in (".xls", ".xlsx"):
        # --- EXCEL: openpyxl 读取 ---
        wb = load_workbook(filename=file, data_only=True)
        ws = wb[sheet_name or wb.sheetnames[0]]
        rows = list(ws.values)
        headers = [str(h).strip() if h is not None else "" for h in rows[0]]
        values = rows[1:]
        return pd.DataFrame(values, columns=headers)

    elif ext == ".csv":
        # --- CSV: csv 模块 读取 ---
        with open(file, newline="", encoding="utf-8") as f:
            reader = csv.reader(f)
            rows = list(reader)
        if not rows:
            return pd.DataFrame()
        headers = [h.strip() for h in rows[0]]
        values = rows[1:]
        return pd.DataFrame(values, columns=headers)

    else:
        raise ValueError(f"不支持的文件类型: {ext}")

# 使用示例
df_excel = read_file_to_df("data.xlsx")
df_csv   = read_file_to_df("data.csv")

要点总结

  1. openpyxl 只管 Excel(.xlsx/.xls);
  2. CSV 必须用 Python 内置的 csv 模块或其他库来读;
  3. 读完后都转换成 pandas.DataFrame,再做后续清洗、归一化、切分等操作。

这样就能避开 pandas.read_csv 在某些复杂 CSV 上的兼容问题,同时整合 openpyxl 的 Excel 读取优点。

下面是将你提出的多字段数据清洗规则(最大/最小值 + 步长 + 容差 + 值替换)逻辑,适配集成到之前讨论过的统一数据清洗模块中的实现。此版本基于你之前的需求整合:

  • 支持多个数据集文件(CSV 或 XLSX)
  • 使用 openpyxl 读取 XLSX,pandas 读取 CSV(可替换)
  • 所有字段列名一致但顺序可能不同,支持字段对齐
  • 清洗因子字段为空的记录
  • 每个字段可配置清洗规则,包括数值限定、值对齐与剔除

✅ 总体结构

data_cleaning/
├── __init__.py
├── cleaner.py        # 主清洗逻辑
├── file_reader.py    # XLSX/CSV 文件读取
├── utils.py          # 工具函数

✅ 1. file_reader.py:读取并对齐字段

import pandas as pd
from openpyxl import load_workbook
from pathlib import Path
from typing import List, Union
import io


def read_excel_file(file: Union[str, Path, io.BytesIO]) -> pd.DataFrame:
    """使用 openpyxl 读取 xlsx 文件为 DataFrame"""
    wb = load_workbook(file)
    ws = wb.active
    data = list(ws.iter_rows(values_only=True))
    header, *rows = data
    return pd.DataFrame(rows, columns=header)


def read_dataset_files(file_list: List[Union[str, Path, io.BytesIO]]) -> pd.DataFrame:
    """
    读取并合并多个数据集文件(只支持相同结构但列顺序可不同)
    """
    dfs = []
    for file in file_list:
        if isinstance(file, (str, Path)) and str(file).lower().endswith(".xlsx"):
            df = read_excel_file(file)
        elif isinstance(file, (str, Path)) and str(file).lower().endswith(".csv"):
            df = pd.read_csv(file)
        else:
            raise ValueError(f"不支持的文件格式或类型: {file}")
        dfs.append(df)

    # 对字段列统一(按照第一个为基准)
    base_columns = dfs[0].columns
    for df in dfs[1:]:
        df.columns = base_columns  # 保证一致性

    combined_df = pd.concat(dfs, ignore_index=True).drop_duplicates()
    return combined_df

✅ 2. utils.py:筛选值生成和字段清洗

import numpy as np
import pandas as pd


def generate_filter_values(min_val: float, max_val: float, step: float) -> np.ndarray:
    """根据 min/max/step 生成筛选值数组"""
    return np.arange(min_val, max_val + step, step)


def apply_filter_to_series(series: pd.Series, filters: np.ndarray, tolerance: float) -> pd.Series:
    """将字段值替换为最接近的筛选值,超出容差则替换为 NaN"""
    cleaned = []
    for val in series:
        if pd.isnull(val):
            cleaned.append(np.nan)
            continue
        matched = False
        for f in filters:
            if abs(val - f) <= tolerance:
                cleaned.append(f)
                matched = True
                break
        if not matched:
            cleaned.append(np.nan)
    return pd.Series(cleaned, index=series.index)

✅ 3. cleaner.py:清洗主逻辑入口

import pandas as pd
from typing import List, Union, Dict
from .file_reader import read_dataset_files
from .utils import generate_filter_values, apply_filter_to_series


def clean_datasets(
    files: List[Union[str, bytes]],
    factor_config: Dict[str, dict],
    required_factors: List[str]
) -> pd.DataFrame:
    """
    统一数据清洗主函数

    参数:
    - files: 文件路径或 file-like 对象列表(csv/xlsx)
    - factor_config: 各因子字段的清洗配置,如 min/max/step/tolerance
    - required_factors: 必须存在的因子字段(非空)

    返回:
    - 清洗后的 DataFrame
    """
    df = read_dataset_files(files)

    # 清除因子缺失的记录
    df.dropna(subset=required_factors, inplace=True)

    # 应用每个字段的清洗规则
    for factor, config in factor_config.items():
        if factor not in df.columns:
            raise ValueError(f"字段 '{factor}' 不存在")

        # 范围过滤
        df = df[(df[factor] >= config["min"]) & (df[factor] <= config["max"])]

        # 值替换
        filters = generate_filter_values(config["min"], config["max"], config["step"])
        df[factor] = apply_filter_to_series(df[factor], filters, config["tolerance"])

    # 剔除无法匹配的 NaN
    df.dropna(subset=factor_config.keys(), inplace=True)
    return df.reset_index(drop=True)

✅ 示例用法(测试用)

if __name__ == "__main__":
    from pathlib import Path

    cleaning_rules = {
        "temperature": {"min": 6, "max": 10, "step": 1, "tolerance": 0.2},
        "humidity": {"min": 20, "max": 30, "step": 2, "tolerance": 0.3}
    }

    required_fields = ["temperature", "humidity"]

    files = [
        Path("data/file1.xlsx"),
        Path("data/file2.xlsx")
    ]

    cleaned = clean_datasets(files, cleaning_rules, required_fields)
    print(cleaned)

✅ 总结说明

模块 功能 可复用性
file_reader.py 支持不同格式读取合并
utils.py 通用工具函数(间隔值生成与筛选)
cleaner.py 清洗总控函数,支持多个字段、字段缺失判断

是否需要我将它封装为 pip 安装包格式,或支持 Celery 任务封装用于异步处理清洗?