为了设计一个通用的 模型训练流程框架,能够适配多种模型类型(如线性回归、数据分析、大语言模型等),最好的方法是使用 “适配器模式(Adapter Pattern)” 来封装不同类型模型的差异,使它们统一实现一套训练和评估接口,便于调度、管理、日志记录、结果展示等。
🧩 一、核心设计理念
- 统一接口:定义一个抽象基类
BaseModelAdapter
,所有模型适配器继承它,实现统一的方法。 - 适配器类:针对不同模型类型(如线性回归、大语言模型等)定义对应的适配器类。
- 训练管理器:提供统一的模型调度、训练执行、日志收集、状态保存等功能。
- 可扩展性强:新增模型类型时,只需新增适配器并注册即可,无需改动主流程。
📦 二、目录结构建议(Django + Python)
ai_platform/
├── adapters/ # 所有模型适配器
│ ├── base.py # BaseModelAdapter 抽象类
│ ├── linear.py # 线性回归适配器
│ ├── analysis.py # 数据分析适配器
│ └── llm.py # 大语言模型适配器
├── services/
│ └── trainer.py # 统一训练器/任务管理器
├── models/
│ └── task.py # ModelTrainTask 等任务模型
└── utils/
└── logger.py # 日志工具
🧱 三、核心接口定义
✅ BaseModelAdapter
(抽象基类)
# adapters/base.py
from abc import ABC, abstractmethod
class BaseModelAdapter(ABC):
def __init__(self, config, dataset):
self.config = config # 用户配置参数
self.dataset = dataset # 数据集对象或路径
@abstractmethod
def preprocess(self):
"""预处理数据"""
pass
@abstractmethod
def train(self):
"""模型训练主逻辑"""
pass
@abstractmethod
def evaluate(self):
"""模型评估逻辑"""
pass
@abstractmethod
def save_model(self, output_path):
"""保存模型到文件/数据库"""
pass
🧠 四、模型适配器示例
1️⃣ 线性回归适配器
# adapters/linear.py
from .base import BaseModelAdapter
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
import joblib
class LinearRegressionAdapter(BaseModelAdapter):
def preprocess(self):
self.X = self.dataset.drop(columns=['target'])
self.y = self.dataset['target']
def train(self):
self.model = LinearRegression()
self.model.fit(self.X, self.y)
def evaluate(self):
y_pred = self.model.predict(self.X)
return {"mse": mean_squared_error(self.y, y_pred)}
def save_model(self, output_path):
joblib.dump(self.model, output_path)
2️⃣ 大语言模型适配器(如调用 OpenAI GPT)
# adapters/llm.py
from .base import BaseModelAdapter
import openai
class LLMAdapter(BaseModelAdapter):
def preprocess(self):
self.prompt = self.config.get("prompt", "")
def train(self):
# 通常大语言模型是 zero/few-shot,无需训练,只推理
self.output = openai.ChatCompletion.create(
model=self.config.get("model", "gpt-4"),
messages=[{"role": "user", "content": self.prompt}],
temperature=self.config.get("temperature", 0.7)
)
def evaluate(self):
return {"output": self.output["choices"][0]["message"]["content"]}
def save_model(self, output_path):
# LLM 模型无法本地保存,仅记录 prompt & output
with open(output_path, 'w') as f:
f.write(self.output["choices"][0]["message"]["content"])
⚙️ 五、训练任务管理器(训练入口)
# services/trainer.py
from adapters.linear import LinearRegressionAdapter
from adapters.llm import LLMAdapter
# 注册适配器
ADAPTER_REGISTRY = {
"linear_regression": LinearRegressionAdapter,
"llm": LLMAdapter,
# 可拓展更多模型
}
class ModelTrainer:
def __init__(self, task_config, dataset):
self.task_type = task_config["task_type"]
self.adapter_class = ADAPTER_REGISTRY.get(self.task_type)
if not self.adapter_class:
raise ValueError(f"Unsupported model type: {self.task_type}")
self.adapter = self.adapter_class(task_config, dataset)
def run(self):
self.adapter.preprocess()
self.adapter.train()
results = self.adapter.evaluate()
self.adapter.save_model(self._output_path())
return results
def _output_path(self):
return f"/tmp/models/{self.task_type}_result.pkl"
🔌 六、使用方式(Django 视图或 Celery 中调用)
from services.trainer import ModelTrainer
from datasets.utils import load_dataset # 自定义函数:从数据库或 S3 载入数据
def train_model(task_id):
task_config = {
"task_type": "linear_regression",
"learning_rate": 0.01,
"prompt": "总结这段文本...",
}
dataset = load_dataset(task_id)
trainer = ModelTrainer(task_config, dataset)
result = trainer.run()
return result
🧮 七、模型配置字段建议
字段名 | 含义 | 模型类型 |
---|---|---|
task_type |
模型任务类型(如 llm , linear_regression ) |
所有 |
learning_rate |
学习率 | ML/DL 模型 |
prompt |
输入提示文本 | LLM |
temperature |
控制 LLM 输出的随机性 | LLM |
max_iterations |
最大训练轮数 | ML 模型 |
target_column |
标签列名 | ML 模型 |
model_name |
预设模型名(如 gpt-4 , glm 等) |
LLM |
✅ 八、优势总结
- 解耦强:每种模型适配器独立实现,不会相互影响。
- 易扩展:新增模型只需添加一个类并注册。
- 接口统一:主训练逻辑和调用方式不变,便于统一日志、状态管理、资源调度。
- 适合平台化:非常适用于 AI 平台、AutoML 系统、训练平台构建。
如果你有特定模型、特定输入/输出需求(比如图像、多分类、时序预测等),我可以帮你进一步扩展对应的适配器类。
需要我继续帮你完善数据库模型设计、Celery 任务封装、前端对接建议吗?
了解了,我将基于你的要求重新设计并精简训练与预测流程:
- 从数据库中的模型模板表加载指定模型类型的默认参数;
- 与前端传入的模型参数进行合并(前端值优先);
- 使用适配器模式完成清洗、预处理、训练、保存、发布、预测的全流程;
- 所有结构解耦、接口清晰、逻辑易读可维护。
我会输出关键模块代码和说明,确保你能直接集成使用。请稍等,我整理完就告诉你。
后端模型训练与预测流程设计
架构概述
数据管道是一种自动化的数据处理工具,它可以从多个源获取数据,对其进行清洗、转换和整合,然后将其提供给机器学习模型进行训练。在本方案中,流程包括:
- 数据清洗:合并多个 CSV 数据集、按因子筛选字段并去重;
- 数据预处理:对标记为归一化的数值列进行标准化;
- 模型训练:根据模型类型选择对应的适配器进行训练;
- 模型保存与发布:训练结束后将模型保存到本地文件,用户可选择将其上传到 S3;
- 模型预测:其他用户提供输入字段,根据已发布模型的 ID 和类型加载模型并推理。
数据清洗与预处理模块
首先进行数据清洗:读取并合并前端传入的多个 CSV 字符串,按因子列表筛选所需字段,最后去除重复行。完成数据清洗后,对需要归一化的数值列进行处理,实现基本的标准化。以下是示例代码结构:
import pandas as pd
from typing import List
import io
class DataCleaner:
@staticmethod
def merge_datasets(csv_contents: List[str]) -> pd.DataFrame:
"""
合并多个 CSV 文本数据为一个 DataFrame
"""
df_list = []
for csv in csv_contents:
df_list.append(pd.read_csv(io.StringIO(csv)))
df = pd.concat(df_list, ignore_index=True)
return df
@staticmethod
def filter_fields(df: pd.DataFrame, field_names: List[str]) -> pd.DataFrame:
"""
根据字段名筛选列
"""
return df[field_names]
@staticmethod
def drop_duplicates(df: pd.DataFrame) -> pd.DataFrame:
"""
去除重复行
"""
return df.drop_duplicates()
import pandas as pd
from typing import Dict
class DataPreprocessor:
@staticmethod
def normalize(df: pd.DataFrame, factors: List[Dict]) -> pd.DataFrame:
"""
对需要归一化的数值列进行标准化处理
"""
for factor in factors:
name = factor['name']
if factor.get('normalize') and pd.api.types.is_numeric_dtype(df[name]):
# 使用均值-标准差标准化
df[name] = (df[name] - df[name].mean()) / df[name].std()
return df
模型适配器接口与实现
使用适配器模式设计统一的模型接口。定义一个 BaseModelAdapter
抽象类,规定 train
、predict
、save
、load
等方法,所有模型类型的适配器都继承该接口。例如:
import pandas as pd
from typing import Dict, Any
import joblib
from abc import ABC, abstractmethod
class BaseModelAdapter(ABC):
@abstractmethod
def train(self, X: pd.DataFrame, y: pd.Series, params: Dict):
pass
@abstractmethod
def predict(self, X: pd.DataFrame) -> Any:
pass
def save(self, filepath: str):
"""
将训练好的模型保存到文件
"""
joblib.dump(self.model, filepath)
@classmethod
def load(cls, filepath: str):
"""
从文件加载模型
"""
adapter = cls()
adapter.model = joblib.load(filepath)
return adapter
from sklearn.linear_model import LinearRegression
from xgboost import XGBRegressor
class LinearRegressionAdapter(BaseModelAdapter):
def __init__(self):
self.model = LinearRegression()
def train(self, X, y, params):
# 初始化模型时应用参数(如正则化系数等)
self.model = LinearRegression(**params)
self.model.fit(X, y)
def predict(self, X):
return self.model.predict(X)
class XGBoostAdapter(BaseModelAdapter):
def __init__(self):
self.model = XGBRegressor()
def train(self, X, y, params):
self.model = XGBRegressor(**params)
self.model.fit(X, y)
def predict(self, X):
return self.model.predict(X)
# 若支持大语言模型,可实现对应适配器
class LLMAdapter(BaseModelAdapter):
def __init__(self):
self.model = None # 初始化大语言模型(例如加载预训练模型)
def train(self, X, y, params):
# 对于 LLM 可执行微调或其它训练逻辑
pass
def predict(self, X):
# 使用大语言模型进行推理
return self.model.generate(X)
训练调度与参数管理
训练模块负责协调整个训练流程。首先,根据前端传入的模型类型,从模板表加载默认参数,然后与前端给定的参数合并(前端优先)。示例代码如下:
class ModelTemplateRepo:
"""
模型模板表,存储每种模型类型的默认参数
"""
TEMPLATE = {
"linear_regression": {"fit_intercept": True, "normalize": False},
"xgboost": {"max_depth": 5, "learning_rate": 0.1},
# 可以添加其他模型类型及其默认参数
}
@staticmethod
def get_default_params(model_type: str) -> Dict:
return ModelTemplateRepo.TEMPLATE.get(model_type, {})
# 适配器注册表:根据模型类型获取对应的 Adapter 类
ADAPTER_REGISTRY = {
"linear_regression": LinearRegressionAdapter,
"xgboost": XGBoostAdapter,
"llm": LLMAdapter
}
class TrainingScheduler:
def __init__(self):
pass
def train_model(self, factors: List[Dict], model_type: str,
user_params: Dict, csv_contents: List[str]) -> str:
# 数据清洗
df = DataCleaner.merge_datasets(csv_contents)
df = DataCleaner.filter_fields(df, [f['name'] for f in factors])
df = DataCleaner.drop_duplicates(df)
# 数据预处理
df = DataPreprocessor.normalize(df, factors)
# 假设最后一个字段为目标变量
X = df[[f['name'] for f in factors if f['name'] != factors[-1]['name']]]
y = df[factors[-1]['name']]
# 参数合并(模板默认参数 + 用户参数)
default_params = ModelTemplateRepo.get_default_params(model_type)
params = {**default_params, **user_params}
# 训练模型
adapter_cls = ADAPTER_REGISTRY.get(model_type)
adapter = adapter_cls()
adapter.train(X, y, params)
# 保存模型到本地
model_path = f"{model_type}_model.pkl"
adapter.save(model_path)
return model_path # 返回模型文件路径或 ID
模型保存与发布
模型训练结束后,模型文件保存在本地。用户确认后,可将模型上传至 S3 等外部存储,便于后续调用。例如:
import boto3
class ModelRegistry:
@staticmethod
def upload_to_s3(local_path: str, bucket_name: str, model_id: str):
"""
将本地模型文件上传到 S3
"""
s3 = boto3.client('s3')
s3.upload_file(local_path, bucket_name, f"{model_id}.pkl")
模型预测模块
预测模块根据传入的已发布模型 ID 和类型加载模型,并对新数据进行预测:
from pandas import DataFrame
class Predictor:
def __init__(self, adapter_registry):
self.adapter_registry = adapter_registry
def predict(self, model_type: str, model_path: str, input_data: DataFrame):
"""
加载指定模型并进行预测
"""
adapter_cls = self.adapter_registry.get(model_type)
adapter = adapter_cls.load(model_path)
return adapter.predict(input_data)
# 使用示例
adapter_registry = ADAPTER_REGISTRY
predictor = Predictor(adapter_registry)
# 其他用户传入的预测数据示例
# input_df = pd.DataFrame([...])
# preds = predictor.predict("linear_regression", "linear_regression_model.pkl", input_df)
以上代码结构实现了从数据清洗到模型训练再到预测的完整流程。各模块设计通用简洁,可根据需要扩展更多模型类型和功能。
参考文献: 数据管道核心概念;多源数据合并去重;适配器模式介绍。
明白了。我将为你设计一个完整的后端框架,使用适配器模式来支持多种模型训练与预测流程,包含数据清洗、预处理、模型训练、模型发布(S3)、预测参数传递等功能,确保结构解耦、清晰可扩展。
我会在完成后提供各个模块的代码结构与逻辑说明,供你直接集成。请稍等,我马上整理完毕。
后端模型训练与预测框架设计
系统架构概览
- 数据处理模块:负责合并多个数据集(CSV 内容)、按因子参数选取特征列、去重,并根据配置对数值特征进行归一化等预处理。
- 模型适配器接口:定义统一的训练与预测接口(
train
、predict
等方法),各模型类型通过实现该接口适配器来解耦具体算法。 - 示例适配器:针对线性回归、逻辑回归、XGBoost、预训练大语言模型等不同模型,实现具体的适配器类,负责调用相应框架训练/推理并保存模型。
- 训练调度器:负责接收前端请求(因子参数、模型参数、数据文件内容),依次调用数据处理、选择适配器训练,并调用模型注册与发布逻辑进行保存和标记。
- 模型注册与发布模块:管理训练后的模型元数据(如模型 ID、本地路径、状态),在用户确认发布后通过 boto3 上传至 S3,并更新模型发布状态。
- 预测服务:允许其他用户调用已发布模型进行预测,动态接收输入参数,加载模型并输出预测结果。
各模块通过接口解耦,例如训练调度器只依赖于数据处理和适配器接口,不直接依赖具体模型库;新增模型类型时,只需新增对应的适配器类并注册到工厂即可,无需修改核心流程。
模型适配器接口
采用适配器模式为各模型类型定义统一接口,以便训练调度器和预测服务可以透明调用。下面给出一个抽象基类 ModelAdapter
,定义统一的训练、预测、模型保存/加载接口:
from abc import ABC, abstractmethod
class ModelAdapter(ABC):
"""
模型适配器基类,定义统一的训练和预测接口。
"""
def __init__(self, model_params: dict):
"""
初始化适配器时,可传入模型配置参数。
"""
self.model_params = model_params
self.model = None # 在训练后保存模型实例
@abstractmethod
def train(self, X, y):
"""
训练模型。X 为特征数据集(DataFrame 或 ndarray),y 为目标标签。
"""
pass
@abstractmethod
def predict(self, X):
"""
使用训练好的模型进行预测。X 为输入特征,可为单条或多条数据。
返回预测结果。
"""
pass
@abstractmethod
def save_model(self, file_path: str):
"""
将训练好的模型保存到指定文件路径。
"""
pass
@abstractmethod
def load_model(self, file_path: str):
"""
从指定路径加载模型,用于预测时使用。
"""
pass
- 解耦设计:训练调度器和预测服务通过该接口与具体模型适配器交互,不需要知道模型细节。新增模型类型时,只需实现该接口并注册,无需修改已有代码。
- 动态扩展:可通过工厂模式或注册表维护模型类型与适配器类的映射,如
"linear_regression" -> LinearRegressionAdapter
,方便动态选择适配器。 - 统一调用:统一的
train/predict
接口,参数为特征数据和输入值,支持动态参数输入。
示例适配器
以下提供两个示例适配器,一个用于线性回归,一个用于预训练大语言模型(LLM),以展示不同模型类型的适配器实现思路。
- 线性回归适配器(使用 scikit-learn):
from sklearn.linear_model import LinearRegression
import joblib
class LinearRegressionAdapter(ModelAdapter):
def train(self, X, y):
self.model = LinearRegression(**self.model_params.get('params', {}))
self.model.fit(X, y)
return self.model
def predict(self, X):
if self.model is None:
raise RuntimeError("模型尚未训练或加载")
return self.model.predict(X)
def save_model(self, file_path: str):
if self.model is None:
raise RuntimeError("无模型可保存")
joblib.dump(self.model, file_path)
def load_model(self, file_path: str):
self.model = joblib.load(file_path)
return self.model
- LLM(大语言模型)适配器(使用 Hugging Face Transformers):
from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline
import torch
class LLMAdapter(ModelAdapter):
def train(self, train_texts, train_labels):
"""
简要示例:在分类任务下,用预训练模型进行微调。
实际实现可使用 Trainer 或自定义训练循环。
"""
model_name = self.model_params.get('model_name', 'bert-base-uncased')
num_labels = self.model_params.get('num_labels', 2)
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=num_labels)
# 这里可进行微调(代码简略,实际可用 Trainer)
self.tokenizer = tokenizer
self.model = model
return model
def predict(self, texts):
"""
使用模型进行文本分类预测。texts 可以是单个字符串或字符串列表。
"""
if self.model is None:
raise RuntimeError("模型尚未训练或加载")
# 使用pipeline简化推理
classifier = pipeline("text-classification", model=self.model, tokenizer=self.tokenizer)
results = classifier(texts)
return results
def save_model(self, dir_path: str):
"""
保存整个模型及tokenizer到文件夹。
"""
if self.model is None:
raise RuntimeError("无模型可保存")
self.model.save_pretrained(dir_path)
self.tokenizer.save_pretrained(dir_path)
def load_model(self, dir_path: str):
"""
从文件夹加载模型及tokenizer。
"""
model_name = self.model_params.get('model_name', 'bert-base-uncased')
self.tokenizer = AutoTokenizer.from_pretrained(dir_path)
self.model = AutoModelForSequenceClassification.from_pretrained(dir_path)
return self.model
这两个示例展示了:LinearRegressionAdapter
负责调用 sklearn 接口训练、预测,并用 joblib
保存模型;LLMAdapter
则使用 Hugging Face 进行文本分类示例,展示对微调和推理的包装。其它模型类型(如逻辑回归、XGBoost 等)也可类似实现各自的适配器类,复用相同的统一接口。
数据处理模块
数据处理模块负责从前端接收的数据文件内容开始,进行清洗和预处理,最终生成可供模型训练的特征矩阵和标签。设计中将其解耦为单独的类,比如 DataProcessor
。主要功能包括:
- 数据清洗:合并前端传来的多个 CSV 文件内容(字符串形式),根据因子列表提取对应列,并去除重复行。
- 归一化等预处理:遍历因子配置,如果某列标记需要归一化且为数值类型,则对该列进行 Min-Max 归一化或其他方式转换。
下面给出一个示例实现:
import pandas as pd
from io import StringIO
from sklearn.preprocessing import MinMaxScaler
class DataProcessor:
def clean(self, file_contents: list, factors: list) -> pd.DataFrame:
"""
合并多个 CSV 内容,提取因子相关的列,并去除重复值。
- file_contents: 包含多个 CSV 格式字符串的列表
- factors: 因子配置列表,每个因子包含 'name' (列名) 和其他属性
"""
# 读取并合并所有 CSV 内容
dfs = [pd.read_csv(StringIO(content)) for content in file_contents]
df = pd.concat(dfs, ignore_index=True)
# 提取因子指定的列
selected_cols = [f['name'] for f in factors]
df = df.loc[:, selected_cols]
# 去除完全重复的行
df = df.drop_duplicates().reset_index(drop=True)
return df
def preprocess(self, df: pd.DataFrame, factors: list) -> pd.DataFrame:
"""
对数值列进行归一化处理(如配置了 normalize=True)。
- df: 上一步清洗后的 DataFrame
- factors: 因子配置,包含类型和是否归一化的标志
"""
for f in factors:
col = f['name']
if f.get('type') == 'numeric' and f.get('normalize', False):
scaler = MinMaxScaler()
df[col] = scaler.fit_transform(df[[col]])
return df
- 解耦设计:
TrainingScheduler
会调用DataProcessor
,而不会关心具体的清洗逻辑细节。若需要增加新预处理方法(如标准化、缺失值填充等),只要在DataProcessor
中扩展即可,不影响其他模块。 - 数据读取:使用
StringIO
读取 CSV 内容字符串,实现“文件内容”而非路径传递的读取方式。 - 去重和筛选:直接使用
pandas
的drop_duplicates()
以及列选择功能实现。
训练调度器
训练调度器(TrainingScheduler
)负责协调整个训练流程,将前端传入的参数和数据分发给各子模块,并管理模型训练与保存的流程。主要逻辑包括:
- 接收前端参数:因子配置列表
factor_params
、模型配置model_params
、以及一个或多个数据文件内容data_contents
。 - 调用数据处理模块:先进行数据清洗 (
clean
),再进行预处理 (preprocess
)。 - 选择模型适配器:根据
model_params['type']
,从工厂或注册表中获取对应的适配器实例。 - 调用适配器训练:传入特征矩阵和标签进行训练,并保存模型到本地临时路径。
- 注册模型元数据:将模型保存路径、参数等信息注册到模型注册表,并生成模型 ID 。
- 等待发布:训练完成后模型先处于“已训练未发布”状态,待用户确认后上传到 S3。
下面是一个示例实现:
import uuid
import os
class TrainingScheduler:
def __init__(self, data_processor, adapter_factory, model_registry):
self.data_processor = data_processor
self.adapter_factory = adapter_factory
self.model_registry = model_registry
def schedule(self, factor_params: list, model_params: dict, data_contents: list) -> str:
"""
调度训练流程,返回模型ID(唯一标识)。
"""
# 1. 数据清洗和预处理
df_clean = self.data_processor.clean(data_contents, factor_params)
df_processed = self.data_processor.preprocess(df_clean, factor_params)
# 提取特征矩阵X和标签y
target_col = model_params['target'] # 假设模型参数里指定了目标列
feature_cols = [f['name'] for f in factor_params if f['name'] != target_col]
X = df_processed[feature_cols]
y = df_processed[target_col]
# 2. 获取对应的模型适配器并训练
model_type = model_params['type']
adapter = self.adapter_factory.get_adapter(model_type, model_params)
adapter.train(X, y)
# 3. 模型保存(本地)
model_id = str(uuid.uuid4())
local_dir = f"/tmp/model_{model_id}"
os.makedirs(local_dir, exist_ok=True)
adapter.save_model(local_dir) # 保存模型或模型文件夹
# 4. 注册模型信息
self.model_registry.register(model_id, {
'type': model_type,
'local_path': local_dir,
'params': model_params,
'published': False
})
return model_id
def publish_model(self, model_id: str):
"""
用户确认发布后调用此方法,将模型上传到 S3 并更新状态。
"""
record = self.model_registry.get(model_id)
if not record:
raise ValueError("模型ID不存在")
if record['published']:
return # 已发布无需重复
local_path = record['local_path']
# 构造 S3 key,可根据需求使用模型ID等信息命名
s3_key = f"models/{model_id}.zip"
uploader = S3Uploader(bucket_name="your-bucket-name")
# 假设模型保存为一个目录,这里先压缩为zip上传,示例代码
zip_path = f"{local_path}.zip"
os.system(f"zip -r {zip_path} {local_path}")
uploader.upload_file(zip_path, s3_key)
# 更新发布状态
record['published'] = True
record['s3_key'] = s3_key
self.model_registry.update(model_id, record)
- 模型ID:生成唯一
model_id
(如使用uuid
),便于后续引用和状态管理。 - 本地保存:先将模型保存到后端本地路径(或临时目录),再在发布时统一上传。
- 待发布状态:训练完成后,模型在注册表中标记为未发布状态,等待用户确认再触发 S3 上传。
- S3 上传:示例中用
S3Uploader
(见下文)进行上传,这里假设模型先压缩后上传到指定 bucket。实际可根据模型类型选择不同的打包方式。
模型注册与发布
ModelRegistry
负责维护所有模型的元数据,包括状态(已训练/已发布)、本地路径、S3 路径等信息。下面是一个简单的注册表设计:
class ModelRegistry:
def __init__(self):
# 存储模型元数据,key 为 model_id
self._store = {}
def register(self, model_id: str, metadata: dict):
"""
注册新模型信息。
"""
self._store[model_id] = metadata
def update(self, model_id: str, metadata: dict):
"""
更新模型元数据(如发布状态、S3路径等)。
"""
if model_id in self._store:
self._store[model_id].update(metadata)
else:
raise KeyError("模型ID未找到")
def get(self, model_id: str) -> dict:
"""
获取模型元数据。
"""
return self._store.get(model_id)
- 元数据管理:
metadata
包含了模型类型、参数、本地存储路径、是否发布、S3 存储键等信息。实际系统中可以结合数据库或持久化存储,此处简化为内存字典。 - 发布逻辑:当调用
TrainingScheduler.publish_model(model_id)
时,会上传模型至 S3,并更新metadata['published'] = True
及相应的 S3 存储路径。这样预测服务可依据published
状态决定是否允许预测调用。 - 后续扩展:可增加版本控制、访问权限、模型描述等字段,使模型注册表更加完善。
S3 上传工具
使用 boto3
客户端实现模型文件上传到 S3。示例工具类如下:
import boto3
class S3Uploader:
def __init__(self, bucket_name: str):
self.s3 = boto3.client('s3')
self.bucket = bucket_name
def upload_file(self, file_path: str, key: str):
"""
上传本地文件到指定 S3 bucket 的 key 路径。
"""
self.s3.upload_file(file_path, self.bucket, key)
- 在生产环境中,需要配置 AWS 的访问密钥或使用 IAM 角色进行认证。
file_path
支持本地路径,key
定义了存储在 S3 上的文件名(可包含目录)。- 上传完成后,注册表中保存对应的 S3 路径信息,以便后续加载使用。
模型预测服务
预测服务允许其他用户指定已发布的模型 ID 并输入特征进行预测。主要步骤:
- 加载模型:根据模型 ID 从注册表获取模型元数据,检查是否已发布。若模型文件尚在本地,可直接
load_model
;若仅在 S3 上,则先下载(或通过load_model
支持直接S3 URL)。 - 动态输入:接收前端传入的预测参数,通常为键值对形式的特征值。在使用时需要将其转换为模型适配器能接受的格式(如 DataFrame 或矩阵)。
- 调用预测:使用适配器的
predict
方法进行预测,并返回结果给前端。
示例实现如下:
class PredictionService:
def __init__(self, adapter_factory, model_registry):
self.adapter_factory = adapter_factory
self.model_registry = model_registry
def predict(self, model_id: str, input_data: dict):
"""
使用已发布模型进行预测。
- model_id: 注册表中的模型ID
- input_data: 包含特征名称和值的字典(动态参数输入)
"""
record = self.model_registry.get(model_id)
if not record or not record.get('published', False):
raise ValueError("模型未发布或不存在")
model_type = record['type']
model_params = record['params']
# 获取对应适配器
adapter = self.adapter_factory.get_adapter(model_type, model_params)
# 若需要,从 S3 下载模型,这里假设本地已有模型目录,否则实现下载逻辑
local_path = record['local_path']
adapter.load_model(local_path)
# 将输入数据转换为 DataFrame (或适配器支持的格式)
import pandas as pd
input_df = pd.DataFrame([input_data])
# 预测
predictions = adapter.predict(input_df)
return predictions
- 动态参数:前端可传入任意需要的特征字段和值,服务端将其包装为单行的
DataFrame
进行预测。这样predict
接口无需固定输入维度。 - 模型加载:假设模型已经上传并下载至本地(或本地保留训练结果),
load_model
负责加载模型实例。若模型仅在 S3,可在此处加下载代码(如boto3.download_file
)后再加载。 - 返回结果:将模型输出直接返回给前端。对于分类任务可能需要映射类别标签,回归则返回数值。
代码调用示例
下面给出一个简单的流程示例,说明如何使用上述组件完成训练和预测:
# 假设前端传递:
factor_params = [
{'name': 'age', 'type': 'numeric', 'normalize': True},
{'name': 'salary', 'type': 'numeric', 'normalize': True},
{'name': 'gender', 'type': 'categorical', 'normalize': False},
{'name': 'income', 'type': 'numeric', 'normalize': False}, # 目标列
]
model_params = {
'type': 'linear_regression',
'target': 'income',
'params': {} # 线性回归超参数
}
data_contents = [csv_string1, csv_string2] # CSV文件内容字符串列表
# 初始化组件
data_processor = DataProcessor()
adapter_factory = ModelAdapterFactory() # 假设工厂已实现类型->适配器映射
model_registry = ModelRegistry()
scheduler = TrainingScheduler(data_processor, adapter_factory, model_registry)
# 调度训练
model_id = scheduler.schedule(factor_params, model_params, data_contents)
print(f"模型已训练完成,ID = {model_id},等待发布。")
# 用户确认发布后调用
scheduler.publish_model(model_id)
print("模型已发布至 S3。")
# 预测示例
prediction_service = PredictionService(adapter_factory, model_registry)
input_features = {'age': 30, 'salary': 60000, 'gender': 'Male'}
result = prediction_service.predict(model_id, input_features)
print("预测结果:", result)
以上示例演示了整个流程:前端传入数据后,训练调度器完成数据处理和模型训练;训练完成后,管理员确认发布模型;其他用户再调用预测服务,通过模型 ID 和动态输入参数获取预测结果。整个过程中的各模块职责清晰,使用适配器模式保证了对不同模型类型的一致调用接口,并通过工厂/注册表支持灵活扩展新的模型类型。
下面是一个高度解耦、可扩展的AI 模型训练与预测通用流程实现,采用适配器模式来支持多种模型(线性回归、XGBoost、大语言模型等),并围绕你的需求(数据清洗、预处理、模型训练、保存、发布、预测)进行模块化拆分。
🧩 1. 目录结构建议
ai_platform/
├── adapters/ # 各类模型适配器
│ ├── base.py # 模型适配器接口
│ ├── linear_regression.py
│ ├── xgboost_model.py
│ └── llm_model.py
├── core/
│ ├── cleaner.py # 数据清洗
│ ├── preprocessor.py # 数据预处理
│ ├── scheduler.py # 训练调度器
│ ├── registry.py # 模型保存和发布
│ └── predictor.py # 预测模块
├── config/
│ └── registry.py # 模型类型注册表
└── main.py # 训练/预测入口
🧠 2. 适配器接口定义(adapters/base.py
)
from abc import ABC, abstractmethod
import pandas as pd
from typing import Dict, Any
class BaseModelAdapter(ABC):
@abstractmethod
def train(self, X: pd.DataFrame, y: pd.Series, params: Dict[str, Any]):
pass
@abstractmethod
def predict(self, X: pd.DataFrame) -> Any:
pass
@abstractmethod
def save(self, path: str):
pass
@abstractmethod
def load(self, path: str):
pass
📈 3. 示例模型适配器:线性回归(adapters/linear_regression.py
)
from sklearn.linear_model import LinearRegression
from .base import BaseModelAdapter
import joblib
class LinearRegressionAdapter(BaseModelAdapter):
def __init__(self):
self.model = None
def train(self, X, y, params):
self.model = LinearRegression(**params)
self.model.fit(X, y)
def predict(self, X):
return self.model.predict(X)
def save(self, path):
joblib.dump(self.model, path)
def load(self, path):
self.model = joblib.load(path)
其他模型如 XGBoost 和 LLM 同理。
🧼 4. 数据清洗和预处理模块(core/cleaner.py
, core/preprocessor.py
)
# cleaner.py
import pandas as pd
import io
from typing import List
class DataCleaner:
@staticmethod
def clean(csv_contents: List[str], factor_names: List[str]) -> pd.DataFrame:
dfs = [pd.read_csv(io.StringIO(c)) for c in csv_contents]
df = pd.concat(dfs).drop_duplicates()
return df[factor_names]
# preprocessor.py
class DataPreprocessor:
@staticmethod
def normalize(df: pd.DataFrame, factors: List[Dict]):
for factor in factors:
name = factor["name"]
if factor.get("normalize") and pd.api.types.is_numeric_dtype(df[name]):
df[name] = (df[name] - df[name].mean()) / df[name].std()
return df
🧠 5. 模型调度器:训练执行器(core/scheduler.py
)
from config.registry import ADAPTER_REGISTRY
from .cleaner import DataCleaner
from .preprocessor import DataPreprocessor
import pandas as pd
import os
class TrainingScheduler:
def train(self, model_type: str, factors: List[Dict], model_params: Dict, csv_contents: List[str]) -> str:
# 1. 清洗数据
field_names = [f["name"] for f in factors]
df = DataCleaner.clean(csv_contents, field_names)
# 2. 预处理
df = DataPreprocessor.normalize(df, factors)
# 3. 生成 X/y
target = field_names[-1]
X = df.drop(columns=[target])
y = df[target]
# 4. 获取适配器
adapter_cls = ADAPTER_REGISTRY[model_type]
model = adapter_cls()
model.train(X, y, model_params)
# 5. 保存模型
path = f"models/{model_type}_{hash(tuple(field_names))}.pkl"
os.makedirs("models", exist_ok=True)
model.save(path)
return path
☁️ 6. 发布模块(core/registry.py
)
import boto3
class ModelPublisher:
def upload_to_s3(self, local_path: str, model_key: str, bucket: str):
s3 = boto3.client("s3")
s3.upload_file(local_path, bucket, model_key)
🔍 7. 推理模块(core/predictor.py
)
from config.registry import ADAPTER_REGISTRY
class Predictor:
def predict(self, model_type: str, model_path: str, input_df):
adapter_cls = ADAPTER_REGISTRY[model_type]
model = adapter_cls()
model.load(model_path)
return model.predict(input_df)
🧩 8. 模型注册表(config/registry.py
)
from adapters.linear_regression import LinearRegressionAdapter
from adapters.xgboost_model import XGBoostAdapter
from adapters.llm_model import LLMAdapter
ADAPTER_REGISTRY = {
"linear_regression": LinearRegressionAdapter,
"xgboost": XGBoostAdapter,
"llm": LLMAdapter
}
🚀 9. 主调用流程(main.py
示例)
from core.scheduler import TrainingScheduler
from core.predictor import Predictor
# 模拟前端输入
factors = [
{"name": "feature1", "normalize": True},
{"name": "feature2", "normalize": False},
{"name": "target", "normalize": False}
]
model_type = "linear_regression"
model_params = {"fit_intercept": True}
csvs = [open("data1.csv").read(), open("data2.csv").read()]
# 训练流程
scheduler = TrainingScheduler()
model_path = scheduler.train(model_type, factors, model_params, csvs)
# 推理流程
predictor = Predictor()
import pandas as pd
input_data = pd.DataFrame([{"feature1": 1.2, "feature2": 3.4}])
pred = predictor.predict(model_type, model_path, input_data)
print(pred)
✅ 总结:核心解耦点
模块 | 功能 | 解耦说明 |
---|---|---|
BaseModelAdapter |
模型标准接口 | 适配不同模型训练与推理逻辑 |
TrainingScheduler |
调度训练流程 | 独立于具体模型与数据格式 |
DataCleaner + DataPreprocessor |
数据处理模块 | 不依赖模型逻辑,按因子驱动 |
ADAPTER_REGISTRY |
适配器注册表 | 模型扩展零侵入 |
Predictor |
推理统一入口 | 封装模型加载和推理逻辑 |
如你需要,我可以进一步补充:
- FastAPI 接口封装;
- S3 接口本地 mock 测试;
- 对接任务队列(如 Celery)执行异步训练。
是否需要我继续封装这些模块?
以下是完整的多数据集清洗模块 MultiDatasetProcessor
的实现、说明和测试示例。
✅ 模块功能概览
MultiDatasetProcessor
主要实现以下步骤:
- 加载多个数据文件(支持 CSV 和 Excel)
- 合并数据集并去除重复值
- 根据“因子配置”自动清洗和字段映射
- 推断字段类型(自动识别数值/字符型)
- 支持数值归一化
- 训练集/测试集按比例划分
✅ 使用方式
示例数据:dataset1.csv
, dataset2.xlsx
示例内容(字段一致):
年龄 | 性别 | 工资 |
---|---|---|
25 | 男 | 7000 |
32 | 女 | 8000 |
45 | 男 | 10000 |
示例因子配置
factor_config = {
"features": [
{"source": "年龄", "alias": "age", "min": 18, "max": 65},
{"source": "性别", "alias": "gender", "enum": ["男", "女"]},
{"source": "工资", "alias": "salary"}
],
"normalize": True # 归一化所有数值型字段
}
✅ 测试代码示例
from processor import MultiDatasetProcessor
filepaths = ["dataset1.csv", "dataset2.xlsx"]
# 实例化处理器
processor = MultiDatasetProcessor(filepaths=filepaths, factor_config=factor_config)
# 执行清洗
cleaned_df = processor.clean()
print("清洗后的数据:")
print(cleaned_df.head())
# 数据拆分
X_train, X_test, y_train, y_test = processor.split(target_column="salary")
print("\n训练特征集:")
print(X_train.head())
print("\n测试目标集:")
print(y_test.head())
✅ 推荐的封装方式(可选)
你可以将这个模块打包为:
- Django app 的工具模块(放入
utils/
或services/
中) - 独立 pip 包用于多项目复用
- 搭配 Celery 后台任务异步执行
- 提供 API 接口(如上传数据后清洗和返回版本 ID)
如果你需要我帮你进一步封装为 API 或任务队列任务,或者实现日志记录等功能,请告诉我,我可以继续为你补充后端结构与接口设计。
非常好,这种组合方式在 AI 平台中非常常见:
- 用
openpyxl
读取 Excel 文件(尤其支持.xlsx
,避免pandas.read_excel
在部分格式或合并单元格下表现不佳) - 然后使用
pandas
进行 高效的数据清洗与分析
✅ 使用 openpyxl
+ pandas
读取 Excel 并清洗数据的推荐流程
📦 安装依赖(如未安装)
pip install openpyxl pandas
🧠 示例:读取 Excel、清洗并归一化字段
import openpyxl
import pandas as pd
from sklearn.preprocessing import StandardScaler
def read_excel_with_openpyxl(path: str, sheet_name: str = None) -> pd.DataFrame:
"""
使用 openpyxl 读取 Excel 的指定 sheet,然后转换为 pandas DataFrame
:param path: Excel 文件路径
:param sheet_name: 表名,默认读取第一个
:return: DataFrame
"""
wb = openpyxl.load_workbook(path, data_only=True)
sheet = wb[sheet_name] if sheet_name else wb.active
# 读取表头
headers = [cell.value for cell in next(sheet.iter_rows(min_row=1, max_row=1))]
# 读取数据
data = []
for row in sheet.iter_rows(min_row=2, values_only=True):
data.append(row)
df = pd.DataFrame(data, columns=headers)
return df
def clean_and_normalize(df: pd.DataFrame, factor_config: dict) -> pd.DataFrame:
"""
根据因子配置清洗字段并归一化(仅数值型)
:param df: 原始 DataFrame
:param factor_config: 因子配置
:return: 清洗后的 DataFrame
"""
result = {}
scaler = StandardScaler()
for feature in factor_config["features"]:
source = feature["source"]
alias = feature["alias"]
col = df[source].copy()
# 字段类型推断
if pd.api.types.is_numeric_dtype(col):
col = col.astype(float)
# 处理 min / max / enum 规则
if "min" in feature:
col = col[col >= feature["min"]]
if "max" in feature:
col = col[col <= feature["max"]]
if "enum" in feature:
col = col[col.isin(feature["enum"])]
else:
if "enum" in feature:
col = col[col.isin(feature["enum"])]
result[alias] = col
cleaned_df = pd.DataFrame(result).dropna()
# 如果开启归一化,只对数值型字段进行
if factor_config.get("normalize"):
numeric_cols = [f["alias"] for f in factor_config["features"]
if pd.api.types.is_numeric_dtype(cleaned_df[f["alias"]])]
cleaned_df[numeric_cols] = scaler.fit_transform(cleaned_df[numeric_cols])
return cleaned_df
✅ 示例因子配置 & 使用
factor_config = {
"features": [
{"source": "年龄", "alias": "age", "min": 18, "max": 100},
{"source": "性别", "alias": "gender", "enum": ["男", "女"]}
],
"normalize": True
}
df_raw = read_excel_with_openpyxl("数据样本.xlsx")
df_cleaned = clean_and_normalize(df_raw, factor_config)
🔁 数据后续处理(可继续进行训练/分割)
from sklearn.model_selection import train_test_split
X = df_cleaned.drop(columns=["target_column"])
y = df_cleaned["target_column"]
X_train, X_test, y_train, y_test = train_test_split(X, y, train_size=0.8)
✅ 优势总结
步骤 | 工具 | 优点 |
---|---|---|
读取 Excel | openpyxl |
更好控制单元格、公式、合并格式 |
转换为 DataFrame | pandas |
高效处理、清洗、分析 |
类型处理 | pandas + 类型推断 |
自动适配字符/数值/空值 |
归一化 | sklearn.preprocessing.StandardScaler |
统一归一化流程 |
如果你希望把这部分做成一个统一封装模块、或提供支持 API 调用或任务队列集成的形式,我可以继续为你抽象成服务或独立包。是否需要?
下面是一个设计良好、注释清晰、支持字段类型自动推断、可打包成独立服务的 数据清洗与预处理模块,可在你的 AI 平台中复用。
✅ 功能概览
- 接收多个数据集(CSV/
DataFrame
),合并去重; - 使用因子配置进行字段映射、清洗(包括数值范围、枚举等);
- 自动推断字段类型(字符型 / 数值型);
- 可选归一化;
- 训练集 / 测试集划分;
- 可打包为独立模块,便于复用。
🗂️ 目录结构建议(可放入平台工具包)
your_project/
├── utils/
│ ├── __init__.py
│ └── data_cleaner.py # ← 本模块
🧠 data_cleaner.py
代码实现(含详细注释)
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from pandas.api.types import is_numeric_dtype, is_string_dtype
from typing import List, Dict, Tuple, Union, Optional
class DataCleaner:
"""
数据清洗与预处理模块,可自动推断字段类型,并执行清洗、归一化和数据分割。
"""
def __init__(self, factor_config: Dict, train_ratio: float = 0.8):
"""
:param factor_config: 因子配置,定义特征、目标字段、归一化规则等
:param train_ratio: 训练集占比 (默认 80%)
"""
self.factor_config = factor_config
self.train_ratio = train_ratio
self.scaler = StandardScaler() if factor_config.get("normalize") else None
def _load_csvs(self, csv_contents: List[Union[str, pd.DataFrame]]) -> pd.DataFrame:
"""
读取 CSV 字符串或 DataFrame 列表,合并并去重
"""
dfs = []
for content in csv_contents:
if isinstance(content, str):
df = pd.read_csv(pd.compat.StringIO(content))
elif isinstance(content, pd.DataFrame):
df = content.copy()
else:
raise ValueError("CSV 内容必须是字符串或 DataFrame")
dfs.append(df)
merged = pd.concat(dfs, ignore_index=True).drop_duplicates()
return merged
def _infer_type(self, series: pd.Series) -> str:
"""
推断字段类型:返回 'numeric' 或 'categorical'
"""
if is_numeric_dtype(series):
return "numeric"
elif is_string_dtype(series):
return "categorical"
else:
# 可拓展支持 datetime 等
return "categorical"
def _clean_feature(self, df: pd.DataFrame, feature: Dict) -> pd.Series:
"""
映射、清洗单个特征字段,根据类型推断或配置执行值校验
"""
source_col = feature["source"]
alias_col = feature["alias"]
col = df[source_col].copy()
# 推断字段类型或读取配置
dtype = feature.get("type") or self._infer_type(col)
# 数值型校验
if dtype == "numeric":
if "min" in feature:
col = col[col >= feature["min"]]
if "max" in feature:
col = col[col <= feature["max"]]
if "enum" in feature:
col = col[col.isin(feature["enum"])]
# 枚举型校验
elif dtype == "categorical":
if "enum" in feature:
col = col[col.isin(feature["enum"])]
else:
raise ValueError(f"不支持的数据类型: {dtype}")
return col.rename(alias_col)
def _clean_target(self, df: pd.DataFrame) -> pd.Series:
"""
清洗目标字段(一般无需枚举限制)
"""
target = self.factor_config["target"]
target_col = df[target["source"]].copy()
return target_col.rename(target["alias"])
def clean(self, csv_contents: List[Union[str, pd.DataFrame]]) -> Tuple[pd.DataFrame, pd.Series]:
"""
合并、映射并清洗特征 + 目标列,返回 X 和 y
"""
df = self._load_csvs(csv_contents)
# 清洗所有特征列
feature_series = []
for feature in self.factor_config["features"]:
if feature["source"] not in df.columns:
raise KeyError(f"缺少字段: {feature['source']}")
cleaned = self._clean_feature(df, feature)
feature_series.append(cleaned)
# 合并为特征 DataFrame
X = pd.concat(feature_series, axis=1)
# 清洗目标字段
y = self._clean_target(df)
# 合并后统一去除 NA(防止对齐失败)
data = pd.concat([X, y], axis=1).dropna()
X_cleaned = data.drop(columns=[y.name])
y_cleaned = data[y.name]
# 执行归一化(仅对数值型列)
if self.scaler:
numeric_cols = [f["alias"] for f in self.factor_config["features"]
if f.get("type") == "numeric" or self._infer_type(df[f["source"]]) == "numeric"]
X_cleaned[numeric_cols] = self.scaler.fit_transform(X_cleaned[numeric_cols])
return X_cleaned, y_cleaned
def split(self, X: pd.DataFrame, y: pd.Series) -> Tuple:
"""
拆分训练集 / 测试集
"""
return train_test_split(X, y, train_size=self.train_ratio, shuffle=True, random_state=42)
def process(self, csv_contents: List[Union[str, pd.DataFrame]]) -> Tuple:
"""
一体化处理:清洗 + 拆分
"""
X, y = self.clean(csv_contents)
return self.split(X, y)
✅ 使用方式(其他模块可直接引用)
from utils.data_cleaner import DataCleaner
# 假设来自前端的因子配置:
factor_config = {
"features": [
{"alias": "Age", "source": "年龄", "min": 18, "max": 100},
{"alias": "Gender", "source": "性别", "enum": ["男", "女"]}
],
"target": {"alias": "收入", "source": "年收入"},
"normalize": True
}
cleaner = DataCleaner(factor_config, train_ratio=0.8)
X_train, X_test, y_train, y_test = cleaner.process([df1, df2])
☑️ 模块复用建议
你可以将 DataCleaner
放入平台的:
- ✅ 模型训练入口前的数据清洗阶段;
- ✅ 数据分析模块前的特征筛选阶段;
- ✅ 自动建模、调参任务的统一预处理接口;
- ✅ 接入任务队列中,实现异步数据预处理。
如需:
- 字段名映射持久化(保存历史映射方案)
- 字段类型智能标注系统(由样本自动识别配置)
- 将模块转为 API 微服务
我可以继续为你构建。是否现在就打包成一个 FastAPI 服务或任务任务模块?
好的,我们将升级数据清洗模块以支持以下能力:
✅ 功能需求(扩展版)
- 字段映射支持:字段名可能与数据表中的不一致,支持统一字段映射。
- 字段类型判断与清洗规则:
- 数值型字段:支持
min
/max
限制、枚举值限制。 - 字符型字段:支持枚举值(白名单)校验。
- 数值型字段:支持
- 字段缺失与异常值处理:非法值、缺失值直接过滤或日志记录。
- 统一字段名处理后再做归一化、切分等操作。
🧩 因子配置结构(示例)
{
"features": [
{
"alias": "Age", // 前端字段名(统一名)
"source": "age_years", // 原数据字段名
"type": "numeric",
"min": 18,
"max": 100
},
{
"alias": "Gender",
"source": "sex",
"type": "categorical",
"enum": ["Male", "Female"]
}
],
"target": {
"alias": "bmi",
"source": "body_mass_index"
},
"normalize": true
}
🧩 模块设计(简化核心逻辑)
文件路径:core/data/processor.py
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from typing import List, Dict, Tuple, Union
class DataProcessorV2:
def __init__(self, factor_config: Dict, train_ratio: float = 0.8):
self.feature_config = factor_config["features"]
self.target_config = factor_config["target"]
self.normalize = factor_config.get("normalize", False)
self.train_ratio = train_ratio
def load_and_merge(self, csv_contents: List[Union[str, pd.DataFrame]]) -> pd.DataFrame:
dfs = []
for item in csv_contents:
if isinstance(item, str):
df = pd.read_csv(pd.compat.StringIO(item))
elif isinstance(item, pd.DataFrame):
df = item
else:
raise ValueError("Unsupported csv content type")
dfs.append(df)
merged = pd.concat(dfs, ignore_index=True)
return merged.drop_duplicates()
def map_and_clean(self, df: pd.DataFrame) -> Tuple[pd.DataFrame, pd.Series]:
cleaned_df = pd.DataFrame()
for feature in self.feature_config:
src = feature["source"]
dst = feature["alias"]
dtype = feature["type"]
if src not in df.columns:
raise ValueError(f"Missing feature column: {src}")
col = df[src].copy()
if dtype == "numeric":
if "min" in feature:
col = col[col >= feature["min"]]
if "max" in feature:
col = col[col <= feature["max"]]
if "enum" in feature:
col = col[col.isin(feature["enum"])]
elif dtype == "categorical":
if "enum" in feature:
col = col[col.isin(feature["enum"])]
else:
raise ValueError(f"Unsupported type {dtype} for feature {dst}")
cleaned_df[dst] = col
# 处理 target
target_src = self.target_config["source"]
target_dst = self.target_config["alias"]
if target_src not in df.columns:
raise ValueError("Missing target column")
target_col = df[target_src]
cleaned_df[target_dst] = target_col
# 去除含有 NA 的行
cleaned_df.dropna(inplace=True)
# 提取 X 和 y
X = cleaned_df[[f["alias"] for f in self.feature_config]]
y = cleaned_df[target_dst]
# 归一化
if self.normalize:
numeric_features = [f["alias"] for f in self.feature_config if f["type"] == "numeric"]
scaler = StandardScaler()
X[numeric_features] = scaler.fit_transform(X[numeric_features])
return X, y
def split_data(self, X: pd.DataFrame, y: pd.Series) -> Tuple:
return train_test_split(X, y, train_size=self.train_ratio, shuffle=True, random_state=42)
def process(self, csv_contents: List[Union[str, pd.DataFrame]]) -> Tuple:
merged_df = self.load_and_merge(csv_contents)
X, y = self.map_and_clean(merged_df)
return self.split_data(X, y)
✅ 使用示例
from core.data.processor import DataProcessorV2
factor_config = {
"features": [
{"alias": "Age", "source": "age", "type": "numeric", "min": 18, "max": 80},
{"alias": "Sex", "source": "gender", "type": "categorical", "enum": ["Male", "Female"]}
],
"target": {"alias": "BMI", "source": "bmi"},
"normalize": True
}
processor = DataProcessorV2(factor_config=factor_config, train_ratio=0.75)
X_train, X_test, y_train, y_test = processor.process([df1, df2])
✅ 扩展建议(后续可加)
- 支持字段类型自动推断(
pandas.api.types
) - 缺失值填充策略(均值、众数、自定义)
- 日志记录每步过滤/丢弃的数量
- 报错集中收集,反馈至任务系统
需要我帮你补充到任务队列流程或加入日志记录模块吗?我也可以帮你写成独立服务或包,供平台多个模块复用。
以下是完整的代码说明、注释及示例测试方法,封装为一个通用的数据清洗模块,支持平台复用。
✅ 模块功能概述
这个模块 MultiDatasetProcessor
用于处理多个 Excel 或 CSV 数据集,具有以下功能:
- 读取多文件(支持路径或内存文件对象)
- 对列名排序和标准化,确保结构一致
- 合并多个数据集并去重
- 根据因子配置执行字段映射、最大值/最小值/枚举过滤
- 归一化数值型字段(可选)
- 根据目标字段切分训练/测试数据
✅ 使用方式示例(测试代码)
from io import BytesIO
# 模拟因子配置
factor_config = {
"features": [
{
"source": "Age", # 原字段名
"alias": "age", # 目标字段名
"min": 18,
"max": 60
},
{
"source": "Gender",
"alias": "gender",
"enum": ["Male", "Female"]
},
{
"source": "Income",
"alias": "income",
"min": 3000
}
],
"normalize": True # 是否进行归一化
}
# 模拟多个数据文件路径或文件对象(可替换为实际上传文件对象)
files = [
"dataset1.csv",
"dataset2.xlsx",
]
# 初始化处理器
processor = MultiDatasetProcessor(files=files, factor_config=factor_config)
# 清洗数据
cleaned_df = processor.clean()
# 打印清洗结果
print("Cleaned Data:\n", cleaned_df.head())
# 分割训练集和测试集
X_train, X_test, y_train, y_test = processor.split(target_column="income")
print("Train shape:", X_train.shape, y_train.shape)
print("Test shape:", X_test.shape, y_test.shape)
✅ 模块复用建议
你可以将该模块保存为 Python 包或微服务(如 Flask FastAPI):
封装为包
创建目录结构:
my_data_cleaner/
├── __init__.py
├── processor.py
processor.py
中存放上面的类。
接入任务队列(Celery 示例)
# tasks.py
from celery import shared_task
from my_data_cleaner.processor import MultiDatasetProcessor
@shared_task
def run_data_cleaning(files, factor_config):
processor = MultiDatasetProcessor(files, factor_config)
cleaned = processor.clean()
return cleaned.to_json()
✅ 小结
功能项 | 是否支持 |
---|---|
多数据集列名不一致 | ✅ 列标准化并按名称排序 |
数据清洗 | ✅ 根据因子配置(数值/字符) |
归一化 | ✅ 针对数值字段 |
类型推断 | ✅ 自动识别数值或字符型 |
文件来源支持 | ✅ CSV, Excel, 路径/内存对象 |
模块复用 | ✅ 可作为包/服务/任务队列调用 |
如果你需要我将此模块整理为 Python 包、FastAPI 服务或 Celery 工具,请告诉我,我可以继续帮助你封装部署。
以下是修改后的完整 Python 模块,已将文件读取逻辑更新为使用 openpyxl
来处理 Excel 文件,并辅以 pandas
进行数据操作,支持 CSV、Excel 文件路径或 BytesIO
文件对象的多数据集读取、清洗与切分。
✅ 模块说明:MultiDatasetProcessor
📌 功能概览
- 支持多个 CSV / Excel 文件
- 使用
openpyxl
读取.xlsx
文件,确保兼容多格式 - 对字段进行统一对齐、清洗、字段映射
- 支持数值范围/枚举/归一化等因子规则
- 支持训练/测试集切分
✅ 使用示例
from io import BytesIO
factor_config = {
"features": [
{
"source": "年龄",
"alias": "age",
"min": 18,
"max": 60
},
{
"source": "性别",
"alias": "gender",
"enum": ["男", "女"]
},
{
"source": "收入",
"alias": "income",
"min": 3000
}
],
"normalize": True
}
files = ["dataset1.csv", "dataset2.xlsx"]
processor = MultiDatasetProcessor(files=files, factor_config=factor_config)
cleaned = processor.clean()
print(cleaned.head())
X_train, X_test, y_train, y_test = processor.split(target_column="income")
print("Train:", X_train.shape, y_train.shape)
print("Test:", X_test.shape, y_test.shape)
✅ 核心代码功能解释(简要)
函数 | 说明 |
---|---|
_read_excel_with_openpyxl |
使用 openpyxl 读取 Excel,获取表头与数据,转为 pandas.DataFrame |
_load_single_file |
识别文件类型,决定使用 pandas 还是 openpyxl 加载 |
_align_columns |
标准化列名并按字母排序,用于列对齐 |
_load_and_merge |
逐个读取数据集并合并去重 |
clean |
按照因子配置进行字段映射、筛选、归一化等处理 |
split |
按照设定比例对清洗后的数据进行训练/测试拆分 |
如你希望将该模块封装为 API 服务或 Celery 任务队列支持组件,也可以继续进行集成设计。如果你想要我生成完整 FastAPI 或 Flask 服务结构,请告诉我。
下面是更新后的 MultiDatasetProcessor
,实现了 所有数据集文件只用 openpyxl
(Excel)和 Python 标准库的 csv
(CSV) 来读取,再交由 pandas
统一封装成 DataFrame
进行后续处理。这样就绕过了有时 pandas.read_csv
可能出现的问题。
import os
import csv
from io import TextIOWrapper, BytesIO
from typing import List, Union
import pandas as pd
from openpyxl import load_workbook
class MultiDatasetProcessor:
"""
多数据集处理器,只使用 openpyxl 读取 Excel,使用 csv 库读取 CSV,
并基于因子配置执行清洗、归一化和训练/测试拆分。
"""
def __init__(
self,
files: List[Union[str, BytesIO]],
factor_config: dict,
sheet_name: str = None
):
"""
:param files: 文件路径或文件对象列表(.csv 或 .xlsx/.xls)
:param factor_config: 因子配置,见示例
:param sheet_name: Excel 的 sheet 名称(可选)
"""
self.files = files
self.factor_config = factor_config
self.sheet_name = sheet_name
# 读取并合并所有文件
self.df = self._load_and_merge()
self.cleaned_df = None
def _read_excel_with_openpyxl(self, file: Union[str, BytesIO]) -> pd.DataFrame:
"""
使用 openpyxl 读取 Excel 文件,将其转成 pandas.DataFrame。
"""
# 加载工作簿
wb = load_workbook(filename=file, data_only=True)
# 选择指定 sheet 或第一个 sheet
ws = wb[self.sheet_name or wb.sheetnames[0]]
data = list(ws.values)
# 第一行作为表头
headers = [str(h).strip() if h is not None else "" for h in data[0]]
# 后续行作为数据
values = data[1:]
return pd.DataFrame(values, columns=headers)
def _read_csv_with_stdlib(self, file: Union[str, BytesIO]) -> pd.DataFrame:
"""
使用 Python 标准库 csv 读取 CSV 文件,将其转成 pandas.DataFrame。
"""
# 打开文件路径或文件对象
if isinstance(file, str):
f = open(file, newline='', encoding='utf-8')
else:
# BytesIO -> 文本流
f = TextIOWrapper(file, encoding='utf-8')
reader = csv.reader(f)
rows = list(reader)
f.close()
if not rows:
return pd.DataFrame()
# 第一行是表头
headers = [h.strip() for h in rows[0]]
# 剩余行是数据
values = rows[1:]
return pd.DataFrame(values, columns=headers)
def _load_single_file(self, file: Union[str, BytesIO]) -> pd.DataFrame:
"""
判断文件类型并调用相应的读取函数。
"""
# 识别扩展名
if isinstance(file, str):
ext = os.path.splitext(file)[1].lower()
else:
# 如果是 BytesIO 对象,默认当成 Excel
ext = '.xlsx'
if ext == '.csv':
return self._read_csv_with_stdlib(file)
elif ext in ('.xls', '.xlsx'):
return self._read_excel_with_openpyxl(file)
else:
raise ValueError(f"不支持的文件类型: {ext}")
def _align_columns(self, df: pd.DataFrame) -> pd.DataFrame:
"""
标准化列名(去除首尾空格)并按字母排序,使多个 DataFrame 列顺序一致。
"""
df.columns = [col.strip() for col in df.columns]
return df.reindex(sorted(df.columns), axis=1)
def _load_and_merge(self) -> pd.DataFrame:
"""
逐个加载所有文件,列对齐后合并,并去重。
"""
aligned = []
for file in self.files:
df = self._load_single_file(file)
df = self._align_columns(df)
aligned.append(df)
combined = pd.concat(aligned, ignore_index=True)
return combined.drop_duplicates()
def _infer_column_type(self, col: pd.Series) -> str:
"""
自动判断字段类型:数值型或字符串型。
"""
if pd.api.types.is_numeric_dtype(col):
return "numeric"
return "string"
def clean(self) -> pd.DataFrame:
"""
按因子配置清洗数据:
1) 字段映射(source -> alias)
2) 数值型字段根据 min/max/enum 过滤
3) 字符型字段根据 enum 过滤
4) 删除任何 alias 列中含 NA 的行
5) 对数值型字段执行标准化(如 normalize=True)
"""
result = {}
for feature in self.factor_config["features"]:
src = feature["source"]
alias = feature["alias"]
if src not in self.df.columns:
continue
col = self.df[src].copy()
col_type = self._infer_column_type(col)
# 数值型清洗
if col_type == "numeric":
col = pd.to_numeric(col, errors="coerce")
if "min" in feature:
col = col.where(col >= feature["min"])
if "max" in feature:
col = col.where(col <= feature["max"])
if "enum" in feature:
col = col.where(col.isin(feature["enum"]))
# 字符型清洗
else:
col = col.astype(str).str.strip()
if "enum" in feature:
col = col.where(col.isin(feature["enum"]))
result[alias] = col
# 构造清洗后的 DataFrame,删除任何含 NA 的行
cleaned = pd.DataFrame(result).dropna()
# 归一化数值型字段
if self.factor_config.get("normalize"):
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
num_cols = [
f["alias"] for f in self.factor_config["features"]
if f["alias"] in cleaned.columns and pd.api.types.is_numeric_dtype(cleaned[f["alias"]])
]
if num_cols:
cleaned[num_cols] = scaler.fit_transform(cleaned[num_cols])
self.cleaned_df = cleaned
return cleaned
def split(self, target_column: str, train_ratio: float = 0.8):
"""
将清洗后的数据拆分为训练集和测试集:
:param target_column: 目标字段(alias 名称)
:param train_ratio: 训练集比例
:return: X_train, X_test, y_train, y_test
"""
from sklearn.model_selection import train_test_split
if self.cleaned_df is None:
raise RuntimeError("请先调用 `.clean()` 方法")
X = self.cleaned_df.drop(columns=[target_column])
y = self.cleaned_df[target_column]
return train_test_split(X, y, train_size=train_ratio, random_state=42)
核心行-by-行解释
load_workbook
、csv.reader
- 用
openpyxl.load_workbook
读取 Excel,获取所有单元格值。 - 用 Python 标准库
csv.reader
读取 CSV,不依赖pandas.read_csv
。
- 用
_align_columns
df.columns = [col.strip() …]
:去除列名前后空格。df.reindex(sorted(df.columns), axis=1)
:按字母顺序排列列,保证不同文件列顺序一致。
- 合并与去重
pd.concat(aligned, ignore_index=True)
:合并所有 DataFrame。.drop_duplicates()
:去除重复行。
- 字段清洗
pd.to_numeric(..., errors="coerce")
:将列转为数值,无法转换的变为 NaN。col.where(cond)
:只有满足条件的行保留,否则置为 NaN。- 对字符串列做
.str.strip()
并用.isin()
过滤枚举值。
- 删除缺失
pd.DataFrame(result).dropna()
:对所有 alias 列,任何包含 NaN 的行都会被删除,确保每条数据完整。
- 归一化
- 使用
StandardScaler
对所有数值型字段做fit_transform
。
- 使用
- 拆分数据
train_test_split
:按train_ratio
比例随机切分训练/测试集。
测试示例
from io import BytesIO
# 示例因子配置
factor_config = {
"features": [
{"source": "Age", "alias": "age", "min": 18, "max": 60},
{"source": "Gender", "alias": "gender", "enum": ["Male", "Female"]},
{"source": "Income", "alias": "income", "min": 3000}
],
"normalize": True
}
# 假设 dataset1.xlsx, dataset2.xlsx 放在当前目录
files = ["dataset1.xlsx", "dataset2.xlsx"]
processor = MultiDatasetProcessor(files, factor_config)
# 清洗数据
cleaned_df = processor.clean()
print("Cleaned DataFrame:\n", cleaned_df.head())
# 拆分训练和测试集
X_train, X_test, y_train, y_test = processor.split(target_column="income")
print(f"Train X shape: {X_train.shape}, y shape: {y_train.shape}")
print(f"Test X shape: {X_test.shape}, y shape: {y_test.shape}")
这样,所有的文件读取都仅依赖 openpyxl
(Excel)和标准库 csv
(CSV),避免了 pandas.read_csv
的偶发问题,同时保留了强大的 DataFrame 操作能力。
对,用 openpyxl 只支持 Excel 格式(.xls/.xlsx),它无法读取 CSV。要处理 CSV,你需要用 Python 的 csv 模块或 pandas.read_csv
(如果它稳定)来读取。下面给你一个简单示例,演示如何统一「openpyxl + csv 模块」来读取 Excel 和 CSV 文件,并拼成 DataFrame。
import os
import csv
from io import TextIOWrapper, BytesIO
import pandas as pd
from openpyxl import load_workbook
def read_file_to_df(file: Union[str, BytesIO], sheet_name: str = None) -> pd.DataFrame:
"""
根据扩展名决定读取方式:
- Excel (.xls/.xlsx) 用 openpyxl
- CSV (.csv) 用 Python csv 模块
返回 pandas.DataFrame
"""
# 判断扩展名
if isinstance(file, str):
ext = os.path.splitext(file)[1].lower()
else:
# 文件对象,假设是 CSV or Excel,根据场景决定
# 这里如果是 BytesIO 且想读 CSV,可传入一个标志
raise ValueError("请传入文件路径以识别类型")
if ext in (".xls", ".xlsx"):
# --- EXCEL: openpyxl 读取 ---
wb = load_workbook(filename=file, data_only=True)
ws = wb[sheet_name or wb.sheetnames[0]]
rows = list(ws.values)
headers = [str(h).strip() if h is not None else "" for h in rows[0]]
values = rows[1:]
return pd.DataFrame(values, columns=headers)
elif ext == ".csv":
# --- CSV: csv 模块 读取 ---
with open(file, newline="", encoding="utf-8") as f:
reader = csv.reader(f)
rows = list(reader)
if not rows:
return pd.DataFrame()
headers = [h.strip() for h in rows[0]]
values = rows[1:]
return pd.DataFrame(values, columns=headers)
else:
raise ValueError(f"不支持的文件类型: {ext}")
# 使用示例
df_excel = read_file_to_df("data.xlsx")
df_csv = read_file_to_df("data.csv")
要点总结
- openpyxl 只管 Excel(.xlsx/.xls);
- CSV 必须用 Python 内置的 csv 模块或其他库来读;
- 读完后都转换成 pandas.DataFrame,再做后续清洗、归一化、切分等操作。
这样就能避开 pandas.read_csv
在某些复杂 CSV 上的兼容问题,同时整合 openpyxl
的 Excel 读取优点。
下面是将你提出的多字段数据清洗规则(最大/最小值 + 步长 + 容差 + 值替换)逻辑,适配集成到之前讨论过的统一数据清洗模块中的实现。此版本基于你之前的需求整合:
- 支持多个数据集文件(CSV 或 XLSX)
- 使用
openpyxl
读取 XLSX,pandas
读取 CSV(可替换) - 所有字段列名一致但顺序可能不同,支持字段对齐
- 清洗因子字段为空的记录
- 每个字段可配置清洗规则,包括数值限定、值对齐与剔除
✅ 总体结构
data_cleaning/
├── __init__.py
├── cleaner.py # 主清洗逻辑
├── file_reader.py # XLSX/CSV 文件读取
├── utils.py # 工具函数
✅ 1. file_reader.py
:读取并对齐字段
import pandas as pd
from openpyxl import load_workbook
from pathlib import Path
from typing import List, Union
import io
def read_excel_file(file: Union[str, Path, io.BytesIO]) -> pd.DataFrame:
"""使用 openpyxl 读取 xlsx 文件为 DataFrame"""
wb = load_workbook(file)
ws = wb.active
data = list(ws.iter_rows(values_only=True))
header, *rows = data
return pd.DataFrame(rows, columns=header)
def read_dataset_files(file_list: List[Union[str, Path, io.BytesIO]]) -> pd.DataFrame:
"""
读取并合并多个数据集文件(只支持相同结构但列顺序可不同)
"""
dfs = []
for file in file_list:
if isinstance(file, (str, Path)) and str(file).lower().endswith(".xlsx"):
df = read_excel_file(file)
elif isinstance(file, (str, Path)) and str(file).lower().endswith(".csv"):
df = pd.read_csv(file)
else:
raise ValueError(f"不支持的文件格式或类型: {file}")
dfs.append(df)
# 对字段列统一(按照第一个为基准)
base_columns = dfs[0].columns
for df in dfs[1:]:
df.columns = base_columns # 保证一致性
combined_df = pd.concat(dfs, ignore_index=True).drop_duplicates()
return combined_df
✅ 2. utils.py
:筛选值生成和字段清洗
import numpy as np
import pandas as pd
def generate_filter_values(min_val: float, max_val: float, step: float) -> np.ndarray:
"""根据 min/max/step 生成筛选值数组"""
return np.arange(min_val, max_val + step, step)
def apply_filter_to_series(series: pd.Series, filters: np.ndarray, tolerance: float) -> pd.Series:
"""将字段值替换为最接近的筛选值,超出容差则替换为 NaN"""
cleaned = []
for val in series:
if pd.isnull(val):
cleaned.append(np.nan)
continue
matched = False
for f in filters:
if abs(val - f) <= tolerance:
cleaned.append(f)
matched = True
break
if not matched:
cleaned.append(np.nan)
return pd.Series(cleaned, index=series.index)
✅ 3. cleaner.py
:清洗主逻辑入口
import pandas as pd
from typing import List, Union, Dict
from .file_reader import read_dataset_files
from .utils import generate_filter_values, apply_filter_to_series
def clean_datasets(
files: List[Union[str, bytes]],
factor_config: Dict[str, dict],
required_factors: List[str]
) -> pd.DataFrame:
"""
统一数据清洗主函数
参数:
- files: 文件路径或 file-like 对象列表(csv/xlsx)
- factor_config: 各因子字段的清洗配置,如 min/max/step/tolerance
- required_factors: 必须存在的因子字段(非空)
返回:
- 清洗后的 DataFrame
"""
df = read_dataset_files(files)
# 清除因子缺失的记录
df.dropna(subset=required_factors, inplace=True)
# 应用每个字段的清洗规则
for factor, config in factor_config.items():
if factor not in df.columns:
raise ValueError(f"字段 '{factor}' 不存在")
# 范围过滤
df = df[(df[factor] >= config["min"]) & (df[factor] <= config["max"])]
# 值替换
filters = generate_filter_values(config["min"], config["max"], config["step"])
df[factor] = apply_filter_to_series(df[factor], filters, config["tolerance"])
# 剔除无法匹配的 NaN
df.dropna(subset=factor_config.keys(), inplace=True)
return df.reset_index(drop=True)
✅ 示例用法(测试用)
if __name__ == "__main__":
from pathlib import Path
cleaning_rules = {
"temperature": {"min": 6, "max": 10, "step": 1, "tolerance": 0.2},
"humidity": {"min": 20, "max": 30, "step": 2, "tolerance": 0.3}
}
required_fields = ["temperature", "humidity"]
files = [
Path("data/file1.xlsx"),
Path("data/file2.xlsx")
]
cleaned = clean_datasets(files, cleaning_rules, required_fields)
print(cleaned)
✅ 总结说明
模块 | 功能 | 可复用性 |
---|---|---|
file_reader.py |
支持不同格式读取合并 | ✅ |
utils.py |
通用工具函数(间隔值生成与筛选) | ✅ |
cleaner.py |
清洗总控函数,支持多个字段、字段缺失判断 | ✅ |
是否需要我将它封装为 pip 安装包格式,或支持 Celery 任务封装用于异步处理清洗?