drf数据校验批量写入

Posted by Shallow Dreameron October 8, 2024

如果你已经在 Django 项目的设置中将每个请求设置为一个事务,结合你在序列化器中的实现可以确保事务的自动管理。这意味着每个请求都将在一个数据库事务中处理,以下是如何整合这个特性:

1. 在序列化器中进行逐一检查

你可以继续使用之前的自定义序列化器。在这里,我们不需要显式地使用 transaction.atomic(),因为 Django 会自动为每个请求处理事务。

2. 序列化器的实现

确保序列化器的 create 方法能够处理数据的检查、删除和插入。

from rest_framework import serializers
from your_app.models import YourModel, AnotherModel

class YourModelSerializer(serializers.ModelSerializer):
    class Meta:
        model = YourModel
        fields = '__all__'

    def validate(self, attrs):
        # 逐一检查每个字段的有效性
        if 'field_name' not in attrs or attrs['field_name'] == '':
            raise serializers.ValidationError({"field_name": "This field is required."})
        return attrs

    def create(self, validated_data):
        # 删除原数据
        YourModel.objects.all().delete()
        # 批量创建新数据
        return YourModel.objects.bulk_create([YourModel(**item) for item in validated_data])

class AnotherModelSerializer(serializers.ModelSerializer):
    class Meta:
        model = AnotherModel
        fields = '__all__'

    def validate(self, attrs):
        # 逐一检查另一个模型的字段有效性
        if 'another_field' not in attrs or attrs['another_field'] == '':
            raise serializers.ValidationError({"another_field": "This field is required."})
        return attrs

    def create(self, validated_data):
        # 删除原数据
        AnotherModel.objects.all().delete()
        # 批量创建新数据
        return AnotherModel.objects.bulk_create([AnotherModel(**item) for item in validated_data])

3. 视图的实现

在视图中,你可以简单地调用序列化器并处理响应。在每个请求中,事务会自动管理。

from rest_framework import viewsets
from your_app.models import YourModel, AnotherModel
from your_app.serializers import YourModelSerializer, AnotherModelSerializer
from rest_framework.response import Response
from rest_framework import status

class YourModelViewSet(viewsets.ModelViewSet):
    queryset = YourModel.objects.all()
    serializer_class = YourModelSerializer

    def create(self, request, *args, **kwargs):
        # 批量处理 YourModel 的数据
        your_model_data = request.data.get('your_model_data', [])
        another_model_data = request.data.get('another_model_data', [])

        # 处理 YourModel 的数据
        your_model_serializer = YourModelSerializer(data=your_model_data, many=True)
        try:
            your_model_serializer.is_valid(raise_exception=True)
        except serializers.ValidationError as e:
            return Response(e.detail, status=status.HTTP_400_BAD_REQUEST)

        # 处理 AnotherModel 的数据
        another_model_serializer = AnotherModelSerializer(data=another_model_data, many=True)
        try:
            another_model_serializer.is_valid(raise_exception=True)
        except serializers.ValidationError as e:
            return Response(e.detail, status=status.HTTP_400_BAD_REQUEST)

        # 删除原数据和插入新数据
        your_model_serializer.create(your_model_serializer.validated_data)
        another_model_serializer.create(another_model_serializer.validated_data)

        return Response({
            "your_model": your_model_serializer.data,
            "another_model": another_model_serializer.data
        }, status=status.HTTP_201_CREATED)

4. 错误处理与事务管理

如果在 validatecreate 方法中抛出 ValidationError,Django 会自动回滚事务。这意味着任何未通过验证的数据不会被保存到数据库中。

总结

  • 事务管理:由于你已经在设置中为每个请求配置了事务管理,Django 将自动处理所有事务。
  • 逐一检查:在序列化器中使用 validate 方法逐一检查数据有效性。
  • 删除和插入:在序列化器的 create 方法中实现删除原数据并插入新数据。

这种方法确保了你的应用能够在保持数据一致性的同时,进行批量操作和错误处理。


