pd库读取xlsx文件问题

Posted by Shallow Dreamer on September 10, 2023

当您使用Pandas库(通常使用pd.read_excel函数)读取一个Excel文件时,并且使用header=[0,1]参数指定了多级表头时,Pandas会尝试自动填充没有数据的列,以构建多级索引。如果您希望避免这种自动填充行为,可以在读取数据后手动处理DataFrame,删除或替换不需要的列名。

以下是一个示例,演示如何读取Excel文件后手动处理多级索引的列名,以删除不需要的列名或将其替换为适当的值:

import pandas as pd

# 读取Excel文件
df = pd.read_excel('your_excel_file.xlsx', header=[0, 1])

# 手动处理多级索引的列名
# 例如,删除第一级索引的列名和第二级索引的第一个列名
df.columns = df.columns.droplevel(0)
df.columns = [f'Column_{i}' if col == '' else col for i, col in enumerate(df.columns)]

# 现在df的列名已经被处理,不再包含自动填充的列名
# 您可以继续处理数据框,或者将其用于您的分析或操作

在上述示例中,我们使用df.columns.droplevel(0)删除了第一级索引的列名,然后使用列表推导式将没有名称的列名替换为适当的列名。这个方法允许您手动处理列名,以满足您的需求。您可以根据需要进一步调整列名的处理方式。

当您使用Pandas库读取一个Excel文件,并且设置header=[0,1]来读取多级表头时,Pandas会尝试填充没有数据的列。这是因为Excel文件通常会在上层表头中省略某些单元格,但Pandas会尽力填充这些空白单元格,以确保表头的多级索引完整。

如果您希望在读取后删除这些填充的列,可以使用以下方法:

import pandas as pd

# 读取Excel文件并设置多级表头
df = pd.read_excel('your_excel_file.xlsx', header=[0, 1])

# 删除所有具有NaN值的列
df = df.dropna(axis=1, how='all')

# 如果要删除特定级别的表头,可以使用以下方法
# 删除第一级表头的空列
df = df.loc[:, (slice(None), ~df.columns.get_level_values(0).duplicated())]

# 删除第二级表头的空列
df = df.loc[:, (slice(None), ~df.columns.get_level_values(1).duplicated())]

# 打印处理后的DataFrame
print(df)

上述代码中,我们首先使用dropna方法删除所有具有NaN值的列,然后使用~df.columns.get_level_values(0).duplicated()来删除第一级表头的重复列,并使用~df.columns.get_level_values(1).duplicated()来删除第二级表头的重复列。这将删除由Pandas填充的空白列,并确保表头中没有多余的列。

根据您的数据和需求,您可以根据需要进行进一步的自定义操作。

当使用pandas库的read_excel函数读取一个Excel文件并设置header=[0, 1]时,pandas会尝试填充缺少数据的列,并根据上面的列来补全表头。这是pandas的默认行为,但如果您希望禁止这个自动填充操作,可以使用usecols参数来选择特定的列,而不是设置header参数。

以下是一个示例,演示如何使用usecols参数来读取Excel文件并选择特定的列,而不进行自动填充:

import pandas as pd

# 读取Excel文件,只选择特定的列,而不进行自动填充
df = pd.read_excel('your_excel_file.xlsx', usecols=[0, 1, 2])  # 选择第1、2、3列

# 打印DataFrame
print(df)

在上面的示例中,usecols参数用于指定要读取的列索引,而不依赖于header参数的设置。这样,您可以精确地选择要读取的列,而不会进行自动填充。

如果您需要保留更多的灵活性,可以在usecols参数中传递列名而不是索引,这样您可以根据需要选择要包括的列。例如:

df = pd.read_excel('your_excel_file.xlsx', usecols=['Column1', 'Column2', 'Column3'])

这将根据列名选择要包括的列。确保将列名替换为您的实际列名。

当您使用Pandas库的read_excel函数读取一个Excel文件,并且在设置header参数为[0, 1]时,如果第一行的表头自动填充了没有数据的列,这可能是因为Excel文件中存在合并单元格或空白单元格。Pandas会将这些空白单元格的值填充到相邻的单元格,导致列名被错位。

