任务分发测试

Posted by Shallow Dreameron October 10, 2025

👌明白了,你的意思是:

  • 你已经写好 delay_on_commit(给单个 task 用的,不需要我重写)。
  • 父任务里面只要合理地 循环调用你的 delay_on_commit,就能完成「一批一批发送子任务」+「记录子任务信息」+「在合适的时机收集结果」的逻辑。

在父任务里常见的两种“节奏”

1. 一个一个发(逐个 delay_on_commit)

@app.task
def parent_task():
    results = []
    for item in get_items():
        # 使用你现有的 delay_on_commit
        r = subtask.delay_on_commit(item.id)
        results.append(r)

    # 在事务提交后,results 里会有 AsyncResult
    transaction.on_commit(lambda: record_results(results))

这里 record_results(results) 就是你自己的逻辑,可能是:

def record_results(results):
    for r in results:
        MyTaskLog.objects.create(task_id=r.id, ...)

2. 一批一批发(类似 group,但不用 group)

@app.task
def parent_task():
    batch = []
    results = []

    for item in get_items():
        batch.append(subtask.s(item.id))
        if len(batch) >= 10:   # 比如每 10 个子任务一批
            for sig in batch:
                results.append(sig.delay_on_commit())  # 用你现有的 API
            batch = []

    # 最后一批
    for sig in batch:
        results.append(sig.delay_on_commit())

    # 事务提交后再统一处理
    transaction.on_commit(lambda: record_results(results))

这样,你就能控制 发送节奏(一次 1 个 / 10 个 / N 个),而且子任务仍然通过你写好的 delay_on_commit 发出。


结果收集

发送后你通常要「等子任务跑完再收集结果」:

def collect_results(results):
    # results 是 AsyncResult 的列表
    values = [r.get() for r in results]  # 会阻塞直到子任务都完成
    return values

在父任务里用:

transaction.on_commit(lambda: collect_results(results))

如果你不想阻塞,可以只记录 task_id 到数据库,后面单独用 AsyncResult(task_id) 去收。


✅ 结论: 你完全可以在父任务里 直接调用你写好的 delay_on_commit

  • 单个循环就是逐个发,
  • 批量循环就是一次发多个,
  • 想收结果就用 AsyncResult.get()
  • 想只追踪就存 task_id

啊,理解了 👍

你现在的需求是 批处理 + 串行节奏控制

  • 父任务里分批发送子任务。
  • 每一批子任务必须等全部完成 → 收集结果 → 记录数据库。
  • 然后再继续下一批,避免一次性把所有子任务丢给 worker 把 worker 堵死。

这其实是「分批 group + 等待」的模式,用你现有的 delay_on_commit 也能做到。


示例实现

from celery import group
from django.db import transaction
from myapp.models import MyTaskLog


@app.task
def parent_task(items, batch_size=10):
    """
    items: 子任务的参数列表
    batch_size: 每批处理多少个子任务
    """

    results = []

    # 分批
    for i in range(0, len(items), batch_size):
        batch_items = items[i:i+batch_size]

        batch_results = []
        for item in batch_items:
            # 用你自己写好的 delay_on_commit
            r = subtask.delay_on_commit(item)
            batch_results.append(r)

        def after_commit(batch_results=batch_results):
            # 等事务提交后才能真正发子任务
            # 这里可以阻塞等待子任务完成
            values = [r.get() for r in batch_results]   # 等这一批都跑完
            for v in values:
                MyTaskLog.objects.create(result=v)      # 记录结果

        # 在事务提交后执行批次
        transaction.on_commit(after_commit)

        # 注意:这里 transaction.on_commit 的回调是在当前事务提交时执行
        # 如果 parent_task 本身没有用事务,可以直接同步调用 after_commit()
        after_commit()

        # 这一批完成后再进入下一批
        results.extend(batch_results)

    return [r.id for r in results]

节奏控制的要点

  1. 批次发送 每批 batch_size 个子任务,用 delay_on_commit 发出。
  2. 等待批次完成[r.get() for r in batch_results] 等待 worker 把这一批跑完。 这会阻塞父任务,直到这一批完成。
  3. 记录结果 在批次完成后,保存结果(或 task_id + 状态)到数据库。
  4. 继续下一批 只有当前批跑完,才会发下一批,保证 worker 不会一次性被占满。

如果父任务自己不在事务里

如果 parent_task 只是普通 Celery 任务,没有事务上下文,delay_on_commit 就相当于直接 apply_async。 这种情况下,你可以直接:

batch_results = [subtask.delay_on_commit(item) for item in batch_items]
values = [r.get() for r in batch_results]

就能做到 一批一批跑 + 等结果