是的,除了使用单个数据表和一个表头字段对应表的方案,还有其他方式能够实现当前需求。可以根据不同的需求和复杂性选择更适合的方案。以下是几种不同的设计方案:
方案一:多表设计(范式化数据库设计)
这是典型的范式化设计,通过多个表来处理不同层次的数据以及表头映射关系。这种设计能更好地维护数据的完整性和查询性能,但复杂性相对较高。
表设计
- Category表(大类)
- 记录
xlsx
的第一层级——大类信息。
- 记录
- Subcategory表(小类)
- 记录第二层级——小类信息,外键关联
Category
。
- 记录第二层级——小类信息,外键关联
- Parameter表(参数)
- 记录第三层级——参数信息,外键关联
Subcategory
。
- 记录第三层级——参数信息,外键关联
- Data表(数据表)
- 存储实际的业务数据,与参数表关联,数据更分层且可拓展。
- FieldMapping表
- 保存
xlsx
表头与数据库字段的对应关系及校验规则。
- 保存
Django模型示例
from django.db import models
class Category(models.Model):
name = models.CharField(max_length=255, unique=True)
class Subcategory(models.Model):
name = models.CharField(max_length=255, blank=True, null=True)
category = models.ForeignKey(Category, on_delete=models.CASCADE)
class Parameter(models.Model):
name = models.CharField(max_length=255)
subcategory = models.ForeignKey(Subcategory, on_delete=models.CASCADE, null=True, blank=True)
class Data(models.Model):
parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE)
value = models.CharField(max_length=255)
class FieldMapping(models.Model):
header_name = models.CharField(max_length=255, unique=True)
field_name = models.CharField(max_length=255)
validation_regex = models.CharField(max_length=500, blank=True, null=True)
data_type = models.CharField(max_length=50, choices=[('string', 'String'), ('integer', 'Integer'), ...])
required = models.BooleanField(default=False)
导入与查询
导入时,数据首先会分别插入到 Category
、Subcategory
和 Parameter
表中,最终存储到 Data
表中。查询时,前端通过传入大类和参数来查询数据。
优点
- 数据规范化,层次关系清晰,易于维护。
- 能更好地处理大规模查询和过滤需求。
缺点
- 设计复杂,需要多个表之间的关联,查询较复杂。
方案二:扁平化设计(字段独立存储)
这种设计将 xlsx
文件的层级信息“平铺”,每一层级(大类、小类、参数)作为数据表的字段独立存储。每一行数据存储时会将三层级信息平铺存入字段中,并通过映射表来控制表头和字段的对应关系。
表设计
- Data表
- 直接存储
xlsx
文件的三层表头字段以及对应的数据。
- 直接存储
- FieldMapping表
- 保存
xlsx
表头与数据库字段的对应关系。
- 保存
Django模型示例
from django.db import models
class Data(models.Model):
category = models.CharField(max_length=255)
subcategory = models.CharField(max_length=255, blank=True, null=True)
parameter = models.CharField(max_length=255)
value = models.CharField(max_length=255)
class FieldMapping(models.Model):
header_name = models.CharField(max_length=255, unique=True)
field_name = models.CharField(max_length=255)
validation_regex = models.CharField(max_length=500, blank=True, null=True)
data_type = models.CharField(max_length=50, choices=[('string', 'String'), ('integer', 'Integer'), ...])
required = models.BooleanField(default=False)
导入与查询
导入时,根据 FieldMapping
进行校验,然后将数据分层存储到 Data
表中。查询时,直接根据三层字段(category
、subcategory
、parameter
)进行过滤。
优点
- 表结构简单,易于理解和操作。
- 数据层次简单,查询时可以直接使用字段过滤。
缺点
- 数据冗余较大,重复的
category
和subcategory
信息在每行数据中都会存储。 - 如果未来层级发生变化,修改表结构的代价较高。
方案三:混合设计(JSON字段存储层次信息 + 映射表)
此方案是一个折衷设计,利用 JSON 字段来存储数据的层次信息,同时通过映射表来控制 xlsx
表头和数据表字段之间的关系。
表设计
- Data表
- 使用 JSON 字段存储层次化的表头和数据值。
- FieldMapping表
- 保存表头与 JSON 字段的映射关系及校验规则。
Django模型示例
from django.db import models
class Data(models.Model):
structured_data = models.JSONField()
class FieldMapping(models.Model):
header_name = models.CharField(max_length=255, unique=True)
json_path = models.CharField(max_length=255) # 记录在 JSON 中的路径
validation_regex = models.CharField(max_length=500, blank=True, null=True)
data_type = models.CharField(max_length=50, choices=[('string', 'String'), ('integer', 'Integer'), ...])
required = models.BooleanField(default=False)
导入与查询
导入时,将数据解析成 JSON 并根据映射表将数据结构化存储。查询时,可以使用 JSON 字段的路径(json_path
)来提取或过滤数据。
优点
- 数据结构化存储,同时支持灵活的表头扩展。
- JSON 字段方便存储动态和复杂结构的数据。
缺点
- JSON 字段的查询性能较差,复杂查询可能需要额外的处理。
- 需要对 JSON 的层次关系做出清晰的定义和规范。
结论
- 方案一(多表设计):适合数据量大且查询复杂的场景,规范化设计适合高性能查询。
- 方案二(扁平化设计):适合数据层次关系较简单,数据冗余可以接受的场景,简单易用。
- 方案三(混合设计):适合数据结构动态变化频繁且需要灵活扩展的场景,JSON 字段能提供更多的灵活性。
根据你的需求,可以权衡数据的复杂性、查询需求和扩展性来选择合适的设计方案。
方案1测试
在使用多表设计时,如果每一条数据是由 大类(Category
)、小类(Subcategory
)、参数(Parameter
) 的组合,并且每条数据还需要根据 人员(User
) 和 时间(Date
) 来区分,那么我们可以扩展现有的多表设计,将 人员 和 时间 作为数据的区分维度。
具体思路是:
- 保持原有的层次结构,即大类、小类、参数组合唯一确定一组属性。
- 增加用户和时间字段,以确保数据是按用户和时间维度来区分的。
表设计
- Category表(大类):记录大类信息。
- Subcategory表(小类):记录小类信息,外键关联
Category
表。 - Parameter表(参数):记录参数信息,外键关联
Subcategory
表。 - User表:记录用户信息(可选:可以直接使用 Django 自带的
User
表)。 - Data表:存储业务数据,并通过
category
、subcategory
、parameter
、user
和date
的组合来唯一确定一条数据。
Django 模型示例
from django.db import models
from django.contrib.auth.models import User
class Category(models.Model):
name = models.CharField(max_length=255, unique=True)
def __str__(self):
return self.name
class Subcategory(models.Model):
name = models.CharField(max_length=255, blank=True, null=True)
category = models.ForeignKey(Category, on_delete=models.CASCADE)
def __str__(self):
return f"{self.category.name} - {self.name}"
class Parameter(models.Model):
name = models.CharField(max_length=255)
subcategory = models.ForeignKey(Subcategory, on_delete=models.CASCADE)
def __str__(self):
return f"{self.subcategory.name} - {self.name}"
class Data(models.Model):
category = models.ForeignKey(Category, on_delete=models.CASCADE)
subcategory = models.ForeignKey(Subcategory, on_delete=models.CASCADE, null=True, blank=True)
parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE)
# 数据的具体值
value = models.CharField(max_length=255)
# 用户信息,关联到 Django 自带的 User 模型
user = models.ForeignKey(User, on_delete=models.CASCADE)
# 数据的时间信息
date = models.DateField()
class Meta:
# category、subcategory、parameter、user 和 date 组合唯一确定一条数据
unique_together = ('category', 'subcategory', 'parameter', 'user', 'date')
def __str__(self):
return f"{self.category.name} - {self.subcategory.name} - {self.parameter.name} ({self.user.username} on {self.date})"
数据导入和重复数据判断
在导入数据时,首先需要判断是否已经存在相同的 category
、subcategory
、parameter
、user
和 date
组合的数据。如果存在,就可以更新这条数据;如果不存在,则创建新数据。
导入逻辑示例
from .models import Data, Category, Subcategory, Parameter, User
from django.core.exceptions import ObjectDoesNotExist
def import_data(category_name, subcategory_name, parameter_name, value, user_id, date):
try:
# 查找 Category
category = Category.objects.get(name=category_name)
# 查找 Subcategory,如果为空则使用 None
subcategory = Subcategory.objects.get(category=category, name=subcategory_name) if subcategory_name else None
# 查找 Parameter
parameter = Parameter.objects.get(subcategory=subcategory, name=parameter_name)
# 查找 User
user = User.objects.get(id=user_id)
# 查找是否已有相同组合的数据
data, created = Data.objects.get_or_create(
category=category,
subcategory=subcategory,
parameter=parameter,
user=user,
date=date,
defaults={'value': value} # 如果新建数据,设定默认值
)
if not created:
# 如果数据已存在,更新值
data.value = value
data.save()
return created
except (Category.DoesNotExist, Subcategory.DoesNotExist, Parameter.DoesNotExist, User.DoesNotExist):
print("对应的类别、参数或用户不存在")
return False
查询逻辑
查询数据时,可以通过 大类(Category
)、小类(Subcategory
)、参数(Parameter
)、用户(User
) 和 时间(Date
) 来过滤数据。
查询逻辑示例
from django.db.models import Q
def query_data(category_name, parameter_name, user_id, date_range=None):
"""
根据大类、参数、用户及时间范围查询数据
:param category_name: 大类名称
:param parameter_name: 参数名称
:param user_id: 用户 ID
:param date_range: 时间范围 (开始日期, 结束日期),可选
:return: 查询结果
"""
try:
category = Category.objects.get(name=category_name)
user = User.objects.get(id=user_id)
parameter = Parameter.objects.get(name=parameter_name)
query = Data.objects.filter(
category=category,
parameter=parameter,
user=user
)
if date_range:
query = query.filter(date__range=date_range)
return query
except (Category.DoesNotExist, Parameter.DoesNotExist, User.DoesNotExist):
return None
优化和扩展
- 索引优化:可以为
category
、subcategory
、parameter
、user
和date
字段添加索引,以提高查询性能。 - 数据扩展:可以进一步扩展
Data
表,增加更多的字段来记录其他相关信息,比如数据来源、状态等。 - 接口扩展:可以基于 Django REST Framework 编写 RESTful API 来实现增删改查接口。
总结
在这个设计中,数据的唯一性 通过大类、小类、参数、人员、时间组合来保证。这样既能体现数据的层次结构,又能确保每条数据具有时间和人员维度的区分,满足你的需求。
为了满足你的需求,其中 大类、小类 和 参数 的组合是唯一的,另外,人员 和 时间 的组合也是唯一的,这意味着每条记录的数据是由这两个维度共同确定的。
表设计
- Category 表(大类):存储大类信息。
- Subcategory 表(小类):存储小类信息,外键关联
Category
。 - Parameter 表(参数):存储参数信息,外键关联
Subcategory
。 - User 表(人员):存储用户信息(可以使用 Django 自带的
User
模型)。 - Data 表(数据):存储实际业务数据,外键关联 人员 和 时间,通过 大类、小类 和 参数 组合唯一确定一组属性,通过 人员 和 时间 来区分不同数据。
Django 模型设计
from django.db import models
from django.contrib.auth.models import User
class Category(models.Model):
name = models.CharField(max_length=255, unique=True)
def __str__(self):
return self.name
class Subcategory(models.Model):
name = models.CharField(max_length=255, blank=True, null=True)
category = models.ForeignKey(Category, on_delete=models.CASCADE)
def __str__(self):
return f"{self.category.name} - {self.name}"
class Parameter(models.Model):
name = models.CharField(max_length=255)
subcategory = models.ForeignKey(Subcategory, on_delete=models.CASCADE)
def __str__(self):
return f"{self.subcategory.name} - {self.name}"
class Data(models.Model):
category = models.ForeignKey(Category, on_delete=models.CASCADE)
subcategory = models.ForeignKey(Subcategory, on_delete=models.CASCADE, null=True, blank=True)
parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE)
# 业务数据的具体值
value = models.CharField(max_length=255)
# 人员和时间
user = models.ForeignKey(User, on_delete=models.CASCADE)
date = models.DateField()
class Meta:
# category、subcategory、parameter 组合唯一,并且 user 和 date 组合唯一
unique_together = (('category', 'subcategory', 'parameter'), ('user', 'date'))
def __str__(self):
return f"{self.category.name} - {self.subcategory.name} - {self.parameter.name} ({self.user.username} on {self.date})"
重复数据判断及增删改查逻辑
在数据导入时,可以根据 大类、小类、参数组合 和 人员、时间组合 的唯一性来判断数据是否已经存在。
导入逻辑示例
from .models import Data, Category, Subcategory, Parameter, User
def import_data(category_name, subcategory_name, parameter_name, value, user_id, date):
try:
# 查找或创建 Category
category, _ = Category.objects.get_or_create(name=category_name)
# 查找或创建 Subcategory
subcategory, _ = Subcategory.objects.get_or_create(name=subcategory_name, category=category)
# 查找或创建 Parameter
parameter, _ = Parameter.objects.get_or_create(name=parameter_name, subcategory=subcategory)
# 查找 User
user = User.objects.get(id=user_id)
# 查找是否已存在相同的 category, subcategory, parameter, user, date 组合的数据
data, created = Data.objects.get_or_create(
category=category,
subcategory=subcategory,
parameter=parameter,
user=user,
date=date,
defaults={'value': value} # 如果新建数据,设定默认值
)
if not created:
# 如果数据已存在,更新值
data.value = value
data.save()
return created
except (Category.DoesNotExist, Subcategory.DoesNotExist, Parameter.DoesNotExist, User.DoesNotExist):
print("对应的类别、参数或用户不存在")
return False
增删改查 API
可以使用 Django REST Framework (DRF) 实现增删改查接口。
from rest_framework import viewsets
from .models import Data
from .serializers import DataSerializer
class DataViewSet(viewsets.ModelViewSet):
queryset = Data.objects.all()
serializer_class = DataSerializer
# serializers.py
from rest_framework import serializers
from .models import Data
class DataSerializer(serializers.ModelSerializer):
class Meta:
model = Data
fields = '__all__'
表头字段映射表
如果还需要表头字段映射,可以设计一个单独的表来存储 表头与字段的对应关系,以及字段的校验正则表达式:
class HeaderFieldMapping(models.Model):
xlsx_header = models.CharField(max_length=255) # 表头
model_field = models.CharField(max_length=255) # 对应数据表字段
validation_regex = models.CharField(max_length=255, blank=True, null=True) # 校验正则
def __str__(self):
return f"{self.xlsx_header} -> {self.model_field}"
这样你就可以在数据导入过程中,动态地根据 xlsx 文件的表头找到对应的数据库字段,并进行数据校验。
方案3测试
为了简化设计为只有一个数据表和一个表头字段对应表的方案,以下是具体的模型设计。
数据表 (Data
)
这个表存储了所有从 xlsx
文件导入的数据,不再细分为 Category
、Subcategory
和 Parameter
表。所有导入的数据将直接存储在一个表中,而具体的列对应关系将通过 FieldMapping
表来管理。
from django.db import models
class Data(models.Model):
# 动态字段存储的方式,使用 JSONField 来适应不同表头的数据
data = models.JSONField()
# 创建时间和更新时间
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
def __str__(self):
return f"Data ID: {self.id}"
表头字段对应表 (FieldMapping
)
这个表将 xlsx
文件的表头与 Data
表中 JSON 字段的具体键名(字段名)进行映射,并提供正则表达式校验规则和数据类型信息。
from django.db import models
class FieldMapping(models.Model):
HEADER_TYPES = [
('string', 'String'),
('integer', 'Integer'),
('float', 'Float'),
('date', 'Date'),
('boolean', 'Boolean'),
]
# 表头名称
header_name = models.CharField(max_length=255, unique=True, db_index=True)
# Data 表中的字段名称(用于 JSON key)
field_name = models.CharField(max_length=255)
# 校验正则表达式(可以为空)
validation_regex = models.CharField(max_length=500, blank=True, null=True)
# 字段数据类型
data_type = models.CharField(max_length=50, choices=HEADER_TYPES)
# 是否为必填字段
required = models.BooleanField(default=False)
def __str__(self):
return f"{self.header_name} -> {self.field_name}"
导入逻辑
在导入 xlsx
文件时,根据 FieldMapping
表来确定 xlsx
表头与 Data
表中 JSON 字段的对应关系,然后将数据插入或更新到 Data
表中。使用 JSON 字段来存储动态的数据列,方便处理不同格式的表头。
import re
from .models import FieldMapping, Data
def validate_and_import_xlsx_data(parsed_data):
"""
根据 FieldMapping 模型进行数据的校验和导入
:param parsed_data: 解析后的 xlsx 数据,每个字典代表一行数据
:return: 成功导入的记录数量
"""
success_count = 0
# 获取所有字段映射
field_mappings = FieldMapping.objects.all()
field_mapping_dict = {mapping.header_name: mapping for mapping in field_mappings}
for row in parsed_data:
valid = True
validated_data = {}
# 遍历每一列进行验证
for header_name, value in row.items():
mapping = field_mapping_dict.get(header_name)
if not mapping:
print(f"未知的表头字段: {header_name}")
continue
field_name = mapping.field_name
validation_regex = mapping.validation_regex
data_type = mapping.data_type
required = mapping.required
# 检查是否为必填字段
if required and not value:
print(f"缺少必填字段:{header_name}")
valid = False
break
# 根据正则表达式进行校验
if validation_regex and value:
if not re.match(validation_regex, str(value)):
print(f"字段 {header_name} 未通过校验")
valid = False
break
# 根据数据类型进行转换
if value:
try:
if data_type == 'integer':
value = int(value)
elif data_type == 'float':
value = float(value)
elif data_type == 'boolean':
value = bool(value)
elif data_type == 'date':
value = parse_date(value) # 假设有日期转换函数
except ValueError:
print(f"字段 {header_name} 无法转换为 {data_type}")
valid = False
break
# 如果通过校验,将数据添加到字典中
validated_data[field_name] = value
if valid:
# 插入到 Data 表中的 JSON 字段
Data.objects.create(data=validated_data)
success_count += 1
return success_count
查询逻辑
查询数据时,通过前端传入的参数在 data
字段中进行过滤,例如通过大类和参数的字段值进行过滤。
from django.db.models import Q
def query_data(category, parameter):
"""
根据前端传入的大类和参数查询数据
:param category: 大类名称
:param parameter: 参数名称
:return: 查询结果
"""
# 根据 JSON 字段进行查询
results = Data.objects.filter(
Q(data__category=category) & Q(data__parameter=parameter)
)
return results
总结
Data
表 使用 JSON 字段来存储所有数据行,每行数据根据表头字段映射到不同的 JSON 键名。FieldMapping
表 记录了xlsx
文件表头与数据库 JSON 字段的对应关系,以及字段的校验规则和数据类型。- 导入时 根据
FieldMapping
进行验证和字段映射,导入后将数据存入Data
表中。 - 查询时 根据 JSON 字段进行过滤,前端可以传入大类和参数等条件来查询数据。
这种设计简化了表结构,适应了表头动态变化和数据导入时的需求,同时也具有很好的扩展性。
以下是一个完整的方案,包含两个数据库设计和实现对应的增删改查接口。这个方案能够处理 xlsx 文件导入时的数据校验、字段映射以及多表关联的层次结构查询。
方案概要
我们将设计两张表:
BaseInfo
: 用于存储基础信息(如人员和创建时间)。Parameter
和Data
: 用于存储大类、小类、参数组合及其对应的值。FieldMapping
: 用于存储 xlsx 文件中的表头与数据库字段的对应关系,并为字段添加数据校验规则。
数据库模型设计
from django.db import models
# 基础信息表
class BaseInfo(models.Model):
user = models.ForeignKey('auth.User', on_delete=models.CASCADE, help_text="数据导入的人员")
creation_time = models.DateTimeField(help_text="数据导入时间")
class Meta:
unique_together = ['user', 'creation_time'] # 确保人员和创建时间唯一
verbose_name = '基础信息'
verbose_name_plural = '基础信息表'
def __str__(self):
return f"{self.user} - {self.creation_time}"
# 参数表:存储大类、小类和参数
class Parameter(models.Model):
category_name = models.CharField(max_length=255, help_text="大类名称")
subcategory_name = models.CharField(max_length=255, null=True, blank=True, help_text="小类名称(可以为空)")
parameter_name = models.CharField(max_length=255, help_text="参数名称")
class Meta:
unique_together = ['category_name', 'subcategory_name', 'parameter_name']
verbose_name = '参数信息'
verbose_name_plural = '参数信息表'
def __str__(self):
return f"{self.category_name} - {self.subcategory_name} - {self.parameter_name}"
# 数据表:关联基础信息和参数,并存储值
class Data(models.Model):
base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, help_text="关联基础信息")
parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE, help_text="关联参数")
value = models.CharField(max_length=255, help_text="参数值")
class Meta:
verbose_name = '数据'
verbose_name_plural = '数据表'
def __str__(self):
return f"{self.base_info} - {self.parameter} - {self.value}"
# 字段映射表:存储xlsx表头与数据库字段的对应关系及校验规则
class FieldMapping(models.Model):
xlsx_header = models.CharField(max_length=255, unique=True, help_text="xlsx 文件中的表头")
db_field = models.CharField(max_length=255, help_text="数据库中的字段名")
validation_regex = models.CharField(max_length=255, null=True, blank=True, help_text="字段的校验正则表达式")
def __str__(self):
return f"{self.xlsx_header} -> {self.db_field}"
数据库模型说明
BaseInfo
: 用来存储每条数据的基础信息,主要是导入人员和时间。Parameter
: 存储大类、小类和参数的组合,每个组合是唯一的,定义了数据结构的层次。Data
: 每条数据与BaseInfo
和Parameter
相关联,存储实际的参数值。FieldMapping
: 将 xlsx 文件的表头映射到数据库的字段,并为字段提供正则校验。
数据导入和校验
当导入数据时,可以通过 FieldMapping
表来找到对应的数据库字段,并对值进行校验。每次数据导入时,根据导入人员和时间创建一条新的 BaseInfo
记录,随后插入数据。
数据导入函数示例
import re
def validate_and_import_xlsx_data(xlsx_data, user, creation_time):
"""
解析并导入 xlsx 数据,根据 FieldMapping 进行字段映射,并进行校验。
:param xlsx_data: 解析后的 xlsx 数据,格式为 [{header: value, ...}, ...]
:param user: 导入的用户
:param creation_time: 导入时间
"""
base_info = BaseInfo.objects.create(user=user, creation_time=creation_time)
for row in xlsx_data:
data_to_save = {}
for xlsx_header, value in row.items():
try:
# 查找对应的字段映射
field_mapping = FieldMapping.objects.get(xlsx_header=xlsx_header)
# 校验数据
if field_mapping.validation_regex:
if not re.match(field_mapping.validation_regex, str(value)):
raise ValueError(f"字段 '{xlsx_header}' 的值 '{value}' 不符合校验规则")
# 查找或创建 Parameter
category_name = row.get('category_name', None)
subcategory_name = row.get('subcategory_name', None)
parameter_name = row.get('parameter_name', None)
if not category_name or not parameter_name:
raise ValueError("大类和参数名称是必须的")
parameter, created = Parameter.objects.get_or_create(
category_name=category_name,
subcategory_name=subcategory_name,
parameter_name=parameter_name
)
# 将值保存到 Data 表中
Data.objects.create(
base_info=base_info,
parameter=parameter,
value=value
)
except FieldMapping.DoesNotExist:
raise ValueError(f"未找到表头 '{xlsx_header}' 的字段映射")
CRUD 操作
增加数据
通过 validate_and_import_xlsx_data
函数即可实现增量导入数据。在导入时,验证表头和字段映射关系,并将数据保存到 Data
表。
查询数据
查询某个用户和创建时间下的所有数据,并构建层级化的结构:
from collections import defaultdict
def query_hierarchical_data(user_id, creation_time):
"""
查询某个用户和创建时间下的数据,返回层级化的结构。
:param user_id: 人员 ID
:param creation_time: 创建时间
:return: 层级化的查询结果
"""
try:
# 查找基础信息记录
base_info = BaseInfo.objects.get(user_id=user_id, creation_time=creation_time)
# 查询该基础信息下的所有数据
data_records = Data.objects.filter(base_info=base_info).select_related('parameter')
# 使用 defaultdict 创建层级化的结构
result = defaultdict(lambda: defaultdict(lambda: {})) # 自动构建嵌套结构
# 遍历查询结果,按大类、小类、参数进行分组
for data_record in data_records:
category = data_record.parameter.category_name
subcategory = data_record.parameter.subcategory_name or "无" # 小类可能为空,用 '无' 表示
parameter = data_record.parameter.parameter_name
# 将值放入层级结构中
result[category][subcategory][parameter] = data_record.value
# 构建最终返回结果
final_result = {
"base_info": {
"user": base_info.user.username,
"creation_time": base_info.creation_time
},
"data": dict(result) # 将 defaultdict 转换为普通 dict
}
return final_result
except BaseInfo.DoesNotExist:
return None
更新数据
更新数据时,可以根据 user_id
和 creation_time
找到对应的 BaseInfo
,然后在 Data
表中找到对应的 Parameter
记录并更新。
def update_data(user_id, creation_time, parameter_id, new_value):
try:
base_info = BaseInfo.objects.get(user_id=user_id, creation_time=creation_time)
data = Data.objects.get(base_info=base_info, parameter_id=parameter_id)
data.value = new_value
data.save()
except BaseInfo.DoesNotExist:
raise ValueError("未找到对应的基础信息")
except Data.DoesNotExist:
raise ValueError("未找到对应的数据记录")
删除数据
根据 BaseInfo
和 Parameter
组合找到对应的数据并删除:
def delete_data(user_id, creation_time, parameter_id):
try:
base_info = BaseInfo.objects.get(user_id=user_id, creation_time=creation_time)
data = Data.objects.get(base_info=base_info, parameter_id=parameter_id)
data.delete()
except BaseInfo.DoesNotExist:
raise ValueError("未找到对应的基础信息")
except Data.DoesNotExist:
raise ValueError("未找到对应的数据记录")
字段映射增删改查
创建字段映射
def create_field_mapping(xlsx_header, db_field, validation_regex=None):
FieldMapping.objects.create(
xlsx_header=xlsx_header,
db_field=db_field,
validation_regex=validation_regex
)
更新字段映射
def update_field_mapping(xlsx_header, db_field=None, validation_regex=None):
field_mapping = FieldMapping.objects.get(x
lsx_header=xlsx_header)
if db_field:
field_mapping.db_field = db_field
if validation_regex:
field_mapping.validation_regex = validation_regex
field_mapping.save()
删除字段映射
def delete_field_mapping(xlsx_header):
FieldMapping.objects.filter(xlsx_header=xlsx_header).delete()
查询字段映射
def get_field_mappings():
return FieldMapping.objects.all()
该方案实现了基础数据和字段映射的设计,并提供了用于数据操作和查询的接口。数据校验通过正则表达式进行,查询时按照大类、小类和参数层次结构进行组织。
在当前设计中,每当插入一条数据时,由于每个参数都是单独存储在 Data
表中的,所以你在写入一条数据(即一个人员和时间的组合)时,可能会向 Data
表中插入上百条记录(每个参数一条数据)。当数据量非常大时,确实可能会产生以下问题:
- 写入性能问题: 每条数据都对应多个参数,每个参数写入时都涉及到数据库的
INSERT
操作。当数据量变大时,大量的INSERT
操作可能会影响性能。 - 查询性能问题: 查询时为了重构数据的层级结构,需要遍历大量的参数记录,尤其是当单条数据包含多个参数时,查询效率可能会下降。
- 数据存储开销: 由于每个参数都是一个独立记录,数据表的大小会迅速增长,尤其是当参数数量多、数据量大的情况下。
可行的优化方案
为了解决这些问题,可以考虑以下几种优化策略:
1. 批量插入
Django 默认会为每条记录执行单独的 INSERT
操作。通过使用 Django 的批量插入功能,可以将多条记录一次性插入数据库,减少 INSERT
操作的开销。
from django.db import transaction
def bulk_insert_data(base_info, parameter_values):
"""
批量插入数据
:param base_info: 对应的 BaseInfo 对象
:param parameter_values: 参数和值的列表,格式为 [{'parameter': Parameter, 'value': value}, ...]
"""
data_objects = []
for param_val in parameter_values:
data_objects.append(Data(base_info=base_info, parameter=param_val['parameter'], value=param_val['value']))
with transaction.atomic():
Data.objects.bulk_create(data_objects)
这样,参数的插入就会被合并为一次批量插入操作,从而提高性能。
2. 查询优化
当查询数据时,可以利用数据库的索引和查询优化技术来加快查询速度。例如,确保在 Data
表的 base_info_id
和 parameter_id
字段上加上索引,以便能够快速查询某条数据的所有参数。
class Data(models.Model):
base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, help_text="关联基础信息", db_index=True)
parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE, help_text="关联参数", db_index=True)
value = models.CharField(max_length=255, help_text="参数值")
class Meta:
indexes = [
models.Index(fields=['base_info', 'parameter']),
]
verbose_name = '数据'
verbose_name_plural = '数据表'
3. 存储结构优化
目前的设计是将每个参数的值存储为单独的记录。如果参数数量非常多,且每条记录包含的参数固定,可以考虑将参数数据压缩成 JSON 格式,存储在一个字段中,而不是每个参数一条记录。
from django.contrib.postgres.fields import JSONField
class Data(models.Model):
base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, help_text="关联基础信息")
parameters = JSONField(help_text="存储所有参数及其值的JSON对象")
def __str__(self):
return f"{self.base_info} - {self.parameters}"
这种方法将所有参数合并存储在一个字段中,减少了表的行数,但会增加单条记录的存储大小。对于查询来说,PostgreSQL 的 JSON 查询功能可以高效地处理这种数据结构,特别是当你需要查询单个参数时,可以使用 ->>
操作符。
优点:
- 减少了
Data
表的记录条数,降低存储压力。 - 查询时可以直接获取所有参数,减少复杂的 JOIN 操作。
缺点:
- 每次插入和更新时需要重新构建整个 JSON 对象。
- 如果需要对单个参数进行复杂查询,性能可能不如单独存储参数的方式高效。
4. 分表存储
如果你的数据量非常大,还可以考虑将 Data
表按照某些规则进行分表存储(如按时间分表、按用户分表)。这样可以避免单表数据过大的问题,提高写入和查询的效率。
方案推荐
综合考虑查询和写入的复杂度与性能,我建议如下方案:
- 短期:使用批量插入和索引优化,以提升写入和查询性能。这在一定数据量下能够有效解决性能问题。
- 长期:如果数据量持续增长,考虑将参数数据存储为 JSON 格式,以减少表记录数量。查询时使用 PostgreSQL 的 JSON 查询功能进行优化。同时,可以考虑分表存储,以应对极端的数据量。
增删改查接口实现(优化批量插入)
def create_or_update_data(user, creation_time, parameter_data):
"""
批量创建或更新数据。
:param user: 导入用户
:param creation_time: 创建时间
:param parameter_data: 参数数据,格式为 [{category_name, subcategory_name, parameter_name, value}, ...]
"""
base_info, _ = BaseInfo.objects.get_or_create(user=user, creation_time=creation_time)
parameters_to_insert = []
for param in parameter_data:
category = param['category_name']
subcategory = param.get('subcategory_name', None)
parameter_name = param['parameter_name']
value = param['value']
# 获取或创建参数
parameter, _ = Parameter.objects.get_or_create(
category_name=category,
subcategory_name=subcategory,
parameter_name=parameter_name
)
# 创建 Data 对象准备批量插入
parameters_to_insert.append({
'parameter': parameter,
'value': value
})
# 批量插入数据
bulk_insert_data(base_info, parameters_to_insert)
通过这个设计,数据的插入、更新、删除和查询都能够在高效和可扩展性之间取得平衡。如果未来需要进一步优化,可以通过数据压缩、分表存储来进行扩展。
在你的需求中,如果使用 JSON 存储参数数据,并且每个大类中都有一个附件的参数,附件的文件名需要与参数数据一同处理。以下是一个可行的设计方案,涵盖了数据库模型、文件上传到 S3 的处理、以及相应的增删改查操作。
数据库设计
模型定义
from django.db import models
from django.contrib.postgres.fields import JSONField
class BaseInfo(models.Model):
user = models.CharField(max_length=255, help_text="导入用户")
creation_time = models.DateTimeField(help_text="创建时间")
class Meta:
unique_together = ('user', 'creation_time')
verbose_name = '基础信息'
verbose_name_plural = '基础信息表'
class Category(models.Model):
name = models.CharField(max_length=255, help_text="大类名称")
attachment = models.CharField(max_length=255, null=True, blank=True, help_text="附件文件名")
class Meta:
verbose_name = '大类'
verbose_name_plural = '大类表'
class Data(models.Model):
base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, help_text="关联基础信息")
parameters = JSONField(help_text="参数数据,以 JSON 格式存储")
class Meta:
verbose_name = '数据'
verbose_name_plural = '数据表'
文件上传到 S3
在处理文件上传时,可以使用 boto3
这个库来上传文件到 S3。首先,你需要安装该库:
pip install boto3
然后在你的视图或服务中实现文件上传逻辑:
import boto3
from django.conf import settings
import os
def upload_file_to_s3(file, bucket_name):
"""
上传文件到 S3。
:param file: 文件对象
:param bucket_name: S3 桶名
:return: 文件名
"""
s3 = boto3.client('s3',
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY)
file_name = file.name
try:
s3.upload_fileobj(file, bucket_name, file_name)
except Exception as e:
print(f"上传失败: {e}")
return None
return file_name
完整的增删改查操作实现
创建或更新数据
from django.db import transaction
def create_or_update_data(user, creation_time, category_data, parameters):
"""
创建或更新数据。
:param user: 导入用户
:param creation_time: 创建时间
:param category_data: 大类数据,包含大类名称和附件
:param parameters: 参数数据,格式为 [{parameter_name: value}, ...]
"""
base_info, _ = BaseInfo.objects.get_or_create(user=user, creation_time=creation_time)
# 创建或更新大类
category, _ = Category.objects.get_or_create(name=category_data['name'])
# 更新附件文件名
if 'attachment' in category_data:
category.attachment = category_data['attachment']
category.save()
# 准备 JSON 参数数据
data_object = Data(base_info=base_info, parameters=parameters)
with transaction.atomic():
data_object.save()
查询数据
def get_data_by_user_and_time(user, creation_time):
"""
根据用户和时间查询数据,并返回层级结构
:param user: 导入用户
:param creation_time: 创建时间
:return: 返回层级结构的字典
"""
base_info = BaseInfo.objects.get(user=user, creation_time=creation_time)
data_records = Data.objects.filter(base_info=base_info)
result = {
"base_info": {
"user": user,
"creation_time": creation_time
},
"data": []
}
for record in data_records:
result["data"].append({
"parameters": record.parameters,
"category_attachment": record.base_info.category.attachment # 附件文件名
})
return result
删除数据
def delete_data_by_user_and_time(user, creation_time):
"""
根据用户和时间删除数据
:param user: 用户
:param creation_time: 创建时间
"""
base_info = BaseInfo.objects.get(user=user, creation_time=creation_time)
Data.objects.filter(base_info=base_info).delete()
base_info.delete() # 删除基础信息
更新数据
def update_data(user, creation_time, parameters):
"""
更新数据
:param user: 用户
:param creation_time: 创建时间
:param parameters: 参数数据
"""
base_info = BaseInfo.objects.get(user=user, creation_time=creation_time)
data_record = Data.objects.filter(base_info=base_info).first()
if data_record:
data_record.parameters.update(parameters)
data_record.save()
使用方案总结
- JSON 存储参数:使用 JSON 字段来灵活存储参数数据,方便后续扩展和修改。
- 处理附件:在大类模型中加入附件字段,使用 S3 存储附件,并通过
upload_file_to_s3
函数实现文件上传。 - 增删改查操作:提供基本的 CRUD 接口,支持用户根据基本信息进行数据操作。
- 事务管理:在数据创建和更新操作中使用事务,确保数据一致性。
性能考虑
- JSON 存储:适合存储灵活的参数数据,但要注意 JSON 字段的查询效率,尤其是在数据量较大时。
- 附件存储:将文件存储在 S3 中,可以减轻数据库负担,并提高文件访问的灵活性。
这个方案兼顾了灵活性与可扩展性,并支持文件附件的处理。希望这能够满足你的需求!如果还有其他问题,请随时告诉我。
为了让代码更具动态性并避免冗余,可以通过以下方法实现:
-
使用字典来映射大类到对应的模型:将大类和小类与模型的映射存储在字典中,以便根据类别动态获取模型类。
-
通过反射动态设置字段值:使用
setattr
函数根据字段名称动态设置属性值。 -
简化数据导入逻辑:利用循环和条件逻辑替代多个 if-elif 语句来处理数据存储。
以下是改进后的代码示例:
更新后的代码
import openpyxl
from django.core.exceptions import ValidationError
from .models import BaseInfo, CategoryA, CategoryB # 确保引入你的模型
def load_field_mappings():
"""从数据库加载字段对应关系。"""
mappings = FieldMapping.objects.all()
mapping_dict = {}
for mapping in mappings:
key = (mapping.category, mapping.subcategory, mapping.parameter)
mapping_dict[key] = mapping.db_field
return mapping_dict
def parse_xlsx(file):
"""解析 xlsx 文件并返回数据对象列表。"""
wb = openpyxl.load_workbook(file)
sheet = wb.active
data_list = []
header_row = [cell.value for cell in sheet[1]] # 第一行作为表头
field_mappings = load_field_mappings()
for row in sheet.iter_rows(min_row=2, values_only=True):
data_item = {
'user': row[0], # 用户名
'creation_time': row[1], # 创建时间
}
for col_idx in range(2, len(row)):
category = header_row[col_idx - 2] # 大类
subcategory = header_row[col_idx - 1] # 小类
parameter = header_row[col_idx] # 参数
key = (category, subcategory, parameter)
if key in field_mappings:
db_field = field_mappings[key]
data_item[db_field] = row[col_idx]
data_item['category'] = category
data_item['subcategory'] = subcategory
data_list.append(data_item)
return data_list
def import_data(file):
"""解析 xlsx 文件,校验数据并导入数据库。"""
data_list = parse_xlsx(file)
validate_data(data_list)
# 大类到模型的映射
category_model_map = {
'大类A': CategoryA,
'大类B': CategoryB,
# 可以继续添加更多的大类和相应的模型
}
for data_item in data_list:
base_info, _ = BaseInfo.objects.get_or_create(
user=data_item['user'],
creation_time=data_item['creation_time']
)
category = data_item['category']
model = category_model_map.get(category)
if model:
# 动态创建或更新对象
defaults = {k: v for k, v in data_item.items() if k not in ['user', 'creation_time', 'category', 'subcategory']}
defaults['subcategory'] = data_item['subcategory'] # 记录小类
model.objects.update_or_create(base_info=base_info, defaults=defaults)
def validate_data(data_list):
"""校验数据,确保数据格式正确且无重复。"""
seen = set()
for data_item in data_list:
key = (data_item['user'], data_item['creation_time'])
if key in seen:
raise ValidationError(f"重复数据: {key}")
seen.add(key)
关键改动
-
模型映射:使用字典
category_model_map
将大类名称映射到相应的模型类,这样可以轻松添加新大类。 -
动态字段设置:在导入数据时,使用字典推导式动态创建
defaults
字典,简化了字段处理的逻辑。 -
减少条件判断:通过直接使用映射和动态属性设置,减少了对多个 if-elif 语句的依赖,代码更简洁。
总结
这种动态化的实现方式不仅提升了代码的可维护性和可读性,也方便了后续的扩展。你可以轻松添加新的大类和相应的模型,而无需重复编写判断逻辑。如果有其他需求或问题,请随时告诉我!
下面是一个完整的实现,包括数据库模型、数据导入逻辑、增删改查接口以及根据参数筛选的接口代码。
数据库模型设计
# models.py
from django.db import models
class BaseInfo(models.Model):
user = models.CharField(max_length=100)
creation_time = models.DateTimeField()
class Meta:
unique_together = ('user', 'creation_time')
class Category(models.Model):
name = models.CharField(max_length=100, unique=True)
class SubCategory(models.Model):
name = models.CharField(max_length=100)
category = models.ForeignKey(Category, on_delete=models.CASCADE, related_name='subcategories')
class CategoryA(models.Model):
base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, related_name='category_a_entries')
sub_category = models.ForeignKey(SubCategory, on_delete=models.CASCADE, related_name='category_a_subcategories')
param_1 = models.CharField(max_length=100, blank=True, null=True)
param_2 = models.CharField(max_length=100, blank=True, null=True)
class CategoryB(models.Model):
base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, related_name='category_b_entries')
sub_category = models.ForeignKey(SubCategory, on_delete=models.CASCADE, related_name='category_b_subcategories')
param_1 = models.CharField(max_length=100, blank=True, null=True)
param_2 = models.CharField(max_length=100, blank=True, null=True)
数据导入逻辑
# utils.py
def import_data(file):
data_list = parse_xlsx(file) # 假设已有解析 XLSX 的方法
validate_data(data_list) # 假设已有数据校验的方法
for data_item in data_list:
base_info, _ = BaseInfo.objects.get_or_create(
user=data_item['user'],
creation_time=data_item['creation_time']
)
# 获取或创建小类
sub_category, _ = SubCategory.objects.get_or_create(name=data_item['sub_category_name'])
# 创建类A数据
CategoryA.objects.create(
base_info=base_info,
sub_category=sub_category,
param_1=data_item.get('param_1', None),
param_2=data_item.get('param_2', None),
)
# 创建类B数据
CategoryB.objects.create(
base_info=base_info,
sub_category=sub_category,
param_1=data_item.get('param_1', None),
param_2=data_item.get('param_2', None),
)
增删改查接口
# views.py
from rest_framework import viewsets, status
from rest_framework.response import Response
from .models import BaseInfo, CategoryA, CategoryB, SubCategory
from .serializers import BaseInfoSerializer, CategoryASerializer, CategoryBSerializer
class BaseInfoViewSet(viewsets.ModelViewSet):
queryset = BaseInfo.objects.all()
serializer_class = BaseInfoSerializer
class CategoryAViewSet(viewsets.ModelViewSet):
queryset = CategoryA.objects.all()
serializer_class = CategoryASerializer
class CategoryBViewSet(viewsets.ModelViewSet):
queryset = CategoryB.objects.all()
serializer_class = CategoryBSerializer
class RetrieveCompleteDataViewSet(viewsets.ViewSet):
def get(self, request, user, creation_time):
base_info = BaseInfo.objects.get(user=user, creation_time=creation_time)
response_data = {
"base_info": {
"user": base_info.user,
"creation_time": base_info.creation_time,
},
"category_a": [],
"category_b": [],
}
for entry in base_info.category_a_entries.all():
response_data["category_a"].append({
"sub_category": entry.sub_category.name,
"param_1": entry.param_1,
"param_2": entry.param_2,
})
for entry in base_info.category_b_entries.all():
response_data["category_b"].append({
"sub_category": entry.sub_category.name,
"param_1": entry.param_1,
"param_2": entry.param_2,
})
return Response(response_data)
class FilterByParametersViewSet(viewsets.ViewSet):
def post(self, request):
params = request.data.get('params', {})
results = []
# 根据参数筛选数据
category_a_entries = CategoryA.objects.filter(**params)
category_b_entries = CategoryB.objects.filter(**params)
for entry in category_a_entries:
results.append({
"base_info": entry.base_info.user,
"creation_time": entry.base_info.creation_time,
"sub_category": entry.sub_category.name,
"param_1": entry.param_1,
"param_2": entry.param_2,
})
for entry in category_b_entries:
results.append({
"base_info": entry.base_info.user,
"creation_time": entry.base_info.creation_time,
"sub_category": entry.sub_category.name,
"param_1": entry.param_1,
"param_2": entry.param_2,
})
return Response(results, status=status.HTTP_200_OK)
Serializer 示例
确保你有适当的序列化器来处理模型数据。
# serializers.py
from rest_framework import serializers
from .models import BaseInfo, CategoryA, CategoryB
class BaseInfoSerializer(serializers.ModelSerializer):
class Meta:
model = BaseInfo
fields = '__all__'
class CategoryASerializer(serializers.ModelSerializer):
class Meta:
model = CategoryA
fields = '__all__'
class CategoryBSerializer(serializers.ModelSerializer):
class Meta:
model = CategoryB
fields = '__all__'
总结
- 数据模型:每个大类、子类和参数字段都有清晰的结构。
- 数据导入:从 XLSX 文件中解析数据并写入数据库。
- 增删改查接口:提供完整的 CRUD 操作接口。
- 参数筛选接口:支持根据传入的参数进行数据筛选。
如果需要进一步的调整或具体功能,随时告诉我!
为了让代码更动态且减少重复,您可以将所有大类模型存储在一个列表中,然后通过循环来构建查询条件。这种方法可以让您在添加新的大类模型时只需修改列表,而无需重复代码。
优化后的代码实现
# views.py
from django.db.models import Q
from rest_framework import viewsets, status
from rest_framework.response import Response
def build_filter_conditions(params):
"""动态构建筛选条件"""
conditions = {}
for key, value in params.items():
if value: # 只添加非空条件
conditions[key] = value
return conditions
class RetrieveCompleteDataViewSet(viewsets.ViewSet):
def get(self, request, user, creation_time):
try:
base_info = BaseInfo.objects.get(user=user, creation_time=creation_time)
except BaseInfo.DoesNotExist:
return Response({"detail": "Data not found"}, status=status.HTTP_404_NOT_FOUND)
response_data = {
"base_info": {
"user": base_info.user,
"creation_time": base_info.creation_time,
},
"categories": {}
}
categories = Category.objects.prefetch_related('subcategories')
for category in categories:
category_name = category.name
response_data["categories"][category_name] = {}
for sub_category in category.subcategories.all():
sub_category_name = sub_category.name
response_data["categories"][category_name][sub_category_name] = {
"params": []
}
entries = (CategoryA.objects.filter(base_info=base_info, sub_category=sub_category) |
CategoryB.objects.filter(base_info=base_info, sub_category=sub_category))
for entry in entries:
response_data["categories"][category_name][sub_category_name]["params"].append({
"param_1": entry.param_1,
"param_2": entry.param_2,
})
return Response(response_data)
class FilterByParametersViewSet(viewsets.ViewSet):
def post(self, request):
params = request.data.get('params', {})
filter_conditions = build_filter_conditions(params)
results = {}
# 将所有分类模型放在一个列表中
category_models = [CategoryA, CategoryB, CategoryC] # 添加所有大类模型
# 使用 Q 对象动态构建查询条件
query = Q()
for model in category_models:
query |= Q(**filter_conditions)
# 根据动态条件筛选数据
entries = []
for model in category_models:
entries += list(model.objects.filter(query).select_related('sub_category', 'base_info'))
for entry in entries:
category_name = entry.sub_category.category.name
sub_category_name = entry.sub_category.name
if category_name not in results:
results[category_name] = {"sub_categories": {}}
if sub_category_name not in results[category_name]["sub_categories"]:
results[category_name]["sub_categories"][sub_category_name] = {"params": []}
results[category_name]["sub_categories"][sub_category_name]["params"].append({
"base_info_user": entry.base_info.user,
"creation_time": entry.base_info.creation_time,
"param_1": entry.param_1,
"param_2": entry.param_2,
})
return Response(results, status=status.HTTP_200_OK)
关键点
- 动态模型列表:将所有大类模型放入
category_models
列表中,这样可以方便地管理和扩展模型。 - 循环处理查询:通过循环处理每个模型,避免硬编码的查询条件。
- 结果整合:在查询时使用
+=
将所有模型的结果汇总到一个列表中,保持逻辑简洁。
总结
这种方式使得代码结构更清晰,并易于扩展和维护。如果您有更多需求或想法,请随时告诉我!
好的,下面是一个完整的 Django 应用示例,包含数据库设计、数据读入、检查、导入,以及增删改查功能,支持根据层级关系的参数进行查询。
数据库模型设计
- 基础信息表(
BaseInfo
) - 参数映射表(
ParameterMapping
) - 数据表(
DataEntry
)
from django.db import models
from django.contrib.postgres.fields import JSONField
class BaseInfo(models.Model):
user = models.CharField(max_length=255) # 写入人员
creation_time = models.DateTimeField(auto_now_add=True) # 创建时间
def __str__(self):
return f"{self.user} - {self.creation_time}"
class ParameterMapping(models.Model):
category = models.CharField(max_length=255) # 大类名称
subcategory = models.CharField(max_length=255, null=True, blank=True) # 小类名称
parameter = models.CharField(max_length=255) # 参数名称
regex_pattern = models.CharField(max_length=255, null=True, blank=True) # 校验正则
def __str__(self):
return f"{self.category} -> {self.subcategory} -> {self.parameter}"
class DataEntry(models.Model):
base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE) # 关联基础信息
data = JSONField() # 存储数据,包括参数及其值
def __str__(self):
return f"Data Entry for {self.base_info.user} - {self.data}"
数据导入逻辑
数据导入命令
创建一个管理命令用于读取 xlsx
文件并导入数据。
import pandas as pd
import re
from django.core.management.base import BaseCommand
from your_app.models import DataEntry, ParameterMapping, BaseInfo
class Command(BaseCommand):
help = 'Import data from xlsx file'
def add_arguments(self, parser):
parser.add_argument('file_path', type=str)
def handle(self, *args, **kwargs):
file_path = kwargs['file_path']
df = pd.read_excel(file_path)
# 获取参数映射和正则
parameter_mappings = ParameterMapping.objects.all()
mapping_dict = {(pm.category, pm.subcategory, pm.parameter): pm.regex_pattern for pm in parameter_mappings}
for index, row in df.iterrows():
# 创建基础信息
base_info = BaseInfo.objects.create(user=row['user']) # 假设用户在某一列
# 遍历表头并构建数据
data = {}
for header in row.index[3:]: # 假设数据从第四列开始
category = row['大类']
subcategory = row['小类']
parameter = header
regex_pattern = mapping_dict.get((category, subcategory, parameter))
value = row[header]
if regex_pattern and not re.match(regex_pattern, str(value)):
self.stdout.write(self.style.ERROR(f"Invalid data for {parameter}: {value}"))
continue # 跳过不符合的值
# 赋值到数据字典中
data[parameter] = value
# 创建数据条目
DataEntry.objects.create(
base_info=base_info,
data=data
)
self.stdout.write(self.style.SUCCESS('Data imported successfully'))
增删改查接口
views.py
from rest_framework import viewsets
from rest_framework import status
from rest_framework.response import Response
from your_app.models import BaseInfo, ParameterMapping, DataEntry
from your_app.serializers import BaseInfoSerializer, ParameterMappingSerializer, DataEntrySerializer
class BaseInfoViewSet(viewsets.ModelViewSet):
queryset = BaseInfo.objects.all()
serializer_class = BaseInfoSerializer
class ParameterMappingViewSet(viewsets.ModelViewSet):
queryset = ParameterMapping.objects.all()
serializer_class = ParameterMappingSerializer
class DataEntryViewSet(viewsets.ModelViewSet):
queryset = DataEntry.objects.all()
serializer_class = DataEntrySerializer
def filter_queryset(self, queryset):
# 根据参数层级过滤数据
filter_conditions = self.request.query_params
for key, value in filter_conditions.items():
queryset = queryset.filter(data__contains={key: value})
return queryset
Serializers
serializers.py
from rest_framework import serializers
from your_app.models import BaseInfo, ParameterMapping, DataEntry
class BaseInfoSerializer(serializers.ModelSerializer):
class Meta:
model = BaseInfo
fields = '__all__'
class ParameterMappingSerializer(serializers.ModelSerializer):
class Meta:
model = ParameterMapping
fields = '__all__'
class DataEntrySerializer(serializers.ModelSerializer):
class Meta:
model = DataEntry
fields = '__all__'
URLs
urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from your_app.views import BaseInfoViewSet, ParameterMappingViewSet, DataEntryViewSet
router = DefaultRouter()
router.register(r'baseinfo', BaseInfoViewSet)
router.register(r'parametermapping', ParameterMappingViewSet)
router.register(r'dataentry', DataEntryViewSet)
urlpatterns = [
path('', include(router.urls)),
]
使用示例
- 导入数据:
python manage.py import_data your_file.xlsx
- 增删改查 API:
- 获取所有数据:
GET /dataentry/
- 根据参数查询数据:
GET /dataentry/?parameter=value
- 创建数据:
POST /dataentry/
(请求体中包含数据) - 更新数据:
PUT /dataentry/<id>/
- 删除数据:
DELETE /dataentry/<id>/
- 获取所有数据:
总结
这个设计充分考虑了您提到的层级关系和数据的检验与导入逻辑,所有数据通过 JSON 存储,便于扩展。查询接口灵活,可以根据需要进行调整。如果有更多需求或调整,请告诉我!
以下是带有注释的完整代码,详细说明了每个部分的功能和实现逻辑。
1. views.py
from rest_framework import viewsets
from rest_framework import status
from rest_framework.response import Response
from your_app.models import BaseInfo, ParameterMapping, DataEntry
from your_app.serializers import BaseInfoSerializer, ParameterMappingSerializer, DataEntrySerializer
# 基础信息的视图集
class BaseInfoViewSet(viewsets.ModelViewSet):
queryset = BaseInfo.objects.all() # 查询所有基础信息
serializer_class = BaseInfoSerializer # 使用的序列化器
def list(self, request):
queryset = self.queryset # 获取所有基础信息
serializer = self.serializer_class(queryset, many=True) # 序列化
return Response(serializer.data) # 返回序列化的数据
def create(self, request):
serializer = self.serializer_class(data=request.data) # 创建序列化对象
if serializer.is_valid(): # 检查数据有效性
serializer.save() # 保存数据
return Response(serializer.data, status=status.HTTP_201_CREATED) # 返回成功响应
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) # 返回错误信息
def retrieve(self, request, pk=None):
base_info = self.get_object() # 获取特定基础信息
serializer = self.serializer_class(base_info) # 序列化
return Response(serializer.data) # 返回数据
def update(self, request, pk=None):
base_info = self.get_object() # 获取特定基础信息
serializer = self.serializer_class(base_info, data=request.data) # 更新序列化对象
if serializer.is_valid(): # 检查数据有效性
serializer.save() # 保存更新
return Response(serializer.data) # 返回更新后的数据
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) # 返回错误信息
def destroy(self, request, pk=None):
base_info = self.get_object() # 获取特定基础信息
base_info.delete() # 删除数据
return Response(status=status.HTTP_204_NO_CONTENT) # 返回204状态,表示成功删除
# 参数映射的视图集
class ParameterMappingViewSet(viewsets.ModelViewSet):
queryset = ParameterMapping.objects.all() # 查询所有参数映射
serializer_class = ParameterMappingSerializer # 使用的序列化器
def list(self, request):
queryset = self.queryset # 获取所有参数映射
serializer = self.serializer_class(queryset, many=True) # 序列化
return Response(serializer.data) # 返回序列化的数据
def create(self, request):
serializer = self.serializer_class(data=request.data) # 创建序列化对象
if serializer.is_valid(): # 检查数据有效性
serializer.save() # 保存数据
return Response(serializer.data, status=status.HTTP_201_CREATED) # 返回成功响应
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) # 返回错误信息
def retrieve(self, request, pk=None):
parameter_mapping = self.get_object() # 获取特定参数映射
serializer = self.serializer_class(parameter_mapping) # 序列化
return Response(serializer.data) # 返回数据
def update(self, request, pk=None):
parameter_mapping = self.get_object() # 获取特定参数映射
serializer = self.serializer_class(parameter_mapping, data=request.data) # 更新序列化对象
if serializer.is_valid(): # 检查数据有效性
serializer.save() # 保存更新
return Response(serializer.data) # 返回更新后的数据
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) # 返回错误信息
def destroy(self, request, pk=None):
parameter_mapping = self.get_object() # 获取特定参数映射
parameter_mapping.delete() # 删除数据
return Response(status=status.HTTP_204_NO_CONTENT) # 返回204状态,表示成功删除
# 数据条目的视图集
class DataEntryViewSet(viewsets.ModelViewSet):
queryset = DataEntry.objects.all() # 查询所有数据条目
serializer_class = DataEntrySerializer # 使用的序列化器
def list(self, request):
queryset = self.queryset # 获取所有数据条目
serializer = self.serializer_class(queryset, many=True) # 序列化
return Response(serializer.data) # 返回序列化的数据
def create(self, request):
serializer = self.serializer_class(data=request.data) # 创建序列化对象
if serializer.is_valid(): # 检查数据有效性
serializer.save() # 保存数据
return Response(serializer.data, status=status.HTTP_201_CREATED) # 返回成功响应
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) # 返回错误信息
def retrieve(self, request, pk=None):
data_entry = self.get_object() # 获取特定数据条目
serializer = self.serializer_class(data_entry) # 序列化
return Response(serializer.data) # 返回数据
def update(self, request, pk=None):
data_entry = self.get_object() # 获取特定数据条目
serializer = self.serializer_class(data_entry, data=request.data) # 更新序列化对象
if serializer.is_valid(): # 检查数据有效性
serializer.save() # 保存更新
return Response(serializer.data) # 返回更新后的数据
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) # 返回错误信息
def destroy(self, request, pk=None):
data_entry = self.get_object() # 获取特定数据条目
data_entry.delete() # 删除数据
return Response(status=status.HTTP_204_NO_CONTENT) # 返回204状态,表示成功删除
# 根据参数筛选数据
def filter_queryset(self, queryset):
filter_conditions = self.request.query_params # 获取查询参数
for key, value in filter_conditions.items(): # 遍历每个查询参数
queryset = queryset.filter(data__contains={key: value}) # 筛选数据
return queryset # 返回筛选后的数据
2. serializers.py
from rest_framework import serializers
from your_app.models import BaseInfo, ParameterMapping, DataEntry
# 基础信息的序列化器
class BaseInfoSerializer(serializers.ModelSerializer):
class Meta:
model = BaseInfo # 目标模型
fields = '__all__' # 包含所有字段
# 参数映射的序列化器
class ParameterMappingSerializer(serializers.ModelSerializer):
class Meta:
model = ParameterMapping # 目标模型
fields = '__all__' # 包含所有字段
# 数据条目的序列化器
class DataEntrySerializer(serializers.ModelSerializer):
class Meta:
model = DataEntry # 目标模型
fields = '__all__' # 包含所有字段
3. urls.py
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from your_app.views import BaseInfoViewSet, ParameterMappingViewSet, DataEntryViewSet
# 创建路由器并注册视图集
router = DefaultRouter()
router.register(r'baseinfo', BaseInfoViewSet) # 注册基础信息视图
router.register(r'parametermapping', ParameterMappingViewSet) # 注册参数映射视图
router.register(r'dataentry', DataEntryViewSet) # 注册数据条目视图
urlpatterns = [
path('', include(router.urls)), # 包含所有注册的路由
]
4. 使用示例
- 获取所有基础信息:
GET /baseinfo/
- 创建基础信息:
POST /baseinfo/ { "user": "张三" }
- 获取所有参数映射:
GET /parametermapping/
- 创建参数映射:
POST /parametermapping/ { "category": "大类1", "subcategory": "小类1", "parameter": "参数1", "regex_pattern": "^[0-9]+$" }
- 获取所有数据条目:
GET /dataentry/
- 创建数据条目:
POST /dataentry/ { "base_info": 1, "data": { "参数1": "100", "参数2": "200" } }
- 根据参数查询数据:
GET /dataentry/?参数1=100&参数2=200
总结
以上代码注释详细,功能清晰,实现了数据的增删改查和基于参数的筛选接口。如果有进一步的需求或需要调整的地方,请随时告诉我!
好的,既然查询参数可能比较多,且您希望使用 POST
请求来进行查询,我们可以创建一个自定义的 POST
请求来接收 JSON 格式的查询参数。这样既能保持接口简洁,也能确保层级结构的查询可以灵活地处理。
修改查询接口支持 POST 请求
1. 修改 DataEntryViewSet
以支持 POST 请求查询
我们会创建一个自定义的 POST
方法 search
,接收查询参数并进行过滤。
from rest_framework.decorators import action
from rest_framework import viewsets, status
from rest_framework.response import Response
from your_app.models import BaseInfo, ParameterMapping, DataEntry
from your_app.serializers import DataEntrySerializer
class DataEntryViewSet(viewsets.ModelViewSet):
queryset = DataEntry.objects.all()
serializer_class = DataEntrySerializer
# 自定义 POST 查询接口
@action(detail=False, methods=['post'], url_path='search')
def search(self, request):
query_data = request.data # 获取 POST 请求中的查询参数
if not query_data:
return Response({"error": "No query data provided."}, status=status.HTTP_400_BAD_REQUEST)
# 构建查询条件
filter_conditions = self.build_query_conditions(query_data)
# 查询数据,使用 JSONField 查询
entries = DataEntry.objects.filter(data__contains=filter_conditions)
serializer = self.get_serializer(entries, many=True)
return Response(serializer.data, status=status.HTTP_200_OK)
# 解析并导入xlsx文件
def create(self, request):
# 处理文件上传
file = request.FILES.get('file')
if not file:
return Response({"error": "No file provided."}, status=status.HTTP_400_BAD_REQUEST)
# 解析xlsx文件
parse_and_import_xlsx(file)
return Response({"success": "Data imported successfully."}, status=status.HTTP_201_CREATED)
# 构建查询条件的辅助方法
def build_query_conditions(self, query_data):
filter_conditions = {}
for category, subcategories in query_data.items():
for subcategory, parameters in subcategories.items():
for parameter, value in parameters.items():
key = f"{category}:{subcategory}:{parameter}" # 构建映射键
filter_conditions[key] = value
return filter_conditions
2. 使用 POST
请求查询数据
该接口现在支持 POST
请求。前端可以向 /dataentry/search/
发送包含嵌套 JSON 查询参数的请求。
示例 POST 请求格式:
POST /dataentry/search/
{
"大类1": {
"小类1": {
"参数1": "值1",
"参数2": "值2"
},
"小类2": {
"参数1": "值3"
}
},
"大类2": {
"小类1": {
"参数1": "值4"
}
}
}
3. Django 路由
您可以在 urls.py
中设置路由:
from django.urls import path, include
from rest_framework.routers import DefaultRouter
from your_app.views import DataEntryViewSet
router = DefaultRouter()
router.register(r'dataentry', DataEntryViewSet)
urlpatterns = [
path('', include(router.urls)),
]
4. 数据检查和导入逻辑
为了保证导入时表头与数据库字段的映射关系一致,您可以扩展之前的数据检查和导入函数,确保在写入数据之前先检查表头与字段的对应关系,并根据表头信息完成校验。
示例:在 parse_and_import_xlsx
中加入检查逻辑
import openpyxl
def parse_and_import_xlsx(file):
workbook = openpyxl.load_workbook(file)
sheet = workbook.active
headers = sheet[1] # 假设第一行是表头
field_mapping = get_field_mapping_from_db() # 从数据库中获取字段映射关系
# 表头和字段映射的检查
for idx, cell in enumerate(headers):
header = cell.value
if header not in field_mapping:
raise ValueError(f"Header {header} does not match any known fields.")
# 数据解析与导入
for row in sheet.iter_rows(min_row=2, values_only=True): # 从第二行开始是数据
base_info = row[:3] # 第一列到第三列为基础信息(大类、小类、参数)
data = row[3:] # 后面的列为参数数据
# 构建 JSON 数据
data_entry = {
"大类": base_info[0],
"小类": base_info[1],
"参数": base_info[2],
"数据": {field_mapping[headers[i].value]: data[i-3] for i in range(3, len(headers))}
}
# 写入数据库
DataEntry.objects.create(data=data_entry)
def get_field_mapping_from_db():
# 从数据库中查询存储的表头与字段的映射关系
mappings = ParameterMapping.objects.all()
return {mapping.header: mapping.field for mapping in mappings}
5. 总结
这个方案实现了:
- 通过 POST 请求进行层级查询:允许前端传递复杂的嵌套查询条件,后端根据这些条件进行数据筛选。
- 动态数据导入:确保表头与数据库字段一一对应,防止解析时出现错误。
- 灵活的架构:能够轻松扩展不同的大类、小类和参数,且不会因大类数量增加导致代码臃肿。
- 数据检查和校验:在导入前对表头与字段映射进行检查,确保数据完整性和正确性。
请根据需求和实际项目情况微调逻辑。如果有进一步问题或需要优化的地方,请告诉我!
为处理不同长度的 data_name
,并确保它们可以在不同分类中嵌套,我们可以调整代码,使得它能够根据每个 data_name
动态构建层级结构,而不受限于固定的分类。这是更新后的实现:
更新后的完整代码
import openpyxl
from django.core.exceptions import ValidationError
from django.db import models
# 模型定义
class ParameterMapping(models.Model):
data_name = models.JSONField() # 存储层级关系 [大类, 小类, 参数]
field = models.CharField(max_length=100) # 对应数据库字段
reg_ex = models.CharField(max_length=255, null=True, blank=True) # 校验正则
class DataEntry(models.Model):
data = models.JSONField() # 存储完整的数据结构
# 获取字段映射关系
def get_field_mapping_from_db():
mappings = ParameterMapping.objects.all()
field_mapping = {}
for mapping in mappings:
field_mapping[tuple(mapping.data_name)] = mapping.field # 使用元组作为键
return field_mapping
# 校验数据
def validate_data(value, reg_ex):
import re
if reg_ex and not re.match(reg_ex, str(value)):
raise ValidationError(f"Value '{value}' does not match the required format.")
# 解析并导入 XLSX 文件
def parse_and_import_xlsx(file):
workbook = openpyxl.load_workbook(file)
sheet = workbook.active
headers = sheet[1] # 假设第一行是表头
field_mapping = get_field_mapping_from_db() # 获取字段映射关系
# 数据解析与导入
for row in sheet.iter_rows(min_row=2, values_only=True):
base_info = row[:3] # 第一列到第三列为基础信息
data = row[3:] # 后面的列为参数数据
# 构建 JSON 数据
data_entry = {
"基础信息": {
"大类": base_info[0],
"小类": base_info[1],
"参数": base_info[2]
},
"数据": {}
}
# 动态构建嵌套数据
for i, header in enumerate(headers[3:]):
header_key = header.value
if header_key in field_mapping.values():
# 查找该表头对应的 data_name
for keys, field in field_mapping.items():
if field == header_key:
current_data = data_entry["数据"]
# 动态创建层级结构
for key in keys:
if key not in current_data:
current_data[key] = {}
current_data = current_data[key]
# 存储对应的数据值
current_data['value'] = data[i]
# 校验数据
reg_ex = ParameterMapping.objects.get(field=header_key).reg_ex
validate_data(data[i], reg_ex)
# 写入数据库
DataEntry.objects.create(data=data_entry)
# 示例使用
# parse_and_import_xlsx("path_to_your_file.xlsx")
关键点说明
-
动态嵌套结构:通过遍历
data_name
,在data_entry["数据"]
中动态创建嵌套层级。这使得可以根据data_name
的长度和内容,灵活地生成多层结构。 -
不同分类支持:不论
data_name
的长度和具体层级,代码都能动态识别并构建相应的数据结构。这意味着不同长度的data_name
可以在不同分类中灵活应用。
示例数据结构
假设 data_name
为:
["大类1"]
["大类2", "小类1"]
["大类2", "小类2", "参数1"]
最终构建的 JSON 数据可能如下:
{
"基础信息": {
"大类": "大类2",
"小类": "小类2",
"参数": "参数1"
},
"数据": {
"大类1": {
"value": "数据值1"
},
"大类2": {
"小类1": {
"value": "数据值2"
},
"小类2": {
"参数1": {
"value": "数据值3"
}
}
}
}
}
备注
- 如果您还有其他要求或需进一步调整,请随时告诉我!
如果原数据格式中的元组长度是不定的,我们可以修改代码,使其根据元组的长度动态地构建嵌套层级。这样,代码可以适应不同的分类结构,将最后一层作为参数名称。
修改后的代码
def transform_data_format(data):
transformed_data = {}
for sheet_name, entries in data.items():
transformed_entries = [] # 存储转换后的条目
for entry in entries:
transformed_entry = {} # 用于构建每个条目
for keys, value in entry.items():
# 动态构建嵌套字典,逐层构建每一个层级
current_level = transformed_entry
for i, key in enumerate(keys):
# 如果是最后一个元素,则这是参数名,直接赋值
if i == len(keys) - 1:
current_level[key] = value
else:
# 如果当前层级不存在,则创建
if key not in current_level:
current_level[key] = {}
# 进入下一级
current_level = current_level[key]
# 将转换后的条目添加到结果列表
transformed_entries.append(transformed_entry)
transformed_data[sheet_name] = transformed_entries
return transformed_data
# 示例输入数据
data = {
"Sheet1": [
{("大类1", "小类1", "参数1"): "值1", ("大类1", "参数2"): "值2"},
{("大类1", "小类2", "参数1"): "值3"},
{("大类2", "参数3"): "值4"}
],
"Sheet2": [
{("大类3", "小类1", "参数1"): "值5"},
{("大类3", "小类2", "参数1"): "值6", ("大类3", "参数4"): "值7"}
]
}
# 调用转换函数
transformed_data = transform_data_format(data)
# 输出结果
import pprint
pprint.pprint(transformed_data)
代码说明
-
遍历每个条目:对每个条目构建一个新的嵌套字典结构
transformed_entry
。 - 动态构建嵌套结构:
- 使用
for i, key in enumerate(keys)
遍历元组keys
。 - 如果是最后一个元素,则将该元素视为参数名,并赋值为
value
。 - 如果不是最后一个元素,则检查当前层级是否存在该 key,如果不存在则创建空字典。
- 随后进入下一级以继续构建层级。
- 使用
- 合并结果:将每个转换后的条目添加到
transformed_entries
,最终汇总到transformed_data
。
示例输出
给定输入示例,输出结果将为:
{
'Sheet1': [
{
'大类1': {
'小类1': {
'参数1': '值1'
},
'参数2': '值2',
'小类2': {
'参数1': '值3'
}
},
'大类2': {
'参数3': '值4'
}
}
],
'Sheet2': [
{
'大类3': {
'小类1': {
'参数1': '值5'
},
'小类2': {
'参数1': '值6'
},
'参数4': '值7'
}
}
]
}
适用性
这个方案适用于元组长度不定的情况,并自动生成层级关系。在最后一个元素作为参数名的规则下,它能灵活处理不同长度的层级结构。
要将 build_query_conditions
方法优化成一个更加通用的函数,我们可以引入可配置的分隔符以及支持多层嵌套的结构,使其更灵活。以下是优化后的代码:
优化后的通用函数
def build_query_conditions(query_data, separator=":"):
"""
构建查询条件的辅助函数。
:param query_data: 嵌套字典,格式为 {大类: {小类: {参数: 值, ...}, ...}, ...}
:param separator: 用于连接键的分隔符,默认为 ":"
:return: 构建的查询条件字典
"""
filter_conditions = {}
def recursive_build(data, current_key=""):
for key, value in data.items():
if isinstance(value, dict):
# 如果值是字典,递归调用
new_key = f"{current_key}{separator}{key}" if current_key else key
recursive_build(value, new_key)
else:
# 否则,添加到查询条件中
new_key = f"{current_key}{separator}{key}" if current_key else key
filter_conditions[new_key] = value
recursive_build(query_data)
return filter_conditions
# 示例使用
query_data = {
"大类1": {
"小类1": {
"参数1": "值1",
"参数2": "值2"
},
"小类2": {
"参数3": "值3"
}
},
"大类2": {
"小类3": {
"参数4": "值4"
}
}
}
# 调用优化后的函数
result = build_query_conditions(query_data)
print(result)
代码说明
- 函数参数:
query_data
: 输入的嵌套字典。separator
: 用于连接键的分隔符,默认值为":"
。
- 递归函数:
recursive_build
:- 接受当前数据和当前键(默认为空字符串)。
- 遍历每个键值对。
- 如果值是字典,则递归调用,构建新的键。
- 如果值不是字典,则将构建的键和值添加到
filter_conditions
。
- 灵活性:
- 通过分隔符的配置,用户可以根据需要更改键的连接方式。
- 适应不同层级的嵌套结构,能处理任意深度的嵌套字典。
示例输出
对于给定的示例输入,输出将为:
{
'大类1:小类1:参数1': '值1',
'大类1:小类1:参数2': '值2',
'大类1:小类2:参数3': '值3',
'大类2:小类3:参数4': '值4'
}
这种优化使得函数在处理多层嵌套结构时更加通用和灵活。如果您有其他要求或需要进一步修改,请随时告诉我!
了解了你的需求:附件不仅出现在某个大类下,而是分布在多个大类、小类中的特定位置,并且每个小类的附件可以通过文件路径来标识。为了让代码更加灵活并适应多个大类、小类中包含附件的情况,我们需要调整逻辑,以便在数据结构中的任意位置找到附件并生成对应的绝对 URL。
方案设计
我们需要做到:
- 处理任意层级:数据结构可能有多个层级,其中某些小类下包含附件。
- 遍历整个数据结构:递归地查找附件信息,并生成绝对 URL。
- 适应多个大类和小类中的附件:支持多个大类下的多个小类都有附件。
优化方案
假设你有一个复杂的嵌套 JSON 结构,并且在每个小类(或某些小类)下都有附件数据。附件是一个列表,包含文件路径或文件名。在解析这个结构时,我们可以递归地遍历每个层级并生成绝对 URL。
实现代码
-
模型设计:假设我们存储的数据包含多个层级,附件是嵌套在 JSON 数据中的某些小类下。
-
生成附件 URL 的方法:遍历整个数据结构,找到包含附件的部分,生成绝对 URL。
from django.db import models
import json
class Data(models.Model):
name = models.CharField(max_length=255)
data = models.JSONField() # 存储嵌套的 JSON 数据,包含附件路径等信息
def get_absolute_url(self, file_path):
"""
返回存储在 S3 或本地的文件的绝对 URL。
假设我们有一个存储基础 URL,文件名可以拼接在这个基础上形成 URL。
"""
base_url = "https://s3.amazonaws.com/my-bucket/" # 假设文件存储在 S3 上
return base_url + file_path
def transform_data_with_urls(self):
"""
将 JSON 字段中的附件路径转换为绝对 URL
"""
def recursive_transform(data):
# 检查当前层的数据
if isinstance(data, dict):
# 递归遍历字典的键值对
for key, value in data.items():
# 如果是字典,递归处理
if isinstance(value, dict):
recursive_transform(value)
elif isinstance(value, list):
# 如果是附件列表,生成绝对 URL
if key == 'attachments': # 假设附件存储在'attachments'键中
for attachment in value:
if 'file_name' in attachment:
attachment['url'] = self.get_absolute_url(attachment['file_name'])
else:
continue
elif isinstance(data, list):
# 如果是列表,递归每个元素
for item in data:
recursive_transform(item)
# 调用递归方法处理 JSON 数据
recursive_transform(self.data)
return self.data
3. 数据存储结构示例
假设存储的 JSON 数据如下,其中附件可能存在于不同的小类中:
{
"大类1": {
"小类1": {
"参数1": "值1",
"attachments": [
{"file_name": "file1.pdf"},
{"file_name": "file2.pdf"}
]
},
"小类2": {
"参数2": "值2",
"attachments": [
{"file_name": "file3.pdf"}
]
}
},
"大类2": {
"小类3": {
"参数3": "值3",
"attachments": [
{"file_name": "file4.pdf"}
]
}
}
}
4. 序列化和视图代码
你可以使用 Django Rest Framework(DRF)来序列化这个数据,并确保返回的数据中包含绝对 URL。
4.1. 序列化器
from rest_framework import serializers
class DataSerializer(serializers.ModelSerializer):
class Meta:
model = Data
fields = ['name', 'data']
def to_representation(self, instance):
"""
重写 to_representation 方法,确保输出的数据包含附件的绝对 URL。
"""
data = super().to_representation(instance)
# 使用模型的 transform_data_with_urls 方法将附件路径转换为绝对 URL
data['data'] = instance.transform_data_with_urls()
return data
4.2. 视图代码(Django Rest Framework)
from rest_framework.views import APIView
from rest_framework.response import Response
class DataView(APIView):
def get(self, request, *args, **kwargs):
# 查询数据
data_instance = Data.objects.first() # 获取示例数据
# 使用序列化器将数据转换为包含 URL 的 JSON 数据
serializer = DataSerializer(data_instance)
# 返回包含 URL 的数据
return Response(serializer.data)
5. 示例输出
对于上面的 data
,输出将会是:
{
"name": "示例数据",
"data": {
"大类1": {
"小类1": {
"参数1": "值1",
"attachments": [
{"file_name": "file1.pdf", "url": "https://s3.amazonaws.com/my-bucket/file1.pdf"},
{"file_name": "file2.pdf", "url": "https://s3.amazonaws.com/my-bucket/file2.pdf"}
]
},
"小类2": {
"参数2": "值2",
"attachments": [
{"file_name": "file3.pdf", "url": "https://s3.amazonaws.com/my-bucket/file3.pdf"}
]
}
},
"大类2": {
"小类3": {
"参数3": "值3",
"attachments": [
{"file_name": "file4.pdf", "url": "https://s3.amazonaws.com/my-bucket/file4.pdf"}
]
}
}
}
}
6. 总结
- 递归处理附件:我们使用递归方法遍历 JSON 数据,在遇到附件信息时,生成文件的绝对 URL。
- 灵活的文件存储结构:附件可以位于任意小类下,并且每个附件都可以生成相应的 URL。
- DRF 序列化:通过重写 DRF 序列化器中的
to_representation
方法,确保返回的数据包含附件的绝对 URL。
这样,你可以在查询数据时,不仅能够得到嵌套的 JSON 数据结构,还能自动生成附件的绝对 URL,以便于前端使用。如果附件存储在不同的服务(如 S3、OSS 等),只需要调整 get_absolute_url
方法中的 URL 生成规则即可。
是的,如果数据量很大,处理大量的嵌套 JSON 数据和生成大量的文件 URL 时,可能会出现性能问题。具体的性能瓶颈可能出现在以下几个方面:
- 递归遍历 JSON 数据:
- 遍历整个嵌套数据结构时,随着数据层级的增加和数据量的增加,递归操作的开销也会逐渐增大。
- 如果数据结构复杂,且每个层级都包含附件,需要频繁地访问和修改大量的字典和列表。
- 生成 URL 的计算开销:
- 每次需要生成文件的绝对 URL 时,都需要进行文件路径拼接和其他计算操作,这在大数据量时会增加计算负担。
- 数据库查询性能:
- 如果数据非常庞大,单次数据库查询可能会带来很大的 I/O 负担,尤其是当你一次性加载大量数据时。
- 在高并发环境下,查询性能可能成为瓶颈。
- 序列化性能:
- 如果数据量过大,在 DRF 序列化时,可能会因为需要将大量嵌套数据格式化成 JSON 而导致序列化速度较慢。
如何优化?
- 数据分页加载:
- 如果数据量非常大,可以考虑在数据库层面做分页查询,避免一次性加载过多的数据。
- 在 DRF 中,可以使用
pagination
类来对数据进行分页加载。
- 批量操作和异步任务:
- 对于 URL 的生成操作,可以考虑将其放到后台异步任务中(例如使用 Celery),这样不会阻塞前端请求。
- 使用批量数据库操作(例如
bulk_create
)来减少数据库交互次数。
- 减少递归深度:
- 尽量减少递归调用的深度,或者使用显式的栈(迭代方式)来替代递归调用。
- 如果能预处理数据(例如在数据入库时预先处理附件生成 URL),可以减少查询时的计算负担。
- 优化 JSON 字段存储:
- 如果 JSON 字段非常大,可以考虑对 JSON 数据做压缩存储(例如使用
gzip
或lz4
等算法),并且只在需要时解压。
- 如果 JSON 字段非常大,可以考虑对 JSON 数据做压缩存储(例如使用
- 缓存机制:
- 使用缓存来存储已计算过的 URL,避免每次查询时都需要重新生成。例如,可以使用 Redis 缓存生成的 URL,尤其是对于那些被频繁访问的附件。
- 预计算附件 URL:
- 如果附件 URL 生成规则固定,可以考虑在数据写入时直接计算好 URL,并存储到数据库中,而不是每次查询时再生成。
具体优化建议
1. 使用分页查询优化数据库查询
在 DRF 中,可以通过分页来优化查询:
from rest_framework.pagination import PageNumberPagination
from rest_framework.views import APIView
from rest_framework.response import Response
class CustomPagination(PageNumberPagination):
page_size = 10 # 每页返回的数据数量
class DataView(APIView):
pagination_class = CustomPagination
def get(self, request, *args, **kwargs):
data_instances = Data.objects.all() # 假设查询所有数据
# 使用分页器对数据进行分页
paginator = CustomPagination()
paginated_data = paginator.paginate_queryset(data_instances, request)
# 序列化数据
serializer = DataSerializer(paginated_data, many=True)
# 返回分页数据
return paginator.get_paginated_response(serializer.data)
这样就可以避免一次性查询大量数据。
2. 使用缓存存储 URL
如果生成 URL 的操作开销较大,并且 URL 内容在短时间内不经常变化,可以使用缓存来存储生成的 URL。
import hashlib
from django.core.cache import cache
class Data(models.Model):
name = models.CharField(max_length=255)
data = models.JSONField()
def get_absolute_url(self, file_path):
# 使用缓存避免重复计算
cache_key = hashlib.md5(file_path.encode()).hexdigest()
cached_url = cache.get(cache_key)
if cached_url:
return cached_url
# 如果缓存中没有,计算绝对 URL
base_url = "https://s3.amazonaws.com/my-bucket/"
url = base_url + file_path
cache.set(cache_key, url, timeout=60 * 60 * 24) # 缓存 24 小时
return url
通过这种方式,每次生成 URL 时,首先检查缓存中是否存在,如果存在则直接返回缓存中的 URL。
3. 后台任务(Celery)处理 URL 生成
将生成文件 URL 的操作移到后台处理,避免在请求响应中占用过多时间。可以使用 Celery 来处理附件 URL 的生成任务。
from celery import shared_task
@shared_task
def generate_attachment_urls(data_instance_id):
data_instance = Data.objects.get(id=data_instance_id)
transformed_data = data_instance.transform_data_with_urls()
# 这里可以将转换后的数据存入数据库或缓存中
# 例如,更新数据实例的 URL 数据
data_instance.data = transformed_data
data_instance.save()
# 触发任务
generate_attachment_urls.delay(data_instance.id)
在前端请求时,你可以返回一个响应,告诉用户数据正在处理,并且稍后获取已处理的结果。
4. 预处理附件 URL
如果附件的文件 URL 是根据固定的规则生成的(例如基于文件名),你可以在数据导入时(或定期任务中)预计算并存储 URL。
class Data(models.Model):
name = models.CharField(max_length=255)
data = models.JSONField()
def save(self, *args, **kwargs):
# 在保存时计算所有附件 URL 并将其存储在 data 字段中
if 'attachments' in self.data:
for attachment in self.data['attachments']:
file_name = attachment.get('file_name', '')
if file_name:
attachment['url'] = self.get_absolute_url(file_name)
super().save(*args, **kwargs)
5. 使用更高效的 JSON 存储
对于大量嵌套的 JSON 数据,可以考虑使用 PostgreSQL 的 JSONB
类型字段,它比普通的 JSON
类型更加高效,支持更复杂的查询和索引。
总结
如果数据量较大,优化的关键是减少不必要的计算和查询,使用分页、缓存、后台任务等技术来提升性能。递归的操作可能会在大量数据时出现性能瓶颈,因此在生成 URL 的时候,可以考虑预计算或异步处理。
如果选择在保存数据时将绝对 URL直接计算并存储到数据库中,而不是每次查询时动态计算,将大大减轻查询和处理时的性能压力。这种方法会把计算和生成 URL 的开销转移到数据写入(或更新)阶段,这样在查询时就可以直接从数据库获取已经包含 URL 的数据。
优势:
- 查询效率提升:查询时不再需要每次生成 URL,直接从数据库获取数据即可。
- 避免重复计算:每次查询都不需要再次计算文件的 URL,减少了计算资源的消耗。
- 减少响应时间:前端直接获得已处理好的数据,提升用户体验。
解决方案:
- 在保存时计算附件 URL:在数据存储时,检查数据中的附件字段,并生成相应的绝对 URL,直接存储在数据表中。
- 更新数据时也计算 URL:如果数据发生变更,尤其是涉及到附件的字段时,应该再次计算并更新附件的 URL。
实现代码
1. 数据模型
我们在 Data
模型中创建一个方法来处理附件 URL 的生成,在数据保存时调用该方法。
from django.db import models
class Data(models.Model):
name = models.CharField(max_length=255)
data = models.JSONField() # 存储嵌套 JSON 数据
created_at = models.DateTimeField(auto_now_add=True)
def get_absolute_url(self, file_path):
"""
假设我们使用 S3 存储文件,生成绝对 URL
"""
base_url = "https://s3.amazonaws.com/my-bucket/"
return base_url + file_path
def save_attachments_with_urls(self):
"""
在数据保存时处理所有附件路径,生成绝对 URL 并更新数据。
"""
def recursive_transform(data):
"""
递归遍历 JSON 数据,查找并生成附件的绝对 URL。
"""
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, dict):
recursive_transform(value)
elif isinstance(value, list):
if key == 'attachments': # 假设附件是一个列表,存在于 'attachments' 键中
for attachment in value:
if 'file_name' in attachment:
file_name = attachment['file_name']
attachment['url'] = self.get_absolute_url(file_name)
else:
continue
elif isinstance(data, list):
for item in data:
recursive_transform(item)
# 执行递归转换,处理 data 字段中的附件路径
recursive_transform(self.data)
def save(self, *args, **kwargs):
# 在保存数据前,处理附件并添加 URL
self.save_attachments_with_urls()
super().save(*args, **kwargs)
2. 数据的保存
在调用 save
方法时,系统会自动检查 data
字段中的附件数据,并为每个附件生成对应的绝对 URL,然后保存到数据库中。
# 示例:保存一个新的数据实例
data_instance = Data(
name="示例数据",
data={
"大类1": {
"小类1": {
"参数1": "值1",
"attachments": [
{"file_name": "file1.pdf"},
{"file_name": "file2.pdf"}
]
}
}
}
)
data_instance.save() # 保存时会自动生成附件 URL
3. 数据序列化
在 DRF 中,创建一个序列化器来展示保存的已处理数据。
from rest_framework import serializers
class DataSerializer(serializers.ModelSerializer):
class Meta:
model = Data
fields = ['name', 'data', 'created_at']
def to_representation(self, instance):
"""
重写 `to_representation` 方法,返回处理后的数据。
"""
data = super().to_representation(instance)
return data
4. 查询接口
from rest_framework.views import APIView
from rest_framework.response import Response
class DataView(APIView):
def get(self, request, *args, **kwargs):
# 获取数据实例
data_instance = Data.objects.first() # 示例:获取第一条数据
# 使用序列化器将数据转化为 JSON 格式
serializer = DataSerializer(data_instance)
# 返回数据
return Response(serializer.data)
5. 查询结果
对于上述保存的数据,查询接口返回的数据将包含附件的绝对 URL:
{
"name": "示例数据",
"data": {
"大类1": {
"小类1": {
"参数1": "值1",
"attachments": [
{"file_name": "file1.pdf", "url": "https://s3.amazonaws.com/my-bucket/file1.pdf"},
{"file_name": "file2.pdf", "url": "https://s3.amazonaws.com/my-bucket/file2.pdf"}
]
}
}
},
"created_at": "2024-10-28T12:00:00Z"
}
6. 总结
- 在数据保存时生成附件 URL:通过
save_attachments_with_urls
方法,我们在保存数据时预先生成了每个附件的 URL,避免每次查询时都计算 URL。 - 递归处理:数据中的附件路径(如果存在)会被递归地处理,适应多层嵌套结构。
- 性能优化:在保存数据时进行附件 URL 的处理,查询时直接使用已存储的结果,避免了查询时重复计算。
注意事项:
- 附件 URL 存储的文件路径:你可以根据具体情况定制
get_absolute_url
方法中的路径生成规则(例如,文件存储在 S3、OSS 或本地文件系统等)。 - 数据更新:如果数据中的附件字段发生变动(例如文件名变了),需要再次调用
save_attachments_with_urls
方法更新 URL。
这种方法非常适合数据频繁查询且不需要动态生成附件 URL 的场景,避免了每次查询时的计算开销。
在使用 bulk_create
批量创建数据时,由于它是一次性将多个对象插入到数据库中,因此无法像普通的 save()
方法一样对每个对象进行预处理。在 bulk_create
中,通常无法在插入时执行像计算附件 URL 这样的操作。
问题
当你使用 bulk_create
时,由于 bulk_create
会跳过模型的 save()
方法,所以不能在保存数据时直接触发附件 URL 的计算。
解决方案
-
提前处理数据:在调用
bulk_create
之前,先预处理所有数据,确保每个对象的数据已经包含了附件的绝对 URL。你可以在插入之前先遍历所有数据,将附件的 URL 填充到数据中。 -
通过批量更新:如果数据已经存在,并且只是需要更新附件 URL,使用
bulk_update
来批量更新 URL 字段。
具体实现
1. 处理数据的函数
首先,定义一个函数,在将数据传递给 bulk_create
之前,处理所有附件的 URL。
def process_attachments_with_urls(data_list):
"""
批量处理数据中的附件,并生成绝对 URL。
:param data_list: 包含多个 Data 实例的列表
"""
for data_instance in data_list:
def recursive_transform(data):
"""递归遍历数据,查找并生成附件 URL"""
if isinstance(data, dict):
for key, value in data.items():
if isinstance(value, dict):
recursive_transform(value)
elif isinstance(value, list):
if key == 'attachments': # 假设附件字段为 'attachments'
for attachment in value:
if 'file_name' in attachment:
file_name = attachment['file_name']
attachment['url'] = data_instance.get_absolute_url(file_name)
else:
continue
elif isinstance(data, list):
for item in data:
recursive_transform(item)
# 处理 data 字段中的附件
recursive_transform(data_instance.data)
2. 批量创建数据
在准备好数据之后,使用 bulk_create
批量创建数据。
from django.db import models
class Data(models.Model):
name = models.CharField(max_length=255)
data = models.JSONField() # 存储嵌套 JSON 数据
created_at = models.DateTimeField(auto_now_add=True)
def get_absolute_url(self, file_path):
"""
假设我们使用 S3 存储文件,生成绝对 URL
"""
base_url = "https://s3.amazonaws.com/my-bucket/"
return base_url + file_path
# 假设我们有多个 Data 实例的数据
data_instances = [
Data(name="示例数据1", data={
"大类1": {
"小类1": {
"参数1": "值1",
"attachments": [
{"file_name": "file1.pdf"},
{"file_name": "file2.pdf"}
]
}
}
}),
Data(name="示例数据2", data={
"大类2": {
"小类2": {
"参数2": "值2",
"attachments": [
{"file_name": "file3.pdf"},
{"file_name": "file4.pdf"}
]
}
}
}),
]
# 处理数据中的附件并生成 URL
process_attachments_with_urls(data_instances)
# 批量创建数据
Data.objects.bulk_create(data_instances)
3. 使用 bulk_update
更新数据中的附件 URL(可选)
如果你已经使用 bulk_create
创建了数据,并且后续某些附件 URL 需要更新,你可以使用 bulk_update
来批量更新这些 URL。比如,当附件的文件路径变化时,你可以通过以下方式更新:
# 假设 data_instances 是要更新的数据
data_instances = Data.objects.all()
# 处理数据中的附件并生成新的 URL
process_attachments_with_urls(data_instances)
# 批量更新数据中的附件 URL
Data.objects.bulk_update(data_instances, ['data']) # 更新 data 字段
4. 优化考虑
- 处理大数据量:如果你需要批量处理非常大的数据集,
bulk_create
和bulk_update
在性能上会比逐条save()
快很多,但依然需要确保数据处理部分不会导致内存溢出。如果数据量非常大,可以考虑对数据进行分页或分批处理。 - 减少数据库 I/O:
bulk_create
和bulk_update
都能够减少数据库 I/O,因为它们会在一次请求中执行所有的插入或更新操作。
总结
- 提前处理数据:在调用
bulk_create
前,确保所有的附件 URL 已经被计算并存储到数据实例中。 - 使用递归方法:递归遍历数据中的 JSON 字段,确保每个附件字段都能正确生成 URL。
- 性能考虑:在数据量大时,考虑分页或分批处理,以免造成内存占用过多。
通过这种方式,你能够在 bulk_create
过程中处理复杂的数据逻辑,并保证数据正确性,同时提升数据插入的性能。
如果附件和数据一起从前端传递到后端进行保存,通常会有两种情况:一种是附件作为文件上传,另一种是附件作为文件名或路径传递。在这两种情况下,我们需要分别处理文件上传和路径传递。
场景说明:
- 附件作为文件上传:前端将文件上传到后端,后端将其存储(例如存储到 S3 或本地文件系统),并将文件的 URL 存储到数据库。
- 附件作为文件名或路径:前端直接传递文件名或文件路径,后端根据文件存储位置生成 URL 并保存。
下面我们提供两种场景的实现方法,并展示前后端代码。
1. 附件作为文件上传(上传文件)
后端实现
假设我们使用 Django 的 FileField
或 ImageField
来存储附件,前端会通过 multipart/form-data
格式上传文件。
1.1. 数据模型
在 Django 中,使用 FileField
或 ImageField
来处理附件上传,文件会保存在服务器或云存储中。
from django.db import models
class Data(models.Model):
name = models.CharField(max_length=255)
data = models.JSONField() # 存储嵌套的 JSON 数据
created_at = models.DateTimeField(auto_now_add=True)
# 附件字段,可以处理单个文件
attachments = models.JSONField(default=list) # 附件可以以列表形式存储多个文件的路径/URL
def save(self, *args, **kwargs):
# 保存附件 URL
self.save_attachments_with_urls()
super().save(*args, **kwargs)
def save_attachments_with_urls(self):
"""
处理附件,生成绝对 URL
"""
base_url = "https://s3.amazonaws.com/my-bucket/"
for attachment in self.attachments:
file_name = attachment.get("file_name")
if file_name:
# 假设附件是存储在 S3 上,可以通过文件路径生成 URL
attachment["url"] = f"{base_url}{file_name}"
1.2. 数据接收与文件上传视图
在接收数据时,使用 Django Rest Framework 来处理文件上传。我们需要处理 multipart/form-data
格式的数据,并提取文件和其他字段。
from rest_framework import serializers
from rest_framework.parsers import MultiPartParser, FormParser
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .models import Data
class DataSerializer(serializers.ModelSerializer):
class Meta:
model = Data
fields = ['name', 'data', 'attachments'] # 包含附件字段
class DataView(APIView):
parser_classes = [MultiPartParser, FormParser]
def post(self, request, *args, **kwargs):
# 获取 JSON 数据和文件
data = request.data # 包含文件的字段将自动解析
files = request.FILES # 文件字段
# 构建数据字典,将文件转移到合适的地方
attachment_list = []
for file_key, file in files.items():
attachment_list.append({"file_name": file.name, "file": file})
# 将附件添加到 data 字段
data["attachments"] = attachment_list
# 反序列化并保存数据
serializer = DataSerializer(data=data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
前端实现
前端使用 FormData
来处理文件上传,并通过 POST
请求发送到后端。
1.3. 前端代码(JavaScript)
function submitForm() {
let formData = new FormData();
// 将其他数据(如大类、小类、参数等)作为 JSON 传递
let data = {
name: "示例数据",
data: {
"大类1": {
"小类1": {
"参数1": "值1"
}
}
}
};
// 添加数据到 FormData
formData.append('data', JSON.stringify(data));
// 添加附件文件
let fileInput = document.getElementById('fileInput'); // 假设文件输入框的 ID 为 fileInput
for (let i = 0; i < fileInput.files.length; i++) {
formData.append('attachments', fileInput.files[i]);
}
// 发送 POST 请求
fetch('/api/data/', {
method: 'POST',
body: formData,
})
.then(response => response.json())
.then(data => {
console.log('成功:', data);
})
.catch((error) => {
console.error('错误:', error);
});
}
HTML 代码:
<form onsubmit="submitForm(); return false;">
<input type="file" id="fileInput" multiple>
<button type="submit">提交</button>
</form>
2. 附件作为文件名或路径传递(传递文件路径)
在这种情况下,前端直接传递文件路径或文件名,后端根据存储位置生成完整的 URL。
后端实现
2.1. 数据模型
与上面类似,但我们不再处理文件上传,而是直接接收文件路径。
class Data(models.Model):
name = models.CharField(max_length=255)
data = models.JSONField() # 存储嵌套的 JSON 数据
created_at = models.DateTimeField(auto_now_add=True)
attachments = models.JSONField(default=list) # 附件字段(存储文件路径)
def save(self, *args, **kwargs):
self.save_attachments_with_urls()
super().save(*args, **kwargs)
def save_attachments_with_urls(self):
"""
处理附件路径,生成绝对 URL
"""
base_url = "https://s3.amazonaws.com/my-bucket/"
for attachment in self.attachments:
file_name = attachment.get("file_name")
if file_name:
# 如果前端传递的是文件路径,生成 URL
attachment["url"] = f"{base_url}{file_name}"
2.2. 数据接收与处理视图
class DataView(APIView):
def post(self, request, *args, **kwargs):
data = request.data # 获取请求中的数据
attachments = data.get("attachments", [])
# 如果附件是文件路径(而非文件),处理它们并生成 URL
for attachment in attachments:
file_name = attachment.get("file_name")
if file_name:
attachment["url"] = f"https://s3.amazonaws.com/my-bucket/{file_name}"
# 将附件添加到数据
data["attachments"] = attachments
# 反序列化并保存数据
serializer = DataSerializer(data=data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
前端实现
前端直接传递文件名或路径,无需上传文件。
function submitForm() {
let data = {
name: "示例数据",
data: {
"大类1": {
"小类1": {
"参数1": "值1"
}
}
},
attachments: [
{ "file_name": "file1.pdf" },
{ "file_name": "file2.pdf" }
]
};
fetch('/api/data/', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
})
.then(response => response.json())
.then(data => {
console.log('成功:', data);
})
.catch((error) => {
console.error('错误:', error);
});
}
总结
1. 附件作为文件上传
- 前端:使用
FormData
上传文件,其他数据作为 JSON 传递。 - 后端:使用 Django 的
FileField
处理文件上传,生成文件的绝对 URL 后存储。
2. 附件作为文件名或路径传递
- 前端:直接传递文件名或路径。
- 后端:根据传递的路径生成文件的绝对 URL,并将其保存在数据库中。
通过这种方式,无论是上传文件还是传递文件路径,数据和附件都可以顺利处理,并且附件的 URL 会被存储到数据库中供后续查询。
理解了你的需求,你想把前端传递的数据(如附件文件名)处理成符合层级结构的格式,并存储到数据库中。你已经有了数据结构,其中附件的值是文件名的数组,而文件名需要上传到 S3 并存储到附件表中,最后在保存到数据库时,将附件文件的 S3 存储路径存储到对应的大类、小类、参数数据中。
解决方案概述:
- 解析前端数据:你传递的结构数据是按层级组织的,包括大类、小类、参数、附件。
- 处理附件文件:需要根据文件名从上传的文件中找到对应的文件,上传到 S3,并返回 S3 键。
- 保存数据:将上传后的文件 S3 键存储在附件模型中,最终将数据按照层级结构存储,并在每个参数下插入附件的序列化对象。
代码实现:
1. 数据库模型
你已经有了一个 Attachment
模型存储附件,附件表的字段可能包含 file_name
, s3objectkey
和 file_url
(URL 地址),如:
class Attachment(models.Model):
file_name = models.CharField(max_length=255)
s3objectkey = models.CharField(max_length=255) # S3 object key
file_url = models.URLField() # 完整的文件 URL
def get_absolute_url(self):
return self.file_url
2. 上传文件到 S3
假设你有一个 W3Storage
类来处理文件上传到 S3,并获取 s3objectkey
。
class W3Storage:
def putobject(self, file):
# 处理文件上传到 S3
# 假设上传成功后返回 S3 object key
return f"s3://bucket/{file.name}"
def get_absolute_url(self, s3objectkey):
# 根据 s3objectkey 生成文件的完整 URL
return f"https://example.com/{s3objectkey}"
3. 处理数据与附件上传
接下来,你需要一个方法来处理数据并上传文件,下面是一个完整的序列化器实现:
class BulkDataItemSerializer(serializers.ListSerializer):
"""
序列化器:批量处理数据项,包括文件的上传和层级数据的创建。
"""
child = DataItemSerializer()
def create(self, validated_data):
"""
批量创建 DataItem 并处理文件上传。
"""
storage = W3Storage() # 初始化 S3 存储类
new_items = [] # 存储创建的 DataItem 对象
for item_data in validated_data:
data_content = item_data.get('data_content', {})
# 处理附件字段
if 'attachments' in data_content:
attachments = data_content['attachments']
processed_attachments = {}
# 处理每个附件项
for attachment_key, file_names in attachments.items():
# file_names 是一个列表,可能是文件名的列表
s3_keys = []
for file_name in file_names:
# 从 request.FILES 获取文件对象
matched_file = self.context['files'].get(file_name)
if matched_file:
# 上传文件并获取 S3 键
s3objectkey = storage.putobject(matched_file)
# 获取文件的完整 URL
file_url = storage.get_absolute_url(s3objectkey)
# 创建附件并保存到数据库
attachment = Attachment.objects.create(
file_name=file_name,
s3objectkey=s3objectkey,
file_url=file_url
)
# 保存附件的 S3 键
s3_keys.append(attachment.get_absolute_url())
# 将上传后的文件的 URL 更新到 data_content 字段
processed_attachments[attachment_key] = s3_keys
# 更新 data_content 中的附件信息
data_content['attachments'] = processed_attachments
item_data['data_content'] = data_content
# 创建新的 DataItem 对象
new_item = DataItem.objects.create(**item_data)
new_items.append(new_item)
return new_items
4. 解析层级数据并存储到数据库
你的前端数据结构看起来是这样的:
{
"大类1": {
"小类1": {
"参数1": "值1",
"附件": ["文件名1", "文件名2"]
},
"小类2": {
"参数2": "值2",
"附件": ["文件名3"]
}
},
"大类2": {
"小类3": {
"参数3": "值3"
}
}
}
在这个数据中,每个大类下的小类,参数字段下可能会包含 附件
字段,这个字段是文件名的数组。
5. 前端文件上传
前端传递的数据格式应该是这样:
{
"data_content": {
"大类1": {
"小类1": {
"参数1": "值1",
"附件": ["file1.jpg", "file2.jpg"]
},
"小类2": {
"参数2": "值2",
"附件": ["file3.jpg"]
}
}
},
"files": {
"file1.jpg": "file1_object",
"file2.jpg": "file2_object",
"file3.jpg": "file3_object"
}
}
其中 files
是前端上传的文件对象集合,文件名是键,文件内容是值(假设为文件对象)。
6. 完整的代码和注释
class BulkDataItemSerializer(serializers.ListSerializer):
"""
列表序列化器:批量处理数据项,包括文件的上传和层级数据的创建。
"""
child = DataItemSerializer()
def create(self, validated_data):
"""
批量创建 DataItem 并处理文件上传。
"""
storage = W3Storage() # 初始化 S3 存储类
new_items = [] # 存储创建的 DataItem 对象
# 遍历每条数据
for item_data in validated_data:
data_content = item_data.get('data_content', {})
# 处理附件字段
if 'attachments' in data_content:
attachments = data_content['attachments']
processed_attachments = {}
# 处理每个附件项
for attachment_key, file_names in attachments.items():
s3_keys = [] # 用于保存上传文件的 S3 键
# 遍历附件文件名
for file_name in file_names:
matched_file = self.context['files'].get(file_name)
if matched_file:
# 上传文件到 S3
s3objectkey = storage.putobject(matched_file)
file_url = storage.get_absolute_url(s3objectkey)
# 创建附件对象并保存
attachment = Attachment.objects.create(
file_name=file_name,
s3objectkey=s3objectkey,
file_url=file_url
)
# 保存文件的 S3 URL
s3_keys.append(attachment.get_absolute_url())
# 更新 data_content 字段中的附件信息
processed_attachments[attachment_key] = s3_keys
data_content['attachments'] = processed_attachments
item_data['data_content'] = data_content
# 创建新的 DataItem 并保存
new_item = DataItem.objects.create(**item_data)
new_items.append(new_item)
return new_items
关键步骤:
- 遍历数据:首先遍历每条数据,根据大类、小类、参数的层级结构存储数据。
- 处理附件:对于每条数据中的附件字段,判断它是单个文件还是多个文件,然后逐个文件上传到 S3。
- 上传文件到 S3:使用
W3Storage.putobject
方法上传文件并获取文件的 S3 键,然后在附件表中存储文件的s3objectkey
和file_url
。 - 保存数据到数据库:处理完附件后,更新数据中的附件字段,最后创建并保存数据项(
DataItem
)到数据库。
结果:
前端的数据结构会按照层级顺序存储,并且每个附件字段会被替换成文件的 S3 URL。你可以在数据库中看到每个大类、小类、参数的数据,以及附件的 URL 信息。
这个方案通过添加 hash
和 引用次数
字段来优化附件管理,避免重复上传和存储同样的文件,同时能有效管理附件的生命周期。以下是完整的实现思路及代码,包括数据库设计、文件上传、数据写入逻辑,以及删除或修改操作。
数据库设计
附件模型 (Attachment)
from django.db import models
class Attachment(models.Model):
file_name = models.CharField(max_length=255) # 原始文件名
s3_object_key = models.CharField(max_length=512) # S3 object key
file_url = models.URLField() # S3 文件绝对 URL
file_hash = models.CharField(max_length=64, unique=True) # 文件的唯一 Hash 值 (MD5/SHA256)
reference_count = models.PositiveIntegerField(default=1) # 引用次数
def delete_s3_file(self):
"""
删除 S3 中存储的文件。
"""
from w3storage import W3Storage
storage = W3Storage()
storage.delete_object(self.s3_object_key)
def delete(self, *args, **kwargs):
"""
重写 delete 方法,确保删除时同步删除 S3 文件。
"""
self.delete_s3_file()
super().delete(*args, **kwargs)
文件处理逻辑
文件上传时的哈希计算
在上传文件到 S3 之前,先计算文件的哈希值 (hash
),以检测是否有重复的文件。
import hashlib
def calculate_file_hash(file):
"""
计算文件的哈希值 (SHA256)。
:param file: 文件对象
:return: 文件的 SHA256 哈希值
"""
sha256 = hashlib.sha256()
for chunk in file.chunks():
sha256.update(chunk)
return sha256.hexdigest()
数据写入逻辑
在数据写入时,检查 Attachment
表中是否已经存在相同的哈希值。如果存在,直接更新数据中的附件信息;否则,上传文件到 S3 并创建新的附件记录。
from .models import Attachment
from w3storage import W3Storage
def handle_attachments(data, files):
"""
处理数据中的附件。
:param data: 数据结构,包含附件字段
:param files: 上传的文件字典
:return: 处理后的数据结构
"""
storage = W3Storage()
def process_attachment(file):
# 计算文件哈希
file_hash = calculate_file_hash(file)
# 检查是否已存在相同文件
attachment, created = Attachment.objects.get_or_create(file_hash=file_hash, defaults={
'file_name': file.name,
's3_object_key': storage.putobject(file.name, file),
'file_url': storage.get_absolute_url(file.name),
'reference_count': 1,
})
if not created:
# 如果文件已存在,增加引用次数
attachment.reference_count += 1
attachment.save()
return {
'file_name': attachment.file_name,
'file_url': attachment.file_url,
'hash': attachment.file_hash,
}
def traverse_and_process(data):
# 递归处理层级数据结构中的附件
for key, value in data.items():
if isinstance(value, dict):
traverse_and_process(value)
elif key == "附件":
processed_attachments = []
for file_name in value:
if file_name in files:
processed_attachments.append(process_attachment(files[file_name]))
data[key] = processed_attachments
traverse_and_process(data)
return data
示例调用
files = {
"file1.jpg": uploaded_file1,
"file2.jpg": uploaded_file2,
}
data = {
"大类1": {
"小类1": {
"参数1": "值1",
"附件": ["file1.jpg", "file2.jpg"]
},
"小类2": {
"参数2": "值2",
"附件": ["file1.jpg"] # 重复文件
}
}
}
processed_data = handle_attachments(data, files)
修改和删除逻辑
修改引用次数
在数据被修改时,对附件的引用次数进行更新。如果引用次数为 0,则删除文件记录以及对应的 S3 文件。
def update_attachment_references(file_hashes_to_remove):
"""
更新附件的引用次数。
:param file_hashes_to_remove: 要减少引用次数的文件哈希列表
"""
for file_hash in file_hashes_to_remove:
try:
attachment = Attachment.objects.get(file_hash=file_hash)
attachment.reference_count -= 1
if attachment.reference_count <= 0:
attachment.delete() # 自动删除 S3 文件
else:
attachment.save()
except Attachment.DoesNotExist:
continue
序列化器中的处理
如果使用 DRF
的序列化器,可以在 save
方法中调用上述逻辑。
from rest_framework import serializers
class NestedDataSerializer(serializers.Serializer):
data = serializers.JSONField()
files = serializers.DictField(child=serializers.FileField())
def save(self, **kwargs):
data = self.validated_data['data']
files = self.validated_data['files']
processed_data = handle_attachments(data, files)
# 存储处理后的数据到数据库
# Your model saving logic here
return processed_data
删除操作示例
file_hashes_to_remove = ["existing_file_hash1", "existing_file_hash2"]
update_attachment_references(file_hashes_to_remove)
整体逻辑总结
- 哈希计算:使用文件的哈希值避免重复上传。
- 引用计数管理:每次新增文件时增加引用次数,修改或删除时减少引用次数。
- S3 文件删除:当引用次数为 0 时,删除附件记录和对应的 S3 文件。
- 数据与附件关联:通过
hash
和引用逻辑简化数据和文件的绑定关系。
在修改操作时,需要先对原有的附件数据进行处理:减少引用次数,并在引用为 0 时删除对应记录;然后处理新的附件数据,增加引用次数或创建新的记录。以下是完整的实现方案和代码。
修改操作逻辑
def update_attachments(existing_data, updated_data, files):
"""
更新附件的引用计数,同时更新数据中的附件信息。
:param existing_data: 修改前的数据,包含附件信息
:param updated_data: 修改后的数据,包含附件信息
:param files: 上传的文件字典
:return: 处理后的新数据
"""
def get_attachments(data):
"""
提取数据中的所有附件哈希列表。
:param data: 数据结构
:return: 附件哈希列表
"""
attachments = []
def traverse(data):
for key, value in data.items():
if isinstance(value, dict):
traverse(value)
elif key == "附件" and isinstance(value, list):
for item in value:
if isinstance(item, dict) and 'hash' in item:
attachments.append(item['hash'])
traverse(data)
return attachments
# 1. 提取现有数据的附件哈希
existing_hashes = set(get_attachments(existing_data))
# 2. 使用新数据处理附件,获取新的数据
updated_data = handle_attachments(updated_data, files)
# 3. 提取新数据的附件哈希
updated_hashes = set(get_attachments(updated_data))
# 4. 计算需要减少引用的附件和新增引用的附件
hashes_to_remove = existing_hashes - updated_hashes
hashes_to_add = updated_hashes - existing_hashes
# 5. 更新引用计数
update_attachment_references(hashes_to_remove)
for hash_to_add in hashes_to_add:
try:
attachment = Attachment.objects.get(file_hash=hash_to_add)
attachment.reference_count += 1
attachment.save()
except Attachment.DoesNotExist:
pass # 这不应该发生,所有新附件应已创建
return updated_data
修改操作调用示例
existing_data = {
"大类1": {
"小类1": {
"参数1": "值1",
"附件": [
{"file_name": "file1.jpg", "file_url": "url1", "hash": "hash1"},
{"file_name": "file2.jpg", "file_url": "url2", "hash": "hash2"}
]
}
}
}
updated_data = {
"大类1": {
"小类1": {
"参数1": "新值1",
"附件": [
"file2.jpg", # 仍然是旧文件
"file3.jpg" # 新增文件
]
}
}
}
files = {
"file3.jpg": uploaded_file3
}
new_data = update_attachments(existing_data, updated_data, files)
修改逻辑步骤
- 提取原有数据中的附件:通过遍历原有数据,获取所有的附件哈希。
- 更新新数据的附件信息:
- 处理新数据中的附件,将文件上传到 S3,并生成附件记录。
- 提取新数据的附件哈希列表。
- 对比引用差异:
- 减少引用次数:对
hashes_to_remove
中的附件,减少引用,引用为 0 时删除记录。 - 增加引用次数:对
hashes_to_add
中的附件,增加引用次数。
- 减少引用次数:对
- 返回更新后的数据:返回处理完毕的新数据结构。
删除逻辑的改进
删除数据时同样需要减少附件的引用次数,如果引用次数为 0,则删除附件记录和 S3 文件。
def delete_attachments(data):
"""
删除数据中的附件引用。
:param data: 数据结构
"""
def get_attachments(data):
attachments = []
def traverse(data):
for key, value in data.items():
if isinstance(value, dict):
traverse(value)
elif key == "附件" and isinstance(value, list):
for item in value:
if isinstance(item, dict) and 'hash' in item:
attachments.append(item['hash'])
traverse(data)
return attachments
hashes_to_remove = set(get_attachments(data))
update_attachment_references(hashes_to_remove)
调用示例:
data_to_delete = {
"大类1": {
"小类1": {
"参数1": "值1",
"附件": [
{"file_name": "file1.jpg", "file_url": "url1", "hash": "hash1"}
]
}
}
}
delete_attachments(data_to_delete)
优化点
- 引用管理:通过
hash
对附件引用进行唯一标识,避免重复存储和上传。 - 批量处理:
bulk_update
和bulk_create
可优化数据库操作。 - 并发支持:在高并发场景下,需要为
Attachment
表的reference_count
字段增加乐观锁机制(如F
表达式)。
这样设计和实现,可以确保附件在数据修改、删除时正确引用和释放,同时减少存储和网络传输的冗余。
// 统计rowspan并打平数据
function calculateRowspansAndFlatten(data) {
// 计算rowspan的数据
const dataWithRowspans = JSON.parse(JSON.stringify(data)); // 深拷贝以不影响原数据
addRowspansToData(dataWithRowspans);
// 打平数据
const flattenedData = flattenData(data);
return {
dataWithRowspans,
flattenedData
};
}
// 计算并添加rowspan到原数据
function addRowspansToData(data) {
for (const category in data) {
if (!data.hasOwnProperty(category)) continue;
let categoryRowspan = 0;
const subcategories = data[category];
for (const subcategory in subcategories) {
if (!subcategories.hasOwnProperty(subcategory)) continue;
// 计算小类的rowspan - 只计算参数的数量
const subcategoryRowspan = Object.keys(subcategories[subcategory]).length;
categoryRowspan += subcategoryRowspan;
// 添加rowspan到小类
subcategories[subcategory]._rowspan = subcategoryRowspan;
}
// 添加rowspan到大类
data[category]._rowspan = categoryRowspan;
}
}
// 打平数据并保留层级信息
function flattenData(data) {
const flattened = [];
// 处理参数和附件
const processParams = (category, subcategory, params) => {
for (const param in params) {
if (!params.hasOwnProperty(param)) continue;
const value = params[param];
if (param === "附件" && Array.isArray(value)) {
value.forEach(attachment => {
flattened.push({
level1: category,
level2: subcategory,
level3: param,
value: attachment
});
});
} else {
flattened.push({
level1: category,
level2: subcategory,
level3: param,
value: value
});
}
}
};
// 遍历数据结构
for (const category in data) {
if (!data.hasOwnProperty(category)) continue;
for (const subcategory in data[category]) {
if (!data[category].hasOwnProperty(subcategory)) continue;
processParams(category, subcategory, data[category][subcategory]);
}
}
return flattened;
}
// 处理数据
const { dataWithRowspans, flattenedData } = calculateRowspansAndFlatten(dataToDelete);