要处理这种情况,您可以采取以下几种方法:

  1. 手动清理Excel文件: 打开Excel文件,检查是否存在合并单元格或额外的空白单元格,然后手动删除它们。确保每个列名都在正确的单元格中,然后再次尝试使用Pandas读取文件。

  2. 跳过无效的行: 使用skiprows参数来跳过文件中不需要的行。您可以通过指定要跳过的行数来排除文件的顶部部分。这样,您可以跳过导致问题的行,然后手动指定列名。

import pandas as pd

# 跳过前n行
df = pd.read_excel('your_excel_file.xlsx', header=[0, 1], skiprows=n)
  1. 手动指定列名: 如果Excel文件中的列名混乱或有问题,您可以手动指定列名,而不依赖于Pandas自动解析。首先,读取数据而不指定列名,然后使用df.columns属性手动指定列名。
import pandas as pd

# 读取数据
df = pd.read_excel('your_excel_file.xlsx', header=[0, 1])

# 手动指定列名
column_names = ['Column1', 'Column2', 'Column3', ...]  # 替换为您的列名
df.columns = column_names

这些方法中的一种应该能够帮助您解决问题,具体取决于您的Excel文件的结构和内容。

当您使用pd.read_excel函数从Excel文件中读取数据并设置header=[0, 1]时,Pandas会尝试填充没有数据的列。这是因为Pandas默认情况下会尝试合并相同级别的索引,以便更容易处理数据。如果您希望禁止此行为,您可以在读取数据后手动处理列头。以下是一种方法:

import pandas as pd

# 读取Excel文件
df = pd.read_excel('your_file.xlsx', header=[0, 1])

# 手动处理列头,删除包含空值的列
df.columns = df.columns.map(lambda x: f'{x[0]}' if pd.notna(x[1]) else '')

# 如果需要,您还可以重置列索引
df = df.reset_index(drop=True)

在上述代码中,我们首先读取了Excel文件,然后使用map函数和一个lambda函数来处理列头。lambda函数检查第二级别的列标题是否包含空值,如果包含空值,则用第一级别的列标题来替代。这将删除那些没有数据的列。最后,如果需要,我们还可以使用reset_index函数来重置DataFrame的列索引。

这种方法允许您手动处理列头,以满足您的需求,而不受默认合并相同级别索引的影响。

当使用Pandas库(pd)读取一个Excel文件并设置header=[0,1]时,Pandas会尝试在读取Excel文件时处理空白或缺失的列名称。默认情况下,Pandas会将缺失的列名称填充为NaN

如果您想要控制如何处理缺失的列名称,可以在读取Excel文件后对数据帧进行处理。以下是一些处理方法:

  1. 删除缺失的列:

    您可以使用dropna方法删除包含NaN值的列。例如:

    import pandas as pd
    
    # 读取Excel文件
    df = pd.read_excel('your_file.xlsx', header=[0, 1])
    
    # 删除缺失的列
    df = df.dropna(axis=1, how='all')
    

    这将删除在所有行中都缺失数据的列。

  2. 重命名列:

    如果您想要为缺失的列名称指定自定义名称,您可以使用rename方法。例如:

    import pandas as pd
    
    # 读取Excel文件
    df = pd.read_excel('your_file.xlsx', header=[0, 1])
    
    # 为缺失的列名称指定自定义名称
    df = df.rename(columns={('', ''): 'CustomColumnName'})
    

    这将把所有缺失的列名称重命名为自定义列名。

  3. 使用fillna:

    您还可以使用fillna方法将缺失的列名称填充为您指定的值。例如:

    import pandas as pd
       
    # 读取Excel文件
    df = pd.read_excel('your_file.xlsx', header=[0, 1])
       
    # 将缺失的列名称填充为自定义值
    df.columns = df.columns.fillna('CustomColumnName')
    

    这将把所有缺失的列名称填充为自定义列名。

根据您的需求,选择上述任何一种方法来处理缺失的列名称。这样,您可以更灵活地控制数据帧的结构。


明白了,如果是处理前端传递的文件并存储到S3,我们可以直接处理文件对象,不需要考虑本地路径。以下是修改后的代码:

import msoffcrypto
import openpyxl
from io import BytesIO

