文件数据库导入方案

Posted by Shallow Dreameron February 6, 2025

根据需求设计如下Django数据库模型方案,满足高扩展性和权限控制要求:

from django.db import models
from django.dispatch import receiver
from django.db.models.signals import pre_delete

class GlobalCollection(models.Model):
    """全局收藏表(所有应用共享)"""
    UPLOAD_SOURCE_CHOICES = [
        ('local', '本地上传'),
        ('project', '项目收藏'),
        ('third_party', '第三方平台')
    ]
    
    file_identifier = models.CharField(max_length=255, unique=True, verbose_name="文件唯一标识")
    source_platform = models.CharField(max_length=50, choices=UPLOAD_SOURCE_CHOICES)
    original_url = models.URLField(null=True, blank=True, verbose_name="原始链接")
    created_at = models.DateTimeField(auto_now_add=True)
    meta_data = models.JSONField(default=dict, verbose_name="元数据")

    class Meta:
        db_table = 'global_collection'
        verbose_name = '全局收藏'

class AppSpecificStorage(models.Model):
    """应用私有存储表(每个应用独立)"""
    app_name = models.CharField(max_length=50, verbose_name="应用名称")
    global_file = models.ForeignKey(GlobalCollection, on_delete=models.CASCADE)
    local_path = models.FileField(upload_to='app_files/', verbose_name="本地存储路径")
    last_modified = models.DateTimeField(auto_now=True)

    class Meta:
        db_table = 'app_specific_storage'
        unique_together = ('app_name', 'global_file')

class ParameterFile(models.Model):
    """参数文件主表"""
    STATUS_CHOICES = [
        ('active', '生效中'),
        ('deleted', '已删除')
    ]
    
    file_name = models.CharField(max_length=255)
    version = models.PositiveIntegerField(default=1)
    app = models.CharField(max_length=50, verbose_name="所属应用")
    status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='active')
    collection_ref = models.OneToOneField(GlobalCollection, null=True, on_delete=models.SET_NULL)
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        db_table = 'parameter_files'
        indexes = [models.Index(fields=['app', 'status'])]

class ParameterHistory(models.Model):
    """参数删除历史表"""
    original_file = models.ForeignKey(ParameterFile, on_delete=models.CASCADE)
    deleted_at = models.DateTimeField(auto_now_add=True)
    snapshot = models.JSONField(verbose_name="文件快照")

    class Meta:
        db_table = 'parameter_history'

class ThirdPartyPermission(models.Model):
    """第三方平台权限控制表"""
    PERMISSION_LEVELS = [
        ('read', '只读'),
        ('write', '可写入'),
        ('admin', '完全控制')
    ]
    
    app_name = models.CharField(max_length=50)
    platform_name = models.CharField(max_length=50)
    access_token = models.CharField(max_length=255)
    permission_level = models.CharField(max_length=10, choices=PERMISSION_LEVELS)
    expires_at = models.DateTimeField()

    class Meta:
        db_table = 'third_party_permissions'
        unique_together = ('app_name', 'platform_name')

# 信号处理:删除时记录历史
@receiver(pre_delete, sender=ParameterFile)
def create_deletion_history(sender, instance, **kwargs):
    ParameterHistory.objects.create(
        original_file=instance,
        snapshot={
            'file_name': instance.file_name,
            'version': instance.version,
            'app': instance.app
        }
    )

模型详细说明:

  1. GlobalCollection(全局收藏表)
    • 核心字段:file_identifier(唯一标识)、source_platform(来源平台类型)
    • 设计特点:使用JSONField存储元数据,支持灵活扩展
    • 作用:作为全平台文件统一注册中心,其他表通过外键引用
  2. AppSpecificStorage(应用私有存储)
    • 核心字段:app_name(应用标识)、local_path(本地存储路径)
    • 设计特点:与GlobalCollection建立外键关联,实现”一次收藏,多应用复制”
    • 唯一约束:确保同一应用不会重复存储相同文件
  3. ParameterFile(参数文件主表)
    • 版本控制:通过version字段实现简单版本管理
    • 状态管理:status字段跟踪文件生命周期
    • 关联收藏:与GlobalCollection建立一对一关系,支持文件溯源
  4. ParameterHistory(删除历史)
    • 数据持久化:使用JSONField保存删除时的完整快照
    • 自动记录:通过Django信号自动触发历史记录
  5. ThirdPartyPermission(第三方权限控制)
    • 权限分级:定义三种访问级别,满足不同场景需求
    • 时效控制:expires_at字段实现权限有效期管理

扩展性设计:

  • 多平台支持:通过source_platform字段区分不同来源,未来新增平台只需扩展枚举值
  • 元数据扩展:所有模型的JSONField为未来新增字段提供存储空间,避免频繁修改表结构
  • 权限系统:独立的ThirdPartyPermission表实现细粒度权限控制,可扩展OAuth等认证方式

典型工作流程:

  1. 用户上传文件到全局收藏表
  2. 各应用从全局表复制文件到自己的存储表
  3. 参数修改通过版本号跟踪,删除操作自动记录历史
  4. 第三方数据访问前检查权限表中配置的有效权限

该方案通过合理的表结构设计和Django高级功能的使用,在保证系统核心功能的同时,为未来扩展保留了充足的空间。


以下是针对文件存储在S3且支持跨表流转的优化Django模型方案:

from django.db import models
from django.dispatch import receiver
from django.db.models.signals import pre_delete
from storages.backends.s3boto3 import S3Boto3Storage

