👌明白了,你的意思是:
- 你已经写好
delay_on_commit(给单个 task 用的,不需要我重写)。 - 父任务里面只要合理地 循环调用你的
delay_on_commit,就能完成「一批一批发送子任务」+「记录子任务信息」+「在合适的时机收集结果」的逻辑。
在父任务里常见的两种“节奏”
1. 一个一个发(逐个 delay_on_commit)
@app.task
def parent_task():
results = []
for item in get_items():
# 使用你现有的 delay_on_commit
r = subtask.delay_on_commit(item.id)
results.append(r)
# 在事务提交后,results 里会有 AsyncResult
transaction.on_commit(lambda: record_results(results))
这里 record_results(results) 就是你自己的逻辑,可能是:
def record_results(results):
for r in results:
MyTaskLog.objects.create(task_id=r.id, ...)
2. 一批一批发(类似 group,但不用 group)
@app.task
def parent_task():
batch = []
results = []
for item in get_items():
batch.append(subtask.s(item.id))
if len(batch) >= 10: # 比如每 10 个子任务一批
for sig in batch:
results.append(sig.delay_on_commit()) # 用你现有的 API
batch = []
# 最后一批
for sig in batch:
results.append(sig.delay_on_commit())
# 事务提交后再统一处理
transaction.on_commit(lambda: record_results(results))
这样,你就能控制 发送节奏(一次 1 个 / 10 个 / N 个),而且子任务仍然通过你写好的 delay_on_commit 发出。
结果收集
发送后你通常要「等子任务跑完再收集结果」:
def collect_results(results):
# results 是 AsyncResult 的列表
values = [r.get() for r in results] # 会阻塞直到子任务都完成
return values
在父任务里用:
transaction.on_commit(lambda: collect_results(results))
如果你不想阻塞,可以只记录 task_id 到数据库,后面单独用 AsyncResult(task_id) 去收。
✅ 结论:
你完全可以在父任务里 直接调用你写好的 delay_on_commit,
- 单个循环就是逐个发,
- 批量循环就是一次发多个,
- 想收结果就用
AsyncResult.get(), - 想只追踪就存
task_id。
啊,理解了 👍
你现在的需求是 批处理 + 串行节奏控制:
- 父任务里分批发送子任务。
- 每一批子任务必须等全部完成 → 收集结果 → 记录数据库。
- 然后再继续下一批,避免一次性把所有子任务丢给 worker 把 worker 堵死。
这其实是「分批 group + 等待」的模式,用你现有的 delay_on_commit 也能做到。
示例实现
from celery import group
from django.db import transaction
from myapp.models import MyTaskLog
@app.task
def parent_task(items, batch_size=10):
"""
items: 子任务的参数列表
batch_size: 每批处理多少个子任务
"""
results = []
# 分批
for i in range(0, len(items), batch_size):
batch_items = items[i:i+batch_size]
batch_results = []
for item in batch_items:
# 用你自己写好的 delay_on_commit
r = subtask.delay_on_commit(item)
batch_results.append(r)
def after_commit(batch_results=batch_results):
# 等事务提交后才能真正发子任务
# 这里可以阻塞等待子任务完成
values = [r.get() for r in batch_results] # 等这一批都跑完
for v in values:
MyTaskLog.objects.create(result=v) # 记录结果
# 在事务提交后执行批次
transaction.on_commit(after_commit)
# 注意:这里 transaction.on_commit 的回调是在当前事务提交时执行
# 如果 parent_task 本身没有用事务,可以直接同步调用 after_commit()
after_commit()
# 这一批完成后再进入下一批
results.extend(batch_results)
return [r.id for r in results]
节奏控制的要点
- 批次发送
每批
batch_size个子任务,用delay_on_commit发出。 - 等待批次完成
用
[r.get() for r in batch_results]等待 worker 把这一批跑完。 这会阻塞父任务,直到这一批完成。 - 记录结果 在批次完成后,保存结果(或 task_id + 状态)到数据库。
- 继续下一批 只有当前批跑完,才会发下一批,保证 worker 不会一次性被占满。
如果父任务自己不在事务里
如果 parent_task 只是普通 Celery 任务,没有事务上下文,delay_on_commit 就相当于直接 apply_async。
这种情况下,你可以直接:
batch_results = [subtask.delay_on_commit(item) for item in batch_items]
values = [r.get() for r in batch_results]
就能做到 一批一批跑 + 等结果。