def read_encrypted_excel(file_object, password):
    """
    处理加密的Excel文件
    :param file_object: 文件对象(可以是BytesIO或文件流)
    :param password: Excel文件的密码
    :return: (workbook, decrypted_buffer)
    """
    try:
        # 创建msoffcrypto对象
        office_file = msoffcrypto.OfficeFile(file_object)
        # 设置密码
        office_file.load_key(password=password)
        
        # 解密到内存缓冲区
        decrypted_buffer = BytesIO()
        office_file.decrypt(decrypted_buffer)
        
        # 将指针移到开始位置
        decrypted_buffer.seek(0)
        
        # 使用openpyxl读取解密后的数据
        wb = openpyxl.load_workbook(decrypted_buffer, read_only=True, data_only=True)
        return wb, decrypted_buffer
            
    except Exception as e:
        print(f"处理Excel文件时出错: {str(e)}")
        raise

# 示例:处理上传的文件并保存到S3
def process_uploaded_excel(file_object, password, s3_client, bucket_name, s3_key):
    try:
        # 读取加密文件
        wb, decrypted_buffer = read_encrypted_excel(file_object, password)
        
        # 处理Excel数据
        ws = wb.active
        data = []
        for row in ws.iter_rows(values_only=True):
            data.append(row)
            
        # 可选:将解密后的文件上传到S3
        decrypted_buffer.seek(0)
        s3_client.upload_fileobj(
            decrypted_buffer,
            bucket_name,
            s3_key
        )
        
        return data
        
    except Exception as e:
        print(f"处理上传文件时出错: {str(e)}")
        raise
        
    finally:
        if 'decrypted_buffer' in locals():
            decrypted_buffer.close()

"""
# FastAPI示例用法:
from fastapi import UploadFile, File

@app.post("/upload-excel/")
async def upload_excel(
    file: UploadFile = File(...),
    password: str = Form(...),
    s3_client = Depends(get_s3_client)
):
    try:
        # 读取上传的文件内容
        file_content = await file.read()
        file_object = BytesIO(file_content)
        
        # 处理Excel文件
        data = process_uploaded_excel(
            file_object,
            password,
            s3_client,
            "your-bucket-name",
            f"excels/{file.filename}"
        )
        
        return {"status": "success", "data": data}
        
    except Exception as e:
        return {"status": "error", "message": str(e)}
"""

这个版本的代码:

  1. 直接处理文件对象而不是文件路径
  2. 支持处理前端上传的文件
  3. 可以直接与S3集成
  4. 使用内存缓冲区处理文件,不需要写入本地文件系统
  5. 包含了适当的异常处理和资源清理

注意事项:

  1. 确保服务器有足够的内存处理上传的文件
  2. 对于特别大的文件,可能需要考虑流式处理或分块处理
  3. 建议设置文件大小限制
  4. 可以根据需要添加文件类型验证

这个实现应该可以很好地配合你的S3存储方案工作。如果你需要其他特定的处理逻辑,可以根据需要进行调整。


以下是一个可以根据工作表名获取特定工作表数据的实现:

import msoffcrypto
import openpyxl
from io import BytesIO
from typing import Generator, Any, Dict, List, Optional

