三层表头设计数据库方案

Posted by Shallow Dreameron October 16, 2024

是的,除了使用单个数据表和一个表头字段对应表的方案,还有其他方式能够实现当前需求。可以根据不同的需求和复杂性选择更适合的方案。以下是几种不同的设计方案:

方案一:多表设计(范式化数据库设计)

这是典型的范式化设计,通过多个表来处理不同层次的数据以及表头映射关系。这种设计能更好地维护数据的完整性和查询性能,但复杂性相对较高。

表设计

  1. Category表(大类)
    • 记录 xlsx 的第一层级——大类信息。
  2. Subcategory表(小类)
    • 记录第二层级——小类信息,外键关联 Category
  3. Parameter表(参数)
    • 记录第三层级——参数信息,外键关联 Subcategory
  4. Data表(数据表)
    • 存储实际的业务数据,与参数表关联,数据更分层且可拓展。
  5. FieldMapping表
    • 保存 xlsx 表头与数据库字段的对应关系及校验规则。

Django模型示例

from django.db import models

class Category(models.Model):
    name = models.CharField(max_length=255, unique=True)

class Subcategory(models.Model):
    name = models.CharField(max_length=255, blank=True, null=True)
    category = models.ForeignKey(Category, on_delete=models.CASCADE)

class Parameter(models.Model):
    name = models.CharField(max_length=255)
    subcategory = models.ForeignKey(Subcategory, on_delete=models.CASCADE, null=True, blank=True)

class Data(models.Model):
    parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE)
    value = models.CharField(max_length=255)

class FieldMapping(models.Model):
    header_name = models.CharField(max_length=255, unique=True)
    field_name = models.CharField(max_length=255)
    validation_regex = models.CharField(max_length=500, blank=True, null=True)
    data_type = models.CharField(max_length=50, choices=[('string', 'String'), ('integer', 'Integer'), ...])
    required = models.BooleanField(default=False)

导入与查询

导入时,数据首先会分别插入到 CategorySubcategoryParameter 表中,最终存储到 Data 表中。查询时,前端通过传入大类和参数来查询数据。

优点
  • 数据规范化,层次关系清晰,易于维护。
  • 能更好地处理大规模查询和过滤需求。
缺点
  • 设计复杂,需要多个表之间的关联,查询较复杂。

方案二:扁平化设计(字段独立存储)

这种设计将 xlsx 文件的层级信息“平铺”,每一层级(大类、小类、参数)作为数据表的字段独立存储。每一行数据存储时会将三层级信息平铺存入字段中,并通过映射表来控制表头和字段的对应关系。

表设计

  1. Data表
    • 直接存储 xlsx 文件的三层表头字段以及对应的数据。
  2. FieldMapping表
    • 保存 xlsx 表头与数据库字段的对应关系。

Django模型示例

from django.db import models

class Data(models.Model):
    category = models.CharField(max_length=255)
    subcategory = models.CharField(max_length=255, blank=True, null=True)
    parameter = models.CharField(max_length=255)
    value = models.CharField(max_length=255)

class FieldMapping(models.Model):
    header_name = models.CharField(max_length=255, unique=True)
    field_name = models.CharField(max_length=255)
    validation_regex = models.CharField(max_length=500, blank=True, null=True)
    data_type = models.CharField(max_length=50, choices=[('string', 'String'), ('integer', 'Integer'), ...])
    required = models.BooleanField(default=False)

导入与查询

导入时,根据 FieldMapping 进行校验,然后将数据分层存储到 Data 表中。查询时,直接根据三层字段(categorysubcategoryparameter)进行过滤。

优点
  • 表结构简单,易于理解和操作。
  • 数据层次简单,查询时可以直接使用字段过滤。
缺点
  • 数据冗余较大,重复的 categorysubcategory 信息在每行数据中都会存储。
  • 如果未来层级发生变化,修改表结构的代价较高。

方案三:混合设计(JSON字段存储层次信息 + 映射表)

此方案是一个折衷设计,利用 JSON 字段来存储数据的层次信息,同时通过映射表来控制 xlsx 表头和数据表字段之间的关系。

表设计

  1. Data表
    • 使用 JSON 字段存储层次化的表头和数据值。
  2. FieldMapping表
    • 保存表头与 JSON 字段的映射关系及校验规则。

Django模型示例

from django.db import models

class Data(models.Model):
    structured_data = models.JSONField()

class FieldMapping(models.Model):
    header_name = models.CharField(max_length=255, unique=True)
    json_path = models.CharField(max_length=255)  # 记录在 JSON 中的路径
    validation_regex = models.CharField(max_length=500, blank=True, null=True)
    data_type = models.CharField(max_length=50, choices=[('string', 'String'), ('integer', 'Integer'), ...])
    required = models.BooleanField(default=False)

导入与查询

导入时,将数据解析成 JSON 并根据映射表将数据结构化存储。查询时,可以使用 JSON 字段的路径(json_path)来提取或过滤数据。

优点
  • 数据结构化存储,同时支持灵活的表头扩展。
  • JSON 字段方便存储动态和复杂结构的数据。
缺点
  • JSON 字段的查询性能较差,复杂查询可能需要额外的处理。
  • 需要对 JSON 的层次关系做出清晰的定义和规范。

结论

  • 方案一(多表设计):适合数据量大且查询复杂的场景,规范化设计适合高性能查询。
  • 方案二(扁平化设计):适合数据层次关系较简单,数据冗余可以接受的场景,简单易用。
  • 方案三(混合设计):适合数据结构动态变化频繁且需要灵活扩展的场景,JSON 字段能提供更多的灵活性。

根据你的需求,可以权衡数据的复杂性、查询需求和扩展性来选择合适的设计方案。


方案1测试

