django-celery

Posted by Shallow Dreamer on June 4, 2023

celery_app.config_from_object('django.conf.settings') 是用来将 Django 项目的配置加载到 Celery 应用程序中的。如果执行该语句时出现问题,可能有以下几个原因:

  1. Django 项目没有正确配置

    确保您的 Django 项目已经正确配置。在 settings.py 文件中,必须包含一些与 Celery 相关的配置项,例如 CELERY_BROKER_URLCELERY_RESULT_BACKEND 等。

  2. 无法找到 Django 项目

    如果执行 celery_app.config_from_object('django.conf.settings') 时出现错误,可能是因为无法找到 Django 项目。请确保您在正确的位置运行 Celery worker,以便能够找到 Django 项目并加载其配置。

  3. 问题出现在其他地方

    如果上述两个步骤都没有解决问题,那么问题可能出现在其他地方。建议检查 Celery 的日志文件以获取更多信息,或者提供更多的错误信息以便更好地帮助您。

下面是一个示例配置,可以参考一下:

# 在 settings.py 文件中添加如下配置

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'

CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_RESULT_SERIALIZER = 'json'

CELERY_IMPORTS = (
    'myapp.tasks',
)

在启动 Celery worker 时,需要指定 Django 项目的配置模块,并将其传递给 config_from_object 方法,例如:

celery -A myproject worker --loglevel=info --settings=myproject.settings

其中,myproject 是您的 Django 项目名称,settings 是 Django 项目中的配置模块。在代码中,可以使用如下语句加载配置:

app = Celery('myapp')
app.config_from_object('django.conf:settings', namespace='CELERY')

以下是一个使用 Django 和 Celery 的项目 demo,可以在 Python 3.11 版本中运行:

  1. 创建 Django 项目

    首先创建一个 Django 项目,可以使用如下命令:

    django-admin startproject myproject
    
  2. 安装 Celery

    在项目目录下使用 pip 工具安装 Celery:

    pip install celery
    
  3. 配置 Celery

    在 Django 项目的 settings.py 中添加 Celery 相关配置:

    # 指定 broker
    CELERY_BROKER_URL = 'redis://localhost:6379/0'
    
    # 设置结果存储
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
    
    # 指定任务模块路径
    CELERY_IMPORTS = ('myapp.tasks',)
    
  4. 创建任务

    在 myapp 应用下创建 tasks.py 文件,定义一个简单的任务:

    from celery import shared_task
       
    @shared_task
    def add(x, y):
        return x + y
    
  5. 启动 worker

    在项目根目录下创建一个 celery.py 文件,启动 worker:

    from myproject import settings
    from celery import Celery
       
    app = Celery('myproject')
       
    # 加载配置
    app.conf.update(settings.__dict__.get('CELERY_CONFIG', {}))
       
    if __name__ == '__main__':
        app.start()
    

    启动 worker:

    celery -A myproject worker -l info
    
  6. 调用任务

    在视图函数中调用任务:

    from django.http import JsonResponse
    from myapp.tasks import add
       
    def myview(request):
        result = add.delay(1, 2)
        return JsonResponse({'task_id': result.id})
    
  7. 查看任务结果

    可以通过异步方式获取任务结果:

    from myapp.tasks import add
       
    result = add.delay(1, 2)
    print(result.id)  # 查看任务 id
    print(result.get())  # 获取任务结果(会阻塞当前线程)
    

    或者使用回调函数获取任务结果:

    from myapp.tasks import add
       
    def on_task_complete(result):
        print(result)
       
    result = add.delay(1, 2)
    result.then(on_task_complete)
    

这个 demo 展示了如何在 Django 中使用 Celery 来执行异步任务,可以根据自己的需求进行进一步扩展。需要注意的是,实际应用中还需要对 Celery 进行更细致的配置,例如设置 worker 数量、超时时间等。

在使用Celery时,你可以在配置文件中指定日志的设置,让Celery自动创建日志文件,记录Celery的运行情况。

以下是使用Celery创建日志文件的一个示例:

# 在celery.py文件中

import logging
from celery import Celery

app = Celery('your_project_name')

app.config_from_object('django.conf:settings', namespace='CELERY')

# 创建一个logger对象,用于记录日志。
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

# 创建一个日志处理器,用于指定日志的格式和写入方式
handler = logging.FileHandler('/path/to/logs/celery.log')
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

# 将处理器添加到logger对象中,并指定日志的级别
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)

在上述代码中,我们首先创建了一个名为logger的对象,用于记录日志信息。然后,我们创建了一个FileHandler对象,该对象指定了日志文件的路径和名称,并设置了日志级别为DEBUG。接下来,我们指定了日志记录的格式,将其添加到处理器中,并将处理器添加到logger对象中。

最后,我们将日志级别设置为DEBUG,并在Celery的配置中初始化这个logger对象。这样,在Celery运行期间,就会自动创建并记录日志文件/path/to/logs/celery.log,并将Celery的运行信息记录到日志文件中。

请注意,你需要将/path/to/logs/celery.log替换为实际的日志文件路径和名称。