# 自定义S3存储路径(按应用隔离)
class AppS3Storage(S3Boto3Storage):
    def __init__(self, app_name=None, **kwargs):
        self.app_name = app_name
        super().__init__(**kwargs)
    
    def get_valid_name(self, name):
        return f"{self.app_name}/{name}"

# 核心文件元数据表(S3文件中心)
class GlobalFileRegistry(models.Model):
    FILE_CATEGORIES = [
        ('param', '参数文件'),
        ('sim', '仿真文件'),
        ('result', '计算结果')
    ]
    
    file_key = models.CharField(max_length=255, unique=True, verbose_name="S3对象键")
    category = models.CharField(max_length=20, choices=FILE_CATEGORIES)
    upload_source = models.CharField(max_length=20)  # 复用原有来源类型
    owner_app = models.CharField(max_length=50, verbose_name="所属应用")
    s3_bucket = models.CharField(max_length=63, default="main-bucket")
    metadata = models.JSONField(default=dict)
    created_at = models.DateTimeField(auto_now_add=True)
    is_public = models.BooleanField(default=False)

    class Meta:
        db_table = 'global_file_registry'
        indexes = [models.Index(fields=['owner_app', 'category'])]

# 跨应用文件引用表
class CrossAppReference(models.Model):
    REFERENCE_TYPES = [
        ('param2sim', '参数->仿真'),
        ('sim2result', '仿真->结果')
    ]
    
    source_file = models.ForeignKey(
        GlobalFileRegistry,
        related_name='outgoing_refs',
        on_delete=models.CASCADE
    )
    target_app = models.CharField(max_length=50)
    target_file = models.ForeignKey(
        GlobalFileRegistry,
        related_name='incoming_refs',
        on_delete=models.CASCADE
    )
    reference_type = models.CharField(max_length=20, choices=REFERENCE_TYPES)
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        db_table = 'cross_app_references'
        unique_together = ('source_file', 'target_file')

# 参数模块专用表
class ParameterFile(models.Model):
    STATUS_CHOICES = [
        ('active', '生效中'),
        ('archived', '已归档')
    ]
    
    global_file = models.OneToOneField(
        GlobalFileRegistry,
        on_delete=models.CASCADE,
        related_name='param_meta'
    )
    version = models.PositiveIntegerField(default=1)
    status = models.CharField(max_length=10, choices=STATUS_CHOICES)
    dependencies = models.ManyToManyField(
        'self',
        symmetrical=False,
        through='ParameterDependency'
    )

    def s3_storage(self):
        return AppS3Storage(app_name="parameters")

    class Meta:
        db_table = 'parameter_files'

# 参数依赖关系表
class ParameterDependency(models.Model):
    parent = models.ForeignKey(
        ParameterFile,
        related_name='child_dependencies',
        on_delete=models.CASCADE
    )
    child = models.ForeignKey(
        ParameterFile,
        related_name='parent_dependencies',
        on_delete=models.CASCADE
    )
    required = models.BooleanField(default=True)

    class Meta:
        db_table = 'parameter_dependencies'

# 仿真模块专用表
class SimulationTask(models.Model):
    global_file = models.ForeignKey(
        GlobalFileRegistry,
        on_delete=models.CASCADE,
        related_name='sim_tasks'
    )
    param_ref = models.ForeignKey(
        ParameterFile,
        null=True,
        on_delete=models.SET_NULL
    )
    status = models.CharField(max_length=20)
    
    def s3_storage(self):
        return AppS3Storage(app_name="simulations")

    class Meta:
        db_table = 'simulation_tasks'

# 信号处理:删除时同步清理S3文件
@receiver(pre_delete, sender=GlobalFileRegistry)
def cleanup_s3_files(sender, instance, **kwargs):
    from django.core.files.storage import default_storage
    try:
        default_storage.delete(instance.file_key)
    except Exception as e:
        logger.error(f"S3删除失败:{instance.file_key} - {str(e)}")

关键改进说明:

  1. S3存储集成
    • 通过AppS3Storage类实现按应用分目录存储
    • GlobalFileRegistry记录所有文件的S3位置和元数据
    • 自动清理机制确保删除记录时同步删除S3文件
  2. 跨表流转设计
    • CrossAppReference表显式记录文件引用关系
    • 各模块通过外键关联到全局注册表,同时保持自身业务字段独立
    • 参数文件支持多级依赖关系(通过中间表实现)
  3. 权限控制增强
    • is_public字段控制文件可见性范围
    • 各模块通过owner_app字段实现数据隔离
    • S3存储桶策略配合IAM实现细粒度访问控制
  4. 典型流转场景示例 ```python

    参数文件被仿真模块引用

    param_file = ParameterFile.objects.get(id=1) sim_file = GlobalFileRegistry.objects.create( file_key=”simulations/task123.sim”, category=”sim”, owner_app=”simulation_app” )

建立跨应用引用

CrossAppReference.objects.create( source_file=param_file.global_file, target_app=”simulation_app”, target_file=sim_file, reference_type=”param2sim” )

获取实际文件URL

sim_storage = sim_file.sim_tasks.first().s3_storage() url = sim_storage.url(sim_file.file_key)


### 扩展性设计:
- **存储扩展**:通过修改`AppS3Storage`类可支持多桶存储策略
- **类型扩展**:在`FILE_CATEGORIES`中添加新类型即可支持新文件类型
- **关系扩展**:`CrossAppReference`表可添加新的reference_type实现新流转场景

该方案通过全局注册表+S3存储的核心设计,既保持了各业务模块表的独立性,又通过明确的引用关系实现文件流转。配合Django存储框架和信号机制,确保整个系统具备良好的扩展性和可维护性。

------

以下是针对删除应用文件不影响其他应用的优化方案:

```python
from django.db import models
from django.db.models import F
from django.dispatch import receiver
from django.db.models.signals import pre_delete