在使用多表设计时,如果每一条数据是由 大类(Category小类(Subcategory参数(Parameter 的组合,并且每条数据还需要根据 人员(User时间(Date 来区分,那么我们可以扩展现有的多表设计,将 人员时间 作为数据的区分维度。

具体思路是:

  1. 保持原有的层次结构,即大类、小类、参数组合唯一确定一组属性。
  2. 增加用户和时间字段,以确保数据是按用户和时间维度来区分的。

表设计

  1. Category表(大类):记录大类信息。
  2. Subcategory表(小类):记录小类信息,外键关联 Category 表。
  3. Parameter表(参数):记录参数信息,外键关联 Subcategory 表。
  4. User表:记录用户信息(可选:可以直接使用 Django 自带的 User 表)。
  5. Data表:存储业务数据,并通过 categorysubcategoryparameteruserdate 的组合来唯一确定一条数据。

Django 模型示例

from django.db import models
from django.contrib.auth.models import User

class Category(models.Model):
    name = models.CharField(max_length=255, unique=True)
    def __str__(self):
        return self.name

class Subcategory(models.Model):
    name = models.CharField(max_length=255, blank=True, null=True)
    category = models.ForeignKey(Category, on_delete=models.CASCADE)
    def __str__(self):
        return f"{self.category.name} - {self.name}"

class Parameter(models.Model):
    name = models.CharField(max_length=255)
    subcategory = models.ForeignKey(Subcategory, on_delete=models.CASCADE)
    def __str__(self):
        return f"{self.subcategory.name} - {self.name}"

class Data(models.Model):
    category = models.ForeignKey(Category, on_delete=models.CASCADE)
    subcategory = models.ForeignKey(Subcategory, on_delete=models.CASCADE, null=True, blank=True)
    parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE)

    # 数据的具体值
    value = models.CharField(max_length=255)

    # 用户信息,关联到 Django 自带的 User 模型
    user = models.ForeignKey(User, on_delete=models.CASCADE)

    # 数据的时间信息
    date = models.DateField()

    class Meta:
        # category、subcategory、parameter、user 和 date 组合唯一确定一条数据
        unique_together = ('category', 'subcategory', 'parameter', 'user', 'date')

    def __str__(self):
        return f"{self.category.name} - {self.subcategory.name} - {self.parameter.name} ({self.user.username} on {self.date})"

数据导入和重复数据判断

在导入数据时,首先需要判断是否已经存在相同的 categorysubcategoryparameteruserdate 组合的数据。如果存在,就可以更新这条数据;如果不存在,则创建新数据。

导入逻辑示例

from .models import Data, Category, Subcategory, Parameter, User
from django.core.exceptions import ObjectDoesNotExist

def import_data(category_name, subcategory_name, parameter_name, value, user_id, date):
    try:
        # 查找 Category
        category = Category.objects.get(name=category_name)
        
        # 查找 Subcategory,如果为空则使用 None
        subcategory = Subcategory.objects.get(category=category, name=subcategory_name) if subcategory_name else None
        
        # 查找 Parameter
        parameter = Parameter.objects.get(subcategory=subcategory, name=parameter_name)
        
        # 查找 User
        user = User.objects.get(id=user_id)
        
        # 查找是否已有相同组合的数据
        data, created = Data.objects.get_or_create(
            category=category,
            subcategory=subcategory,
            parameter=parameter,
            user=user,
            date=date,
            defaults={'value': value}  # 如果新建数据,设定默认值
        )
        
        if not created:
            # 如果数据已存在,更新值
            data.value = value
            data.save()
        return created
    except (Category.DoesNotExist, Subcategory.DoesNotExist, Parameter.DoesNotExist, User.DoesNotExist):
        print("对应的类别、参数或用户不存在")
        return False

查询逻辑

查询数据时,可以通过 大类(Category小类(Subcategory参数(Parameter用户(User时间(Date 来过滤数据。

查询逻辑示例

from django.db.models import Q

def query_data(category_name, parameter_name, user_id, date_range=None):
    """
    根据大类、参数、用户及时间范围查询数据
    :param category_name: 大类名称
    :param parameter_name: 参数名称
    :param user_id: 用户 ID
    :param date_range: 时间范围 (开始日期, 结束日期),可选
    :return: 查询结果
    """
    try:
        category = Category.objects.get(name=category_name)
        user = User.objects.get(id=user_id)
        parameter = Parameter.objects.get(name=parameter_name)

        query = Data.objects.filter(
            category=category,
            parameter=parameter,
            user=user
        )

        if date_range:
            query = query.filter(date__range=date_range)

        return query
    except (Category.DoesNotExist, Parameter.DoesNotExist, User.DoesNotExist):
        return None

优化和扩展

  1. 索引优化:可以为 categorysubcategoryparameteruserdate 字段添加索引,以提高查询性能。
  2. 数据扩展:可以进一步扩展 Data 表,增加更多的字段来记录其他相关信息,比如数据来源、状态等。
  3. 接口扩展:可以基于 Django REST Framework 编写 RESTful API 来实现增删改查接口。

总结

在这个设计中,数据的唯一性 通过大类、小类、参数、人员、时间组合来保证。这样既能体现数据的层次结构,又能确保每条数据具有时间和人员维度的区分,满足你的需求。


为了满足你的需求,其中 大类小类参数 的组合是唯一的,另外,人员时间 的组合也是唯一的,这意味着每条记录的数据是由这两个维度共同确定的。

表设计

  1. Category 表(大类):存储大类信息。
  2. Subcategory 表(小类):存储小类信息,外键关联 Category
  3. Parameter 表(参数):存储参数信息,外键关联 Subcategory
  4. User 表(人员):存储用户信息(可以使用 Django 自带的 User 模型)。
  5. Data 表(数据):存储实际业务数据,外键关联 人员时间,通过 大类小类参数 组合唯一确定一组属性,通过 人员时间 来区分不同数据。

Django 模型设计

from django.db import models
from django.contrib.auth.models import User

class Category(models.Model):
    name = models.CharField(max_length=255, unique=True)
    def __str__(self):
        return self.name

class Subcategory(models.Model):
    name = models.CharField(max_length=255, blank=True, null=True)
    category = models.ForeignKey(Category, on_delete=models.CASCADE)
    def __str__(self):
        return f"{self.category.name} - {self.name}"

class Parameter(models.Model):
    name = models.CharField(max_length=255)
    subcategory = models.ForeignKey(Subcategory, on_delete=models.CASCADE)
    def __str__(self):
        return f"{self.subcategory.name} - {self.name}"

class Data(models.Model):
    category = models.ForeignKey(Category, on_delete=models.CASCADE)
    subcategory = models.ForeignKey(Subcategory, on_delete=models.CASCADE, null=True, blank=True)
    parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE)
    
    # 业务数据的具体值
    value = models.CharField(max_length=255)

    # 人员和时间
    user = models.ForeignKey(User, on_delete=models.CASCADE)
    date = models.DateField()

    class Meta:
        # category、subcategory、parameter 组合唯一,并且 user 和 date 组合唯一
        unique_together = (('category', 'subcategory', 'parameter'), ('user', 'date'))

    def __str__(self):
        return f"{self.category.name} - {self.subcategory.name} - {self.parameter.name} ({self.user.username} on {self.date})"

重复数据判断及增删改查逻辑

在数据导入时,可以根据 大类、小类、参数组合人员、时间组合 的唯一性来判断数据是否已经存在。

导入逻辑示例

from .models import Data, Category, Subcategory, Parameter, User

def import_data(category_name, subcategory_name, parameter_name, value, user_id, date):
    try:
        # 查找或创建 Category
        category, _ = Category.objects.get_or_create(name=category_name)
        
        # 查找或创建 Subcategory
        subcategory, _ = Subcategory.objects.get_or_create(name=subcategory_name, category=category)
        
        # 查找或创建 Parameter
        parameter, _ = Parameter.objects.get_or_create(name=parameter_name, subcategory=subcategory)
        
        # 查找 User
        user = User.objects.get(id=user_id)
        
        # 查找是否已存在相同的 category, subcategory, parameter, user, date 组合的数据
        data, created = Data.objects.get_or_create(
            category=category,
            subcategory=subcategory,
            parameter=parameter,
            user=user,
            date=date,
            defaults={'value': value}  # 如果新建数据,设定默认值
        )
        
        if not created:
            # 如果数据已存在,更新值
            data.value = value
            data.save()
        return created
    except (Category.DoesNotExist, Subcategory.DoesNotExist, Parameter.DoesNotExist, User.DoesNotExist):
        print("对应的类别、参数或用户不存在")
        return False

增删改查 API

可以使用 Django REST Framework (DRF) 实现增删改查接口。

from rest_framework import viewsets
from .models import Data
from .serializers import DataSerializer

class DataViewSet(viewsets.ModelViewSet):
    queryset = Data.objects.all()
    serializer_class = DataSerializer

# serializers.py
from rest_framework import serializers
from .models import Data

class DataSerializer(serializers.ModelSerializer):
    class Meta:
        model = Data
        fields = '__all__'

表头字段映射表

如果还需要表头字段映射,可以设计一个单独的表来存储 表头与字段的对应关系,以及字段的校验正则表达式:

class HeaderFieldMapping(models.Model):
    xlsx_header = models.CharField(max_length=255)  # 表头
    model_field = models.CharField(max_length=255)  # 对应数据表字段
    validation_regex = models.CharField(max_length=255, blank=True, null=True)  # 校验正则

    def __str__(self):
        return f"{self.xlsx_header} -> {self.model_field}"

这样你就可以在数据导入过程中,动态地根据 xlsx 文件的表头找到对应的数据库字段,并进行数据校验。


方案3测试

为了简化设计为只有一个数据表和一个表头字段对应表的方案,以下是具体的模型设计。

数据表 (Data)

这个表存储了所有从 xlsx 文件导入的数据,不再细分为 CategorySubcategoryParameter 表。所有导入的数据将直接存储在一个表中,而具体的列对应关系将通过 FieldMapping 表来管理。

from django.db import models

class Data(models.Model):
    # 动态字段存储的方式,使用 JSONField 来适应不同表头的数据
    data = models.JSONField()

    # 创建时间和更新时间
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    def __str__(self):
        return f"Data ID: {self.id}"

表头字段对应表 (FieldMapping)

这个表将 xlsx 文件的表头与 Data 表中 JSON 字段的具体键名(字段名)进行映射,并提供正则表达式校验规则和数据类型信息。

from django.db import models

class FieldMapping(models.Model):
    HEADER_TYPES = [
        ('string', 'String'),
        ('integer', 'Integer'),
        ('float', 'Float'),
        ('date', 'Date'),
        ('boolean', 'Boolean'),
    ]

    # 表头名称
    header_name = models.CharField(max_length=255, unique=True, db_index=True)

    # Data 表中的字段名称(用于 JSON key)
    field_name = models.CharField(max_length=255)

    # 校验正则表达式(可以为空)
    validation_regex = models.CharField(max_length=500, blank=True, null=True)

    # 字段数据类型
    data_type = models.CharField(max_length=50, choices=HEADER_TYPES)

    # 是否为必填字段
    required = models.BooleanField(default=False)

    def __str__(self):
        return f"{self.header_name} -> {self.field_name}"

导入逻辑

在导入 xlsx 文件时,根据 FieldMapping 表来确定 xlsx 表头与 Data 表中 JSON 字段的对应关系,然后将数据插入或更新到 Data 表中。使用 JSON 字段来存储动态的数据列,方便处理不同格式的表头。

import re
from .models import FieldMapping, Data

def validate_and_import_xlsx_data(parsed_data):
    """
    根据 FieldMapping 模型进行数据的校验和导入
    :param parsed_data: 解析后的 xlsx 数据,每个字典代表一行数据
    :return: 成功导入的记录数量
    """
    success_count = 0

    # 获取所有字段映射
    field_mappings = FieldMapping.objects.all()
    field_mapping_dict = {mapping.header_name: mapping for mapping in field_mappings}

    for row in parsed_data:
        valid = True
        validated_data = {}

        # 遍历每一列进行验证
        for header_name, value in row.items():
            mapping = field_mapping_dict.get(header_name)

            if not mapping:
                print(f"未知的表头字段: {header_name}")
                continue

            field_name = mapping.field_name
            validation_regex = mapping.validation_regex
            data_type = mapping.data_type
            required = mapping.required

            # 检查是否为必填字段
            if required and not value:
                print(f"缺少必填字段:{header_name}")
                valid = False
                break

            # 根据正则表达式进行校验
            if validation_regex and value:
                if not re.match(validation_regex, str(value)):
                    print(f"字段 {header_name} 未通过校验")
                    valid = False
                    break

            # 根据数据类型进行转换
            if value:
                try:
                    if data_type == 'integer':
                        value = int(value)
                    elif data_type == 'float':
                        value = float(value)
                    elif data_type == 'boolean':
                        value = bool(value)
                    elif data_type == 'date':
                        value = parse_date(value)  # 假设有日期转换函数
                except ValueError:
                    print(f"字段 {header_name} 无法转换为 {data_type}")
                    valid = False
                    break

            # 如果通过校验,将数据添加到字典中
            validated_data[field_name] = value

        if valid:
            # 插入到 Data 表中的 JSON 字段
            Data.objects.create(data=validated_data)
            success_count += 1

    return success_count

查询逻辑

查询数据时,通过前端传入的参数在 data 字段中进行过滤,例如通过大类和参数的字段值进行过滤。

from django.db.models import Q

def query_data(category, parameter):
    """
    根据前端传入的大类和参数查询数据
    :param category: 大类名称
    :param parameter: 参数名称
    :return: 查询结果
    """
    # 根据 JSON 字段进行查询
    results = Data.objects.filter(
        Q(data__category=category) & Q(data__parameter=parameter)
    )
    return results

总结

  • Data 使用 JSON 字段来存储所有数据行,每行数据根据表头字段映射到不同的 JSON 键名。
  • FieldMapping 记录了 xlsx 文件表头与数据库 JSON 字段的对应关系,以及字段的校验规则和数据类型。
  • 导入时 根据 FieldMapping 进行验证和字段映射,导入后将数据存入 Data 表中。
  • 查询时 根据 JSON 字段进行过滤,前端可以传入大类和参数等条件来查询数据。

这种设计简化了表结构,适应了表头动态变化和数据导入时的需求,同时也具有很好的扩展性。


以下是一个完整的方案,包含两个数据库设计和实现对应的增删改查接口。这个方案能够处理 xlsx 文件导入时的数据校验、字段映射以及多表关联的层次结构查询。

方案概要

我们将设计两张表:

  1. BaseInfo: 用于存储基础信息(如人员和创建时间)。
  2. ParameterData: 用于存储大类、小类、参数组合及其对应的值。
  3. FieldMapping: 用于存储 xlsx 文件中的表头与数据库字段的对应关系,并为字段添加数据校验规则。

数据库模型设计

from django.db import models

# 基础信息表
class BaseInfo(models.Model):
    user = models.ForeignKey('auth.User', on_delete=models.CASCADE, help_text="数据导入的人员")
    creation_time = models.DateTimeField(help_text="数据导入时间")

    class Meta:
        unique_together = ['user', 'creation_time']  # 确保人员和创建时间唯一
        verbose_name = '基础信息'
        verbose_name_plural = '基础信息表'

    def __str__(self):
        return f"{self.user} - {self.creation_time}"

# 参数表:存储大类、小类和参数
class Parameter(models.Model):
    category_name = models.CharField(max_length=255, help_text="大类名称")
    subcategory_name = models.CharField(max_length=255, null=True, blank=True, help_text="小类名称(可以为空)")
    parameter_name = models.CharField(max_length=255, help_text="参数名称")

    class Meta:
        unique_together = ['category_name', 'subcategory_name', 'parameter_name']
        verbose_name = '参数信息'
        verbose_name_plural = '参数信息表'

    def __str__(self):
        return f"{self.category_name} - {self.subcategory_name} - {self.parameter_name}"

# 数据表:关联基础信息和参数,并存储值
class Data(models.Model):
    base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, help_text="关联基础信息")
    parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE, help_text="关联参数")
    value = models.CharField(max_length=255, help_text="参数值")

    class Meta:
        verbose_name = '数据'
        verbose_name_plural = '数据表'

    def __str__(self):
        return f"{self.base_info} - {self.parameter} - {self.value}"

# 字段映射表:存储xlsx表头与数据库字段的对应关系及校验规则
class FieldMapping(models.Model):
    xlsx_header = models.CharField(max_length=255, unique=True, help_text="xlsx 文件中的表头")
    db_field = models.CharField(max_length=255, help_text="数据库中的字段名")
    validation_regex = models.CharField(max_length=255, null=True, blank=True, help_text="字段的校验正则表达式")

    def __str__(self):
        return f"{self.xlsx_header} -> {self.db_field}"

数据库模型说明

  1. BaseInfo: 用来存储每条数据的基础信息,主要是导入人员和时间。
  2. Parameter: 存储大类、小类和参数的组合,每个组合是唯一的,定义了数据结构的层次。
  3. Data: 每条数据与 BaseInfoParameter 相关联,存储实际的参数值。
  4. FieldMapping: 将 xlsx 文件的表头映射到数据库的字段,并为字段提供正则校验。

数据导入和校验

当导入数据时,可以通过 FieldMapping 表来找到对应的数据库字段,并对值进行校验。每次数据导入时,根据导入人员和时间创建一条新的 BaseInfo 记录,随后插入数据。

数据导入函数示例

import re

def validate_and_import_xlsx_data(xlsx_data, user, creation_time):
    """
    解析并导入 xlsx 数据,根据 FieldMapping 进行字段映射,并进行校验。
    :param xlsx_data: 解析后的 xlsx 数据,格式为 [{header: value, ...}, ...]
    :param user: 导入的用户
    :param creation_time: 导入时间
    """
    base_info = BaseInfo.objects.create(user=user, creation_time=creation_time)

    for row in xlsx_data:
        data_to_save = {}

        for xlsx_header, value in row.items():
            try:
                # 查找对应的字段映射
                field_mapping = FieldMapping.objects.get(xlsx_header=xlsx_header)

                # 校验数据
                if field_mapping.validation_regex:
                    if not re.match(field_mapping.validation_regex, str(value)):
                        raise ValueError(f"字段 '{xlsx_header}' 的值 '{value}' 不符合校验规则")
                
                # 查找或创建 Parameter
                category_name = row.get('category_name', None)
                subcategory_name = row.get('subcategory_name', None)
                parameter_name = row.get('parameter_name', None)

                if not category_name or not parameter_name:
                    raise ValueError("大类和参数名称是必须的")

                parameter, created = Parameter.objects.get_or_create(
                    category_name=category_name,
                    subcategory_name=subcategory_name,
                    parameter_name=parameter_name
                )

                # 将值保存到 Data 表中
                Data.objects.create(
                    base_info=base_info,
                    parameter=parameter,
                    value=value
                )

            except FieldMapping.DoesNotExist:
                raise ValueError(f"未找到表头 '{xlsx_header}' 的字段映射")

CRUD 操作

增加数据

通过 validate_and_import_xlsx_data 函数即可实现增量导入数据。在导入时,验证表头和字段映射关系,并将数据保存到 Data 表。

查询数据

查询某个用户和创建时间下的所有数据,并构建层级化的结构:

from collections import defaultdict

def query_hierarchical_data(user_id, creation_time):
    """
    查询某个用户和创建时间下的数据,返回层级化的结构。
    :param user_id: 人员 ID
    :param creation_time: 创建时间
    :return: 层级化的查询结果
    """
    try:
        # 查找基础信息记录
        base_info = BaseInfo.objects.get(user_id=user_id, creation_time=creation_time)

        # 查询该基础信息下的所有数据
        data_records = Data.objects.filter(base_info=base_info).select_related('parameter')

        # 使用 defaultdict 创建层级化的结构
        result = defaultdict(lambda: defaultdict(lambda: {}))  # 自动构建嵌套结构

        # 遍历查询结果,按大类、小类、参数进行分组
        for data_record in data_records:
            category = data_record.parameter.category_name
            subcategory = data_record.parameter.subcategory_name or "无"  # 小类可能为空,用 '无' 表示
            parameter = data_record.parameter.parameter_name

            # 将值放入层级结构中
            result[category][subcategory][parameter] = data_record.value

        # 构建最终返回结果
        final_result = {
            "base_info": {
                "user": base_info.user.username,
                "creation_time": base_info.creation_time
            },
            "data": dict(result)  # 将 defaultdict 转换为普通 dict
        }

        return final_result
    except BaseInfo.DoesNotExist:
        return None

更新数据

更新数据时,可以根据 user_idcreation_time 找到对应的 BaseInfo,然后在 Data 表中找到对应的 Parameter 记录并更新。

def update_data(user_id, creation_time, parameter_id, new_value):
    try:
        base_info = BaseInfo.objects.get(user_id=user_id, creation_time=creation_time)
        data = Data.objects.get(base_info=base_info, parameter_id=parameter_id)
        data.value = new_value
        data.save()
    except BaseInfo.DoesNotExist:
        raise ValueError("未找到对应的基础信息")
    except Data.DoesNotExist:
        raise ValueError("未找到对应的数据记录")

删除数据

根据 BaseInfoParameter 组合找到对应的数据并删除:

def delete_data(user_id, creation_time, parameter_id):
    try:
        base_info = BaseInfo.objects.get(user_id=user_id, creation_time=creation_time)
        data = Data.objects.get(base_info=base_info, parameter_id=parameter_id)
        data.delete()
    except BaseInfo.DoesNotExist:
        raise ValueError("未找到对应的基础信息")
    except Data.DoesNotExist:
        raise ValueError("未找到对应的数据记录")

字段映射增删改查

创建字段映射

def create_field_mapping(xlsx_header, db_field, validation_regex=None):
    FieldMapping.objects.create(
        xlsx_header=xlsx_header,
        db_field=db_field,
        validation_regex=validation_regex
    )

更新字段映射

def update_field_mapping(xlsx_header, db_field=None, validation_regex=None):
    field_mapping = FieldMapping.objects.get(x

lsx_header=xlsx_header)
    if db_field:
        field_mapping.db_field = db_field
    if validation_regex:
        field_mapping.validation_regex = validation_regex
    field_mapping.save()

删除字段映射

def delete_field_mapping(xlsx_header):
    FieldMapping.objects.filter(xlsx_header=xlsx_header).delete()

查询字段映射

def get_field_mappings():
    return FieldMapping.objects.all()

该方案实现了基础数据和字段映射的设计,并提供了用于数据操作和查询的接口。数据校验通过正则表达式进行,查询时按照大类、小类和参数层次结构进行组织。


在当前设计中,每当插入一条数据时,由于每个参数都是单独存储在 Data 表中的,所以你在写入一条数据(即一个人员和时间的组合)时,可能会向 Data 表中插入上百条记录(每个参数一条数据)。当数据量非常大时,确实可能会产生以下问题:

  1. 写入性能问题: 每条数据都对应多个参数,每个参数写入时都涉及到数据库的 INSERT 操作。当数据量变大时,大量的 INSERT 操作可能会影响性能。
  2. 查询性能问题: 查询时为了重构数据的层级结构,需要遍历大量的参数记录,尤其是当单条数据包含多个参数时,查询效率可能会下降。
  3. 数据存储开销: 由于每个参数都是一个独立记录,数据表的大小会迅速增长,尤其是当参数数量多、数据量大的情况下。

可行的优化方案

为了解决这些问题,可以考虑以下几种优化策略:

1. 批量插入

Django 默认会为每条记录执行单独的 INSERT 操作。通过使用 Django 的批量插入功能,可以将多条记录一次性插入数据库,减少 INSERT 操作的开销。

from django.db import transaction

def bulk_insert_data(base_info, parameter_values):
    """
    批量插入数据
    :param base_info: 对应的 BaseInfo 对象
    :param parameter_values: 参数和值的列表,格式为 [{'parameter': Parameter, 'value': value}, ...]
    """
    data_objects = []
    for param_val in parameter_values:
        data_objects.append(Data(base_info=base_info, parameter=param_val['parameter'], value=param_val['value']))

    with transaction.atomic():
        Data.objects.bulk_create(data_objects)

这样,参数的插入就会被合并为一次批量插入操作,从而提高性能。

2. 查询优化

当查询数据时,可以利用数据库的索引和查询优化技术来加快查询速度。例如,确保在 Data 表的 base_info_idparameter_id 字段上加上索引,以便能够快速查询某条数据的所有参数。

class Data(models.Model):
    base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, help_text="关联基础信息", db_index=True)
    parameter = models.ForeignKey(Parameter, on_delete=models.CASCADE, help_text="关联参数", db_index=True)
    value = models.CharField(max_length=255, help_text="参数值")

    class Meta:
        indexes = [
            models.Index(fields=['base_info', 'parameter']),
        ]
        verbose_name = '数据'
        verbose_name_plural = '数据表'

3. 存储结构优化

目前的设计是将每个参数的值存储为单独的记录。如果参数数量非常多,且每条记录包含的参数固定,可以考虑将参数数据压缩成 JSON 格式,存储在一个字段中,而不是每个参数一条记录。

from django.contrib.postgres.fields import JSONField

class Data(models.Model):
    base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, help_text="关联基础信息")
    parameters = JSONField(help_text="存储所有参数及其值的JSON对象")

    def __str__(self):
        return f"{self.base_info} - {self.parameters}"

这种方法将所有参数合并存储在一个字段中,减少了表的行数,但会增加单条记录的存储大小。对于查询来说,PostgreSQL 的 JSON 查询功能可以高效地处理这种数据结构,特别是当你需要查询单个参数时,可以使用 ->> 操作符。

优点:

  • 减少了 Data 表的记录条数,降低存储压力。
  • 查询时可以直接获取所有参数,减少复杂的 JOIN 操作。

缺点:

  • 每次插入和更新时需要重新构建整个 JSON 对象。
  • 如果需要对单个参数进行复杂查询,性能可能不如单独存储参数的方式高效。

4. 分表存储

如果你的数据量非常大,还可以考虑将 Data 表按照某些规则进行分表存储(如按时间分表、按用户分表)。这样可以避免单表数据过大的问题,提高写入和查询的效率。


方案推荐

综合考虑查询和写入的复杂度与性能,我建议如下方案:

  1. 短期:使用批量插入和索引优化,以提升写入和查询性能。这在一定数据量下能够有效解决性能问题。
  2. 长期:如果数据量持续增长,考虑将参数数据存储为 JSON 格式,以减少表记录数量。查询时使用 PostgreSQL 的 JSON 查询功能进行优化。同时,可以考虑分表存储,以应对极端的数据量。

增删改查接口实现(优化批量插入)

def create_or_update_data(user, creation_time, parameter_data):
    """
    批量创建或更新数据。
    :param user: 导入用户
    :param creation_time: 创建时间
    :param parameter_data: 参数数据,格式为 [{category_name, subcategory_name, parameter_name, value}, ...]
    """
    base_info, _ = BaseInfo.objects.get_or_create(user=user, creation_time=creation_time)

    parameters_to_insert = []
    for param in parameter_data:
        category = param['category_name']
        subcategory = param.get('subcategory_name', None)
        parameter_name = param['parameter_name']
        value = param['value']

        # 获取或创建参数
        parameter, _ = Parameter.objects.get_or_create(
            category_name=category,
            subcategory_name=subcategory,
            parameter_name=parameter_name
        )

        # 创建 Data 对象准备批量插入
        parameters_to_insert.append({
            'parameter': parameter,
            'value': value
        })

    # 批量插入数据
    bulk_insert_data(base_info, parameters_to_insert)

通过这个设计,数据的插入、更新、删除和查询都能够在高效和可扩展性之间取得平衡。如果未来需要进一步优化,可以通过数据压缩、分表存储来进行扩展。


在你的需求中,如果使用 JSON 存储参数数据,并且每个大类中都有一个附件的参数,附件的文件名需要与参数数据一同处理。以下是一个可行的设计方案,涵盖了数据库模型、文件上传到 S3 的处理、以及相应的增删改查操作。

数据库设计

模型定义

from django.db import models
from django.contrib.postgres.fields import JSONField

class BaseInfo(models.Model):
    user = models.CharField(max_length=255, help_text="导入用户")
    creation_time = models.DateTimeField(help_text="创建时间")

    class Meta:
        unique_together = ('user', 'creation_time')
        verbose_name = '基础信息'
        verbose_name_plural = '基础信息表'

class Category(models.Model):
    name = models.CharField(max_length=255, help_text="大类名称")
    attachment = models.CharField(max_length=255, null=True, blank=True, help_text="附件文件名")

    class Meta:
        verbose_name = '大类'
        verbose_name_plural = '大类表'

class Data(models.Model):
    base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, help_text="关联基础信息")
    parameters = JSONField(help_text="参数数据,以 JSON 格式存储")

    class Meta:
        verbose_name = '数据'
        verbose_name_plural = '数据表'

文件上传到 S3

在处理文件上传时,可以使用 boto3 这个库来上传文件到 S3。首先,你需要安装该库:

pip install boto3

然后在你的视图或服务中实现文件上传逻辑:

import boto3
from django.conf import settings
import os

def upload_file_to_s3(file, bucket_name):
    """
    上传文件到 S3。
    :param file: 文件对象
    :param bucket_name: S3 桶名
    :return: 文件名
    """
    s3 = boto3.client('s3', 
                       aws_access_key_id=settings.AWS_ACCESS_KEY_ID, 
                       aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY)

    file_name = file.name
    try:
        s3.upload_fileobj(file, bucket_name, file_name)
    except Exception as e:
        print(f"上传失败: {e}")
        return None

    return file_name

完整的增删改查操作实现

创建或更新数据

from django.db import transaction

def create_or_update_data(user, creation_time, category_data, parameters):
    """
    创建或更新数据。
    :param user: 导入用户
    :param creation_time: 创建时间
    :param category_data: 大类数据,包含大类名称和附件
    :param parameters: 参数数据,格式为 [{parameter_name: value}, ...]
    """
    base_info, _ = BaseInfo.objects.get_or_create(user=user, creation_time=creation_time)

    # 创建或更新大类
    category, _ = Category.objects.get_or_create(name=category_data['name'])
    
    # 更新附件文件名
    if 'attachment' in category_data:
        category.attachment = category_data['attachment']
        category.save()

    # 准备 JSON 参数数据
    data_object = Data(base_info=base_info, parameters=parameters)

    with transaction.atomic():
        data_object.save()

查询数据

def get_data_by_user_and_time(user, creation_time):
    """
    根据用户和时间查询数据,并返回层级结构
    :param user: 导入用户
    :param creation_time: 创建时间
    :return: 返回层级结构的字典
    """
    base_info = BaseInfo.objects.get(user=user, creation_time=creation_time)
    data_records = Data.objects.filter(base_info=base_info)

    result = {
        "base_info": {
            "user": user,
            "creation_time": creation_time
        },
        "data": []
    }

    for record in data_records:
        result["data"].append({
            "parameters": record.parameters,
            "category_attachment": record.base_info.category.attachment  # 附件文件名
        })

    return result

删除数据

def delete_data_by_user_and_time(user, creation_time):
    """
    根据用户和时间删除数据
    :param user: 用户
    :param creation_time: 创建时间
    """
    base_info = BaseInfo.objects.get(user=user, creation_time=creation_time)
    Data.objects.filter(base_info=base_info).delete()
    base_info.delete()  # 删除基础信息

更新数据

def update_data(user, creation_time, parameters):
    """
    更新数据
    :param user: 用户
    :param creation_time: 创建时间
    :param parameters: 参数数据
    """
    base_info = BaseInfo.objects.get(user=user, creation_time=creation_time)
    data_record = Data.objects.filter(base_info=base_info).first()
    if data_record:
        data_record.parameters.update(parameters)
        data_record.save()

使用方案总结

  1. JSON 存储参数:使用 JSON 字段来灵活存储参数数据,方便后续扩展和修改。
  2. 处理附件:在大类模型中加入附件字段,使用 S3 存储附件,并通过 upload_file_to_s3 函数实现文件上传。
  3. 增删改查操作:提供基本的 CRUD 接口,支持用户根据基本信息进行数据操作。
  4. 事务管理:在数据创建和更新操作中使用事务,确保数据一致性。

性能考虑

  • JSON 存储:适合存储灵活的参数数据,但要注意 JSON 字段的查询效率,尤其是在数据量较大时。
  • 附件存储:将文件存储在 S3 中,可以减轻数据库负担,并提高文件访问的灵活性。

这个方案兼顾了灵活性与可扩展性,并支持文件附件的处理。希望这能够满足你的需求!如果还有其他问题,请随时告诉我。


为了让代码更具动态性并避免冗余,可以通过以下方法实现:

  1. 使用字典来映射大类到对应的模型:将大类和小类与模型的映射存储在字典中,以便根据类别动态获取模型类。

  2. 通过反射动态设置字段值:使用 setattr 函数根据字段名称动态设置属性值。

  3. 简化数据导入逻辑:利用循环和条件逻辑替代多个 if-elif 语句来处理数据存储。

以下是改进后的代码示例:

更新后的代码

import openpyxl
from django.core.exceptions import ValidationError
from .models import BaseInfo, CategoryA, CategoryB  # 确保引入你的模型

def load_field_mappings():
    """从数据库加载字段对应关系。"""
    mappings = FieldMapping.objects.all()
    mapping_dict = {}
    for mapping in mappings:
        key = (mapping.category, mapping.subcategory, mapping.parameter)
        mapping_dict[key] = mapping.db_field
    return mapping_dict

def parse_xlsx(file):
    """解析 xlsx 文件并返回数据对象列表。"""
    wb = openpyxl.load_workbook(file)
    sheet = wb.active
    data_list = []
    header_row = [cell.value for cell in sheet[1]]  # 第一行作为表头
    field_mappings = load_field_mappings()

    for row in sheet.iter_rows(min_row=2, values_only=True):
        data_item = {
            'user': row[0],  # 用户名
            'creation_time': row[1],  # 创建时间
        }

        for col_idx in range(2, len(row)):
            category = header_row[col_idx - 2]  # 大类
            subcategory = header_row[col_idx - 1]  # 小类
            parameter = header_row[col_idx]  # 参数
            key = (category, subcategory, parameter)

            if key in field_mappings:
                db_field = field_mappings[key]
                data_item[db_field] = row[col_idx]
                data_item['category'] = category
                data_item['subcategory'] = subcategory

        data_list.append(data_item)

    return data_list

def import_data(file):
    """解析 xlsx 文件,校验数据并导入数据库。"""
    data_list = parse_xlsx(file)
    validate_data(data_list)

    # 大类到模型的映射
    category_model_map = {
        '大类A': CategoryA,
        '大类B': CategoryB,
        # 可以继续添加更多的大类和相应的模型
    }

    for data_item in data_list:
        base_info, _ = BaseInfo.objects.get_or_create(
            user=data_item['user'], 
            creation_time=data_item['creation_time']
        )

        category = data_item['category']
        model = category_model_map.get(category)

        if model:
            # 动态创建或更新对象
            defaults = {k: v for k, v in data_item.items() if k not in ['user', 'creation_time', 'category', 'subcategory']}
            defaults['subcategory'] = data_item['subcategory']  # 记录小类
            model.objects.update_or_create(base_info=base_info, defaults=defaults)

def validate_data(data_list):
    """校验数据,确保数据格式正确且无重复。"""
    seen = set()
    for data_item in data_list:
        key = (data_item['user'], data_item['creation_time'])
        if key in seen:
            raise ValidationError(f"重复数据: {key}")
        seen.add(key)

关键改动

  1. 模型映射:使用字典 category_model_map 将大类名称映射到相应的模型类,这样可以轻松添加新大类。

  2. 动态字段设置:在导入数据时,使用字典推导式动态创建 defaults 字典,简化了字段处理的逻辑。

  3. 减少条件判断:通过直接使用映射和动态属性设置,减少了对多个 if-elif 语句的依赖,代码更简洁。

总结

这种动态化的实现方式不仅提升了代码的可维护性和可读性,也方便了后续的扩展。你可以轻松添加新的大类和相应的模型,而无需重复编写判断逻辑。如果有其他需求或问题,请随时告诉我!


下面是一个完整的实现,包括数据库模型、数据导入逻辑、增删改查接口以及根据参数筛选的接口代码。

数据库模型设计

# models.py
from django.db import models

class BaseInfo(models.Model):
    user = models.CharField(max_length=100)
    creation_time = models.DateTimeField()

    class Meta:
        unique_together = ('user', 'creation_time')

class Category(models.Model):
    name = models.CharField(max_length=100, unique=True)

class SubCategory(models.Model):
    name = models.CharField(max_length=100)
    category = models.ForeignKey(Category, on_delete=models.CASCADE, related_name='subcategories')

class CategoryA(models.Model):
    base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, related_name='category_a_entries')
    sub_category = models.ForeignKey(SubCategory, on_delete=models.CASCADE, related_name='category_a_subcategories')
    param_1 = models.CharField(max_length=100, blank=True, null=True)
    param_2 = models.CharField(max_length=100, blank=True, null=True)

class CategoryB(models.Model):
    base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE, related_name='category_b_entries')
    sub_category = models.ForeignKey(SubCategory, on_delete=models.CASCADE, related_name='category_b_subcategories')
    param_1 = models.CharField(max_length=100, blank=True, null=True)
    param_2 = models.CharField(max_length=100, blank=True, null=True)

数据导入逻辑

# utils.py
def import_data(file):
    data_list = parse_xlsx(file)  # 假设已有解析 XLSX 的方法
    validate_data(data_list)  # 假设已有数据校验的方法

    for data_item in data_list:
        base_info, _ = BaseInfo.objects.get_or_create(
            user=data_item['user'],
            creation_time=data_item['creation_time']
        )

        # 获取或创建小类
        sub_category, _ = SubCategory.objects.get_or_create(name=data_item['sub_category_name'])

        # 创建类A数据
        CategoryA.objects.create(
            base_info=base_info,
            sub_category=sub_category,
            param_1=data_item.get('param_1', None),
            param_2=data_item.get('param_2', None),
        )

        # 创建类B数据
        CategoryB.objects.create(
            base_info=base_info,
            sub_category=sub_category,
            param_1=data_item.get('param_1', None),
            param_2=data_item.get('param_2', None),
        )

增删改查接口

# views.py
from rest_framework import viewsets, status
from rest_framework.response import Response
from .models import BaseInfo, CategoryA, CategoryB, SubCategory
from .serializers import BaseInfoSerializer, CategoryASerializer, CategoryBSerializer

class BaseInfoViewSet(viewsets.ModelViewSet):
    queryset = BaseInfo.objects.all()
    serializer_class = BaseInfoSerializer

class CategoryAViewSet(viewsets.ModelViewSet):
    queryset = CategoryA.objects.all()
    serializer_class = CategoryASerializer

class CategoryBViewSet(viewsets.ModelViewSet):
    queryset = CategoryB.objects.all()
    serializer_class = CategoryBSerializer

class RetrieveCompleteDataViewSet(viewsets.ViewSet):
    def get(self, request, user, creation_time):
        base_info = BaseInfo.objects.get(user=user, creation_time=creation_time)
        
        response_data = {
            "base_info": {
                "user": base_info.user,
                "creation_time": base_info.creation_time,
            },
            "category_a": [],
            "category_b": [],
        }
        
        for entry in base_info.category_a_entries.all():
            response_data["category_a"].append({
                "sub_category": entry.sub_category.name,
                "param_1": entry.param_1,
                "param_2": entry.param_2,
            })

        for entry in base_info.category_b_entries.all():
            response_data["category_b"].append({
                "sub_category": entry.sub_category.name,
                "param_1": entry.param_1,
                "param_2": entry.param_2,
            })

        return Response(response_data)

class FilterByParametersViewSet(viewsets.ViewSet):
    def post(self, request):
        params = request.data.get('params', {})
        results = []

        # 根据参数筛选数据
        category_a_entries = CategoryA.objects.filter(**params)
        category_b_entries = CategoryB.objects.filter(**params)

        for entry in category_a_entries:
            results.append({
                "base_info": entry.base_info.user,
                "creation_time": entry.base_info.creation_time,
                "sub_category": entry.sub_category.name,
                "param_1": entry.param_1,
                "param_2": entry.param_2,
            })

        for entry in category_b_entries:
            results.append({
                "base_info": entry.base_info.user,
                "creation_time": entry.base_info.creation_time,
                "sub_category": entry.sub_category.name,
                "param_1": entry.param_1,
                "param_2": entry.param_2,
            })

        return Response(results, status=status.HTTP_200_OK)

Serializer 示例

确保你有适当的序列化器来处理模型数据。

# serializers.py
from rest_framework import serializers
from .models import BaseInfo, CategoryA, CategoryB

class BaseInfoSerializer(serializers.ModelSerializer):
    class Meta:
        model = BaseInfo
        fields = '__all__'

class CategoryASerializer(serializers.ModelSerializer):
    class Meta:
        model = CategoryA
        fields = '__all__'

class CategoryBSerializer(serializers.ModelSerializer):
    class Meta:
        model = CategoryB
        fields = '__all__'

总结

  1. 数据模型:每个大类、子类和参数字段都有清晰的结构。
  2. 数据导入:从 XLSX 文件中解析数据并写入数据库。
  3. 增删改查接口:提供完整的 CRUD 操作接口。
  4. 参数筛选接口:支持根据传入的参数进行数据筛选。

如果需要进一步的调整或具体功能,随时告诉我!


为了让代码更动态且减少重复,您可以将所有大类模型存储在一个列表中,然后通过循环来构建查询条件。这种方法可以让您在添加新的大类模型时只需修改列表,而无需重复代码。

优化后的代码实现

# views.py
from django.db.models import Q
from rest_framework import viewsets, status
from rest_framework.response import Response

def build_filter_conditions(params):
    """动态构建筛选条件"""
    conditions = {}
    for key, value in params.items():
        if value:  # 只添加非空条件
            conditions[key] = value
    return conditions

class RetrieveCompleteDataViewSet(viewsets.ViewSet):
    def get(self, request, user, creation_time):
        try:
            base_info = BaseInfo.objects.get(user=user, creation_time=creation_time)
        except BaseInfo.DoesNotExist:
            return Response({"detail": "Data not found"}, status=status.HTTP_404_NOT_FOUND)

        response_data = {
            "base_info": {
                "user": base_info.user,
                "creation_time": base_info.creation_time,
            },
            "categories": {}
        }

        categories = Category.objects.prefetch_related('subcategories')

        for category in categories:
            category_name = category.name
            response_data["categories"][category_name] = {}

            for sub_category in category.subcategories.all():
                sub_category_name = sub_category.name
                response_data["categories"][category_name][sub_category_name] = {
                    "params": []
                }

                entries = (CategoryA.objects.filter(base_info=base_info, sub_category=sub_category) |
                           CategoryB.objects.filter(base_info=base_info, sub_category=sub_category))

                for entry in entries:
                    response_data["categories"][category_name][sub_category_name]["params"].append({
                        "param_1": entry.param_1,
                        "param_2": entry.param_2,
                    })

        return Response(response_data)

class FilterByParametersViewSet(viewsets.ViewSet):
    def post(self, request):
        params = request.data.get('params', {})
        filter_conditions = build_filter_conditions(params)
        results = {}

        # 将所有分类模型放在一个列表中
        category_models = [CategoryA, CategoryB, CategoryC]  # 添加所有大类模型

        # 使用 Q 对象动态构建查询条件
        query = Q()
        for model in category_models:
            query |= Q(**filter_conditions)

        # 根据动态条件筛选数据
        entries = []
        for model in category_models:
            entries += list(model.objects.filter(query).select_related('sub_category', 'base_info'))

        for entry in entries:
            category_name = entry.sub_category.category.name
            sub_category_name = entry.sub_category.name

            if category_name not in results:
                results[category_name] = {"sub_categories": {}}

            if sub_category_name not in results[category_name]["sub_categories"]:
                results[category_name]["sub_categories"][sub_category_name] = {"params": []}

            results[category_name]["sub_categories"][sub_category_name]["params"].append({
                "base_info_user": entry.base_info.user,
                "creation_time": entry.base_info.creation_time,
                "param_1": entry.param_1,
                "param_2": entry.param_2,
            })

        return Response(results, status=status.HTTP_200_OK)

关键点

  1. 动态模型列表:将所有大类模型放入 category_models 列表中,这样可以方便地管理和扩展模型。
  2. 循环处理查询:通过循环处理每个模型,避免硬编码的查询条件。
  3. 结果整合:在查询时使用 += 将所有模型的结果汇总到一个列表中,保持逻辑简洁。

总结

这种方式使得代码结构更清晰,并易于扩展和维护。如果您有更多需求或想法,请随时告诉我!


好的,下面是一个完整的 Django 应用示例,包含数据库设计、数据读入、检查、导入,以及增删改查功能,支持根据层级关系的参数进行查询。

数据库模型设计

  1. 基础信息表(BaseInfo
  2. 参数映射表(ParameterMapping
  3. 数据表(DataEntry
from django.db import models
from django.contrib.postgres.fields import JSONField

class BaseInfo(models.Model):
    user = models.CharField(max_length=255)  # 写入人员
    creation_time = models.DateTimeField(auto_now_add=True)  # 创建时间

    def __str__(self):
        return f"{self.user} - {self.creation_time}"


class ParameterMapping(models.Model):
    category = models.CharField(max_length=255)  # 大类名称
    subcategory = models.CharField(max_length=255, null=True, blank=True)  # 小类名称
    parameter = models.CharField(max_length=255)  # 参数名称
    regex_pattern = models.CharField(max_length=255, null=True, blank=True)  # 校验正则

    def __str__(self):
        return f"{self.category} -> {self.subcategory} -> {self.parameter}"


class DataEntry(models.Model):
    base_info = models.ForeignKey(BaseInfo, on_delete=models.CASCADE)  # 关联基础信息
    data = JSONField()  # 存储数据,包括参数及其值

    def __str__(self):
        return f"Data Entry for {self.base_info.user} - {self.data}"

数据导入逻辑

数据导入命令

创建一个管理命令用于读取 xlsx 文件并导入数据。

import pandas as pd
import re
from django.core.management.base import BaseCommand
from your_app.models import DataEntry, ParameterMapping, BaseInfo

class Command(BaseCommand):
    help = 'Import data from xlsx file'

    def add_arguments(self, parser):
        parser.add_argument('file_path', type=str)

    def handle(self, *args, **kwargs):
        file_path = kwargs['file_path']
        df = pd.read_excel(file_path)

        # 获取参数映射和正则
        parameter_mappings = ParameterMapping.objects.all()
        mapping_dict = {(pm.category, pm.subcategory, pm.parameter): pm.regex_pattern for pm in parameter_mappings}

        for index, row in df.iterrows():
            # 创建基础信息
            base_info = BaseInfo.objects.create(user=row['user'])  # 假设用户在某一列

            # 遍历表头并构建数据
            data = {}
            for header in row.index[3:]:  # 假设数据从第四列开始
                category = row['大类']
                subcategory = row['小类']
                parameter = header
                
                regex_pattern = mapping_dict.get((category, subcategory, parameter))

                value = row[header]
                if regex_pattern and not re.match(regex_pattern, str(value)):
                    self.stdout.write(self.style.ERROR(f"Invalid data for {parameter}: {value}"))
                    continue  # 跳过不符合的值

                # 赋值到数据字典中
                data[parameter] = value

            # 创建数据条目
            DataEntry.objects.create(
                base_info=base_info,
                data=data
            )
        self.stdout.write(self.style.SUCCESS('Data imported successfully'))

增删改查接口

views.py

from rest_framework import viewsets
from rest_framework import status
from rest_framework.response import Response
from your_app.models import BaseInfo, ParameterMapping, DataEntry
from your_app.serializers import BaseInfoSerializer, ParameterMappingSerializer, DataEntrySerializer

class BaseInfoViewSet(viewsets.ModelViewSet):
    queryset = BaseInfo.objects.all()
    serializer_class = BaseInfoSerializer


class ParameterMappingViewSet(viewsets.ModelViewSet):
    queryset = ParameterMapping.objects.all()
    serializer_class = ParameterMappingSerializer


class DataEntryViewSet(viewsets.ModelViewSet):
    queryset = DataEntry.objects.all()
    serializer_class = DataEntrySerializer

    def filter_queryset(self, queryset):
        # 根据参数层级过滤数据
        filter_conditions = self.request.query_params
        for key, value in filter_conditions.items():
            queryset = queryset.filter(data__contains={key: value})
        return queryset

Serializers

serializers.py

from rest_framework import serializers
from your_app.models import BaseInfo, ParameterMapping, DataEntry

class BaseInfoSerializer(serializers.ModelSerializer):
    class Meta:
        model = BaseInfo
        fields = '__all__'


class ParameterMappingSerializer(serializers.ModelSerializer):
    class Meta:
        model = ParameterMapping
        fields = '__all__'


class DataEntrySerializer(serializers.ModelSerializer):
    class Meta:
        model = DataEntry
        fields = '__all__'

URLs

urls.py

from django.urls import path, include
from rest_framework.routers import DefaultRouter
from your_app.views import BaseInfoViewSet, ParameterMappingViewSet, DataEntryViewSet

router = DefaultRouter()
router.register(r'baseinfo', BaseInfoViewSet)
router.register(r'parametermapping', ParameterMappingViewSet)
router.register(r'dataentry', DataEntryViewSet)

urlpatterns = [
    path('', include(router.urls)),
]

使用示例

  1. 导入数据
    python manage.py import_data your_file.xlsx
    
  2. 增删改查 API
    • 获取所有数据:GET /dataentry/
    • 根据参数查询数据:GET /dataentry/?parameter=value
    • 创建数据:POST /dataentry/(请求体中包含数据)
    • 更新数据:PUT /dataentry/<id>/
    • 删除数据:DELETE /dataentry/<id>/

总结

这个设计充分考虑了您提到的层级关系和数据的检验与导入逻辑,所有数据通过 JSON 存储,便于扩展。查询接口灵活,可以根据需要进行调整。如果有更多需求或调整,请告诉我!


以下是带有注释的完整代码,详细说明了每个部分的功能和实现逻辑。

1. views.py

from rest_framework import viewsets
from rest_framework import status
from rest_framework.response import Response
from your_app.models import BaseInfo, ParameterMapping, DataEntry
from your_app.serializers import BaseInfoSerializer, ParameterMappingSerializer, DataEntrySerializer

# 基础信息的视图集
class BaseInfoViewSet(viewsets.ModelViewSet):
    queryset = BaseInfo.objects.all()  # 查询所有基础信息
    serializer_class = BaseInfoSerializer  # 使用的序列化器

    def list(self, request):
        queryset = self.queryset  # 获取所有基础信息
        serializer = self.serializer_class(queryset, many=True)  # 序列化
        return Response(serializer.data)  # 返回序列化的数据

    def create(self, request):
        serializer = self.serializer_class(data=request.data)  # 创建序列化对象
        if serializer.is_valid():  # 检查数据有效性
            serializer.save()  # 保存数据
            return Response(serializer.data, status=status.HTTP_201_CREATED)  # 返回成功响应
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)  # 返回错误信息

    def retrieve(self, request, pk=None):
        base_info = self.get_object()  # 获取特定基础信息
        serializer = self.serializer_class(base_info)  # 序列化
        return Response(serializer.data)  # 返回数据

    def update(self, request, pk=None):
        base_info = self.get_object()  # 获取特定基础信息
        serializer = self.serializer_class(base_info, data=request.data)  # 更新序列化对象
        if serializer.is_valid():  # 检查数据有效性
            serializer.save()  # 保存更新
            return Response(serializer.data)  # 返回更新后的数据
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)  # 返回错误信息

    def destroy(self, request, pk=None):
        base_info = self.get_object()  # 获取特定基础信息
        base_info.delete()  # 删除数据
        return Response(status=status.HTTP_204_NO_CONTENT)  # 返回204状态,表示成功删除


# 参数映射的视图集
class ParameterMappingViewSet(viewsets.ModelViewSet):
    queryset = ParameterMapping.objects.all()  # 查询所有参数映射
    serializer_class = ParameterMappingSerializer  # 使用的序列化器

    def list(self, request):
        queryset = self.queryset  # 获取所有参数映射
        serializer = self.serializer_class(queryset, many=True)  # 序列化
        return Response(serializer.data)  # 返回序列化的数据

    def create(self, request):
        serializer = self.serializer_class(data=request.data)  # 创建序列化对象
        if serializer.is_valid():  # 检查数据有效性
            serializer.save()  # 保存数据
            return Response(serializer.data, status=status.HTTP_201_CREATED)  # 返回成功响应
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)  # 返回错误信息

    def retrieve(self, request, pk=None):
        parameter_mapping = self.get_object()  # 获取特定参数映射
        serializer = self.serializer_class(parameter_mapping)  # 序列化
        return Response(serializer.data)  # 返回数据

    def update(self, request, pk=None):
        parameter_mapping = self.get_object()  # 获取特定参数映射
        serializer = self.serializer_class(parameter_mapping, data=request.data)  # 更新序列化对象
        if serializer.is_valid():  # 检查数据有效性
            serializer.save()  # 保存更新
            return Response(serializer.data)  # 返回更新后的数据
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)  # 返回错误信息

    def destroy(self, request, pk=None):
        parameter_mapping = self.get_object()  # 获取特定参数映射
        parameter_mapping.delete()  # 删除数据
        return Response(status=status.HTTP_204_NO_CONTENT)  # 返回204状态,表示成功删除