在这种情况下,由于数据已经通过 openpyxl 解析并存储在 Redis 中,你可以在序列化器中从 Redis 读取数据并进行校验。校验时,如果发现错误,仍然需要返回单元格编号及错误信息;如果所有数据都符合要求,则返回校验通过的结果。

实现步骤:

  1. 数据存储在 Redis:Excel 文件通过 openpyxl 解析后,结果会存入 Redis,每个数据项都包含单元格的编号(例如 A1, B2)和其值。
  2. 序列化器校验:在序列化器中读取 Redis 中存储的解析数据,然后通过自定义的校验函数逐一校验每个单元格的值。
  3. 返回结果:校验通过时返回所有数据,如果校验失败,则返回错误信息和对应的单元格编号。

示例代码

1. 存储数据到 Redis

你可以将解析后的数据结构化存储在 Redis 中,例如使用 json 格式或 hash 结构,每个单元格数据都带有其编号。

import redis
import json
from openpyxl import load_workbook

# 初始化 Redis 连接
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

def parse_and_store_xlsx(file_path, redis_key):
    workbook = load_workbook(file_path)
    sheet = workbook.active
    parsed_data = []

    for row in sheet.iter_rows(min_row=1, max_row=sheet.max_row, min_col=1, max_col=sheet.max_column):
        for cell in row:
            cell_data = {
                'cell_reference': cell.coordinate,  # 单元格编号
                'value': cell.value  # 单元格值
            }
            parsed_data.append(cell_data)

    # 将解析的数据存入 Redis
    redis_client.set(redis_key, json.dumps(parsed_data))

2. 序列化器中的校验

在序列化器中,从 Redis 获取存储的数据并逐一校验,返回结果或错误。

from rest_framework import serializers
import redis
import json

# 初始化 Redis 连接
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

def validate_cell_value(value, cell_reference):
    """
    校验单元格值的函数
    """
    errors = []
    
    # 示例:期望值为数字类型
    if not isinstance(value, (int, float)):
        errors.append(f"Invalid value at {cell_reference}: Expected a number, got {type(value).__name__}")
    
    return errors

class RedisDataValidationSerializer(serializers.Serializer):
    """
    序列化器,用于从 Redis 获取数据并校验
    """
    redis_key = serializers.CharField()

    def validate(self, data):
        redis_key = data.get('redis_key')

        # 从 Redis 中获取数据
        raw_data = redis_client.get(redis_key)
        if not raw_data:
            raise serializers.ValidationError("No data found in Redis for the given key")

        # 解析 Redis 中的数据
        parsed_data = json.loads(raw_data)
        errors = []
        validated_data = []

        # 校验数据
        for cell_data in parsed_data:
            cell_value = cell_data.get('value')
            cell_reference = cell_data.get('cell_reference')

            # 调用校验函数
            cell_errors = validate_cell_value(cell_value, cell_reference)

            if cell_errors:
                errors.extend(cell_errors)
            else:
                # 如果校验通过,保存有效数据
                validated_data.append({
                    "cell": cell_reference,
                    "value": cell_value
                })

        if errors:
            # 返回错误信息和单元格编号
            raise serializers.ValidationError(errors)

        # 返回校验后的数据
        return validated_data

3. 视图中的实现

使用序列化器来校验 Redis 中的数据:

from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status

class ValidateRedisDataView(APIView):
    def post(self, request, *args, **kwargs):
        serializer = RedisDataValidationSerializer(data=request.data)
        
        if serializer.is_valid():
            validated_data = serializer.validated_data
            return Response(validated_data, status=status.HTTP_200_OK)
        else:
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

4. 前端请求

在前端发送请求时,传递 Redis 的 key:

const redisKey = "your-redis-key";

fetch('/api/validate-redis-data/', {
    method: 'POST',
    headers: {
        'Content-Type': 'application/json',
    },
    body: JSON.stringify({ redis_key: redisKey })
})
.then(response => response.json())
.then(data => console.log(data))
.catch(error => console.error(error));