def stream_encrypted_excel_by_sheet(
    file_object, 
    password: str, 
    sheet_names: List[str] = None,
    chunk_size: int = 1000
) -> Generator[Dict[str, List], None, None]:
    """
    流式处理加密Excel文件中的指定工作表
    :param file_object: 文件对象
    :param password: Excel密码
    :param sheet_names: 需要读取的工作表名列表,如果为None则读取所有工作表
    :param chunk_size: 每次处理的行数
    :yield: 返回包含工作表名和数据的字典
    """
    try:
        # 创建msoffcrypto对象并解密
        office_file = msoffcrypto.OfficeFile(file_object)
        office_file.load_key(password=password)
        
        # 解密到内存缓冲区
        decrypted_buffer = BytesIO()
        office_file.decrypt(decrypted_buffer)
        decrypted_buffer.seek(0)
        
        # 使用openpyxl以只读模式打开
        wb = openpyxl.load_workbook(
            decrypted_buffer, 
            read_only=True, 
            data_only=True
        )
        
        # 获取所有工作表名
        available_sheets = wb.sheetnames
        
        # 如果未指定工作表名,则处理所有工作表
        if sheet_names is None:
            sheet_names = available_sheets
        
        # 验证请求的工作表是否存在
        invalid_sheets = [name for name in sheet_names if name not in available_sheets]
        if invalid_sheets:
            raise ValueError(f"工作表 {', '.join(invalid_sheets)} 不存在。可用的工作表: {', '.join(available_sheets)}")
        
        # 处理每个工作表
        for sheet_name in sheet_names:
            ws = wb[sheet_name]
            current_chunk = []
            
            # 逐行读取数据
            for row in ws.iter_rows(values_only=True):
                current_chunk.append(row)
                
                if len(current_chunk) >= chunk_size:
                    yield {
                        'sheet_name': sheet_name,
                        'data': current_chunk
                    }
                    current_chunk = []
            
            # 处理最后剩余的数据
            if current_chunk:
                yield {
                    'sheet_name': sheet_name,
                    'data': current_chunk
                }
            
    except Exception as e:
        print(f"处理Excel文件时出错: {str(e)}")
        raise
        
    finally:
        if 'decrypted_buffer' in locals():
            decrypted_buffer.close()
        if 'wb' in locals():
            wb.close()

# 使用示例
def process_excel_sheets(file_object, password: str, sheet_names: List[str] = None):
    try:
        # 处理指定工作表的数据
        for chunk in stream_encrypted_excel_by_sheet(file_object, password, sheet_names):
            sheet_name = chunk['sheet_name']
            data = chunk['data']
            print(f"Processing sheet: {sheet_name}")
            for row in data:
                # 处理每一行数据
                print(row)
                
    except ValueError as ve:
        print(f"工作表错误: {str(ve)}")
    except Exception as e:
        print(f"处理文件时出错: {str(e)}")

# FastAPI示例用法
"""
from fastapi import UploadFile, File, Form
from typing import List

@app.post("/process-excel-sheets/")
async def process_sheets(
    file: UploadFile = File(...),
    password: str = Form(...),
    sheet_names: List[str] = Form(...)
):
    try:
        # 读取上传的文件内容
        file_content = await file.read()
        file_object = BytesIO(file_content)
        
        # 存储处理结果
        results = {}
        
        # 处理指定的工作表
        for chunk in stream_encrypted_excel_by_sheet(file_object, password, sheet_names):
            sheet_name = chunk['sheet_name']
            if sheet_name not in results:
                results[sheet_name] = []
            results[sheet_name].extend(chunk['data'])
        
        return {"status": "success", "data": results}
        
    except ValueError as ve:
        return {"status": "error", "message": str(ve)}
    except Exception as e:
        return {"status": "error", "message": f"处理文件时出错: {str(e)}"}
    finally:
        file_object.close()
"""

# 示例:获取工作表名列表
def get_sheet_names(file_object, password: str) -> List[str]:
    """
    获取Excel文件中所有工作表的名称
    """
    try:
        office_file = msoffcrypto.OfficeFile(file_object)
        office_file.load_key(password=password)
        
        decrypted_buffer = BytesIO()
        office_file.decrypt(decrypted_buffer)
        decrypted_buffer.seek(0)
        
        wb = openpyxl.load_workbook(decrypted_buffer, read_only=True)
        return wb.sheetnames
        
    finally:
        if 'decrypted_buffer' in locals():
            decrypted_buffer.close()

主要特点:

  1. 灵活的工作表选择
    • 可以指定要读取的工作表
    • 如果不指定,则读取所有工作表
    • 包含工作表存在性验证
  2. 流式处理
    • 每个工作表的数据都是分块处理的
    • 减少内存使用
    • 支持大文件处理
  3. 错误处理
    • 详细的错误信息
    • 包含工作表验证
    • 资源正确释放

使用示例:

# 读取特定工作表
sheet_names = ['Sheet1', 'Sheet2']
process_excel_sheets(file_object, password, sheet_names)

# 读取所有工作表
process_excel_sheets(file_object, password)

# 获取所有工作表名
sheet_names = get_sheet_names(file_object, password)
print(f"可用的工作表: {sheet_names}")

这个实现可以帮助你有效地处理多工作表的Excel文件,同时保持内存使用效率。