# 数据条目的视图集
class DataEntryViewSet(viewsets.ModelViewSet):
    queryset = DataEntry.objects.all()  # 查询所有数据条目
    serializer_class = DataEntrySerializer  # 使用的序列化器

    def list(self, request):
        queryset = self.queryset  # 获取所有数据条目
        serializer = self.serializer_class(queryset, many=True)  # 序列化
        return Response(serializer.data)  # 返回序列化的数据

    def create(self, request):
        serializer = self.serializer_class(data=request.data)  # 创建序列化对象
        if serializer.is_valid():  # 检查数据有效性
            serializer.save()  # 保存数据
            return Response(serializer.data, status=status.HTTP_201_CREATED)  # 返回成功响应
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)  # 返回错误信息

    def retrieve(self, request, pk=None):
        data_entry = self.get_object()  # 获取特定数据条目
        serializer = self.serializer_class(data_entry)  # 序列化
        return Response(serializer.data)  # 返回数据

    def update(self, request, pk=None):
        data_entry = self.get_object()  # 获取特定数据条目
        serializer = self.serializer_class(data_entry, data=request.data)  # 更新序列化对象
        if serializer.is_valid():  # 检查数据有效性
            serializer.save()  # 保存更新
            return Response(serializer.data)  # 返回更新后的数据
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)  # 返回错误信息

    def destroy(self, request, pk=None):
        data_entry = self.get_object()  # 获取特定数据条目
        data_entry.delete()  # 删除数据
        return Response(status=status.HTTP_204_NO_CONTENT)  # 返回204状态,表示成功删除

    # 根据参数筛选数据
    def filter_queryset(self, queryset):
        filter_conditions = self.request.query_params  # 获取查询参数
        for key, value in filter_conditions.items():  # 遍历每个查询参数
            queryset = queryset.filter(data__contains={key: value})  # 筛选数据
        return queryset  # 返回筛选后的数据

