根据需求设计如下Django数据库模型方案,满足高扩展性和权限控制要求:
from django.db import models
from django.dispatch import receiver
from django.db.models.signals import pre_delete
class GlobalCollection(models.Model):
"""全局收藏表(所有应用共享)"""
UPLOAD_SOURCE_CHOICES = [
('local', '本地上传'),
('project', '项目收藏'),
('third_party', '第三方平台')
]
file_identifier = models.CharField(max_length=255, unique=True, verbose_name="文件唯一标识")
source_platform = models.CharField(max_length=50, choices=UPLOAD_SOURCE_CHOICES)
original_url = models.URLField(null=True, blank=True, verbose_name="原始链接")
created_at = models.DateTimeField(auto_now_add=True)
meta_data = models.JSONField(default=dict, verbose_name="元数据")
class Meta:
db_table = 'global_collection'
verbose_name = '全局收藏'
class AppSpecificStorage(models.Model):
"""应用私有存储表(每个应用独立)"""
app_name = models.CharField(max_length=50, verbose_name="应用名称")
global_file = models.ForeignKey(GlobalCollection, on_delete=models.CASCADE)
local_path = models.FileField(upload_to='app_files/', verbose_name="本地存储路径")
last_modified = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'app_specific_storage'
unique_together = ('app_name', 'global_file')
class ParameterFile(models.Model):
"""参数文件主表"""
STATUS_CHOICES = [
('active', '生效中'),
('deleted', '已删除')
]
file_name = models.CharField(max_length=255)
version = models.PositiveIntegerField(default=1)
app = models.CharField(max_length=50, verbose_name="所属应用")
status = models.CharField(max_length=10, choices=STATUS_CHOICES, default='active')
collection_ref = models.OneToOneField(GlobalCollection, null=True, on_delete=models.SET_NULL)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'parameter_files'
indexes = [models.Index(fields=['app', 'status'])]
class ParameterHistory(models.Model):
"""参数删除历史表"""
original_file = models.ForeignKey(ParameterFile, on_delete=models.CASCADE)
deleted_at = models.DateTimeField(auto_now_add=True)
snapshot = models.JSONField(verbose_name="文件快照")
class Meta:
db_table = 'parameter_history'
class ThirdPartyPermission(models.Model):
"""第三方平台权限控制表"""
PERMISSION_LEVELS = [
('read', '只读'),
('write', '可写入'),
('admin', '完全控制')
]
app_name = models.CharField(max_length=50)
platform_name = models.CharField(max_length=50)
access_token = models.CharField(max_length=255)
permission_level = models.CharField(max_length=10, choices=PERMISSION_LEVELS)
expires_at = models.DateTimeField()
class Meta:
db_table = 'third_party_permissions'
unique_together = ('app_name', 'platform_name')
# 信号处理:删除时记录历史
@receiver(pre_delete, sender=ParameterFile)
def create_deletion_history(sender, instance, **kwargs):
ParameterHistory.objects.create(
original_file=instance,
snapshot={
'file_name': instance.file_name,
'version': instance.version,
'app': instance.app
}
)
模型详细说明:
- GlobalCollection(全局收藏表)
- 核心字段:
file_identifier
(唯一标识)、source_platform
(来源平台类型) - 设计特点:使用JSONField存储元数据,支持灵活扩展
- 作用:作为全平台文件统一注册中心,其他表通过外键引用
- 核心字段:
- AppSpecificStorage(应用私有存储)
- 核心字段:
app_name
(应用标识)、local_path
(本地存储路径) - 设计特点:与GlobalCollection建立外键关联,实现”一次收藏,多应用复制”
- 唯一约束:确保同一应用不会重复存储相同文件
- 核心字段:
- ParameterFile(参数文件主表)
- 版本控制:通过
version
字段实现简单版本管理 - 状态管理:
status
字段跟踪文件生命周期 - 关联收藏:与GlobalCollection建立一对一关系,支持文件溯源
- 版本控制:通过
- ParameterHistory(删除历史)
- 数据持久化:使用JSONField保存删除时的完整快照
- 自动记录:通过Django信号自动触发历史记录
- ThirdPartyPermission(第三方权限控制)
- 权限分级:定义三种访问级别,满足不同场景需求
- 时效控制:
expires_at
字段实现权限有效期管理
扩展性设计:
- 多平台支持:通过
source_platform
字段区分不同来源,未来新增平台只需扩展枚举值 - 元数据扩展:所有模型的JSONField为未来新增字段提供存储空间,避免频繁修改表结构
- 权限系统:独立的ThirdPartyPermission表实现细粒度权限控制,可扩展OAuth等认证方式
典型工作流程:
- 用户上传文件到全局收藏表
- 各应用从全局表复制文件到自己的存储表
- 参数修改通过版本号跟踪,删除操作自动记录历史
- 第三方数据访问前检查权限表中配置的有效权限
该方案通过合理的表结构设计和Django高级功能的使用,在保证系统核心功能的同时,为未来扩展保留了充足的空间。
以下是针对文件存储在S3且支持跨表流转的优化Django模型方案:
from django.db import models
from django.dispatch import receiver
from django.db.models.signals import pre_delete
from storages.backends.s3boto3 import S3Boto3Storage
# 自定义S3存储路径(按应用隔离)
class AppS3Storage(S3Boto3Storage):
def __init__(self, app_name=None, **kwargs):
self.app_name = app_name
super().__init__(**kwargs)
def get_valid_name(self, name):
return f"{self.app_name}/{name}"
# 核心文件元数据表(S3文件中心)
class GlobalFileRegistry(models.Model):
FILE_CATEGORIES = [
('param', '参数文件'),
('sim', '仿真文件'),
('result', '计算结果')
]
file_key = models.CharField(max_length=255, unique=True, verbose_name="S3对象键")
category = models.CharField(max_length=20, choices=FILE_CATEGORIES)
upload_source = models.CharField(max_length=20) # 复用原有来源类型
owner_app = models.CharField(max_length=50, verbose_name="所属应用")
s3_bucket = models.CharField(max_length=63, default="main-bucket")
metadata = models.JSONField(default=dict)
created_at = models.DateTimeField(auto_now_add=True)
is_public = models.BooleanField(default=False)
class Meta:
db_table = 'global_file_registry'
indexes = [models.Index(fields=['owner_app', 'category'])]
# 跨应用文件引用表
class CrossAppReference(models.Model):
REFERENCE_TYPES = [
('param2sim', '参数->仿真'),
('sim2result', '仿真->结果')
]
source_file = models.ForeignKey(
GlobalFileRegistry,
related_name='outgoing_refs',
on_delete=models.CASCADE
)
target_app = models.CharField(max_length=50)
target_file = models.ForeignKey(
GlobalFileRegistry,
related_name='incoming_refs',
on_delete=models.CASCADE
)
reference_type = models.CharField(max_length=20, choices=REFERENCE_TYPES)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'cross_app_references'
unique_together = ('source_file', 'target_file')
# 参数模块专用表
class ParameterFile(models.Model):
STATUS_CHOICES = [
('active', '生效中'),
('archived', '已归档')
]
global_file = models.OneToOneField(
GlobalFileRegistry,
on_delete=models.CASCADE,
related_name='param_meta'
)
version = models.PositiveIntegerField(default=1)
status = models.CharField(max_length=10, choices=STATUS_CHOICES)
dependencies = models.ManyToManyField(
'self',
symmetrical=False,
through='ParameterDependency'
)
def s3_storage(self):
return AppS3Storage(app_name="parameters")
class Meta:
db_table = 'parameter_files'
# 参数依赖关系表
class ParameterDependency(models.Model):
parent = models.ForeignKey(
ParameterFile,
related_name='child_dependencies',
on_delete=models.CASCADE
)
child = models.ForeignKey(
ParameterFile,
related_name='parent_dependencies',
on_delete=models.CASCADE
)
required = models.BooleanField(default=True)
class Meta:
db_table = 'parameter_dependencies'
# 仿真模块专用表
class SimulationTask(models.Model):
global_file = models.ForeignKey(
GlobalFileRegistry,
on_delete=models.CASCADE,
related_name='sim_tasks'
)
param_ref = models.ForeignKey(
ParameterFile,
null=True,
on_delete=models.SET_NULL
)
status = models.CharField(max_length=20)
def s3_storage(self):
return AppS3Storage(app_name="simulations")
class Meta:
db_table = 'simulation_tasks'
# 信号处理:删除时同步清理S3文件
@receiver(pre_delete, sender=GlobalFileRegistry)
def cleanup_s3_files(sender, instance, **kwargs):
from django.core.files.storage import default_storage
try:
default_storage.delete(instance.file_key)
except Exception as e:
logger.error(f"S3删除失败:{instance.file_key} - {str(e)}")
关键改进说明:
- S3存储集成
- 通过
AppS3Storage
类实现按应用分目录存储 GlobalFileRegistry
记录所有文件的S3位置和元数据- 自动清理机制确保删除记录时同步删除S3文件
- 通过
- 跨表流转设计
CrossAppReference
表显式记录文件引用关系- 各模块通过外键关联到全局注册表,同时保持自身业务字段独立
- 参数文件支持多级依赖关系(通过中间表实现)
- 权限控制增强
is_public
字段控制文件可见性范围- 各模块通过
owner_app
字段实现数据隔离 - S3存储桶策略配合IAM实现细粒度访问控制
- 典型流转场景示例
```python
参数文件被仿真模块引用
param_file = ParameterFile.objects.get(id=1) sim_file = GlobalFileRegistry.objects.create( file_key=”simulations/task123.sim”, category=”sim”, owner_app=”simulation_app” )
建立跨应用引用
CrossAppReference.objects.create( source_file=param_file.global_file, target_app=”simulation_app”, target_file=sim_file, reference_type=”param2sim” )
获取实际文件URL
sim_storage = sim_file.sim_tasks.first().s3_storage() url = sim_storage.url(sim_file.file_key)
### 扩展性设计:
- **存储扩展**:通过修改`AppS3Storage`类可支持多桶存储策略
- **类型扩展**:在`FILE_CATEGORIES`中添加新类型即可支持新文件类型
- **关系扩展**:`CrossAppReference`表可添加新的reference_type实现新流转场景
该方案通过全局注册表+S3存储的核心设计,既保持了各业务模块表的独立性,又通过明确的引用关系实现文件流转。配合Django存储框架和信号机制,确保整个系统具备良好的扩展性和可维护性。
------
以下是针对删除应用文件不影响其他应用的优化方案:
```python
from django.db import models
from django.db.models import F
from django.dispatch import receiver
from django.db.models.signals import pre_delete
class GlobalFileRegistry(models.Model):
"""增强版全局文件注册表"""
file_key = models.CharField(max_length=255, unique=True)
reference_count = models.PositiveIntegerField(default=0) # 新增引用计数器
s3_bucket = models.CharField(max_length=63)
metadata = models.JSONField(default=dict)
is_locked = models.BooleanField(default=False) # 新增锁定标志
def acquire_reference(self):
"""原子操作增加引用计数"""
GlobalFileRegistry.objects.filter(pk=self.pk).update(
reference_count=F('reference_count') + 1
)
self.refresh_from_db()
def release_reference(self):
"""原子操作减少引用计数"""
GlobalFileRegistry.objects.filter(pk=self.pk).update(
reference_count=F('reference_count') - 1
)
self.refresh_from_db()
class AppFileLink(models.Model):
"""应用文件关联表(替代直接外键)"""
app_name = models.CharField(max_length=50)
global_file = models.ForeignKey(GlobalFileRegistry, on_delete=models.DO_NOTHING)
local_metadata = models.JSONField(default=dict)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
unique_together = ('app_name', 'global_file')
class ParameterFile(models.Model):
"""参数文件表(示例应用专用表)"""
app_link = models.OneToOneField(AppFileLink, on_delete=models.CASCADE)
version = models.PositiveIntegerField(default=1)
status = models.CharField(max_length=20)
@property
def s3_url(self):
return f"s3://{self.app_link.global_file.s3_bucket}/{self.app_link.global_file.file_key}"
class ParameterHistory(models.Model):
"""独立历史记录表"""
app_name = models.CharField(max_length=50) # 新增应用标识
file_snapshot = models.JSONField()
operation_time = models.DateTimeField(auto_now_add=True)
# 信号处理:删除应用关联时更新引用计数
@receiver(pre_delete, sender=AppFileLink)
def handle_app_file_delete(sender, instance, **kwargs):
# 原子操作减少引用计数
instance.global_file.release_reference()
# 检查是否需要进行物理删除
if instance.global_file.reference_count == 0 and not instance.global_file.is_locked:
# 异步任务删除S3文件(使用Celery等)
from .tasks import delete_s3_file
delete_s3_file.delay(
bucket=instance.global_file.s3_bucket,
key=instance.global_file.file_key
)
instance.global_file.delete()
关键机制说明:
- 引用计数系统
- 新增
reference_count
字段跟踪文件被引用的次数 - 使用原子操作保证计数准确性(
F表达式
) - 只有引用归零且未锁定时才执行物理删除
- 新增
- 应用关联隔离设计
- 通过
AppFileLink
中间表建立应用与文件的关联 - 删除应用记录时只解除关联,不影响核心注册表
- 唯一约束确保同一应用不会重复关联文件
- 通过
- 历史记录隔离
- 历史表显式记录
app_name
标识来源应用 - 删除应用时只需删除该应用的关联记录,历史数据完整保留
- 历史表显式记录
- 安全删除流程
# 示例删除操作流程 def safe_delete_app_file(app_name, file_id): # 查找应用关联记录 link = AppFileLink.objects.get(app_name=app_name, id=file_id) # 创建历史快照 ParameterHistory.objects.create( app_name=app_name, file_snapshot={ 'key': link.global_file.file_key, 'metadata': link.local_metadata } ) # 执行删除(自动触发信号处理) link.delete()
扩展优势:
- 多应用共存:不同应用通过独立
AppFileLink
记录访问同一物理文件 - 删除安全:删除操作只影响当前应用的关联关系,其他应用的引用保持不变
- 历史追溯:通过
app_name
字段可精确查询各应用的操作历史 - 文件锁定:
is_locked
字段可防止关键文件被误删
该方案通过引用计数和关联表的设计,实现了应用级文件管理的完全隔离。当删除某个应用的文件时,系统只会解除该应用与文件的关联关系,只有当所有应用都解除关联后,系统才会自动清理物理文件,从根本上保证了跨应用数据的安全性。
以下是完整的Django数据库模型方案,满足文件存储在S3、应用隔离删除、历史记录独立性和权限控制的要求:
# models.py
from django.db import models
from django.db.models import F
from django.dispatch import receiver
from django.db.models.signals import pre_delete, post_save
from storages.backends.s3boto3 import S3Boto3Storage
import logging
logger = logging.getLogger(__name__)
# ---------------------------
# 自定义S3存储配置
# ---------------------------
class AppAwareS3Storage(S3Boto3Storage):
"""按应用分目录的S3存储"""
def __init__(self, app_name=None, **kwargs):
self.app_name = app_name
super().__init__(**kwargs)
def get_valid_name(self, name):
if self.app_name:
return f"{self.app_name}/{name}"
return name
# ---------------------------
# 核心模型
# ---------------------------
class GlobalFileRegistry(models.Model):
"""全局文件元数据注册中心"""
FILE_STATES = [
('active', '活跃'),
('archived', '已归档'),
('locked', '锁定')
]
file_key = models.CharField(max_length=512, unique=True, verbose_name="S3对象键")
bucket_name = models.CharField(max_length=63, default="main-bucket")
reference_count = models.PositiveIntegerField(default=0, verbose_name="引用计数")
state = models.CharField(max_length=20, choices=FILE_STATES, default='active')
global_metadata = models.JSONField(default=dict, verbose_name="全局元数据")
created_at = models.DateTimeField(auto_now_add=True)
modified_at = models.DateTimeField(auto_now=True)
class Meta:
indexes = [
models.Index(fields=['file_key']),
models.Index(fields=['state'])
]
verbose_name = "全局文件注册表"
def __str__(self):
return f"{self.bucket_name}/{self.file_key}"
def acquire_reference(self):
"""原子增加引用计数"""
GlobalFileRegistry.objects.filter(pk=self.pk).update(
reference_count=F('reference_count') + 1
)
self.refresh_from_db()
def release_reference(self):
"""原子减少引用计数"""
GlobalFileRegistry.objects.filter(pk=self.pk).update(
reference_count=F('reference_count') - 1
)
self.refresh_from_db()
class AppFileLink(models.Model):
"""应用与文件的关联关系表"""
app_name = models.CharField(max_length=50, verbose_name="应用名称")
global_file = models.ForeignKey(
GlobalFileRegistry,
on_delete=models.CASCADE,
related_name='app_links'
)
local_metadata = models.JSONField(default=dict, verbose_name="应用私有元数据")
is_primary = models.BooleanField(default=False, verbose_name="主引用")
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
unique_together = ('app_name', 'global_file')
verbose_name = "应用文件关联表"
def save(self, *args, **kwargs):
"""新建关联时更新引用计数"""
if not self.pk: # 新建关联
self.global_file.acquire_reference()
super().save(*args, **kwargs)
# ---------------------------
# 业务模型示例
# ---------------------------
class ParameterFile(models.Model):
"""参数文件业务表"""
app_link = models.OneToOneField(
AppFileLink,
on_delete=models.CASCADE,
related_name='parameter_file'
)
version = models.PositiveIntegerField(default=1)
status = models.CharField(max_length=20, default='draft')
class Meta:
verbose_name = "参数文件"
@property
def storage(self):
"""获取应用专属存储实例"""
return AppAwareS3Storage(app_name=self.app_link.app_name)
def get_presigned_url(self, expiration=3600):
"""生成预签名URL"""
return self.storage.url(
self.app_link.global_file.file_key,
parameters={'ExpiresIn': expiration}
)
class ParameterHistory(models.Model):
"""参数文件操作历史"""
app_name = models.CharField(max_length=50)
operation = models.CharField(max_length=20)
snapshot = models.JSONField()
timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [models.Index(fields=['app_name', 'timestamp'])]
verbose_name = "参数历史记录"
# ---------------------------
# 权限控制模型
# ---------------------------
class ThirdPartyPermission(models.Model):
"""第三方平台访问权限"""
PERMISSION_CHOICES = [
('read', '只读'),
('write', '读写'),
('admin', '管理')
]
app_name = models.CharField(max_length=50)
platform = models.CharField(max_length=50)
access_key = models.CharField(max_length=128)
secret_key = models.CharField(max_length=256)
permission_level = models.CharField(max_length=10, choices=PERMISSION_CHOICES)
valid_until = models.DateTimeField()
class Meta:
unique_together = ('app_name', 'platform')
verbose_name = "第三方权限"
# ---------------------------
# 信号处理
# ---------------------------
@receiver(pre_delete, sender=AppFileLink)
def handle_app_link_delete(sender, instance, **kwargs):
"""处理应用关联删除"""
try:
# 记录删除操作到历史表
ParameterHistory.objects.create(
app_name=instance.app_name,
operation='delete',
snapshot={
'file_key': instance.global_file.file_key,
'metadata': instance.local_metadata
}
)
# 释放引用计数
instance.global_file.release_reference()
# 检查是否需要物理删除
if (instance.global_file.reference_count == 0 and
instance.global_file.state != 'locked'):
# 异步删除S3文件(示例使用线程,生产环境应使用Celery)
from threading import Thread
def async_delete():
try:
storage = AppAwareS3Storage()
storage.delete(instance.global_file.file_key)
instance.global_file.delete()
except Exception as e:
logger.error(f"删除失败: {str(e)}")
Thread(target=async_delete).start()
except Exception as e:
logger.error(f"删除处理异常: {str(e)}")
@receiver(post_save, sender=GlobalFileRegistry)
def log_file_activity(sender, instance, created, **kwargs):
"""记录全局文件变更"""
if created:
action = 'create'
else:
action = 'update'
ParameterHistory.objects.create(
app_name='system',
operation=action,
snapshot={
'file_key': instance.file_key,
'state': instance.state,
'references': instance.reference_count
}
)
# ---------------------------
# 工具函数
# ---------------------------
def create_app_file(app_name, file_key, metadata=None):
"""创建新的应用文件"""
global_file = GlobalFileRegistry.objects.create(
file_key=file_key,
global_metadata=metadata or {}
)
link = AppFileLink.objects.create(
app_name=app_name,
global_file=global_file,
local_metadata={'uploader': 'system'}
)
return link
def transfer_file(source_app, target_app, file_key):
"""跨应用文件流转"""
try:
source_link = AppFileLink.objects.get(
app_name=source_app,
global_file__file_key=file_key
)
# 创建新关联
new_link = AppFileLink.objects.create(
app_name=target_app,
global_file=source_link.global_file,
local_metadata={'source': source_app}
)
return new_link
except AppFileLink.DoesNotExist:
raise ValueError("源文件不存在")
def lock_global_file(file_key):
"""锁定全局文件"""
file = GlobalFileRegistry.objects.get(file_key=file_key)
file.state = 'locked'
file.save()
return file
方案特点说明:
- S3存储集成
- 使用
AppAwareS3Storage
类实现按应用目录存储 - 自动生成预签名URL(
get_presigned_url
方法) - 异步删除机制避免阻塞主线程
- 使用
- 引用计数系统
- 原子操作保证计数准确性(
acquire_reference
/release_reference
) - 只有引用归零且未锁定时触发物理删除
- 文件状态管理(活跃/归档/锁定)
- 原子操作保证计数准确性(
- 应用隔离设计
- 通过
AppFileLink
中间表解耦应用与物理文件 - 删除应用关联只影响当前应用
- 独立的历史记录表保留完整操作日志
- 通过
- 安全机制
- 文件锁定功能防止误删关键文件
- 详细的异常处理和日志记录
- 第三方权限表实现细粒度访问控制
- 扩展性设计
- 全局元数据(
global_metadata
)和应用私有元数据(local_metadata
)分离 - 支持多种文件类型扩展(通过
FILE_STATES
和业务模型继承) - 内置文件流转工具函数(
transfer_file
)
- 全局元数据(
典型操作示例
1. 创建新文件
new_file = create_app_file(
app_name='simulation',
file_key='params/v1/config.json',
metadata={'author': 'AI'}
)
2. 跨应用流转文件
try:
transfer_file(
source_app='simulation',
target_app='analysis',
file_key='params/v1/config.json'
)
except ValueError as e:
print(f"流转失败: {str(e)}")
3. 安全删除应用文件
# 查找要删除的关联记录
link = AppFileLink.objects.get(app_name='analysis', global_file__file_key='params/v1/config.json')
link.delete() # 自动触发信号处理
4. 生成预签名URL
param_file = ParameterFile.objects.get(id=1)
print(param_file.get_presigned_url(expiration=3600)) # 1小时有效期的URL
验证要点
- 引用计数验证
- 创建多个应用关联,检查全局文件的
reference_count
是否正确递增 - 删除关联记录时验证计数递减逻辑
- 创建多个应用关联,检查全局文件的
- 删除隔离验证
- 删除一个应用的关联后,确认其他应用的关联仍然有效
- 验证只有最后一个引用删除后才触发物理删除
- 历史记录完整性
- 执行删除操作后检查历史表中相关记录
- 确认不同应用的操作日志独立存储
- S3集成测试
- 上传文件后验证S3存储路径是否符合预期
- 删除操作后检查S3文件是否被正确移除
该方案通过中间表、引用计数和状态管理实现了应用间的文件独立管理,同时保持文件流转能力。结合Django信号机制和异步任务处理,确保系统在高并发场景下的稳定性和数据一致性。
以下是针对前端支持两种文件上传方式的后端适配方案和Vue3前端实现示例:
后端适配方案
新增API接口
# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from .models import GlobalFileRegistry, AppFileLink
class FileUploadAPI(APIView):
def post(self, request):
upload_type = request.data.get('type')
app_name = request.headers.get('X-App-Name')
if upload_type == 'new':
# 处理新文件上传
file = request.FILES.get('file')
file_key = f"{app_name}/{file.name}"
# 存储到S3(示例代码)
storage = AppAwareS3Storage(app_name=app_name)
storage.save(file_key, file)
# 创建数据库记录
global_file = GlobalFileRegistry.objects.create(
file_key=file_key,
bucket_name=storage.bucket_name,
global_metadata={'size': file.size}
)
AppFileLink.objects.create(
app_name=app_name,
global_file=global_file,
local_metadata={'upload_type': 'new'}
)
return Response({'id': global_file.id}, status=status.HTTP_201_CREATED)
elif upload_type == 'existing':
# 处理现有文件选择
file_id = request.data.get('file_id')
try:
existing_file = GlobalFileRegistry.objects.get(id=file_id)
# 验证权限(示例)
if not existing_file.app_links.filter(app_name=app_name).exists():
AppFileLink.objects.create(
app_name=app_name,
global_file=existing_file,
local_metadata={'source': 'existing'}
)
return Response({'id': existing_file.id})
return Response({'error': '无权访问该文件'}, status=403)
except GlobalFileRegistry.DoesNotExist:
return Response({'error': '文件不存在'}, status=404)
return Response({'error': '无效的上传类型'}, status=400)
class FileListAPI(APIView):
def get(self, request):
app_name = request.headers.get('X-App-Name')
files = GlobalFileRegistry.objects.filter(
app_links__app_name=app_name
).values('id', 'file_key', 'created_at')
return Response(list(files))
前端Vue3实现
组件实现
<!-- FileUpload.vue -->
<template>
<div class="upload-container">
<div class="upload-type-selector">
<button @click="toggleType('new')" :class="{ active: currentType === 'new' }">
上传新文件
</button>
<button @click="toggleType('existing')" :class="{ active: currentType === 'existing' }">
选择已有文件
</button>
</div>
<div v-if="currentType === 'new'" class="file-upload">
<input type="file" @change="handleFileSelect" ref="fileInput" />
<button @click="uploadFile" :disabled="!selectedFile">
上传文件
</button>
</div>
<div v-else class="existing-files">
<div class="file-list">
<div v-for="file in files" :key="file.id" class="file-item" @click="selectFile(file)">
<span class="file-name"></span>
<span class="file-date"></span>
</div>
</div>
<button @click="confirmSelection" :disabled="!selectedExistingFile">
确认选择
</button>
</div>
</div>
</template>
<script setup>
import { ref, onMounted } from 'vue'
import axios from 'axios'
const currentType = ref('new')
const selectedFile = ref(null)
const selectedExistingFile = ref(null)
const files = ref([])
// 获取可用文件列表
const loadExistingFiles = async () => {
try {
const response = await axios.get('/api/files/', {
headers: { 'X-App-Name': 'your-app-name' }
})
files.value = response.data
} catch (error) {
console.error('获取文件列表失败:', error)
}
}
// 切换上传类型
const toggleType = (type) => {
currentType.value = type
if (type === 'existing') loadExistingFiles()
}
// 处理文件选择
const handleFileSelect = (event) => {
selectedFile.value = event.target.files[0]
}
// 上传新文件
const uploadFile = async () => {
const formData = new FormData()
formData.append('file', selectedFile.value)
formData.append('type', 'new')
try {
const response = await axios.post('/api/upload/', formData, {
headers: {
'Content-Type': 'multipart/form-data',
'X-App-Name': 'your-app-name'
}
})
alert(`文件上传成功!ID: ${response.data.id}`)
} catch (error) {
console.error('上传失败:', error)
}
}
// 选择现有文件
const selectFile = (file) => {
selectedExistingFile.value = file
}
// 确认选择现有文件
const confirmSelection = async () => {
try {
const response = await axios.post('/api/upload/', {
type: 'existing',
file_id: selectedExistingFile.value.id
}, {
headers: { 'X-App-Name': 'your-app-name' }
})
alert(`文件选择成功!ID: ${response.data.id}`)
} catch (error) {
console.error('选择失败:', error)
}
}
// 辅助函数
const formatDate = (timestamp) => {
return new Date(timestamp).toLocaleDateString()
}
onMounted(loadExistingFiles)
</script>
<style scoped>
.upload-container {
max-width: 600px;
margin: 20px auto;
padding: 20px;
border: 1px solid #ccc;
border-radius: 8px;
}
.upload-type-selector {
margin-bottom: 20px;
display: flex;
gap: 10px;
}
button {
padding: 8px 16px;
cursor: pointer;
background: #f0f0f0;
border: 1px solid #ddd;
border-radius: 4px;
}
button.active {
background: #007bff;
color: white;
border-color: #007bff;
}
.file-list {
max-height: 300px;
overflow-y: auto;
margin-bottom: 15px;
}
.file-item {
padding: 10px;
border: 1px solid #eee;
margin-bottom: 5px;
cursor: pointer;
display: flex;
justify-content: space-between;
}
.file-item:hover {
background-color: #f8f9fa;
}
</style>
系统工作流程
- 新文件上传流程:
- 用户选择本地文件 → 前端调用
/api/upload/
的POST请求(类型为new
) - 后端保存文件到S3 → 创建全局文件记录 → 返回新文件ID
- 前端显示上传成功信息
- 用户选择本地文件 → 前端调用
- 现有文件选择流程:
- 用户切换至现有文件视图 → 前端调用
/api/files/
获取可用文件列表 - 用户选择文件 → 前端调用
/api/upload/
的POST请求(类型为existing
) - 后端验证权限 → 创建关联记录 → 返回文件ID
- 前端显示选择成功信息
- 用户切换至现有文件视图 → 前端调用
关键特性说明
- 前端双模式支持:
- 通过状态切换按钮实现两种模式的UI切换
- 动态加载现有文件列表
- 统一的提交处理接口
- 后端安全验证:
- 通过
X-App-Name
请求头识别来源应用 - 现有文件选择时验证应用权限
- 使用FormData处理文件上传
- 通过
- 数据一致性保障:
- 新文件上传自动创建所有关联记录
- 现有文件选择只增加引用计数
- 所有操作记录历史日志
- 扩展性设计:
- 前端组件可复用不同业务模块
- 后端接口支持多种文件类型扩展
- 权限验证逻辑可扩展更复杂的规则
适配建议
- 权限增强:
- 添加JWT认证头代替简单的
X-App-Name
- 实现细粒度的文件访问控制策略
- 添加JWT认证头代替简单的
- 性能优化:
- 添加文件分页加载功能
- 实现前端上传进度显示
- 使用Web Workers处理大文件分片上传
- 用户体验改进:
- 添加文件搜索过滤功能
- 实现拖拽上传支持
- 添加文件预览功能
该方案通过前后端协同设计,完整实现了两种文件处理模式的切换和协同工作。后端保持原有的存储架构不变,通过扩展API接口支持新功能,前端通过状态管理实现无缝的交互体验,同时保持代码的可维护性和扩展性。
以下是支持跨平台文件复制的完整解决方案:
后端适配方案
新增模型和API
# models.py
class ExternalPlatform(models.Model):
name = models.CharField(max_length=50, unique=True)
bucket_name = models.CharField(max_length=63)
access_key = models.CharField(max_length=128)
secret_key = models.CharField(max_length=256)
region = models.CharField(max_length=50, default='us-east-1')
class ExternalFileReference(models.Model):
platform = models.ForeignKey(ExternalPlatform, on_delete=models.CASCADE)
external_file_key = models.CharField(max_length=512)
internal_file = models.ForeignKey(GlobalFileRegistry, on_delete=models.CASCADE)
copied_at = models.DateTimeField(auto_now_add=True)
# views.py
from boto3 import client
from django.conf import settings
class ExternalFileAPI(APIView):
def get(self, request):
"""获取外部平台文件列表"""
platform_id = request.query_params.get('platform_id')
try:
platform = ExternalPlatform.objects.get(id=platform_id)
s3 = client('s3',
aws_access_key_id=platform.access_key,
aws_secret_access_key=platform.secret_key,
region_name=platform.region
)
# 示例:获取前100个文件
response = s3.list_objects_v2(Bucket=platform.bucket_name, MaxKeys=100)
files = [{'key': obj['Key'], 'size': obj['Size']}
for obj in response.get('Contents', [])]
return Response(files)
except Exception as e:
return Response({'error': str(e)}, status=500)
def post(self, request):
"""复制外部文件到本平台"""
app_name = request.headers.get('X-App-Name')
platform_id = request.data.get('platform_id')
external_key = request.data.get('file_key')
try:
# 获取平台配置
platform = ExternalPlatform.objects.get(id=platform_id)
# 生成新文件路径
new_key = f"external/{platform.name}/{external_key.split('/')[-1]}"
# 初始化S3客户端
source_s3 = client('s3',
aws_access_key_id=platform.access_key,
aws_secret_access_key=platform.secret_key,
region_name=platform.region
)
dest_s3 = client('s3',
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY
)
# 执行复制操作
copy_source = {'Bucket': platform.bucket_name, 'Key': external_key}
dest_s3.copy_object(
CopySource=copy_source,
Bucket=settings.AWS_STORAGE_BUCKET_NAME,
Key=new_key
)
# 创建数据库记录
global_file = GlobalFileRegistry.objects.create(
file_key=new_key,
bucket_name=settings.AWS_STORAGE_BUCKET_NAME,
global_metadata={'source': 'external'}
)
ExternalFileReference.objects.create(
platform=platform,
external_file_key=external_key,
internal_file=global_file
)
AppFileLink.objects.create(
app_name=app_name,
global_file=global_file,
local_metadata={'copied_from': platform.name}
)
return Response({'id': global_file.id}, status=201)
except Exception as e:
logger.error(f"文件复制失败: {str(e)}")
return Response({'error': '文件复制失败'}, status=500)
前端Vue3实现
<!-- ExternalFileSelector.vue -->
<template>
<div class="external-file-selector">
<div class="platform-selection">
<select v-model="selectedPlatform">
<option v-for="platform in platforms" :value="platform.id">
</option>
</select>
<button @click="loadExternalFiles">加载文件</button>
</div>
<div v-if="externalFiles.length" class="file-list">
<div v-for="file in externalFiles"
:key="file.key"
class="file-item"
@click="selectedFile = file.key"
:class="{ selected: selectedFile === file.key }">
<span class="file-name"></span>
<span class="file-size"></span>
</div>
</div>
<div class="actions">
<button @click="copyFile" :disabled="!selectedFile">
复制到本平台
</button>
<div v-if="copyProgress" class="progress">
复制进度: %
</div>
</div>
</div>
</template>
<script setup>
import { ref, onMounted } from 'vue'
import axios from 'axios'
const platforms = ref([])
const selectedPlatform = ref(null)
const externalFiles = ref([])
const selectedFile = ref(null)
const copyProgress = ref(0)
// 加载已配置的外部平台
const loadPlatforms = async () => {
try {
const response = await axios.get('/api/external-platforms/')
platforms.value = response.data
} catch (error) {
console.error('加载平台失败:', error)
}
}
// 加载外部文件列表
const loadExternalFiles = async () => {
try {
const response = await axios.get('/api/external-files/', {
params: { platform_id: selectedPlatform.value }
})
externalFiles.value = response.data
} catch (error) {
console.error('加载文件失败:', error)
}
}
// 执行文件复制
const copyFile = async () => {
try {
const response = await axios.post('/api/external-files/', {
platform_id: selectedPlatform.value,
file_key: selectedFile.value
}, {
headers: { 'X-App-Name': 'your-app-name' },
onUploadProgress: progressEvent => {
if (progressEvent.total) {
copyProgress.value = Math.round(
(progressEvent.loaded * 100) / progressEvent.total
)
}
}
})
alert(`文件复制成功!新文件ID: ${response.data.id}`)
selectedFile.value = null
copyProgress.value = 0
} catch (error) {
console.error('复制失败:', error)
alert('文件复制失败,请检查控制台')
}
}
// 辅助函数
const formatSize = bytes => {
const units = ['B', 'KB', 'MB', 'GB']
let size = bytes
let unitIndex = 0
while (size >= 1024 && unitIndex < units.length - 1) {
size /= 1024
unitIndex++
}
return `${size.toFixed(1)} ${units[unitIndex]}`
}
onMounted(loadPlatforms)
</script>
<style scoped>
.external-file-selector {
border: 1px solid #e0e0e0;
padding: 20px;
border-radius: 8px;
margin-top: 20px;
}
.platform-selection {
display: flex;
gap: 10px;
margin-bottom: 20px;
}
select {
padding: 8px;
border-radius: 4px;
border: 1px solid #ddd;
}
.file-list {
max-height: 400px;
overflow-y: auto;
margin-bottom: 20px;
}
.file-item {
padding: 12px;
border: 1px solid #eee;
margin-bottom: 8px;
cursor: pointer;
display: flex;
justify-content: space-between;
transition: all 0.2s;
}
.file-item:hover {
background-color: #f8f9fa;
}
.file-item.selected {
border-color: #007bff;
background-color: #e7f1ff;
}
.progress {
margin-top: 10px;
color: #28a745;
font-weight: bold;
}
</style>
系统工作流程
- 配置外部平台:
- 管理员在后台配置外部平台信息(S3凭证、存储桶等)
- 配置信息加密存储在
ExternalPlatform
表中
- 选择外部文件:
- 用户选择目标平台
- 前端调用
/api/external-files/
获取文件列表 - 用户选择要复制的文件
- 执行复制操作:
- 前端调用
/api/external-files/
的POST请求 - 后端使用AWS SDK执行跨桶复制
- 创建本地文件记录和关联关系
- 前端调用
- 结果反馈:
- 前端显示复制进度和结果
- 复制后的文件出现在本地文件列表中
关键安全措施
- 凭证管理:
- 外部平台凭证加密存储
- 使用最小权限原则配置S3访问策略
- 访问控制:
- 前端只能选择已配置的平台
- 后端验证用户权限后才执行复制操作
- 数据完整性:
- 使用S3的ETag验证文件完整性
- 记录完整的复制日志到数据库
- 错误处理:
- 实现自动重试机制
- 记录详细的错误日志
- 提供事务回滚机制
性能优化建议
- 大文件处理:
# 使用分片复制 def copy_large_file(source_s3, dest_s3, copy_source, dest_bucket, dest_key): # 初始化分片上传 upload_id = dest_s3.create_multipart_upload( Bucket=dest_bucket, Key=dest_key )['UploadId'] # 获取源文件大小 head = source_s3.head_object( Bucket=copy_source['Bucket'], Key=copy_source['Key'] ) size = head['ContentLength'] # 分片复制(每部分5MB) part_size = 5 * 1024 * 1024 parts = [] for i in range(0, size, part_size): copy_range = f"bytes={i}-{min(i+part_size-1, size-1)}" part = dest_s3.upload_part_copy( Bucket=dest_bucket, Key=dest_key, CopySource=copy_source, UploadId=upload_id, PartNumber=len(parts)+1, CopySourceRange=copy_range ) parts.append({'PartNumber': len(parts)+1, 'ETag': part['CopyPartResult']['ETag']}) # 完成分片上传 dest_s3.complete_multipart_upload( Bucket=dest_bucket, Key=dest_key, UploadId=upload_id, MultipartUpload={'Parts': parts} )
- 异步任务处理:
# tasks.py @shared_task def async_copy_file(platform_id, external_key, app_name): try: # 此处实现上述复制逻辑 # ... return {'success': True, 'file_id': global_file.id} except Exception as e: return {'success': False, 'error': str(e)}
- 前端优化:
<!-- 添加进度显示 --> <template> <div v-if="copyProgress > 0" class="progress-bar"> <div :style="{ width: copyProgress + '%' }"></div> </div> </template> <style> .progress-bar { height: 8px; background: #eee; border-radius: 4px; overflow: hidden; margin: 10px 0; } .progress-bar div { height: 100%; background: #007bff; transition: width 0.3s ease; } </style>
验证要点
- 端到端测试:
- 测试从选择外部平台到文件复制的完整流程
- 验证复制后的文件可正常访问和使用
- 权限测试:
- 尝试使用无效凭证访问外部平台
- 验证无权用户无法执行复制操作
- 大文件测试:
- 测试多GB级别文件的复制
- 验证分片复制功能的可靠性
- 回滚测试:
- 在复制过程中中断操作
- 验证系统能正确清理未完成的文件
该方案通过前后端协同实现了安全的跨平台文件复制功能,在保持原有架构的基础上扩展了外部平台支持。通过S3的直接复制功能保障了传输效率,结合数据库事务和异步任务处理确保了系统的可靠性,前端组件化的实现方式方便集成到现有系统中。