以上就是使用Celery创建日志文件的一个示例。你可以根据自己的需求来进行配置和调整,以满足自己的日志记录需求。

在Django中使用Celery进行并发处理任务时,可能会遇到自动创建新进程的问题。这可能是由于Celery未正确配置或安装导致的。

以下是一些可能的解决方法:

  1. 确保你已经正确的安装了Celery,并将其添加到了你的项目中。

  2. 检查你的Celery配置文件,确保你正确地设置了Celery的worker数量和concurrency参数。如果这些参数设置不正确,可能会导致Celery创建过多的进程。

  3. 如果你已经尝试了上述方法但仍无法解决问题,请检查你的代码是否正确地使用了Celery。特别是,确保你正确地调用了Celery的task函数,并在必要时传递了正确的参数。

  4. 如果问题仍然存在,则可以尝试重新启动Celery,并在启动期间记录所有输出消息和错误。这些日志可能包含有关问题的更多详细信息,从而帮助你更好地理解问题并解决它。

总之,要解决这个问题,你需要仔细检查你的代码和Celery的配置,并尝试排除所有可能的错误。如果你无法解决问题,请考虑向Celery社区或其他专业人士寻求帮助。

Celery 5.1.2 版本理论上应该支持 Python 3.6 和 Python 3.7 及以上的版本,因此问题不太可能是由 Celery 版本本身引起的。

如果在 Python 3.7 或更高版本中发现 Celery 自动创建新进程导致无法正常处理任务,可能是因为某些库或组件的兼容性问题。建议采取以下步骤来排除问题:

  1. 检查你所使用的所有依赖项是否都已正确安装,并且与所使用的 Python 版本兼容。特别是检查 Celery 的依赖项,如 kombu、billiard 等等,官方推荐使用最新稳定版。

  2. 检查你的代码是否在使用 Celery 时运行了其他线程或进程。在多线程环境下,有些函数可能无法正常工作,从而导致问题。确保你的代码没有这种情况出现。

  3. 如果你使用的是 Django,建议尝试将 CELERY_TASK_ALWAYS_EAGER 设置为 False。这个参数用于指定是否强制同步执行 Celery 任务。设置为 True 时,任务会立即执行,而不是加入到队列中等待异步执行。在某些情况下,这可能导致 Celery 出现奇怪的行为。

  4. 如果上述步骤都没有解决问题,请考虑重装 Python、Celery 和其他相关依赖项。在重新安装之前,可能需要卸载所有现有的库和组件,并确保你的代码不再有任何残留文件。

总之,要解决这个问题,需要仔细排查每一个可能的因素,并逐一进行测试和排除。如果问题仍然存在,请考虑向 Celery 社区或其他专业人士寻求帮助。

好的,以下是一个使用 Django、Redis 和 Celery 的项目示例:

  1. 创建 Django 项目
django-admin startproject myproject
  1. 安装 Django、Redis 和 Celery
pip install django redis celery
  1. 配置 Django,启用 Redis 和 Celery

修改 myproject/settings.py 文件,添加以下配置项:

# myproject/settings.py

# 其他配置项...

# 配置 Redis
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
BROKER_URL = f'redis://{REDIS_HOST}:{REDIS_PORT}/0'
CELERY_RESULT_BACKEND = f'redis://{REDIS_HOST}:{REDIS_PORT}/1'

# 配置 Celery
CELERY_BROKER_URL = BROKER_URL
CELERY_RESULT_BACKEND = CELERY_RESULT_BACKEND

CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'

CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False

CELERY_BEAT_SCHEDULE = {
    # 添加定时任务
    # ...
}

在上述配置中,我们指定了 Redis 的地址和端口,并将它们作为 Celery 和 Django 的 broker 和 result backend。此外,我们还指定了 Celery 的序列化格式、时区等参数,如果需要可以配置定时任务。

  1. 创建并运行 Celery worker

创建一个新文件 celery.py 并添加以下内容:

# celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

app = Celery('myproject')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()

myproject/__init__.py 文件中添加以下代码,让 Celery 能够找到 entrypoint:

# myproject/__init__.py

from .celery import app as celery_app

__all__ = ('celery_app',)

最后,在项目的根目录下启动 Celery worker:

celery -A myproject worker -l INFO
  1. 创建一个异步任务

myapp/tasks.py 文件中添加以下内容,定义一个异步任务:

# myapp/tasks.py

from celery import shared_task

@shared_task
def add(x, y):
    return x + y
  1. 在 Django 中调用异步任务

在 Django 任何你需要调用异步任务的地方,引入上面定义的 add() 函数,并使用 delay() 方法来异步执行它:

# myapp/views.py

from django.shortcuts import render
from .tasks import add

def home(request):
    result = add.delay(4, 4)
    return render(request, 'home.html', {'result': result.get()})
  1. 运行 Django 项目

运行 Django 项目:

python manage.py runserver

现在,你可以在浏览器中访问 http://127.0.0.1:8000/,查看异步任务返回的结果。

这就是一个简单的 Django、Redis 和 Celery 的项目示例。你可以根据自己的需求和实际情况进行修改和扩展。