2. serializers.py

from rest_framework import serializers
from your_app.models import BaseInfo, ParameterMapping, DataEntry

# 基础信息的序列化器
class BaseInfoSerializer(serializers.ModelSerializer):
    class Meta:
        model = BaseInfo  # 目标模型
        fields = '__all__'  # 包含所有字段


# 参数映射的序列化器
class ParameterMappingSerializer(serializers.ModelSerializer):
    class Meta:
        model = ParameterMapping  # 目标模型
        fields = '__all__'  # 包含所有字段


# 数据条目的序列化器
class DataEntrySerializer(serializers.ModelSerializer):
    class Meta:
        model = DataEntry  # 目标模型
        fields = '__all__'  # 包含所有字段

3. urls.py

from django.urls import path, include
from rest_framework.routers import DefaultRouter
from your_app.views import BaseInfoViewSet, ParameterMappingViewSet, DataEntryViewSet

# 创建路由器并注册视图集
router = DefaultRouter()
router.register(r'baseinfo', BaseInfoViewSet)  # 注册基础信息视图
router.register(r'parametermapping', ParameterMappingViewSet)  # 注册参数映射视图
router.register(r'dataentry', DataEntryViewSet)  # 注册数据条目视图

urlpatterns = [
    path('', include(router.urls)),  # 包含所有注册的路由
]