总结

  • Redis 存储:先通过 openpyxl 解析 Excel 文件,将数据存储到 Redis。
  • 序列化器校验:序列化器从 Redis 获取解析后的数据,并逐一调用校验函数校验每个单元格。
  • 返回结果:校验失败时返回带有单元格编号的错误信息;校验通过时返回所有经过验证的数据。

这样,你可以方便地从 Redis 读取并校验数据,同时跟踪每个单元格的编号和错误信息。


你已经将校验逻辑封装在类的 __call__ 方法中,这是一个很好的设计。接下来,我会帮助你将这一逻辑融入到当前方案中,使其适用于你基于类的校验器。

假设的校验器类设计

你已经将校验逻辑放在了类的 __call__ 方法中,意味着你可以像调用函数一样调用该类。这里是一个可能的校验器类的结构:

class DataValidator:
    def __call__(self, cell_value, cell_reference):
        """
        校验单元格的值,并返回错误信息(如果有)
        """
        # 示例校验逻辑(非空校验)
        if cell_value is None:
            return {
                "cell": cell_reference,
                "error": f"Value is missing at {cell_reference}"
            }
        return None

1. 修改文件解析和校验的逻辑

将校验器类的调用整合到文件解析过程中。

import redis
import hashlib
import json
from openpyxl import load_workbook

# 初始化 Redis 连接
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

class DataValidator:
    def __call__(self, cell_value, cell_reference):
        """
        校验单元格的值,并返回错误信息(如果有)
        """
        if cell_value is None:
            return {
                "cell": cell_reference,
                "error": f"Value is missing at {cell_reference}"
            }
        return None

def hash_file(file):
    """
    计算文件的二进制哈希值,生成 Redis key
    """
    file_hash = hashlib.md5()
    file_hash.update(file.read())
    file.seek(0)  # 重置文件指针,确保文件后续可以被正确读取
    return file_hash.hexdigest()

def parse_and_validate_xlsx(file, validator):
    """
    解析并校验文件数据,使用传入的 validator 进行校验
    """
    workbook = load_workbook(file)
    sheet = workbook.active
    errors = []
    validated_data = []
    
    for row in sheet.iter_rows(min_row=1, max_row=sheet.max_row, min_col=1, max_col=sheet.max_column):
        for cell in row:
            cell_value = cell.value
            cell_reference = cell.coordinate  # 单元格编号
            
            # 使用 validator 进行校验
            error = validator(cell_value, cell_reference)
            if error:
                errors.append(error)
            else:
                validated_data.append({
                    "cell": cell_reference,
                    "value": cell_value
                })
    
    redis_key = hash_file(file)
    
    if errors:
        # 如果有错误,存储错误信息到 Redis
        redis_client.set(redis_key, json.dumps(errors))
        return False  # 返回信号表明有错误
    else:
        # 校验通过,存储校验后的数据
        redis_client.set(redis_key, json.dumps(validated_data))
        return True  # 返回信号表明数据全部合格

2. 修改视图中的实现

在视图中实例化你的校验器类,并将其传递到文件解析函数中:

from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status

class UploadAndValidateExcelView(APIView):
    def post(self, request, *args, **kwargs):
        file = request.FILES.get('file')
        if not file:
            return Response({"error": "No file uploaded"}, status=status.HTTP_400_BAD_REQUEST)
        
        # 实例化校验器
        validator = DataValidator()
        
        # 解析并校验文件
        all_valid = parse_and_validate_xlsx(file, validator)
        
        # 获取校验结果
        redis_key = hash_file(file)
        validation_result = get_validation_result(redis_key)
        
        if all_valid:
            return Response({"status": "success", "data": validation_result}, status=status.HTTP_200_OK)
        else:
            return Response({"status": "error", "errors": validation_result}, status=status.HTTP_400_BAD_REQUEST)

总结:

  • 校验器封装:你可以将校验逻辑集中到 __call__ 方法中,通过类实例化并直接调用。
  • 文件解析与校验结合:在文件解析时,传入校验器并调用它来检查每个单元格的数据。
  • Redis 存储结果:通过 Redis 存储校验结果,分别保存通过的有效数据或错误信息。

这样,校验逻辑的扩展和维护都变得更加方便和灵活。