使用Celery(和其他任务队列)的常见问题

这是我在使用Celery的Django项目中屡见不鲜的一些问题。 它们可能适用于其他任务队列,我只是没有使用太多。

1.使数据多于引用
如果在任务参数中复制数据库中的数据,则在执行任务之前,队列中的数据可能会过时。 Celery关于任务状态的文档更完整地描述这一点,大家可以看官方文档。

从版本4开始,在Celery上意外地做到这一点并不容易,该版本将默认序列化程序从Pickle更改为JSON。 (如果不确定使用的是哪个序列化程序,请检查设置。)

但是,仍然可以排队数据而不是引用。 例如,假设您在任务参数中使用用户的电子邮件地址而不是ID,将延迟1分钟的电子邮件放入队列。 如果用户在询问运行之前更改了电子邮件地址,则电子邮件将发送到错误的地址。

2.在数据库事务中排队任务
尽管您可能会认为Celery用于“以后/最终”执行任务,但它也可以快速执行它们!如果您从数据库事务中排队任务,则该任务可能在数据库提交事务之前执行。这可能意味着您的任务需要访问的数据(例如新的模型实例)不可见,并且引发了DidNotExist异常。

解决方案是在事务提交后使用Django的transaction.on_commit使您的任务排队。 Django文档和Celery文档中对此都有很好的描述。

如果您不在视图中使用事务,那么您还可以在任务所需的所有数据都存在之前执行任务。我建议您在视图中使用事务-最简单的方法是使用Django的ATOMIC_REQUESTS设置。

**更新(2020-02-03):从数据库副本读取任务也会发生陈旧数据问题。他建议的解决方案是通过模型上的updated_at字段进行过滤,以确保您获得了足够的最新时间戳。如果您使用的是MySQL,还可以通过检查已在副本上应用了全局事务ID来执行此操作。

3.在任务中不使用数据库事务
尽管Django可以通过ATOMIC_REQUESTS在视图中轻松使用数据库事务,你自己规划代码路径,这包括Celery任务。如果您不使用transaction.atomic()包装任务,或者在任务正文中使用它,则可能会遇到数据完整性问题。

值得审核您的任务,以查找应在哪里使用transaction.atomic()。您甚至可以为Celery的@shared_task添加特定于项目的包装器,以将@atomic添加到您的任务中。

4.默认的“不公平”任务分配
默认情况下,Celery工作者将向其工作者进程发送批任务,并在其中将它们在内存中重新排队。这样做是为了提高吞吐量,因为工作进程无需等待代理程序的任务。但这确实意味着一个进程中的长时间运行任务可以支撑在其后排的较快任务,即使其他工作进程可以自由运行它们也是如此。

以我的经验,这种默认行为从来都不是理想的。对于项目而言,具有运行时间截然不同的任务非常普遍,从而导致这种阻塞。

您可以通过使用-O fair运行celery worker来禁用它。有关“ Prefork池预取设置”的Celery文档有更好的解释。

5.在很长的将来使用长倒数或eta
Celery为任务队列提供eta和countdown参数。这些使您可以安排任务以供以后执行。

不幸的是,这些工作方式并未内置在brokers中。这些延迟的任务将最终出现在您的队列的最前面,在后面的非延迟任务之前。 Celery worker进程获取延迟的任务,并将其“放在一边”在内存中,然后获取未延迟的任务。

对于许多此类任务,Celery工作进程将使用大量内存来保存这些任务。重新启动工作进程还将需要重新获取队列开头的所有延迟任务。我已经看到一种情况,其中有足够多的延迟任务,重新启动需要几分钟才能开始进行实际工作!

如果您需要将任务延迟几分钟以上,则应避免eta和倒数计时。最好在模型实例中添加一个要执行操作的时间字段,并使用定期任务来排入实际执行时间。

6. ACKS行为
Celery的默认行为是立即确认任务,将其从您的brokers队列中删除。如果它们被中断,例如由于服务器随机崩溃而中断,Celery不会重试该任务。

如果您的任务不是幂等的(可以无问题地重复),那么这很好。但这不适用于处理随机错误,例如数据库连接随机断开。在这种情况下,工作会丢失,因为Celery会在尝试之前将其从队列中删除。

相反的行为“迟到”,仅在成功完成后才确认任务。这是其他许多排队系统(例如SQS)的鼓励行为。

Celery在FAQ中的文档中对此进行了介绍:“我应该使用retry还是acks_late?”。这是一个细微的问题,但是我确实认为默认的“尽早收拾”行为倾向于违反直觉。

我建议在您的Celery配置中将acks_late = True设置为默认值,并考虑通过哪种模式适合每个任务。您可以通过将acks_late传递给@shared_task装饰器,在每个任务功能上对其进行重新配置。

6.不重试错过的工作
任务可能由于多种原因而崩溃,其中许多是您无法控制的。 例如,如果数据库服务器崩溃,则Celery可能无法执行任务,并引发“连接失败”错误。

解决此问题的最简单方法是使用第二个定期的“清扫任务”,该任务扫描并重复/排队错过的工作。 AWS的最新文章《避免无法克服的队列积压》显示,它们将此类任务称为“反熵清除程序”。

7.以向后不兼容的方式更改任务签名
想象一下,我们有一个像这样的任务功能:

并且我们对其进行了更改,以添加一个新的必需参数:

当我们部署此更改时,来自第一个版本的代码的所有排队任务将失败,并显示以下信息:

您应该以与数据库迁移相同的考虑来对待任务功能签名。

如果要添加新参数,请先为其提供默认值-类似于将数据库列添加为可空值。 该版本的代码生效后,您可以删除默认值。

同样,如果要删除参数,请先为其提供默认值。

如果要删除任务,请先删除呼叫站点,然后再删除任务本身。

我没有任何工具可以帮助执行此操作,我们进行了一些本地的Django系统检查。 这些确保我们仔细考虑了对任务及其论点所做的每项更改。

我希望这可以帮助您享受Celery或您使用的任何任务队列.

https://adamj.eu/tech/2020/02/03/common-celery-issues-on-django-projects/