4. 使用示例

  • 获取所有基础信息
    GET /baseinfo/
    
  • 创建基础信息
    POST /baseinfo/
    {
        "user": "张三"
    }
    
  • 获取所有参数映射
    GET /parametermapping/
    
  • 创建参数映射
    POST /parametermapping/
    {
        "category": "大类1",
        "subcategory": "小类1",
        "parameter": "参数1",
        "regex_pattern": "^[0-9]+$"
    }
    
  • 获取所有数据条目
    GET /dataentry/
    
  • 创建数据条目
    POST /dataentry/
    {
        "base_info": 1,
        "data": {
            "参数1": "100",
            "参数2": "200"
        }
    }
    
  • 根据参数查询数据
    GET /dataentry/?参数1=100&参数2=200
    

总结

以上代码注释详细,功能清晰,实现了数据的增删改查和基于参数的筛选接口。如果有进一步的需求或需要调整的地方,请随时告诉我!


好的,既然查询参数可能比较多,且您希望使用 POST 请求来进行查询,我们可以创建一个自定义的 POST 请求来接收 JSON 格式的查询参数。这样既能保持接口简洁,也能确保层级结构的查询可以灵活地处理。

修改查询接口支持 POST 请求

1. 修改 DataEntryViewSet 以支持 POST 请求查询

我们会创建一个自定义的 POST 方法 search,接收查询参数并进行过滤。

from rest_framework.decorators import action
from rest_framework import viewsets, status
from rest_framework.response import Response
from your_app.models import BaseInfo, ParameterMapping, DataEntry
from your_app.serializers import DataEntrySerializer

class DataEntryViewSet(viewsets.ModelViewSet):
    queryset = DataEntry.objects.all()
    serializer_class = DataEntrySerializer

    # 自定义 POST 查询接口
    @action(detail=False, methods=['post'], url_path='search')
    def search(self, request):
        query_data = request.data  # 获取 POST 请求中的查询参数
        
        if not query_data:
            return Response({"error": "No query data provided."}, status=status.HTTP_400_BAD_REQUEST)

        # 构建查询条件
        filter_conditions = self.build_query_conditions(query_data)

        # 查询数据,使用 JSONField 查询
        entries = DataEntry.objects.filter(data__contains=filter_conditions)
        serializer = self.get_serializer(entries, many=True)
        return Response(serializer.data, status=status.HTTP_200_OK)

    # 解析并导入xlsx文件
    def create(self, request):
        # 处理文件上传
        file = request.FILES.get('file')
        if not file:
            return Response({"error": "No file provided."}, status=status.HTTP_400_BAD_REQUEST)

        # 解析xlsx文件
        parse_and_import_xlsx(file)

        return Response({"success": "Data imported successfully."}, status=status.HTTP_201_CREATED)

    # 构建查询条件的辅助方法
    def build_query_conditions(self, query_data):
        filter_conditions = {}
        for category, subcategories in query_data.items():
            for subcategory, parameters in subcategories.items():
                for parameter, value in parameters.items():
                    key = f"{category}:{subcategory}:{parameter}"  # 构建映射键
                    filter_conditions[key] = value
        return filter_conditions

2. 使用 POST 请求查询数据

该接口现在支持 POST 请求。前端可以向 /dataentry/search/ 发送包含嵌套 JSON 查询参数的请求。

示例 POST 请求格式:

POST /dataentry/search/

{
    "大类1": {
        "小类1": {
            "参数1": "值1",
            "参数2": "值2"
        },
        "小类2": {
            "参数1": "值3"
        }
    },
    "大类2": {
        "小类1": {
            "参数1": "值4"
        }
    }
}

3. Django 路由

您可以在 urls.py 中设置路由:

from django.urls import path, include
from rest_framework.routers import DefaultRouter
from your_app.views import DataEntryViewSet

router = DefaultRouter()
router.register(r'dataentry', DataEntryViewSet)

urlpatterns = [
    path('', include(router.urls)),
]

4. 数据检查和导入逻辑

为了保证导入时表头与数据库字段的映射关系一致,您可以扩展之前的数据检查和导入函数,确保在写入数据之前先检查表头与字段的对应关系,并根据表头信息完成校验。

示例:在 parse_and_import_xlsx 中加入检查逻辑

import openpyxl

def parse_and_import_xlsx(file):
    workbook = openpyxl.load_workbook(file)
    sheet = workbook.active

    headers = sheet[1]  # 假设第一行是表头
    field_mapping = get_field_mapping_from_db()  # 从数据库中获取字段映射关系

    # 表头和字段映射的检查
    for idx, cell in enumerate(headers):
        header = cell.value
        if header not in field_mapping:
            raise ValueError(f"Header {header} does not match any known fields.")
    
    # 数据解析与导入
    for row in sheet.iter_rows(min_row=2, values_only=True):  # 从第二行开始是数据
        base_info = row[:3]  # 第一列到第三列为基础信息(大类、小类、参数)
        data = row[3:]       # 后面的列为参数数据
        
        # 构建 JSON 数据
        data_entry = {
            "大类": base_info[0],
            "小类": base_info[1],
            "参数": base_info[2],
            "数据": {field_mapping[headers[i].value]: data[i-3] for i in range(3, len(headers))}
        }

        # 写入数据库
        DataEntry.objects.create(data=data_entry)

def get_field_mapping_from_db():
    # 从数据库中查询存储的表头与字段的映射关系
    mappings = ParameterMapping.objects.all()
    return {mapping.header: mapping.field for mapping in mappings}

5. 总结

这个方案实现了:

  1. 通过 POST 请求进行层级查询:允许前端传递复杂的嵌套查询条件,后端根据这些条件进行数据筛选。
  2. 动态数据导入:确保表头与数据库字段一一对应,防止解析时出现错误。
  3. 灵活的架构:能够轻松扩展不同的大类、小类和参数,且不会因大类数量增加导致代码臃肿。
  4. 数据检查和校验:在导入前对表头与字段映射进行检查,确保数据完整性和正确性。

请根据需求和实际项目情况微调逻辑。如果有进一步问题或需要优化的地方,请告诉我!


为处理不同长度的 data_name,并确保它们可以在不同分类中嵌套,我们可以调整代码,使得它能够根据每个 data_name 动态构建层级结构,而不受限于固定的分类。这是更新后的实现:

更新后的完整代码

import openpyxl
from django.core.exceptions import ValidationError
from django.db import models

# 模型定义
class ParameterMapping(models.Model):
    data_name = models.JSONField()  # 存储层级关系 [大类, 小类, 参数]
    field = models.CharField(max_length=100)  # 对应数据库字段
    reg_ex = models.CharField(max_length=255, null=True, blank=True)  # 校验正则

class DataEntry(models.Model):
    data = models.JSONField()  # 存储完整的数据结构

# 获取字段映射关系
def get_field_mapping_from_db():
    mappings = ParameterMapping.objects.all()
    field_mapping = {}
    for mapping in mappings:
        field_mapping[tuple(mapping.data_name)] = mapping.field  # 使用元组作为键
    return field_mapping

# 校验数据
def validate_data(value, reg_ex):
    import re
    if reg_ex and not re.match(reg_ex, str(value)):
        raise ValidationError(f"Value '{value}' does not match the required format.")

# 解析并导入 XLSX 文件
def parse_and_import_xlsx(file):
    workbook = openpyxl.load_workbook(file)
    sheet = workbook.active

    headers = sheet[1]  # 假设第一行是表头
    field_mapping = get_field_mapping_from_db()  # 获取字段映射关系

    # 数据解析与导入
    for row in sheet.iter_rows(min_row=2, values_only=True):
        base_info = row[:3]  # 第一列到第三列为基础信息
        data = row[3:]       # 后面的列为参数数据
        
        # 构建 JSON 数据
        data_entry = {
            "基础信息": {
                "大类": base_info[0],
                "小类": base_info[1],
                "参数": base_info[2]
            },
            "数据": {}
        }

        # 动态构建嵌套数据
        for i, header in enumerate(headers[3:]):
            header_key = header.value
            if header_key in field_mapping.values():
                # 查找该表头对应的 data_name
                for keys, field in field_mapping.items():
                    if field == header_key:
                        current_data = data_entry["数据"]
                        
                        # 动态创建层级结构
                        for key in keys:
                            if key not in current_data:
                                current_data[key] = {}
                            current_data = current_data[key]

                        # 存储对应的数据值
                        current_data['value'] = data[i]

                        # 校验数据
                        reg_ex = ParameterMapping.objects.get(field=header_key).reg_ex
                        validate_data(data[i], reg_ex)

        # 写入数据库
        DataEntry.objects.create(data=data_entry)

# 示例使用
# parse_and_import_xlsx("path_to_your_file.xlsx")

关键点说明

  1. 动态嵌套结构:通过遍历 data_name,在 data_entry["数据"] 中动态创建嵌套层级。这使得可以根据 data_name 的长度和内容,灵活地生成多层结构。

  2. 不同分类支持:不论 data_name 的长度和具体层级,代码都能动态识别并构建相应的数据结构。这意味着不同长度的 data_name 可以在不同分类中灵活应用。

示例数据结构

假设 data_name 为:

  • ["大类1"]
  • ["大类2", "小类1"]
  • ["大类2", "小类2", "参数1"]

最终构建的 JSON 数据可能如下:

{
  "基础信息": {
    "大类": "大类2",
    "小类": "小类2",
    "参数": "参数1"
  },
  "数据": {
    "大类1": {
      "value": "数据值1"
    },
    "大类2": {
      "小类1": {
        "value": "数据值2"
      },
      "小类2": {
        "参数1": {
          "value": "数据值3"
        }
      }
    }
  }
}

备注

  • 如果您还有其他要求或需进一步调整,请随时告诉我!

如果原数据格式中的元组长度是不定的,我们可以修改代码,使其根据元组的长度动态地构建嵌套层级。这样,代码可以适应不同的分类结构,将最后一层作为参数名称。

修改后的代码

def transform_data_format(data):
    transformed_data = {}

    for sheet_name, entries in data.items():
        transformed_entries = []  # 存储转换后的条目

        for entry in entries:
            transformed_entry = {}  # 用于构建每个条目
            
            for keys, value in entry.items():
                # 动态构建嵌套字典,逐层构建每一个层级
                current_level = transformed_entry

                for i, key in enumerate(keys):
                    # 如果是最后一个元素,则这是参数名,直接赋值
                    if i == len(keys) - 1:
                        current_level[key] = value
                    else:
                        # 如果当前层级不存在,则创建
                        if key not in current_level:
                            current_level[key] = {}
                        # 进入下一级
                        current_level = current_level[key]

            # 将转换后的条目添加到结果列表
            transformed_entries.append(transformed_entry)

        transformed_data[sheet_name] = transformed_entries

    return transformed_data

# 示例输入数据
data = {
    "Sheet1": [
        {("大类1", "小类1", "参数1"): "值1", ("大类1", "参数2"): "值2"},
        {("大类1", "小类2", "参数1"): "值3"},
        {("大类2", "参数3"): "值4"}
    ],
    "Sheet2": [
        {("大类3", "小类1", "参数1"): "值5"},
        {("大类3", "小类2", "参数1"): "值6", ("大类3", "参数4"): "值7"}
    ]
}

# 调用转换函数
transformed_data = transform_data_format(data)

# 输出结果
import pprint
pprint.pprint(transformed_data)

代码说明

  1. 遍历每个条目:对每个条目构建一个新的嵌套字典结构 transformed_entry

  2. 动态构建嵌套结构
    • 使用 for i, key in enumerate(keys) 遍历元组 keys
    • 如果是最后一个元素,则将该元素视为参数名,并赋值为 value
    • 如果不是最后一个元素,则检查当前层级是否存在该 key,如果不存在则创建空字典。
    • 随后进入下一级以继续构建层级。
  3. 合并结果:将每个转换后的条目添加到 transformed_entries,最终汇总到 transformed_data

示例输出

给定输入示例,输出结果将为:

{
    'Sheet1': [
        {
            '大类1': {
                '小类1': {
                    '参数1': '值1'
                },
                '参数2': '值2',
                '小类2': {
                    '参数1': '值3'
                }
            },
            '大类2': {
                '参数3': '值4'
            }
        }
    ],
    'Sheet2': [
        {
            '大类3': {
                '小类1': {
                    '参数1': '值5'
                },
                '小类2': {
                    '参数1': '值6'
                },
                '参数4': '值7'
            }
        }
    ]
}

适用性

这个方案适用于元组长度不定的情况,并自动生成层级关系。在最后一个元素作为参数名的规则下,它能灵活处理不同长度的层级结构。


要将 build_query_conditions 方法优化成一个更加通用的函数,我们可以引入可配置的分隔符以及支持多层嵌套的结构,使其更灵活。以下是优化后的代码:

优化后的通用函数

def build_query_conditions(query_data, separator=":"):
    """
    构建查询条件的辅助函数。
    
    :param query_data: 嵌套字典,格式为 {大类: {小类: {参数: 值, ...}, ...}, ...}
    :param separator: 用于连接键的分隔符,默认为 ":"
    :return: 构建的查询条件字典
    """
    filter_conditions = {}

    def recursive_build(data, current_key=""):
        for key, value in data.items():
            if isinstance(value, dict):
                # 如果值是字典,递归调用
                new_key = f"{current_key}{separator}{key}" if current_key else key
                recursive_build(value, new_key)
            else:
                # 否则,添加到查询条件中
                new_key = f"{current_key}{separator}{key}" if current_key else key
                filter_conditions[new_key] = value

    recursive_build(query_data)
    return filter_conditions

# 示例使用
query_data = {
    "大类1": {
        "小类1": {
            "参数1": "值1",
            "参数2": "值2"
        },
        "小类2": {
            "参数3": "值3"
        }
    },
    "大类2": {
        "小类3": {
            "参数4": "值4"
        }
    }
}

# 调用优化后的函数
result = build_query_conditions(query_data)
print(result)

代码说明

  1. 函数参数
    • query_data: 输入的嵌套字典。
    • separator: 用于连接键的分隔符,默认值为 ":"
  2. 递归函数recursive_build
    • 接受当前数据和当前键(默认为空字符串)。
    • 遍历每个键值对。
    • 如果值是字典,则递归调用,构建新的键。
    • 如果值不是字典,则将构建的键和值添加到 filter_conditions
  3. 灵活性
    • 通过分隔符的配置,用户可以根据需要更改键的连接方式。
    • 适应不同层级的嵌套结构,能处理任意深度的嵌套字典。

示例输出

对于给定的示例输入,输出将为:

{
    '大类1:小类1:参数1': '值1',
    '大类1:小类1:参数2': '值2',
    '大类1:小类2:参数3': '值3',
    '大类2:小类3:参数4': '值4'
}

这种优化使得函数在处理多层嵌套结构时更加通用和灵活。如果您有其他要求或需要进一步修改,请随时告诉我!


了解了你的需求:附件不仅出现在某个大类下,而是分布在多个大类、小类中的特定位置,并且每个小类的附件可以通过文件路径来标识。为了让代码更加灵活并适应多个大类、小类中包含附件的情况,我们需要调整逻辑,以便在数据结构中的任意位置找到附件并生成对应的绝对 URL。

