Django使用Celery实现定时任务

Celery是一个专注于实时处理和任务调度的分布式任务队列,,支持使用任务队列的方式在分布式的机器/进程/线程上执行任务调度。
Celery本身不是任务队列, 是管理分布式任务队列的工具。 它封装了操作常见任务队列的各种操作, 我们使用它可以快速进行任务队列的使用与管理。

一、celery 组件

Celery采用典型的生产者-消费者模型,主要由以下组成:

  • 1、Celery Beat: 任务调度器。 Beat 进程会读取配置文件的内容, 周期性的将配置中到期需要执行的任务发送给任务队列。
  • 2、Producer : 任务生产者. 调用 Celery API , 函数或者装饰器, 产生任务并交给任务队列处理的都是任务生产者。
  • 2、消息队列broker:消息代理, 队列本身,也称为消息中间件。接受任务生产者发送过来的任务消息, 存进队列再按序分发给任务消费方。broker 实际上就是一个 MQ 队列服务,可以使用redis、rabbitmq等作为broker。
  • 3、Celery Worker: 任务消费者, broker 通知 worker 队列 中有任务,worker队列 中取出任务执行,每一个 worker 就是一个进程。通常会在多台服务器运行多个消费者, 提高运行效率。
  • 4、存储结果的backend:执行结果存储在 backend,默认也会存储在 broker 使用的 MQ队列 服务中,也可以单独配置用何种服务做backend。

Celery架构图
celery

二、配置定时任务

原生celery,非djcelery模块

安装redis

安装celery

1
pip install celery

Django项目配置

1、settings.py

1
2
3
4
5
6
7
BROKER_URL  = 'redis://:password@localhost:6379/0'
CELERY_RESULT_BACKEND = 'redis://:password@localhost:6379/0'

CELERY_ACCEPT_CONTENT = ['application/json',]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE

2、创建celery.py

在项目配置目录(和settings.py同级)下新建 celery.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from celery import Celery, platforms
from celery.schedules import crontab
from datetime import timedelta
import os

project_name = os.path.split(os.path.abspath('.'))[-1]
project_setting = '%s.settings' % project_name

os.environ.setdefault('DJANGO_SETTINGS_MODULE',project_setting)

app = Celery(project_name)
app.config_from_object('django.conf:settings')

app.autodiscover_tasks()

# 允许root 用户运行celery
platforms.C_FORCE_ROOT = True

# 定时任务
app.conf.update(
CELERYBEAT_SCHEDULE = {
'task-1':{
'task':'trade.tasks.myjob1',
# 每天1:01点执行
'schedule':crontab(minute=1,hour=11)
},
'task-2': {
'task': 'trade.tasks.myjob2',
# 每5分钟执行一次
'schedule':crontab(minute=5),
'args': (1, 2)
}
}
)

timedeltadatetime 中的一个对象,需要 from datetime import timedelta 引入,有如下几个参数:

  • days:天
  • seconds:秒
  • microseconds:微妙
  • milliseconds:毫秒
  • minutes:分
  • hours:小时

crontab 的参数有:

  • month_of_year:月份
  • day_of_month:日期
  • day_of_week:周
  • hour:小时
  • minute:分钟

3、修改__init__.py

在项目配置目录(和settings.py同级)编辑 __init__.py文件中增加如下内容,确保django启动的时候celery.py 里面的定时任务app能够被加载到:

1
2
3
4
5
from .celery import app as celery_app
import pymysql

pymysql.install_as_MySQLdb()
__all__ = ['celery_app']

4、创建tasks.py

在应用目录下创建 tasks.py 文件:

1
2
3
4
5
6
7
8
9
from celery import shared_task

@shared_task
def myJob1():
pass

@shared_task
def myJob2(x,y):
return x + y

启动celery beat

1、celery 启动了一个 beat 进程一直在不断的判断是否有任务需要执行。
在项目主目录下执行命令:

1
celery -A website beat -l info

2、同时要启动 worker 去执行任务:

1
celery -A website worker -l info

一种更简单的启动方式,只需启动一次即可,代替上面两个步骤:

1
celery -A website worker -B -l info

三、Celery内存泄露

1、长时间运行 Celery 有可能发生内存泄露,可以配置 CELERYD_MAX_TASKS_PER_CHILD,让Worker在执行n个任务杀掉子进程再启动新的子进程,可以防止内存泄露。
settings.py:

1
CELERYD_MAX_TASKS_PER_CHILD = 10

2、Worker 执行大量任务后有可能出现僵死的情况,解决办法是设置一个 crontab 定时重启 Worker