事实证明,在芹菜中,不能将两个组链接在一起。我怀疑这是因为与任务链接的组自动变成和弦-> Celery文档:http : //docs.celeryproject.org/en/latest/userguide/canvas.html
将组与另一个任务链接在一起将自动将其升级为和弦:
组返回父任务。将两个组链接在一起时,我怀疑当第一组完成时,和弦会启动回调“任务”。我怀疑这个“任务”实际上是第二组的“父母任务”。我进一步怀疑该父任务在启动该组中的所有子任务后立即完成,因此执行了第二个组之后的下一项。
为了证明这一点,这里是一些示例代码。您需要已经有一个正在运行的celery实例。
# celery_experiment.pyfrom celery import task, group, chain, chordfrom celery.signals import task_sent, task_postrun, task_prerunimport timeimport loggingimport randomrandom.seed()logging.basicConfig(level=logging.DEBUG)### HANDLERS ### @task_prerun.connect()def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):try:logging.info(’[%s] starting’ % kwargs[’id’]) except KeyError:pass@task_postrun.connect()def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds): try: logging.info(’[%s] finished’ % kwargs[’id’]) except KeyError:passdef random_sleep(id): slp = random.randint(1, 3) logging.info(’[%s] sleep for %ssecs’ % (id, slp)) time.sleep(slp)@task()def thing(id): logging.info(’[%s] begin’ % id) random_sleep(id) logging.info(’[%s] end’ % id)def exec_exp(): st = thing.si(id=’st’) st_arr = [thing.si(id=’st_arr1_a’), thing.si(id=’st_arr1_b’), thing.si(id=’st_arr1_c’),] st_arr2 = [thing.si(id=’st_arr2_a’), thing.si(id=’st_arr2_b’),] st2 = thing.si(id=’st2’) st3 = thing.si(id=’st3’) st4 = thing.si(id=’st4’) grp1 = group(st_arr) grp2 = group(st_arr2) # chn can chain two groups together because they are seperated by a single subtask chn = (st | grp1 | st2 | grp2 | st3 | st4) # in chn2 you can’t chain two groups together. what will happen is st3 will start before grp2 finishes #chn2 = (st | st2 | grp1 | grp2 | st3 | st4) r = chn() #r2 = chn2()解决方法
当我有以下内容时
group1 = group(task1.si(),task1.si(),task1.si())group2 = group(task2.si(),task2.si(),task2.si())workflow = chain(group1,group2,task3.si())
直观的解释是,task3应该仅在第2组中的所有任务完成后才执行。
实际上,任务3在group1开始但尚未完成时执行。
我究竟做错了什么?