方案设计

我们需要做到:

  1. 处理任意层级:数据结构可能有多个层级,其中某些小类下包含附件。
  2. 遍历整个数据结构:递归地查找附件信息,并生成绝对 URL。
  3. 适应多个大类和小类中的附件:支持多个大类下的多个小类都有附件。

优化方案

假设你有一个复杂的嵌套 JSON 结构,并且在每个小类(或某些小类)下都有附件数据。附件是一个列表,包含文件路径或文件名。在解析这个结构时,我们可以递归地遍历每个层级并生成绝对 URL。

实现代码

  1. 模型设计:假设我们存储的数据包含多个层级,附件是嵌套在 JSON 数据中的某些小类下。

  2. 生成附件 URL 的方法:遍历整个数据结构,找到包含附件的部分,生成绝对 URL。

from django.db import models
import json

class Data(models.Model):
    name = models.CharField(max_length=255)
    data = models.JSONField()  # 存储嵌套的 JSON 数据,包含附件路径等信息

    def get_absolute_url(self, file_path):
        """
        返回存储在 S3 或本地的文件的绝对 URL。
        假设我们有一个存储基础 URL,文件名可以拼接在这个基础上形成 URL。
        """
        base_url = "https://s3.amazonaws.com/my-bucket/"  # 假设文件存储在 S3 上
        return base_url + file_path

    def transform_data_with_urls(self):
        """
        将 JSON 字段中的附件路径转换为绝对 URL
        """
        def recursive_transform(data):
            # 检查当前层的数据
            if isinstance(data, dict):
                # 递归遍历字典的键值对
                for key, value in data.items():
                    # 如果是字典,递归处理
                    if isinstance(value, dict):
                        recursive_transform(value)
                    elif isinstance(value, list):
                        # 如果是附件列表,生成绝对 URL
                        if key == 'attachments':  # 假设附件存储在'attachments'键中
                            for attachment in value:
                                if 'file_name' in attachment:
                                    attachment['url'] = self.get_absolute_url(attachment['file_name'])
                    else:
                        continue
            elif isinstance(data, list):
                # 如果是列表,递归每个元素
                for item in data:
                    recursive_transform(item)

        # 调用递归方法处理 JSON 数据
        recursive_transform(self.data)
        return self.data

3. 数据存储结构示例

假设存储的 JSON 数据如下,其中附件可能存在于不同的小类中:

{
  "大类1": {
    "小类1": {
      "参数1": "值1",
      "attachments": [
        {"file_name": "file1.pdf"},
        {"file_name": "file2.pdf"}
      ]
    },
    "小类2": {
      "参数2": "值2",
      "attachments": [
        {"file_name": "file3.pdf"}
      ]
    }
  },
  "大类2": {
    "小类3": {
      "参数3": "值3",
      "attachments": [
        {"file_name": "file4.pdf"}
      ]
    }
  }
}

4. 序列化和视图代码

你可以使用 Django Rest Framework(DRF)来序列化这个数据,并确保返回的数据中包含绝对 URL。

4.1. 序列化器

from rest_framework import serializers

class DataSerializer(serializers.ModelSerializer):
    class Meta:
        model = Data
        fields = ['name', 'data']

    def to_representation(self, instance):
        """
        重写 to_representation 方法,确保输出的数据包含附件的绝对 URL。
        """
        data = super().to_representation(instance)
        # 使用模型的 transform_data_with_urls 方法将附件路径转换为绝对 URL
        data['data'] = instance.transform_data_with_urls()
        return data

4.2. 视图代码(Django Rest Framework)

from rest_framework.views import APIView
from rest_framework.response import Response

class DataView(APIView):
    def get(self, request, *args, **kwargs):
        # 查询数据
        data_instance = Data.objects.first()  # 获取示例数据

        # 使用序列化器将数据转换为包含 URL 的 JSON 数据
        serializer = DataSerializer(data_instance)

        # 返回包含 URL 的数据
        return Response(serializer.data)

5. 示例输出

对于上面的 data,输出将会是:

{
    "name": "示例数据",
    "data": {
        "大类1": {
            "小类1": {
                "参数1": "值1",
                "attachments": [
                    {"file_name": "file1.pdf", "url": "https://s3.amazonaws.com/my-bucket/file1.pdf"},
                    {"file_name": "file2.pdf", "url": "https://s3.amazonaws.com/my-bucket/file2.pdf"}
                ]
            },
            "小类2": {
                "参数2": "值2",
                "attachments": [
                    {"file_name": "file3.pdf", "url": "https://s3.amazonaws.com/my-bucket/file3.pdf"}
                ]
            }
        },
        "大类2": {
            "小类3": {
                "参数3": "值3",
                "attachments": [
                    {"file_name": "file4.pdf", "url": "https://s3.amazonaws.com/my-bucket/file4.pdf"}
                ]
            }
        }
    }
}

6. 总结

  1. 递归处理附件:我们使用递归方法遍历 JSON 数据,在遇到附件信息时,生成文件的绝对 URL。
  2. 灵活的文件存储结构:附件可以位于任意小类下,并且每个附件都可以生成相应的 URL。
  3. DRF 序列化:通过重写 DRF 序列化器中的 to_representation 方法,确保返回的数据包含附件的绝对 URL。

这样,你可以在查询数据时,不仅能够得到嵌套的 JSON 数据结构,还能自动生成附件的绝对 URL,以便于前端使用。如果附件存储在不同的服务(如 S3、OSS 等),只需要调整 get_absolute_url 方法中的 URL 生成规则即可。


是的,如果数据量很大,处理大量的嵌套 JSON 数据和生成大量的文件 URL 时,可能会出现性能问题。具体的性能瓶颈可能出现在以下几个方面:

  1. 递归遍历 JSON 数据
    • 遍历整个嵌套数据结构时,随着数据层级的增加和数据量的增加,递归操作的开销也会逐渐增大。
    • 如果数据结构复杂,且每个层级都包含附件,需要频繁地访问和修改大量的字典和列表。
  2. 生成 URL 的计算开销
    • 每次需要生成文件的绝对 URL 时,都需要进行文件路径拼接和其他计算操作,这在大数据量时会增加计算负担。
  3. 数据库查询性能
    • 如果数据非常庞大,单次数据库查询可能会带来很大的 I/O 负担,尤其是当你一次性加载大量数据时。
    • 在高并发环境下,查询性能可能成为瓶颈。
  4. 序列化性能
    • 如果数据量过大,在 DRF 序列化时,可能会因为需要将大量嵌套数据格式化成 JSON 而导致序列化速度较慢。

如何优化?

  1. 数据分页加载
    • 如果数据量非常大,可以考虑在数据库层面做分页查询,避免一次性加载过多的数据。
    • 在 DRF 中,可以使用 pagination 类来对数据进行分页加载。
  2. 批量操作和异步任务
    • 对于 URL 的生成操作,可以考虑将其放到后台异步任务中(例如使用 Celery),这样不会阻塞前端请求。
    • 使用批量数据库操作(例如 bulk_create)来减少数据库交互次数。
  3. 减少递归深度
    • 尽量减少递归调用的深度,或者使用显式的栈(迭代方式)来替代递归调用。
    • 如果能预处理数据(例如在数据入库时预先处理附件生成 URL),可以减少查询时的计算负担。
  4. 优化 JSON 字段存储
    • 如果 JSON 字段非常大,可以考虑对 JSON 数据做压缩存储(例如使用 gziplz4 等算法),并且只在需要时解压。
  5. 缓存机制
    • 使用缓存来存储已计算过的 URL,避免每次查询时都需要重新生成。例如,可以使用 Redis 缓存生成的 URL,尤其是对于那些被频繁访问的附件。
  6. 预计算附件 URL
    • 如果附件 URL 生成规则固定,可以考虑在数据写入时直接计算好 URL,并存储到数据库中,而不是每次查询时再生成。

具体优化建议

1. 使用分页查询优化数据库查询

在 DRF 中,可以通过分页来优化查询:

from rest_framework.pagination import PageNumberPagination
from rest_framework.views import APIView
from rest_framework.response import Response

class CustomPagination(PageNumberPagination):
    page_size = 10  # 每页返回的数据数量

class DataView(APIView):
    pagination_class = CustomPagination

    def get(self, request, *args, **kwargs):
        data_instances = Data.objects.all()  # 假设查询所有数据

        # 使用分页器对数据进行分页
        paginator = CustomPagination()
        paginated_data = paginator.paginate_queryset(data_instances, request)

        # 序列化数据
        serializer = DataSerializer(paginated_data, many=True)
        
        # 返回分页数据
        return paginator.get_paginated_response(serializer.data)

这样就可以避免一次性查询大量数据。

2. 使用缓存存储 URL

如果生成 URL 的操作开销较大,并且 URL 内容在短时间内不经常变化,可以使用缓存来存储生成的 URL。

import hashlib
from django.core.cache import cache

class Data(models.Model):
    name = models.CharField(max_length=255)
    data = models.JSONField()

    def get_absolute_url(self, file_path):
        # 使用缓存避免重复计算
        cache_key = hashlib.md5(file_path.encode()).hexdigest()
        cached_url = cache.get(cache_key)

        if cached_url:
            return cached_url

        # 如果缓存中没有,计算绝对 URL
        base_url = "https://s3.amazonaws.com/my-bucket/"
        url = base_url + file_path
        cache.set(cache_key, url, timeout=60 * 60 * 24)  # 缓存 24 小时
        return url

通过这种方式,每次生成 URL 时,首先检查缓存中是否存在,如果存在则直接返回缓存中的 URL。

3. 后台任务(Celery)处理 URL 生成

将生成文件 URL 的操作移到后台处理,避免在请求响应中占用过多时间。可以使用 Celery 来处理附件 URL 的生成任务。

from celery import shared_task

@shared_task
def generate_attachment_urls(data_instance_id):
    data_instance = Data.objects.get(id=data_instance_id)
    transformed_data = data_instance.transform_data_with_urls()
    
    # 这里可以将转换后的数据存入数据库或缓存中
    # 例如,更新数据实例的 URL 数据
    data_instance.data = transformed_data
    data_instance.save()

# 触发任务
generate_attachment_urls.delay(data_instance.id)

在前端请求时,你可以返回一个响应,告诉用户数据正在处理,并且稍后获取已处理的结果。

4. 预处理附件 URL

如果附件的文件 URL 是根据固定的规则生成的(例如基于文件名),你可以在数据导入时(或定期任务中)预计算并存储 URL。

class Data(models.Model):
    name = models.CharField(max_length=255)
    data = models.JSONField()

    def save(self, *args, **kwargs):
        # 在保存时计算所有附件 URL 并将其存储在 data 字段中
        if 'attachments' in self.data:
            for attachment in self.data['attachments']:
                file_name = attachment.get('file_name', '')
                if file_name:
                    attachment['url'] = self.get_absolute_url(file_name)
        super().save(*args, **kwargs)

5. 使用更高效的 JSON 存储

对于大量嵌套的 JSON 数据,可以考虑使用 PostgreSQLJSONB 类型字段,它比普通的 JSON 类型更加高效,支持更复杂的查询和索引。

总结

如果数据量较大,优化的关键是减少不必要的计算和查询,使用分页、缓存、后台任务等技术来提升性能。递归的操作可能会在大量数据时出现性能瓶颈,因此在生成 URL 的时候,可以考虑预计算或异步处理。


如果选择在保存数据时将绝对 URL直接计算并存储到数据库中,而不是每次查询时动态计算,将大大减轻查询和处理时的性能压力。这种方法会把计算和生成 URL 的开销转移到数据写入(或更新)阶段,这样在查询时就可以直接从数据库获取已经包含 URL 的数据。

优势:

  1. 查询效率提升:查询时不再需要每次生成 URL,直接从数据库获取数据即可。
  2. 避免重复计算:每次查询都不需要再次计算文件的 URL,减少了计算资源的消耗。
  3. 减少响应时间:前端直接获得已处理好的数据,提升用户体验。

解决方案:

  1. 在保存时计算附件 URL:在数据存储时,检查数据中的附件字段,并生成相应的绝对 URL,直接存储在数据表中。
  2. 更新数据时也计算 URL:如果数据发生变更,尤其是涉及到附件的字段时,应该再次计算并更新附件的 URL。

实现代码

1. 数据模型

我们在 Data 模型中创建一个方法来处理附件 URL 的生成,在数据保存时调用该方法。

from django.db import models

class Data(models.Model):
    name = models.CharField(max_length=255)
    data = models.JSONField()  # 存储嵌套 JSON 数据
    created_at = models.DateTimeField(auto_now_add=True)

    def get_absolute_url(self, file_path):
        """
        假设我们使用 S3 存储文件,生成绝对 URL
        """
        base_url = "https://s3.amazonaws.com/my-bucket/"
        return base_url + file_path

    def save_attachments_with_urls(self):
        """
        在数据保存时处理所有附件路径,生成绝对 URL 并更新数据。
        """
        def recursive_transform(data):
            """
            递归遍历 JSON 数据,查找并生成附件的绝对 URL。
            """
            if isinstance(data, dict):
                for key, value in data.items():
                    if isinstance(value, dict):
                        recursive_transform(value)
                    elif isinstance(value, list):
                        if key == 'attachments':  # 假设附件是一个列表,存在于 'attachments' 键中
                            for attachment in value:
                                if 'file_name' in attachment:
                                    file_name = attachment['file_name']
                                    attachment['url'] = self.get_absolute_url(file_name)
                    else:
                        continue
            elif isinstance(data, list):
                for item in data:
                    recursive_transform(item)

        # 执行递归转换,处理 data 字段中的附件路径
        recursive_transform(self.data)

    def save(self, *args, **kwargs):
        # 在保存数据前,处理附件并添加 URL
        self.save_attachments_with_urls()
        super().save(*args, **kwargs)

2. 数据的保存

在调用 save 方法时,系统会自动检查 data 字段中的附件数据,并为每个附件生成对应的绝对 URL,然后保存到数据库中。

# 示例:保存一个新的数据实例
data_instance = Data(
    name="示例数据",
    data={
        "大类1": {
            "小类1": {
                "参数1": "值1",
                "attachments": [
                    {"file_name": "file1.pdf"},
                    {"file_name": "file2.pdf"}
                ]
            }
        }
    }
)

data_instance.save()  # 保存时会自动生成附件 URL

3. 数据序列化

在 DRF 中,创建一个序列化器来展示保存的已处理数据。

from rest_framework import serializers

class DataSerializer(serializers.ModelSerializer):
    class Meta:
        model = Data
        fields = ['name', 'data', 'created_at']

    def to_representation(self, instance):
        """
        重写 `to_representation` 方法,返回处理后的数据。
        """
        data = super().to_representation(instance)
        return data

4. 查询接口

from rest_framework.views import APIView
from rest_framework.response import Response

class DataView(APIView):
    def get(self, request, *args, **kwargs):
        # 获取数据实例
        data_instance = Data.objects.first()  # 示例:获取第一条数据

        # 使用序列化器将数据转化为 JSON 格式
        serializer = DataSerializer(data_instance)

        # 返回数据
        return Response(serializer.data)

5. 查询结果

对于上述保存的数据,查询接口返回的数据将包含附件的绝对 URL:

{
    "name": "示例数据",
    "data": {
        "大类1": {
            "小类1": {
                "参数1": "值1",
                "attachments": [
                    {"file_name": "file1.pdf", "url": "https://s3.amazonaws.com/my-bucket/file1.pdf"},
                    {"file_name": "file2.pdf", "url": "https://s3.amazonaws.com/my-bucket/file2.pdf"}
                ]
            }
        }
    },
    "created_at": "2024-10-28T12:00:00Z"
}

