删除不在数据库中的s3文件

Posted by Shallow Dreameron September 8, 2024

为了提升性能,可以通过以下几种优化方式来加快清理不再被引用的S3文件的过程,特别是对于大规模文件操作和复杂数据库遍历的场景:

性能优化方案:

  1. 批量查询数据库
    • 避免一次性加载所有数据,而是分批次从数据库中获取数据,减少内存压力。例如,使用Django的iterator()方法来节省内存。
    # 批量迭代获取数据库中的附件信息
    def get_current_files_in_batches(batch_size=1000):
        current_files = set()
        for record in YourModel.objects.iterator(chunk_size=batch_size):
            json_data = json.loads(record.json_field)
            current_files.update(json_data.get('attachments', []))
        return current_files
    
  2. 异步并行处理
    • 使用异步任务处理S3文件的删除操作,可以通过Python的concurrent.futurescelery来并发处理文件的删除,以加快删除速度。

    使用concurrent.futures示例:

    import boto3
    from concurrent.futures import ThreadPoolExecutor
       
    s3_client = boto3.client('s3')
    bucket_name = 'your-bucket-name'
       
    def delete_file(file_name):
        s3_client.delete_object(Bucket=bucket_name, Key=file_name)
        print(f"Deleted {file_name} from S3")
       
    def delete_s3_files_in_parallel(files_to_delete):
        with ThreadPoolExecutor(max_workers=10) as executor:  # 使用10个并发线程
            executor.map(delete_file, files_to_delete)
    
  3. 批量删除S3文件
    • S3支持批量删除最多1000个文件,可以一次性删除多个文件,这将显著提升删除效率。
    def delete_s3_files_in_bulk(files_to_delete):
        s3_client = boto3.client('s3')
        bucket_name = 'your-bucket-name'
        objects = [{'Key': file_name} for file_name in files_to_delete]
       
        # S3 批量删除操作
        s3_client.delete_objects(Bucket=bucket_name, Delete={'Objects': objects})
    
  4. 减少数据库查询的开销
    • 将所有查询操作合并到一个操作中,以减少数据库交互的次数。可以直接查询所有JSON字段中包含的附件,并利用JSONB字段的索引提高查询速度。
    • 在Postgres中,可以使用jsonb_array_elements函数来直接从数据库的JSON字段中提取数组元素:
    SELECT jsonb_array_elements(json_field->'attachments') AS file_name
    FROM your_model;
    
  5. 缓存机制
    • 如果文件列表变化不频繁,可以将数据库中已引用的文件信息缓存到内存或Redis中,避免每次都从数据库中重新查询,尤其是当附件数据量较大时。
  6. 合理的调度频率
    • 根据业务需求调整清理任务的调度频率。频繁地遍历和清理可能会增加系统负载,建议根据文件的增长速度和删除需求设定合理的调度间隔。
  7. 分页获取S3文件
    • 如果S3中存储的文件很多,可以分页获取文件来避免一次性加载大量文件,减少API调用和内存消耗。
    def get_all_s3_files():
        s3_client = boto3.client('s3')
        bucket_name = 'your-bucket-name'
        s3_files = set()
        paginator = s3_client.get_paginator('list_objects_v2')
       
        for page in paginator.paginate(Bucket=bucket_name):
            for content in page.get('Contents', []):
                s3_files.add(content['Key'])
           
        return s3_files
    

优化后的流程:

  1. 批量遍历数据库,每次获取一定量的附件数据,减少内存占用。
  2. 并行处理S3删除操作,利用多线程加快文件删除速度。
  3. 批量删除S3文件,通过S3批量删除API每次删除最多1000个文件,提升效率。
  4. 减少数据库查询,通过优化查询语句和索引,提升数据提取速度。

这种方式可以确保系统具备较好的性能,即便在大量数据和文件操作的场景下,也能够保持高效的执行速度。


以下是一个完整可行的代码示例,它将数据库与历史数据库中的JSON格式附件数据提取出来,与S3存储中的文件进行比对,并并行且批量删除不再使用的文件。该代码包括数据库的批量遍历、S3文件的批量删除、并行执行删除操作等优化,确保较好的性能。

完整代码实现

import json
import boto3
from concurrent.futures import ThreadPoolExecutor
from django.db import connection

# S3 client initialization
s3_client = boto3.client('s3')
bucket_name = 'your-bucket-name'