class GlobalFileRegistry(models.Model):
    """增强版全局文件注册表"""
    file_key = models.CharField(max_length=255, unique=True)
    reference_count = models.PositiveIntegerField(default=0)  # 新增引用计数器
    s3_bucket = models.CharField(max_length=63)
    metadata = models.JSONField(default=dict)
    is_locked = models.BooleanField(default=False)  # 新增锁定标志

    def acquire_reference(self):
        """原子操作增加引用计数"""
        GlobalFileRegistry.objects.filter(pk=self.pk).update(
            reference_count=F('reference_count') + 1
        )
        self.refresh_from_db()

    def release_reference(self):
        """原子操作减少引用计数"""
        GlobalFileRegistry.objects.filter(pk=self.pk).update(
            reference_count=F('reference_count') - 1
        )
        self.refresh_from_db()

class AppFileLink(models.Model):
    """应用文件关联表(替代直接外键)"""
    app_name = models.CharField(max_length=50)
    global_file = models.ForeignKey(GlobalFileRegistry, on_delete=models.DO_NOTHING)
    local_metadata = models.JSONField(default=dict)
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        unique_together = ('app_name', 'global_file')

class ParameterFile(models.Model):
    """参数文件表(示例应用专用表)"""
    app_link = models.OneToOneField(AppFileLink, on_delete=models.CASCADE)
    version = models.PositiveIntegerField(default=1)
    status = models.CharField(max_length=20)
    
    @property
    def s3_url(self):
        return f"s3://{self.app_link.global_file.s3_bucket}/{self.app_link.global_file.file_key}"

class ParameterHistory(models.Model):
    """独立历史记录表"""
    app_name = models.CharField(max_length=50)  # 新增应用标识
    file_snapshot = models.JSONField()
    operation_time = models.DateTimeField(auto_now_add=True)

# 信号处理:删除应用关联时更新引用计数
@receiver(pre_delete, sender=AppFileLink)
def handle_app_file_delete(sender, instance, **kwargs):
    # 原子操作减少引用计数
    instance.global_file.release_reference()
    
    # 检查是否需要进行物理删除
    if instance.global_file.reference_count == 0 and not instance.global_file.is_locked:
        # 异步任务删除S3文件(使用Celery等)
        from .tasks import delete_s3_file
        delete_s3_file.delay(
            bucket=instance.global_file.s3_bucket,
            key=instance.global_file.file_key
        )
        instance.global_file.delete()