6. 总结

  1. 在数据保存时生成附件 URL:通过 save_attachments_with_urls 方法,我们在保存数据时预先生成了每个附件的 URL,避免每次查询时都计算 URL。
  2. 递归处理:数据中的附件路径(如果存在)会被递归地处理,适应多层嵌套结构。
  3. 性能优化:在保存数据时进行附件 URL 的处理,查询时直接使用已存储的结果,避免了查询时重复计算。

注意事项:

  • 附件 URL 存储的文件路径:你可以根据具体情况定制 get_absolute_url 方法中的路径生成规则(例如,文件存储在 S3、OSS 或本地文件系统等)。
  • 数据更新:如果数据中的附件字段发生变动(例如文件名变了),需要再次调用 save_attachments_with_urls 方法更新 URL。

这种方法非常适合数据频繁查询且不需要动态生成附件 URL 的场景,避免了每次查询时的计算开销。


在使用 bulk_create 批量创建数据时,由于它是一次性将多个对象插入到数据库中,因此无法像普通的 save() 方法一样对每个对象进行预处理。在 bulk_create 中,通常无法在插入时执行像计算附件 URL 这样的操作。

问题

当你使用 bulk_create 时,由于 bulk_create 会跳过模型的 save() 方法,所以不能在保存数据时直接触发附件 URL 的计算。

解决方案

  1. 提前处理数据:在调用 bulk_create 之前,先预处理所有数据,确保每个对象的数据已经包含了附件的绝对 URL。你可以在插入之前先遍历所有数据,将附件的 URL 填充到数据中。

  2. 通过批量更新:如果数据已经存在,并且只是需要更新附件 URL,使用 bulk_update 来批量更新 URL 字段。

具体实现

1. 处理数据的函数

首先,定义一个函数,在将数据传递给 bulk_create 之前,处理所有附件的 URL。

def process_attachments_with_urls(data_list):
    """
    批量处理数据中的附件,并生成绝对 URL。
    :param data_list: 包含多个 Data 实例的列表
    """
    for data_instance in data_list:
        def recursive_transform(data):
            """递归遍历数据,查找并生成附件 URL"""
            if isinstance(data, dict):
                for key, value in data.items():
                    if isinstance(value, dict):
                        recursive_transform(value)
                    elif isinstance(value, list):
                        if key == 'attachments':  # 假设附件字段为 'attachments'
                            for attachment in value:
                                if 'file_name' in attachment:
                                    file_name = attachment['file_name']
                                    attachment['url'] = data_instance.get_absolute_url(file_name)
                    else:
                        continue
            elif isinstance(data, list):
                for item in data:
                    recursive_transform(item)

        # 处理 data 字段中的附件
        recursive_transform(data_instance.data)

2. 批量创建数据

在准备好数据之后,使用 bulk_create 批量创建数据。

from django.db import models

class Data(models.Model):
    name = models.CharField(max_length=255)
    data = models.JSONField()  # 存储嵌套 JSON 数据
    created_at = models.DateTimeField(auto_now_add=True)

    def get_absolute_url(self, file_path):
        """
        假设我们使用 S3 存储文件,生成绝对 URL
        """
        base_url = "https://s3.amazonaws.com/my-bucket/"
        return base_url + file_path


# 假设我们有多个 Data 实例的数据
data_instances = [
    Data(name="示例数据1", data={
        "大类1": {
            "小类1": {
                "参数1": "值1",
                "attachments": [
                    {"file_name": "file1.pdf"},
                    {"file_name": "file2.pdf"}
                ]
            }
        }
    }),
    Data(name="示例数据2", data={
        "大类2": {
            "小类2": {
                "参数2": "值2",
                "attachments": [
                    {"file_name": "file3.pdf"},
                    {"file_name": "file4.pdf"}
                ]
            }
        }
    }),
]

# 处理数据中的附件并生成 URL
process_attachments_with_urls(data_instances)

# 批量创建数据
Data.objects.bulk_create(data_instances)

3. 使用 bulk_update 更新数据中的附件 URL(可选)

如果你已经使用 bulk_create 创建了数据,并且后续某些附件 URL 需要更新,你可以使用 bulk_update 来批量更新这些 URL。比如,当附件的文件路径变化时,你可以通过以下方式更新:

# 假设 data_instances 是要更新的数据
data_instances = Data.objects.all()

# 处理数据中的附件并生成新的 URL
process_attachments_with_urls(data_instances)

# 批量更新数据中的附件 URL
Data.objects.bulk_update(data_instances, ['data'])  # 更新 data 字段

4. 优化考虑

  • 处理大数据量:如果你需要批量处理非常大的数据集,bulk_createbulk_update 在性能上会比逐条 save() 快很多,但依然需要确保数据处理部分不会导致内存溢出。如果数据量非常大,可以考虑对数据进行分页或分批处理。
  • 减少数据库 I/Obulk_createbulk_update 都能够减少数据库 I/O,因为它们会在一次请求中执行所有的插入或更新操作。

总结

  1. 提前处理数据:在调用 bulk_create 前,确保所有的附件 URL 已经被计算并存储到数据实例中。
  2. 使用递归方法:递归遍历数据中的 JSON 字段,确保每个附件字段都能正确生成 URL。
  3. 性能考虑:在数据量大时,考虑分页或分批处理,以免造成内存占用过多。

通过这种方式,你能够在 bulk_create 过程中处理复杂的数据逻辑,并保证数据正确性,同时提升数据插入的性能。


如果附件和数据一起从前端传递到后端进行保存,通常会有两种情况:一种是附件作为文件上传,另一种是附件作为文件名或路径传递。在这两种情况下,我们需要分别处理文件上传和路径传递。

场景说明:

  1. 附件作为文件上传:前端将文件上传到后端,后端将其存储(例如存储到 S3 或本地文件系统),并将文件的 URL 存储到数据库。
  2. 附件作为文件名或路径:前端直接传递文件名或文件路径,后端根据文件存储位置生成 URL 并保存。

下面我们提供两种场景的实现方法,并展示前后端代码。


1. 附件作为文件上传(上传文件)

后端实现

假设我们使用 Django 的 FileFieldImageField 来存储附件,前端会通过 multipart/form-data 格式上传文件。

1.1. 数据模型

在 Django 中,使用 FileFieldImageField 来处理附件上传,文件会保存在服务器或云存储中。

from django.db import models

class Data(models.Model):
    name = models.CharField(max_length=255)
    data = models.JSONField()  # 存储嵌套的 JSON 数据
    created_at = models.DateTimeField(auto_now_add=True)
    
    # 附件字段,可以处理单个文件
    attachments = models.JSONField(default=list)  # 附件可以以列表形式存储多个文件的路径/URL
    
    def save(self, *args, **kwargs):
        # 保存附件 URL
        self.save_attachments_with_urls()
        super().save(*args, **kwargs)
    
    def save_attachments_with_urls(self):
        """
        处理附件,生成绝对 URL
        """
        base_url = "https://s3.amazonaws.com/my-bucket/"
        for attachment in self.attachments:
            file_name = attachment.get("file_name")
            if file_name:
                # 假设附件是存储在 S3 上,可以通过文件路径生成 URL
                attachment["url"] = f"{base_url}{file_name}"

1.2. 数据接收与文件上传视图

在接收数据时,使用 Django Rest Framework 来处理文件上传。我们需要处理 multipart/form-data 格式的数据,并提取文件和其他字段。

from rest_framework import serializers
from rest_framework.parsers import MultiPartParser, FormParser
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .models import Data

class DataSerializer(serializers.ModelSerializer):
    class Meta:
        model = Data
        fields = ['name', 'data', 'attachments']  # 包含附件字段

class DataView(APIView):
    parser_classes = [MultiPartParser, FormParser]

    def post(self, request, *args, **kwargs):
        # 获取 JSON 数据和文件
        data = request.data  # 包含文件的字段将自动解析
        files = request.FILES  # 文件字段
        
        # 构建数据字典,将文件转移到合适的地方
        attachment_list = []
        for file_key, file in files.items():
            attachment_list.append({"file_name": file.name, "file": file})
        
        # 将附件添加到 data 字段
        data["attachments"] = attachment_list
        
        # 反序列化并保存数据
        serializer = DataSerializer(data=data)
        if serializer.is_valid():
            serializer.save()
            return Response(serializer.data, status=status.HTTP_201_CREATED)
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

前端实现

前端使用 FormData 来处理文件上传,并通过 POST 请求发送到后端。

1.3. 前端代码(JavaScript)

function submitForm() {
    let formData = new FormData();

    // 将其他数据(如大类、小类、参数等)作为 JSON 传递
    let data = {
        name: "示例数据",
        data: {
            "大类1": {
                "小类1": {
                    "参数1": "值1"
                }
            }
        }
    };

    // 添加数据到 FormData
    formData.append('data', JSON.stringify(data));

    // 添加附件文件
    let fileInput = document.getElementById('fileInput');  // 假设文件输入框的 ID 为 fileInput
    for (let i = 0; i < fileInput.files.length; i++) {
        formData.append('attachments', fileInput.files[i]);
    }

    // 发送 POST 请求
    fetch('/api/data/', {
        method: 'POST',
        body: formData,
    })
    .then(response => response.json())
    .then(data => {
        console.log('成功:', data);
    })
    .catch((error) => {
        console.error('错误:', error);
    });
}

HTML 代码:

<form onsubmit="submitForm(); return false;">
    <input type="file" id="fileInput" multiple>
    <button type="submit">提交</button>
</form>

2. 附件作为文件名或路径传递(传递文件路径)

在这种情况下,前端直接传递文件路径或文件名,后端根据存储位置生成完整的 URL。

后端实现

2.1. 数据模型

与上面类似,但我们不再处理文件上传,而是直接接收文件路径。

class Data(models.Model):
    name = models.CharField(max_length=255)
    data = models.JSONField()  # 存储嵌套的 JSON 数据
    created_at = models.DateTimeField(auto_now_add=True)
    
    attachments = models.JSONField(default=list)  # 附件字段(存储文件路径)

    def save(self, *args, **kwargs):
        self.save_attachments_with_urls()
        super().save(*args, **kwargs)

    def save_attachments_with_urls(self):
        """
        处理附件路径,生成绝对 URL
        """
        base_url = "https://s3.amazonaws.com/my-bucket/"
        for attachment in self.attachments:
            file_name = attachment.get("file_name")
            if file_name:
                # 如果前端传递的是文件路径,生成 URL
                attachment["url"] = f"{base_url}{file_name}"

2.2. 数据接收与处理视图

class DataView(APIView):
    def post(self, request, *args, **kwargs):
        data = request.data  # 获取请求中的数据
        attachments = data.get("attachments", [])
        
        # 如果附件是文件路径(而非文件),处理它们并生成 URL
        for attachment in attachments:
            file_name = attachment.get("file_name")
            if file_name:
                attachment["url"] = f"https://s3.amazonaws.com/my-bucket/{file_name}"

        # 将附件添加到数据
        data["attachments"] = attachments
        
        # 反序列化并保存数据
        serializer = DataSerializer(data=data)
        if serializer.is_valid():
            serializer.save()
            return Response(serializer.data, status=status.HTTP_201_CREATED)
        return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

前端实现

前端直接传递文件名或路径,无需上传文件。

function submitForm() {
    let data = {
        name: "示例数据",
        data: {
            "大类1": {
                "小类1": {
                    "参数1": "值1"
                }
            }
        },
        attachments: [
            { "file_name": "file1.pdf" },
            { "file_name": "file2.pdf" }
        ]
    };

    fetch('/api/data/', {
        method: 'POST',
        headers: {
            'Content-Type': 'application/json'
        },
        body: JSON.stringify(data)
    })
    .then(response => response.json())
    .then(data => {
        console.log('成功:', data);
    })
    .catch((error) => {
        console.error('错误:', error);
    });
}

总结

1. 附件作为文件上传

  • 前端:使用 FormData 上传文件,其他数据作为 JSON 传递。
  • 后端:使用 Django 的 FileField 处理文件上传,生成文件的绝对 URL 后存储。

2. 附件作为文件名或路径传递

  • 前端:直接传递文件名或路径。
  • 后端:根据传递的路径生成文件的绝对 URL,并将其保存在数据库中。

通过这种方式,无论是上传文件还是传递文件路径,数据和附件都可以顺利处理,并且附件的 URL 会被存储到数据库中供后续查询。


理解了你的需求,你想把前端传递的数据(如附件文件名)处理成符合层级结构的格式,并存储到数据库中。你已经有了数据结构,其中附件的值是文件名的数组,而文件名需要上传到 S3 并存储到附件表中,最后在保存到数据库时,将附件文件的 S3 存储路径存储到对应的大类、小类、参数数据中。

解决方案概述:

  1. 解析前端数据:你传递的结构数据是按层级组织的,包括大类、小类、参数、附件。
  2. 处理附件文件:需要根据文件名从上传的文件中找到对应的文件,上传到 S3,并返回 S3 键。
  3. 保存数据:将上传后的文件 S3 键存储在附件模型中,最终将数据按照层级结构存储,并在每个参数下插入附件的序列化对象。

代码实现:

1. 数据库模型

你已经有了一个 Attachment 模型存储附件,附件表的字段可能包含 file_name, s3objectkeyfile_url(URL 地址),如:

class Attachment(models.Model):
    file_name = models.CharField(max_length=255)
    s3objectkey = models.CharField(max_length=255)  # S3 object key
    file_url = models.URLField()  # 完整的文件 URL

    def get_absolute_url(self):
        return self.file_url

2. 上传文件到 S3

假设你有一个 W3Storage 类来处理文件上传到 S3,并获取 s3objectkey

class W3Storage:
    def putobject(self, file):
        # 处理文件上传到 S3
        # 假设上传成功后返回 S3 object key
        return f"s3://bucket/{file.name}"

    def get_absolute_url(self, s3objectkey):
        # 根据 s3objectkey 生成文件的完整 URL
        return f"https://example.com/{s3objectkey}"

3. 处理数据与附件上传

接下来,你需要一个方法来处理数据并上传文件,下面是一个完整的序列化器实现:

class BulkDataItemSerializer(serializers.ListSerializer):
    """
    序列化器:批量处理数据项,包括文件的上传和层级数据的创建。
    """
    child = DataItemSerializer()

    def create(self, validated_data):
        """
        批量创建 DataItem 并处理文件上传。
        """
        storage = W3Storage()  # 初始化 S3 存储类
        new_items = []  # 存储创建的 DataItem 对象

        for item_data in validated_data:
            data_content = item_data.get('data_content', {})

            # 处理附件字段
            if 'attachments' in data_content:
                attachments = data_content['attachments']
                processed_attachments = {}

                # 处理每个附件项
                for attachment_key, file_names in attachments.items():
                    # file_names 是一个列表,可能是文件名的列表
                    s3_keys = []

                    for file_name in file_names:
                        # 从 request.FILES 获取文件对象
                        matched_file = self.context['files'].get(file_name)
                        if matched_file:
                            # 上传文件并获取 S3 键
                            s3objectkey = storage.putobject(matched_file)
                            # 获取文件的完整 URL
                            file_url = storage.get_absolute_url(s3objectkey)

                            # 创建附件并保存到数据库
                            attachment = Attachment.objects.create(
                                file_name=file_name,
                                s3objectkey=s3objectkey,
                                file_url=file_url
                            )

                            # 保存附件的 S3 键
                            s3_keys.append(attachment.get_absolute_url())

                    # 将上传后的文件的 URL 更新到 data_content 字段
                    processed_attachments[attachment_key] = s3_keys

                # 更新 data_content 中的附件信息
                data_content['attachments'] = processed_attachments
                item_data['data_content'] = data_content

            # 创建新的 DataItem 对象
            new_item = DataItem.objects.create(**item_data)
            new_items.append(new_item)

        return new_items

