为了提升性能,可以通过以下几种优化方式来加快清理不再被引用的S3文件的过程,特别是对于大规模文件操作和复杂数据库遍历的场景:
性能优化方案:
- 批量查询数据库:
- 避免一次性加载所有数据,而是分批次从数据库中获取数据,减少内存压力。例如,使用Django的
iterator()
方法来节省内存。
# 批量迭代获取数据库中的附件信息 def get_current_files_in_batches(batch_size=1000): current_files = set() for record in YourModel.objects.iterator(chunk_size=batch_size): json_data = json.loads(record.json_field) current_files.update(json_data.get('attachments', [])) return current_files
- 避免一次性加载所有数据,而是分批次从数据库中获取数据,减少内存压力。例如,使用Django的
- 异步并行处理:
- 使用异步任务处理S3文件的删除操作,可以通过Python的
concurrent.futures
或celery
来并发处理文件的删除,以加快删除速度。
使用
concurrent.futures
示例:import boto3 from concurrent.futures import ThreadPoolExecutor s3_client = boto3.client('s3') bucket_name = 'your-bucket-name' def delete_file(file_name): s3_client.delete_object(Bucket=bucket_name, Key=file_name) print(f"Deleted {file_name} from S3") def delete_s3_files_in_parallel(files_to_delete): with ThreadPoolExecutor(max_workers=10) as executor: # 使用10个并发线程 executor.map(delete_file, files_to_delete)
- 使用异步任务处理S3文件的删除操作,可以通过Python的
- 批量删除S3文件:
- S3支持批量删除最多1000个文件,可以一次性删除多个文件,这将显著提升删除效率。
def delete_s3_files_in_bulk(files_to_delete): s3_client = boto3.client('s3') bucket_name = 'your-bucket-name' objects = [{'Key': file_name} for file_name in files_to_delete] # S3 批量删除操作 s3_client.delete_objects(Bucket=bucket_name, Delete={'Objects': objects})
- 减少数据库查询的开销:
- 将所有查询操作合并到一个操作中,以减少数据库交互的次数。可以直接查询所有JSON字段中包含的附件,并利用
JSONB
字段的索引提高查询速度。 - 在Postgres中,可以使用
jsonb_array_elements
函数来直接从数据库的JSON字段中提取数组元素:
SELECT jsonb_array_elements(json_field->'attachments') AS file_name FROM your_model;
- 将所有查询操作合并到一个操作中,以减少数据库交互的次数。可以直接查询所有JSON字段中包含的附件,并利用
- 缓存机制:
- 如果文件列表变化不频繁,可以将数据库中已引用的文件信息缓存到内存或Redis中,避免每次都从数据库中重新查询,尤其是当附件数据量较大时。
- 合理的调度频率:
- 根据业务需求调整清理任务的调度频率。频繁地遍历和清理可能会增加系统负载,建议根据文件的增长速度和删除需求设定合理的调度间隔。
- 分页获取S3文件:
- 如果S3中存储的文件很多,可以分页获取文件来避免一次性加载大量文件,减少API调用和内存消耗。
def get_all_s3_files(): s3_client = boto3.client('s3') bucket_name = 'your-bucket-name' s3_files = set() paginator = s3_client.get_paginator('list_objects_v2') for page in paginator.paginate(Bucket=bucket_name): for content in page.get('Contents', []): s3_files.add(content['Key']) return s3_files
优化后的流程:
- 批量遍历数据库,每次获取一定量的附件数据,减少内存占用。
- 并行处理S3删除操作,利用多线程加快文件删除速度。
- 批量删除S3文件,通过S3批量删除API每次删除最多1000个文件,提升效率。
- 减少数据库查询,通过优化查询语句和索引,提升数据提取速度。
这种方式可以确保系统具备较好的性能,即便在大量数据和文件操作的场景下,也能够保持高效的执行速度。
以下是一个完整可行的代码示例,它将数据库与历史数据库中的JSON格式附件数据提取出来,与S3存储中的文件进行比对,并并行且批量删除不再使用的文件。该代码包括数据库的批量遍历、S3文件的批量删除、并行执行删除操作等优化,确保较好的性能。
完整代码实现
import json
import boto3
from concurrent.futures import ThreadPoolExecutor
from django.db import connection
# S3 client initialization
s3_client = boto3.client('s3')
bucket_name = 'your-bucket-name'
# 获取当前数据库中所有引用的文件
def get_current_files_in_batches(batch_size=1000):
current_files = set()
# 直接使用SQL查询,优化性能
with connection.cursor() as cursor:
cursor.execute("SELECT json_field FROM your_model_table_name")
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
for row in rows:
json_data = json.loads(row[0]) # 解析JSON字段
current_files.update(json_data.get('attachments', [])) # 提取附件数组
return current_files
# 获取历史数据库中所有引用的文件
def get_historical_files_in_batches(batch_size=1000):
historical_files = set()
# 直接使用SQL查询,优化性能
with connection.cursor() as cursor:
cursor.execute("SELECT json_field FROM your_historical_model_table_name")
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
for row in rows:
json_data = json.loads(row[0]) # 解析JSON字段
historical_files.update(json_data.get('attachments', [])) # 提取附件数组
return historical_files
# 获取S3中所有文件(分页处理)
def get_all_s3_files():
s3_files = set()
paginator = s3_client.get_paginator('list_objects_v2')
# 分页列出S3中的所有文件
for page in paginator.paginate(Bucket=bucket_name):
for content in page.get('Contents', []):
s3_files.add(content['Key'])
return s3_files
# 批量删除S3文件
def delete_s3_files_in_bulk(files_to_delete):
# 将要删除的文件以S3批量删除格式组织
objects = [{'Key': file_name} for file_name in files_to_delete]
# 批量删除
if objects:
s3_client.delete_objects(Bucket=bucket_name, Delete={'Objects': objects})
print(f"Deleted {len(files_to_delete)} files from S3")
# 使用并发进行批量删除
def delete_s3_files_concurrently(files_to_delete, max_workers=10, batch_size=1000):
# 将待删除的文件按批次分割
files_to_delete_batches = [files_to_delete[i:i + batch_size] for i in range(0, len(files_to_delete), batch_size)]
# 使用线程池并发删除
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(delete_s3_files_in_bulk, files_to_delete_batches)
# 主执行逻辑
def clean_up_s3():
# 1. 获取当前数据库中的文件
print("Fetching current database files...")
current_files = get_current_files_in_batches()
# 2. 获取历史数据库中的文件
print("Fetching historical database files...")
historical_files = get_historical_files_in_batches()
# 3. 获取S3中所有文件
print("Fetching S3 files...")
s3_files = get_all_s3_files()
# 4. 合并当前和历史数据库中的文件
referenced_files = current_files.union(historical_files)
# 5. 找出不在数据库中的文件(需要删除的文件)
files_to_delete = list(s3_files.difference(referenced_files))
print(f"Found {len(files_to_delete)} files to delete from S3.")
# 6. 并发删除不再引用的S3文件
if files_to_delete:
print("Deleting files from S3...")
delete_s3_files_concurrently(files_to_delete)
print("Cleanup complete.")
else:
print("No files to delete from S3.")
# 执行清理任务
if __name__ == "__main__":
clean_up_s3()
代码说明:
- 获取数据库文件:
get_current_files_in_batches()
和get_historical_files_in_batches()
从当前数据库和历史数据库中批量获取JSON字段中的附件数组信息,避免一次性加载所有数据引发的内存压力。
- 获取S3文件:
get_all_s3_files()
使用分页机制获取S3存储桶中的所有文件,以避免单次请求文件过多引发的内存问题。
- 批量删除S3文件:
delete_s3_files_in_bulk()
使用S3批量删除接口,一次最多可以删除1000个文件。delete_s3_files_concurrently()
使用ThreadPoolExecutor
进行并发操作,以提升删除效率。
- 清理逻辑:
clean_up_s3()
整合所有步骤,最终找到不在数据库中的文件,并进行并发批量删除。
适用场景:
- 数据量较大时,可以有效避免一次性加载过多数据,节省内存。
- 使用多线程并发删除S3文件,提升性能,适用于需要高效处理大量S3文件的场景。
- 利用S3的批量删除功能,一次性删除多个文件,减少API调用次数。