# 获取当前数据库中所有引用的文件
def get_current_files_in_batches(batch_size=1000):
    current_files = set()
    
    # 直接使用SQL查询,优化性能
    with connection.cursor() as cursor:
        cursor.execute("SELECT json_field FROM your_model_table_name")
        while True:
            rows = cursor.fetchmany(batch_size)
            if not rows:
                break
            for row in rows:
                json_data = json.loads(row[0])  # 解析JSON字段
                current_files.update(json_data.get('attachments', []))  # 提取附件数组
    return current_files

# 获取历史数据库中所有引用的文件
def get_historical_files_in_batches(batch_size=1000):
    historical_files = set()

    # 直接使用SQL查询,优化性能
    with connection.cursor() as cursor:
        cursor.execute("SELECT json_field FROM your_historical_model_table_name")
        while True:
            rows = cursor.fetchmany(batch_size)
            if not rows:
                break
            for row in rows:
                json_data = json.loads(row[0])  # 解析JSON字段
                historical_files.update(json_data.get('attachments', []))  # 提取附件数组
    return historical_files

# 获取S3中所有文件(分页处理)
def get_all_s3_files():
    s3_files = set()
    paginator = s3_client.get_paginator('list_objects_v2')

    # 分页列出S3中的所有文件
    for page in paginator.paginate(Bucket=bucket_name):
        for content in page.get('Contents', []):
            s3_files.add(content['Key'])
    
    return s3_files

# 批量删除S3文件
def delete_s3_files_in_bulk(files_to_delete):
    # 将要删除的文件以S3批量删除格式组织
    objects = [{'Key': file_name} for file_name in files_to_delete]
    
    # 批量删除
    if objects:
        s3_client.delete_objects(Bucket=bucket_name, Delete={'Objects': objects})
        print(f"Deleted {len(files_to_delete)} files from S3")

# 使用并发进行批量删除
def delete_s3_files_concurrently(files_to_delete, max_workers=10, batch_size=1000):
    # 将待删除的文件按批次分割
    files_to_delete_batches = [files_to_delete[i:i + batch_size] for i in range(0, len(files_to_delete), batch_size)]
    
    # 使用线程池并发删除
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        executor.map(delete_s3_files_in_bulk, files_to_delete_batches)

# 主执行逻辑
def clean_up_s3():
    # 1. 获取当前数据库中的文件
    print("Fetching current database files...")
    current_files = get_current_files_in_batches()

    # 2. 获取历史数据库中的文件
    print("Fetching historical database files...")
    historical_files = get_historical_files_in_batches()

    # 3. 获取S3中所有文件
    print("Fetching S3 files...")
    s3_files = get_all_s3_files()

    # 4. 合并当前和历史数据库中的文件
    referenced_files = current_files.union(historical_files)
    
    # 5. 找出不在数据库中的文件(需要删除的文件)
    files_to_delete = list(s3_files.difference(referenced_files))

    print(f"Found {len(files_to_delete)} files to delete from S3.")

    # 6. 并发删除不再引用的S3文件
    if files_to_delete:
        print("Deleting files from S3...")
        delete_s3_files_concurrently(files_to_delete)
        print("Cleanup complete.")
    else:
        print("No files to delete from S3.")

# 执行清理任务
if __name__ == "__main__":
    clean_up_s3()

代码说明:

  1. 获取数据库文件
    • get_current_files_in_batches()get_historical_files_in_batches() 从当前数据库和历史数据库中批量获取JSON字段中的附件数组信息,避免一次性加载所有数据引发的内存压力。
  2. 获取S3文件
    • get_all_s3_files() 使用分页机制获取S3存储桶中的所有文件,以避免单次请求文件过多引发的内存问题。
  3. 批量删除S3文件
    • delete_s3_files_in_bulk() 使用S3批量删除接口,一次最多可以删除1000个文件。
    • delete_s3_files_concurrently() 使用ThreadPoolExecutor进行并发操作,以提升删除效率。
  4. 清理逻辑
    • clean_up_s3() 整合所有步骤,最终找到不在数据库中的文件,并进行并发批量删除。

适用场景:

  • 数据量较大时,可以有效避免一次性加载过多数据,节省内存。
  • 使用多线程并发删除S3文件,提升性能,适用于需要高效处理大量S3文件的场景。
  • 利用S3的批量删除功能,一次性删除多个文件,减少API调用次数。