4. 解析层级数据并存储到数据库

你的前端数据结构看起来是这样的:

{
  "大类1": {
    "小类1": {
      "参数1": "值1",
      "附件": ["文件名1", "文件名2"]
    },
    "小类2": {
      "参数2": "值2",
      "附件": ["文件名3"]
    }
  },
  "大类2": {
    "小类3": {
      "参数3": "值3"
    }
  }
}

在这个数据中,每个大类下的小类,参数字段下可能会包含 附件 字段,这个字段是文件名的数组。

5. 前端文件上传

前端传递的数据格式应该是这样:

{
  "data_content": {
    "大类1": {
      "小类1": {
        "参数1": "值1",
        "附件": ["file1.jpg", "file2.jpg"]
      },
      "小类2": {
        "参数2": "值2",
        "附件": ["file3.jpg"]
      }
    }
  },
  "files": {
    "file1.jpg": "file1_object",
    "file2.jpg": "file2_object",
    "file3.jpg": "file3_object"
  }
}

其中 files 是前端上传的文件对象集合,文件名是键,文件内容是值(假设为文件对象)。

6. 完整的代码和注释

class BulkDataItemSerializer(serializers.ListSerializer):
    """
    列表序列化器:批量处理数据项,包括文件的上传和层级数据的创建。
    """
    child = DataItemSerializer()

    def create(self, validated_data):
        """
        批量创建 DataItem 并处理文件上传。
        """
        storage = W3Storage()  # 初始化 S3 存储类
        new_items = []  # 存储创建的 DataItem 对象

        # 遍历每条数据
        for item_data in validated_data:
            data_content = item_data.get('data_content', {})

            # 处理附件字段
            if 'attachments' in data_content:
                attachments = data_content['attachments']
                processed_attachments = {}

                # 处理每个附件项
                for attachment_key, file_names in attachments.items():
                    s3_keys = []  # 用于保存上传文件的 S3 键

                    # 遍历附件文件名
                    for file_name in file_names:
                        matched_file = self.context['files'].get(file_name)
                        if matched_file:
                            # 上传文件到 S3
                            s3objectkey = storage.putobject(matched_file)
                            file_url = storage.get_absolute_url(s3objectkey)

                            # 创建附件对象并保存
                            attachment = Attachment.objects.create(
                                file_name=file_name,
                                s3objectkey=s3objectkey,
                                file_url=file_url
                            )

                            # 保存文件的 S3 URL
                            s3_keys.append(attachment.get_absolute_url())

                    # 更新 data_content 字段中的附件信息
                    processed_attachments[attachment_key] = s3_keys

                data_content['attachments'] = processed_attachments
                item_data['data_content'] = data_content

            # 创建新的 DataItem 并保存
            new_item = DataItem.objects.create(**item_data)
            new_items.append(new_item)

        return new_items

关键步骤:

  1. 遍历数据:首先遍历每条数据,根据大类、小类、参数的层级结构存储数据。
  2. 处理附件:对于每条数据中的附件字段,判断它是单个文件还是多个文件,然后逐个文件上传到 S3。
  3. 上传文件到 S3:使用 W3Storage.putobject 方法上传文件并获取文件的 S3 键,然后在附件表中存储文件的 s3objectkeyfile_url
  4. 保存数据到数据库:处理完附件后,更新数据中的附件字段,最后创建并保存数据项(DataItem)到数据库。

结果:

前端的数据结构会按照层级顺序存储,并且每个附件字段会被替换成文件的 S3 URL。你可以在数据库中看到每个大类、小类、参数的数据,以及附件的 URL 信息。


这个方案通过添加 hash引用次数 字段来优化附件管理,避免重复上传和存储同样的文件,同时能有效管理附件的生命周期。以下是完整的实现思路及代码,包括数据库设计、文件上传、数据写入逻辑,以及删除或修改操作。


数据库设计

附件模型 (Attachment)

from django.db import models

class Attachment(models.Model):
    file_name = models.CharField(max_length=255)  # 原始文件名
    s3_object_key = models.CharField(max_length=512)  # S3 object key
    file_url = models.URLField()  # S3 文件绝对 URL
    file_hash = models.CharField(max_length=64, unique=True)  # 文件的唯一 Hash 值 (MD5/SHA256)
    reference_count = models.PositiveIntegerField(default=1)  # 引用次数

    def delete_s3_file(self):
        """
        删除 S3 中存储的文件。
        """
        from w3storage import W3Storage
        storage = W3Storage()
        storage.delete_object(self.s3_object_key)

    def delete(self, *args, **kwargs):
        """
        重写 delete 方法,确保删除时同步删除 S3 文件。
        """
        self.delete_s3_file()
        super().delete(*args, **kwargs)

文件处理逻辑

文件上传时的哈希计算

在上传文件到 S3 之前,先计算文件的哈希值 (hash),以检测是否有重复的文件。

import hashlib

def calculate_file_hash(file):
    """
    计算文件的哈希值 (SHA256)。
    :param file: 文件对象
    :return: 文件的 SHA256 哈希值
    """
    sha256 = hashlib.sha256()
    for chunk in file.chunks():
        sha256.update(chunk)
    return sha256.hexdigest()

数据写入逻辑

在数据写入时,检查 Attachment 表中是否已经存在相同的哈希值。如果存在,直接更新数据中的附件信息;否则,上传文件到 S3 并创建新的附件记录。

from .models import Attachment
from w3storage import W3Storage

def handle_attachments(data, files):
    """
    处理数据中的附件。
    :param data: 数据结构,包含附件字段
    :param files: 上传的文件字典
    :return: 处理后的数据结构
    """
    storage = W3Storage()

    def process_attachment(file):
        # 计算文件哈希
        file_hash = calculate_file_hash(file)

        # 检查是否已存在相同文件
        attachment, created = Attachment.objects.get_or_create(file_hash=file_hash, defaults={
            'file_name': file.name,
            's3_object_key': storage.putobject(file.name, file),
            'file_url': storage.get_absolute_url(file.name),
            'reference_count': 1,
        })

        if not created:
            # 如果文件已存在,增加引用次数
            attachment.reference_count += 1
            attachment.save()

        return {
            'file_name': attachment.file_name,
            'file_url': attachment.file_url,
            'hash': attachment.file_hash,
        }

    def traverse_and_process(data):
        # 递归处理层级数据结构中的附件
        for key, value in data.items():
            if isinstance(value, dict):
                traverse_and_process(value)
            elif key == "附件":
                processed_attachments = []
                for file_name in value:
                    if file_name in files:
                        processed_attachments.append(process_attachment(files[file_name]))
                data[key] = processed_attachments

    traverse_and_process(data)
    return data

示例调用

files = {
    "file1.jpg": uploaded_file1,
    "file2.jpg": uploaded_file2,
}

data = {
    "大类1": {
        "小类1": {
            "参数1": "值1",
            "附件": ["file1.jpg", "file2.jpg"]
        },
        "小类2": {
            "参数2": "值2",
            "附件": ["file1.jpg"]  # 重复文件
        }
    }
}

processed_data = handle_attachments(data, files)

修改和删除逻辑

修改引用次数

在数据被修改时,对附件的引用次数进行更新。如果引用次数为 0,则删除文件记录以及对应的 S3 文件。

def update_attachment_references(file_hashes_to_remove):
    """
    更新附件的引用次数。
    :param file_hashes_to_remove: 要减少引用次数的文件哈希列表
    """
    for file_hash in file_hashes_to_remove:
        try:
            attachment = Attachment.objects.get(file_hash=file_hash)
            attachment.reference_count -= 1
            if attachment.reference_count <= 0:
                attachment.delete()  # 自动删除 S3 文件
            else:
                attachment.save()
        except Attachment.DoesNotExist:
            continue

序列化器中的处理

如果使用 DRF 的序列化器,可以在 save 方法中调用上述逻辑。

from rest_framework import serializers

class NestedDataSerializer(serializers.Serializer):
    data = serializers.JSONField()
    files = serializers.DictField(child=serializers.FileField())

    def save(self, **kwargs):
        data = self.validated_data['data']
        files = self.validated_data['files']
        processed_data = handle_attachments(data, files)
        # 存储处理后的数据到数据库
        # Your model saving logic here
        return processed_data

删除操作示例

file_hashes_to_remove = ["existing_file_hash1", "existing_file_hash2"]
update_attachment_references(file_hashes_to_remove)

整体逻辑总结

  1. 哈希计算:使用文件的哈希值避免重复上传。
  2. 引用计数管理:每次新增文件时增加引用次数,修改或删除时减少引用次数。
  3. S3 文件删除:当引用次数为 0 时,删除附件记录和对应的 S3 文件。
  4. 数据与附件关联:通过 hash 和引用逻辑简化数据和文件的绑定关系。

在修改操作时,需要先对原有的附件数据进行处理:减少引用次数,并在引用为 0 时删除对应记录;然后处理新的附件数据,增加引用次数或创建新的记录。以下是完整的实现方案和代码。


修改操作逻辑

def update_attachments(existing_data, updated_data, files):
    """
    更新附件的引用计数,同时更新数据中的附件信息。
    :param existing_data: 修改前的数据,包含附件信息
    :param updated_data: 修改后的数据,包含附件信息
    :param files: 上传的文件字典
    :return: 处理后的新数据
    """
    def get_attachments(data):
        """
        提取数据中的所有附件哈希列表。
        :param data: 数据结构
        :return: 附件哈希列表
        """
        attachments = []

        def traverse(data):
            for key, value in data.items():
                if isinstance(value, dict):
                    traverse(value)
                elif key == "附件" and isinstance(value, list):
                    for item in value:
                        if isinstance(item, dict) and 'hash' in item:
                            attachments.append(item['hash'])

        traverse(data)
        return attachments

    # 1. 提取现有数据的附件哈希
    existing_hashes = set(get_attachments(existing_data))

    # 2. 使用新数据处理附件,获取新的数据
    updated_data = handle_attachments(updated_data, files)

    # 3. 提取新数据的附件哈希
    updated_hashes = set(get_attachments(updated_data))

    # 4. 计算需要减少引用的附件和新增引用的附件
    hashes_to_remove = existing_hashes - updated_hashes
    hashes_to_add = updated_hashes - existing_hashes

    # 5. 更新引用计数
    update_attachment_references(hashes_to_remove)

    for hash_to_add in hashes_to_add:
        try:
            attachment = Attachment.objects.get(file_hash=hash_to_add)
            attachment.reference_count += 1
            attachment.save()
        except Attachment.DoesNotExist:
            pass  # 这不应该发生,所有新附件应已创建

    return updated_data

修改操作调用示例

existing_data = {
    "大类1": {
        "小类1": {
            "参数1": "值1",
            "附件": [
                {"file_name": "file1.jpg", "file_url": "url1", "hash": "hash1"},
                {"file_name": "file2.jpg", "file_url": "url2", "hash": "hash2"}
            ]
        }
    }
}

updated_data = {
    "大类1": {
        "小类1": {
            "参数1": "新值1",
            "附件": [
                "file2.jpg",  # 仍然是旧文件
                "file3.jpg"   # 新增文件
            ]
        }
    }
}

files = {
    "file3.jpg": uploaded_file3
}

new_data = update_attachments(existing_data, updated_data, files)

修改逻辑步骤

  1. 提取原有数据中的附件:通过遍历原有数据,获取所有的附件哈希。
  2. 更新新数据的附件信息
    • 处理新数据中的附件,将文件上传到 S3,并生成附件记录。
    • 提取新数据的附件哈希列表。
  3. 对比引用差异
    • 减少引用次数:对 hashes_to_remove 中的附件,减少引用,引用为 0 时删除记录。
    • 增加引用次数:对 hashes_to_add 中的附件,增加引用次数。
  4. 返回更新后的数据:返回处理完毕的新数据结构。

删除逻辑的改进

删除数据时同样需要减少附件的引用次数,如果引用次数为 0,则删除附件记录和 S3 文件。

def delete_attachments(data):
    """
    删除数据中的附件引用。
    :param data: 数据结构
    """
    def get_attachments(data):
        attachments = []

        def traverse(data):
            for key, value in data.items():
                if isinstance(value, dict):
                    traverse(value)
                elif key == "附件" and isinstance(value, list):
                    for item in value:
                        if isinstance(item, dict) and 'hash' in item:
                            attachments.append(item['hash'])

        traverse(data)
        return attachments

    hashes_to_remove = set(get_attachments(data))
    update_attachment_references(hashes_to_remove)

调用示例:

data_to_delete = {
    "大类1": {
        "小类1": {
            "参数1": "值1",
            "附件": [
                {"file_name": "file1.jpg", "file_url": "url1", "hash": "hash1"}
            ]
        }
    }
}

delete_attachments(data_to_delete)

优化点

  1. 引用管理:通过 hash 对附件引用进行唯一标识,避免重复存储和上传。
  2. 批量处理bulk_updatebulk_create 可优化数据库操作。
  3. 并发支持:在高并发场景下,需要为 Attachment 表的 reference_count 字段增加乐观锁机制(如 F 表达式)。

这样设计和实现,可以确保附件在数据修改、删除时正确引用和释放,同时减少存储和网络传输的冗余。


// 统计rowspan并打平数据
function calculateRowspansAndFlatten(data) {
    // 计算rowspan的数据
    const dataWithRowspans = JSON.parse(JSON.stringify(data)); // 深拷贝以不影响原数据
    addRowspansToData(dataWithRowspans);
    // 打平数据
    const flattenedData = flattenData(data);
    return {
        dataWithRowspans,
        flattenedData
    };
}

// 计算并添加rowspan到原数据
function addRowspansToData(data) {
    for (const category in data) {
        if (!data.hasOwnProperty(category)) continue;
        
        let categoryRowspan = 0;
        const subcategories = data[category];
        
        for (const subcategory in subcategories) {
            if (!subcategories.hasOwnProperty(subcategory)) continue;
            
            // 计算小类的rowspan - 只计算参数的数量
            const subcategoryRowspan = Object.keys(subcategories[subcategory]).length;
            categoryRowspan += subcategoryRowspan;
            
            // 添加rowspan到小类
            subcategories[subcategory]._rowspan = subcategoryRowspan;
        }
        
        // 添加rowspan到大类
        data[category]._rowspan = categoryRowspan;
    }
}

// 打平数据并保留层级信息
function flattenData(data) {
    const flattened = [];
    
    // 处理参数和附件
    const processParams = (category, subcategory, params) => {
        for (const param in params) {
            if (!params.hasOwnProperty(param)) continue;
            
            const value = params[param];
            if (param === "附件" && Array.isArray(value)) {
                value.forEach(attachment => {
                    flattened.push({
                        level1: category,
                        level2: subcategory,
                        level3: param,
                        value: attachment
                    });
                });
            } else {
                flattened.push({
                    level1: category,
                    level2: subcategory,
                    level3: param,
                    value: value
                });
            }
        }
    };

    // 遍历数据结构
    for (const category in data) {
        if (!data.hasOwnProperty(category)) continue;
        
        for (const subcategory in data[category]) {
            if (!data[category].hasOwnProperty(subcategory)) continue;
            processParams(category, subcategory, data[category][subcategory]);
        }
    }
    
    return flattened;
}

// 处理数据
const { dataWithRowspans, flattenedData } = calculateRowspansAndFlatten(dataToDelete);