关键机制说明:

  1. 引用计数系统
    • 新增reference_count字段跟踪文件被引用的次数
    • 使用原子操作保证计数准确性(F表达式
    • 只有引用归零且未锁定时才执行物理删除
  2. 应用关联隔离设计
    • 通过AppFileLink中间表建立应用与文件的关联
    • 删除应用记录时只解除关联,不影响核心注册表
    • 唯一约束确保同一应用不会重复关联文件
  3. 历史记录隔离
    • 历史表显式记录app_name标识来源应用
    • 删除应用时只需删除该应用的关联记录,历史数据完整保留
  4. 安全删除流程
    # 示例删除操作流程
    def safe_delete_app_file(app_name, file_id):
     # 查找应用关联记录
     link = AppFileLink.objects.get(app_name=app_name, id=file_id)
        
     # 创建历史快照
     ParameterHistory.objects.create(
         app_name=app_name,
         file_snapshot={
             'key': link.global_file.file_key,
             'metadata': link.local_metadata
         }
     )
        
     # 执行删除(自动触发信号处理)
     link.delete()
    

扩展优势:

  • 多应用共存:不同应用通过独立AppFileLink记录访问同一物理文件
  • 删除安全:删除操作只影响当前应用的关联关系,其他应用的引用保持不变
  • 历史追溯:通过app_name字段可精确查询各应用的操作历史
  • 文件锁定is_locked字段可防止关键文件被误删

该方案通过引用计数和关联表的设计,实现了应用级文件管理的完全隔离。当删除某个应用的文件时,系统只会解除该应用与文件的关联关系,只有当所有应用都解除关联后,系统才会自动清理物理文件,从根本上保证了跨应用数据的安全性。


以下是完整的Django数据库模型方案,满足文件存储在S3、应用隔离删除、历史记录独立性和权限控制的要求:

# models.py
from django.db import models
from django.db.models import F
from django.dispatch import receiver
from django.db.models.signals import pre_delete, post_save
from storages.backends.s3boto3 import S3Boto3Storage
import logging

logger = logging.getLogger(__name__)

# ---------------------------
# 自定义S3存储配置
# ---------------------------
class AppAwareS3Storage(S3Boto3Storage):
    """按应用分目录的S3存储"""
    def __init__(self, app_name=None, **kwargs):
        self.app_name = app_name
        super().__init__(**kwargs)
    
    def get_valid_name(self, name):
        if self.app_name:
            return f"{self.app_name}/{name}"
        return name

# ---------------------------
# 核心模型
# ---------------------------
class GlobalFileRegistry(models.Model):
    """全局文件元数据注册中心"""
    FILE_STATES = [
        ('active', '活跃'),
        ('archived', '已归档'),
        ('locked', '锁定')
    ]
    
    file_key = models.CharField(max_length=512, unique=True, verbose_name="S3对象键")
    bucket_name = models.CharField(max_length=63, default="main-bucket")
    reference_count = models.PositiveIntegerField(default=0, verbose_name="引用计数")
    state = models.CharField(max_length=20, choices=FILE_STATES, default='active')
    global_metadata = models.JSONField(default=dict, verbose_name="全局元数据")
    created_at = models.DateTimeField(auto_now_add=True)
    modified_at = models.DateTimeField(auto_now=True)

    class Meta:
        indexes = [
            models.Index(fields=['file_key']),
            models.Index(fields=['state'])
        ]
        verbose_name = "全局文件注册表"

    def __str__(self):
        return f"{self.bucket_name}/{self.file_key}"

    def acquire_reference(self):
        """原子增加引用计数"""
        GlobalFileRegistry.objects.filter(pk=self.pk).update(
            reference_count=F('reference_count') + 1
        )
        self.refresh_from_db()

    def release_reference(self):
        """原子减少引用计数"""
        GlobalFileRegistry.objects.filter(pk=self.pk).update(
            reference_count=F('reference_count') - 1
        )
        self.refresh_from_db()

class AppFileLink(models.Model):
    """应用与文件的关联关系表"""
    app_name = models.CharField(max_length=50, verbose_name="应用名称")
    global_file = models.ForeignKey(
        GlobalFileRegistry,
        on_delete=models.CASCADE,
        related_name='app_links'
    )
    local_metadata = models.JSONField(default=dict, verbose_name="应用私有元数据")
    is_primary = models.BooleanField(default=False, verbose_name="主引用")
    created_at = models.DateTimeField(auto_now_add=True)

    class Meta:
        unique_together = ('app_name', 'global_file')
        verbose_name = "应用文件关联表"

    def save(self, *args, **kwargs):
        """新建关联时更新引用计数"""
        if not self.pk:  # 新建关联
            self.global_file.acquire_reference()
        super().save(*args, **kwargs)

# ---------------------------
# 业务模型示例
# ---------------------------
class ParameterFile(models.Model):
    """参数文件业务表"""
    app_link = models.OneToOneField(
        AppFileLink,
        on_delete=models.CASCADE,
        related_name='parameter_file'
    )
    version = models.PositiveIntegerField(default=1)
    status = models.CharField(max_length=20, default='draft')

    class Meta:
        verbose_name = "参数文件"

    @property
    def storage(self):
        """获取应用专属存储实例"""
        return AppAwareS3Storage(app_name=self.app_link.app_name)

    def get_presigned_url(self, expiration=3600):
        """生成预签名URL"""
        return self.storage.url(
            self.app_link.global_file.file_key,
            parameters={'ExpiresIn': expiration}
        )

class ParameterHistory(models.Model):
    """参数文件操作历史"""
    app_name = models.CharField(max_length=50)
    operation = models.CharField(max_length=20)
    snapshot = models.JSONField()
    timestamp = models.DateTimeField(auto_now_add=True)

    class Meta:
        indexes = [models.Index(fields=['app_name', 'timestamp'])]
        verbose_name = "参数历史记录"

# ---------------------------
# 权限控制模型
# ---------------------------
class ThirdPartyPermission(models.Model):
    """第三方平台访问权限"""
    PERMISSION_CHOICES = [
        ('read', '只读'),
        ('write', '读写'),
        ('admin', '管理')
    ]
    
    app_name = models.CharField(max_length=50)
    platform = models.CharField(max_length=50)
    access_key = models.CharField(max_length=128)
    secret_key = models.CharField(max_length=256)
    permission_level = models.CharField(max_length=10, choices=PERMISSION_CHOICES)
    valid_until = models.DateTimeField()

    class Meta:
        unique_together = ('app_name', 'platform')
        verbose_name = "第三方权限"

# ---------------------------
# 信号处理
# ---------------------------
@receiver(pre_delete, sender=AppFileLink)
def handle_app_link_delete(sender, instance, **kwargs):
    """处理应用关联删除"""
    try:
        # 记录删除操作到历史表
        ParameterHistory.objects.create(
            app_name=instance.app_name,
            operation='delete',
            snapshot={
                'file_key': instance.global_file.file_key,
                'metadata': instance.local_metadata
            }
        )
        
        # 释放引用计数
        instance.global_file.release_reference()
        
        # 检查是否需要物理删除
        if (instance.global_file.reference_count == 0 and 
            instance.global_file.state != 'locked'):
            # 异步删除S3文件(示例使用线程,生产环境应使用Celery)
            from threading import Thread
            def async_delete():
                try:
                    storage = AppAwareS3Storage()
                    storage.delete(instance.global_file.file_key)
                    instance.global_file.delete()
                except Exception as e:
                    logger.error(f"删除失败: {str(e)}")
            Thread(target=async_delete).start()
            
    except Exception as e:
        logger.error(f"删除处理异常: {str(e)}")

@receiver(post_save, sender=GlobalFileRegistry)
def log_file_activity(sender, instance, created, **kwargs):
    """记录全局文件变更"""
    if created:
        action = 'create'
    else:
        action = 'update'
    
    ParameterHistory.objects.create(
        app_name='system',
        operation=action,
        snapshot={
            'file_key': instance.file_key,
            'state': instance.state,
            'references': instance.reference_count
        }
    )

# ---------------------------
# 工具函数
# ---------------------------
def create_app_file(app_name, file_key, metadata=None):
    """创建新的应用文件"""
    global_file = GlobalFileRegistry.objects.create(
        file_key=file_key,
        global_metadata=metadata or {}
    )
    
    link = AppFileLink.objects.create(
        app_name=app_name,
        global_file=global_file,
        local_metadata={'uploader': 'system'}
    )
    
    return link

def transfer_file(source_app, target_app, file_key):
    """跨应用文件流转"""
    try:
        source_link = AppFileLink.objects.get(
            app_name=source_app,
            global_file__file_key=file_key
        )
        
        # 创建新关联
        new_link = AppFileLink.objects.create(
            app_name=target_app,
            global_file=source_link.global_file,
            local_metadata={'source': source_app}
        )
        
        return new_link
    except AppFileLink.DoesNotExist:
        raise ValueError("源文件不存在")

def lock_global_file(file_key):
    """锁定全局文件"""
    file = GlobalFileRegistry.objects.get(file_key=file_key)
    file.state = 'locked'
    file.save()
    return file

方案特点说明:

  1. S3存储集成
    • 使用AppAwareS3Storage类实现按应用目录存储
    • 自动生成预签名URL(get_presigned_url方法)
    • 异步删除机制避免阻塞主线程
  2. 引用计数系统
    • 原子操作保证计数准确性(acquire_reference/release_reference
    • 只有引用归零且未锁定时触发物理删除
    • 文件状态管理(活跃/归档/锁定)
  3. 应用隔离设计
    • 通过AppFileLink中间表解耦应用与物理文件
    • 删除应用关联只影响当前应用
    • 独立的历史记录表保留完整操作日志
  4. 安全机制
    • 文件锁定功能防止误删关键文件
    • 详细的异常处理和日志记录
    • 第三方权限表实现细粒度访问控制
  5. 扩展性设计
    • 全局元数据(global_metadata)和应用私有元数据(local_metadata)分离
    • 支持多种文件类型扩展(通过FILE_STATES和业务模型继承)
    • 内置文件流转工具函数(transfer_file

典型操作示例

1. 创建新文件

new_file = create_app_file(
    app_name='simulation',
    file_key='params/v1/config.json',
    metadata={'author': 'AI'}
)

2. 跨应用流转文件

try:
    transfer_file(
        source_app='simulation',
        target_app='analysis',
        file_key='params/v1/config.json'
    )
except ValueError as e:
    print(f"流转失败: {str(e)}")

3. 安全删除应用文件

# 查找要删除的关联记录
link = AppFileLink.objects.get(app_name='analysis', global_file__file_key='params/v1/config.json')
link.delete()  # 自动触发信号处理

4. 生成预签名URL

param_file = ParameterFile.objects.get(id=1)
print(param_file.get_presigned_url(expiration=3600))  # 1小时有效期的URL

验证要点

  1. 引用计数验证
    • 创建多个应用关联,检查全局文件的reference_count是否正确递增
    • 删除关联记录时验证计数递减逻辑
  2. 删除隔离验证
    • 删除一个应用的关联后,确认其他应用的关联仍然有效
    • 验证只有最后一个引用删除后才触发物理删除
  3. 历史记录完整性
    • 执行删除操作后检查历史表中相关记录
    • 确认不同应用的操作日志独立存储
  4. S3集成测试
    • 上传文件后验证S3存储路径是否符合预期
    • 删除操作后检查S3文件是否被正确移除

该方案通过中间表、引用计数和状态管理实现了应用间的文件独立管理,同时保持文件流转能力。结合Django信号机制和异步任务处理,确保系统在高并发场景下的稳定性和数据一致性。


以下是针对前端支持两种文件上传方式的后端适配方案和Vue3前端实现示例:

后端适配方案

新增API接口

# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .models import GlobalFileRegistry, AppFileLink

class FileUploadAPI(APIView):
    def post(self, request):
        upload_type = request.data.get('type')
        app_name = request.headers.get('X-App-Name')

        if upload_type == 'new':
            # 处理新文件上传
            file = request.FILES.get('file')
            file_key = f"{app_name}/{file.name}"
            
            # 存储到S3(示例代码)
            storage = AppAwareS3Storage(app_name=app_name)
            storage.save(file_key, file)
            
            # 创建数据库记录
            global_file = GlobalFileRegistry.objects.create(
                file_key=file_key,
                bucket_name=storage.bucket_name,
                global_metadata={'size': file.size}
            )
            
            AppFileLink.objects.create(
                app_name=app_name,
                global_file=global_file,
                local_metadata={'upload_type': 'new'}
            )
            
            return Response({'id': global_file.id}, status=status.HTTP_201_CREATED)

        elif upload_type == 'existing':
            # 处理现有文件选择
            file_id = request.data.get('file_id')
            try:
                existing_file = GlobalFileRegistry.objects.get(id=file_id)
                # 验证权限(示例)
                if not existing_file.app_links.filter(app_name=app_name).exists():
                    AppFileLink.objects.create(
                        app_name=app_name,
                        global_file=existing_file,
                        local_metadata={'source': 'existing'}
                    )
                    return Response({'id': existing_file.id})
                return Response({'error': '无权访问该文件'}, status=403)
            except GlobalFileRegistry.DoesNotExist:
                return Response({'error': '文件不存在'}, status=404)

        return Response({'error': '无效的上传类型'}, status=400)

class FileListAPI(APIView):
    def get(self, request):
        app_name = request.headers.get('X-App-Name')
        files = GlobalFileRegistry.objects.filter(
            app_links__app_name=app_name
        ).values('id', 'file_key', 'created_at')
        return Response(list(files))

前端Vue3实现

组件实现

<!-- FileUpload.vue -->
<template>
  <div class="upload-container">
    <div class="upload-type-selector">
      <button @click="toggleType('new')" :class="{ active: currentType === 'new' }">
        上传新文件
      </button>
      <button @click="toggleType('existing')" :class="{ active: currentType === 'existing' }">
        选择已有文件
      </button>
    </div>

    <div v-if="currentType === 'new'" class="file-upload">
      <input type="file" @change="handleFileSelect" ref="fileInput" />
      <button @click="uploadFile" :disabled="!selectedFile">
        上传文件
      </button>
    </div>

    <div v-else class="existing-files">
      <div class="file-list">
        <div v-for="file in files" :key="file.id" class="file-item" @click="selectFile(file)">
          <span class="file-name"></span>
          <span class="file-date"></span>
        </div>
      </div>
      <button @click="confirmSelection" :disabled="!selectedExistingFile">
        确认选择
      </button>
    </div>
  </div>
</template>

<script setup>
import { ref, onMounted } from 'vue'
import axios from 'axios'

const currentType = ref('new')
const selectedFile = ref(null)
const selectedExistingFile = ref(null)
const files = ref([])

// 获取可用文件列表
const loadExistingFiles = async () => {
  try {
    const response = await axios.get('/api/files/', {
      headers: { 'X-App-Name': 'your-app-name' }
    })
    files.value = response.data
  } catch (error) {
    console.error('获取文件列表失败:', error)
  }
}

// 切换上传类型
const toggleType = (type) => {
  currentType.value = type
  if (type === 'existing') loadExistingFiles()
}

// 处理文件选择
const handleFileSelect = (event) => {
  selectedFile.value = event.target.files[0]
}

// 上传新文件
const uploadFile = async () => {
  const formData = new FormData()
  formData.append('file', selectedFile.value)
  formData.append('type', 'new')

  try {
    const response = await axios.post('/api/upload/', formData, {
      headers: {
        'Content-Type': 'multipart/form-data',
        'X-App-Name': 'your-app-name'
      }
    })
    alert(`文件上传成功!ID: ${response.data.id}`)
  } catch (error) {
    console.error('上传失败:', error)
  }
}

// 选择现有文件
const selectFile = (file) => {
  selectedExistingFile.value = file
}

// 确认选择现有文件
const confirmSelection = async () => {
  try {
    const response = await axios.post('/api/upload/', {
      type: 'existing',
      file_id: selectedExistingFile.value.id
    }, {
      headers: { 'X-App-Name': 'your-app-name' }
    })
    alert(`文件选择成功!ID: ${response.data.id}`)
  } catch (error) {
    console.error('选择失败:', error)
  }
}

// 辅助函数
const formatDate = (timestamp) => {
  return new Date(timestamp).toLocaleDateString()
}

onMounted(loadExistingFiles)
</script>

<style scoped>
.upload-container {
  max-width: 600px;
  margin: 20px auto;
  padding: 20px;
  border: 1px solid #ccc;
  border-radius: 8px;
}

.upload-type-selector {
  margin-bottom: 20px;
  display: flex;
  gap: 10px;
}

button {
  padding: 8px 16px;
  cursor: pointer;
  background: #f0f0f0;
  border: 1px solid #ddd;
  border-radius: 4px;
}

button.active {
  background: #007bff;
  color: white;
  border-color: #007bff;
}

.file-list {
  max-height: 300px;
  overflow-y: auto;
  margin-bottom: 15px;
}

.file-item {
  padding: 10px;
  border: 1px solid #eee;
  margin-bottom: 5px;
  cursor: pointer;
  display: flex;
  justify-content: space-between;
}

.file-item:hover {
  background-color: #f8f9fa;
}
</style>

系统工作流程

  1. 新文件上传流程
    • 用户选择本地文件 → 前端调用/api/upload/的POST请求(类型为new
    • 后端保存文件到S3 → 创建全局文件记录 → 返回新文件ID
    • 前端显示上传成功信息
  2. 现有文件选择流程
    • 用户切换至现有文件视图 → 前端调用/api/files/获取可用文件列表
    • 用户选择文件 → 前端调用/api/upload/的POST请求(类型为existing
    • 后端验证权限 → 创建关联记录 → 返回文件ID
    • 前端显示选择成功信息

关键特性说明

  1. 前端双模式支持
    • 通过状态切换按钮实现两种模式的UI切换
    • 动态加载现有文件列表
    • 统一的提交处理接口
  2. 后端安全验证
    • 通过X-App-Name请求头识别来源应用
    • 现有文件选择时验证应用权限
    • 使用FormData处理文件上传
  3. 数据一致性保障
    • 新文件上传自动创建所有关联记录
    • 现有文件选择只增加引用计数
    • 所有操作记录历史日志
  4. 扩展性设计
    • 前端组件可复用不同业务模块
    • 后端接口支持多种文件类型扩展
    • 权限验证逻辑可扩展更复杂的规则

适配建议

  1. 权限增强
    • 添加JWT认证头代替简单的X-App-Name
    • 实现细粒度的文件访问控制策略
  2. 性能优化
    • 添加文件分页加载功能
    • 实现前端上传进度显示
    • 使用Web Workers处理大文件分片上传
  3. 用户体验改进
    • 添加文件搜索过滤功能
    • 实现拖拽上传支持
    • 添加文件预览功能

该方案通过前后端协同设计,完整实现了两种文件处理模式的切换和协同工作。后端保持原有的存储架构不变,通过扩展API接口支持新功能,前端通过状态管理实现无缝的交互体验,同时保持代码的可维护性和扩展性。


以下是支持跨平台文件复制的完整解决方案:

后端适配方案

新增模型和API

# models.py
class ExternalPlatform(models.Model):
    name = models.CharField(max_length=50, unique=True)
    bucket_name = models.CharField(max_length=63)
    access_key = models.CharField(max_length=128)
    secret_key = models.CharField(max_length=256)
    region = models.CharField(max_length=50, default='us-east-1')

class ExternalFileReference(models.Model):
    platform = models.ForeignKey(ExternalPlatform, on_delete=models.CASCADE)
    external_file_key = models.CharField(max_length=512)
    internal_file = models.ForeignKey(GlobalFileRegistry, on_delete=models.CASCADE)
    copied_at = models.DateTimeField(auto_now_add=True)

# views.py
from boto3 import client
from django.conf import settings

class ExternalFileAPI(APIView):
    def get(self, request):
        """获取外部平台文件列表"""
        platform_id = request.query_params.get('platform_id')
        try:
            platform = ExternalPlatform.objects.get(id=platform_id)
            s3 = client('s3',
                aws_access_key_id=platform.access_key,
                aws_secret_access_key=platform.secret_key,
                region_name=platform.region
            )
            # 示例:获取前100个文件
            response = s3.list_objects_v2(Bucket=platform.bucket_name, MaxKeys=100)
            files = [{'key': obj['Key'], 'size': obj['Size']} 
                    for obj in response.get('Contents', [])]
            return Response(files)
        except Exception as e:
            return Response({'error': str(e)}, status=500)

    def post(self, request):
        """复制外部文件到本平台"""
        app_name = request.headers.get('X-App-Name')
        platform_id = request.data.get('platform_id')
        external_key = request.data.get('file_key')
        
        try:
            # 获取平台配置
            platform = ExternalPlatform.objects.get(id=platform_id)
            
            # 生成新文件路径
            new_key = f"external/{platform.name}/{external_key.split('/')[-1]}"
            
            # 初始化S3客户端
            source_s3 = client('s3',
                aws_access_key_id=platform.access_key,
                aws_secret_access_key=platform.secret_key,
                region_name=platform.region
            )
            dest_s3 = client('s3',
                aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
                aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY
            )
            
            # 执行复制操作
            copy_source = {'Bucket': platform.bucket_name, 'Key': external_key}
            dest_s3.copy_object(
                CopySource=copy_source,
                Bucket=settings.AWS_STORAGE_BUCKET_NAME,
                Key=new_key
            )
            
            # 创建数据库记录
            global_file = GlobalFileRegistry.objects.create(
                file_key=new_key,
                bucket_name=settings.AWS_STORAGE_BUCKET_NAME,
                global_metadata={'source': 'external'}
            )
            
            ExternalFileReference.objects.create(
                platform=platform,
                external_file_key=external_key,
                internal_file=global_file
            )
            
            AppFileLink.objects.create(
                app_name=app_name,
                global_file=global_file,
                local_metadata={'copied_from': platform.name}
            )
            
            return Response({'id': global_file.id}, status=201)
            
        except Exception as e:
            logger.error(f"文件复制失败: {str(e)}")
            return Response({'error': '文件复制失败'}, status=500)

前端Vue3实现

<!-- ExternalFileSelector.vue -->
<template>
  <div class="external-file-selector">
    <div class="platform-selection">
      <select v-model="selectedPlatform">
        <option v-for="platform in platforms" :value="platform.id">
          
        </option>
      </select>
      <button @click="loadExternalFiles">加载文件</button>
    </div>

    <div v-if="externalFiles.length" class="file-list">
      <div v-for="file in externalFiles" 
           :key="file.key"
           class="file-item"
           @click="selectedFile = file.key"
           :class="{ selected: selectedFile === file.key }">
        <span class="file-name"></span>
        <span class="file-size"></span>
      </div>
    </div>

    <div class="actions">
      <button @click="copyFile" :disabled="!selectedFile">
        复制到本平台
      </button>
      <div v-if="copyProgress" class="progress">
        复制进度: %
      </div>
    </div>
  </div>
</template>

<script setup>
import { ref, onMounted } from 'vue'
import axios from 'axios'

const platforms = ref([])
const selectedPlatform = ref(null)
const externalFiles = ref([])
const selectedFile = ref(null)
const copyProgress = ref(0)

// 加载已配置的外部平台
const loadPlatforms = async () => {
  try {
    const response = await axios.get('/api/external-platforms/')
    platforms.value = response.data
  } catch (error) {
    console.error('加载平台失败:', error)
  }
}

// 加载外部文件列表
const loadExternalFiles = async () => {
  try {
    const response = await axios.get('/api/external-files/', {
      params: { platform_id: selectedPlatform.value }
    })
    externalFiles.value = response.data
  } catch (error) {
    console.error('加载文件失败:', error)
  }
}

// 执行文件复制
const copyFile = async () => {
  try {
    const response = await axios.post('/api/external-files/', {
      platform_id: selectedPlatform.value,
      file_key: selectedFile.value
    }, {
      headers: { 'X-App-Name': 'your-app-name' },
      onUploadProgress: progressEvent => {
        if (progressEvent.total) {
          copyProgress.value = Math.round(
            (progressEvent.loaded * 100) / progressEvent.total
          )
        }
      }
    })
    
    alert(`文件复制成功!新文件ID: ${response.data.id}`)
    selectedFile.value = null
    copyProgress.value = 0
  } catch (error) {
    console.error('复制失败:', error)
    alert('文件复制失败,请检查控制台')
  }
}

// 辅助函数
const formatSize = bytes => {
  const units = ['B', 'KB', 'MB', 'GB']
  let size = bytes
  let unitIndex = 0
  while (size >= 1024 && unitIndex < units.length - 1) {
    size /= 1024
    unitIndex++
  }
  return `${size.toFixed(1)} ${units[unitIndex]}`
}

onMounted(loadPlatforms)
</script>

<style scoped>
.external-file-selector {
  border: 1px solid #e0e0e0;
  padding: 20px;
  border-radius: 8px;
  margin-top: 20px;
}

.platform-selection {
  display: flex;
  gap: 10px;
  margin-bottom: 20px;
}

select {
  padding: 8px;
  border-radius: 4px;
  border: 1px solid #ddd;
}

.file-list {
  max-height: 400px;
  overflow-y: auto;
  margin-bottom: 20px;
}

.file-item {
  padding: 12px;
  border: 1px solid #eee;
  margin-bottom: 8px;
  cursor: pointer;
  display: flex;
  justify-content: space-between;
  transition: all 0.2s;
}

.file-item:hover {
  background-color: #f8f9fa;
}

.file-item.selected {
  border-color: #007bff;
  background-color: #e7f1ff;
}

.progress {
  margin-top: 10px;
  color: #28a745;
  font-weight: bold;
}
</style>

系统工作流程

  1. 配置外部平台
    • 管理员在后台配置外部平台信息(S3凭证、存储桶等)
    • 配置信息加密存储在ExternalPlatform表中
  2. 选择外部文件
    • 用户选择目标平台
    • 前端调用/api/external-files/获取文件列表
    • 用户选择要复制的文件
  3. 执行复制操作
    • 前端调用/api/external-files/的POST请求
    • 后端使用AWS SDK执行跨桶复制
    • 创建本地文件记录和关联关系
  4. 结果反馈
    • 前端显示复制进度和结果
    • 复制后的文件出现在本地文件列表中

关键安全措施

  1. 凭证管理
    • 外部平台凭证加密存储
    • 使用最小权限原则配置S3访问策略
  2. 访问控制
    • 前端只能选择已配置的平台
    • 后端验证用户权限后才执行复制操作
  3. 数据完整性
    • 使用S3的ETag验证文件完整性
    • 记录完整的复制日志到数据库
  4. 错误处理
    • 实现自动重试机制
    • 记录详细的错误日志
    • 提供事务回滚机制

性能优化建议

  1. 大文件处理
    # 使用分片复制
    def copy_large_file(source_s3, dest_s3, copy_source, dest_bucket, dest_key):
        # 初始化分片上传
        upload_id = dest_s3.create_multipart_upload(
            Bucket=dest_bucket,
            Key=dest_key
        )['UploadId']
           
        # 获取源文件大小
        head = source_s3.head_object(
            Bucket=copy_source['Bucket'],
            Key=copy_source['Key']
        )
        size = head['ContentLength']
           
        # 分片复制(每部分5MB)
        part_size = 5 * 1024 * 1024
        parts = []
        for i in range(0, size, part_size):
            copy_range = f"bytes={i}-{min(i+part_size-1, size-1)}"
            part = dest_s3.upload_part_copy(
                Bucket=dest_bucket,
                Key=dest_key,
                CopySource=copy_source,
                UploadId=upload_id,
                PartNumber=len(parts)+1,
                CopySourceRange=copy_range
            )
            parts.append({'PartNumber': len(parts)+1, 'ETag': part['CopyPartResult']['ETag']})
           
        # 完成分片上传
        dest_s3.complete_multipart_upload(
            Bucket=dest_bucket,
            Key=dest_key,
            UploadId=upload_id,
            MultipartUpload={'Parts': parts}
        )
    
  2. 异步任务处理
    # tasks.py
    @shared_task
    def async_copy_file(platform_id, external_key, app_name):
        try:
            # 此处实现上述复制逻辑
            # ...
            return {'success': True, 'file_id': global_file.id}
        except Exception as e:
            return {'success': False, 'error': str(e)}
    
  3. 前端优化
    <!-- 添加进度显示 -->
    <template>
      <div v-if="copyProgress > 0" class="progress-bar">
        <div :style="{ width: copyProgress + '%' }"></div>
      </div>
    </template>
       
    <style>
    .progress-bar {
      height: 8px;
      background: #eee;
      border-radius: 4px;
      overflow: hidden;
      margin: 10px 0;
    }
    .progress-bar div {
      height: 100%;
      background: #007bff;
      transition: width 0.3s ease;
    }
    </style>
    

验证要点

  1. 端到端测试
    • 测试从选择外部平台到文件复制的完整流程
    • 验证复制后的文件可正常访问和使用
  2. 权限测试
    • 尝试使用无效凭证访问外部平台
    • 验证无权用户无法执行复制操作
  3. 大文件测试
    • 测试多GB级别文件的复制
    • 验证分片复制功能的可靠性
  4. 回滚测试
    • 在复制过程中中断操作
    • 验证系统能正确清理未完成的文件

该方案通过前后端协同实现了安全的跨平台文件复制功能,在保持原有架构的基础上扩展了外部平台支持。通过S3的直接复制功能保障了传输效率,结合数据库事务和异步任务处理确保了系统的可靠性,前端组件化的实现方式方便集成到现有系统中。